This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 05ab5e86d [server] Handle uncaught exceptions in TabletServer during 
log recovery from residual data corresponding to already dropped tables (#1487)
05ab5e86d is described below

commit 05ab5e86d1df69a96d56beed9ce258c603a478f3
Author: Yang Wang <[email protected]>
AuthorDate: Tue Sep 16 17:33:22 2025 +0800

    [server] Handle uncaught exceptions in TabletServer during log recovery 
from residual data corresponding to already dropped tables (#1487)
---
 .../org/apache/fluss/server/TabletManagerBase.java |   3 +-
 .../org/apache/fluss/server/log/LogManager.java    |  85 +++--
 .../fluss/server/log/DroppedTableRecoveryTest.java | 354 +++++++++++++++++++++
 3 files changed, 423 insertions(+), 19 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/TabletManagerBase.java 
b/fluss-server/src/main/java/org/apache/fluss/server/TabletManagerBase.java
index 17bfd260b..6525df1c4 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/TabletManagerBase.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/TabletManagerBase.java
@@ -20,6 +20,7 @@ package org.apache.fluss.server;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.KvStorageException;
 import org.apache.fluss.exception.LogStorageException;
+import org.apache.fluss.exception.SchemaNotExistException;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.SchemaInfo;
 import org.apache.fluss.metadata.TableBucket;
@@ -203,7 +204,7 @@ public abstract class TabletManagerBase {
         Optional<SchemaInfo> schemaInfoOpt = zkClient.getSchemaById(tablePath, 
schemaId);
         SchemaInfo schemaInfo;
         if (!schemaInfoOpt.isPresent()) {
-            throw new LogStorageException(
+            throw new SchemaNotExistException(
                     String.format(
                             "Failed to load table '%s': Table schema not found 
in zookeeper metadata.",
                             tablePath));
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java
index 644eedfb4..7a30b0d7b 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java
@@ -22,6 +22,7 @@ import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.exception.LogStorageException;
+import org.apache.fluss.exception.SchemaNotExistException;
 import org.apache.fluss.metadata.LogFormat;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.TableBucket;
@@ -189,24 +190,8 @@ public final class LogManager extends TabletManagerBase {
             final boolean cleanShutdown = isCleanShutdown;
             // set runnable job.
             Runnable[] jobsForDir =
-                    tabletsToLoad.stream()
-                            .map(
-                                    tabletDir ->
-                                            (Runnable)
-                                                    () -> {
-                                                        LOG.debug("Loading log 
{}", tabletDir);
-                                                        try {
-                                                            loadLog(
-                                                                    tabletDir,
-                                                                    
cleanShutdown,
-                                                                    
finalRecoveryPoints,
-                                                                    conf,
-                                                                    clock);
-                                                        } catch (Exception e) {
-                                                            throw new 
FlussRuntimeException(e);
-                                                        }
-                                                    })
-                            .toArray(Runnable[]::new);
+                    createLogLoadingJobs(
+                            tabletsToLoad, cleanShutdown, finalRecoveryPoints, 
conf, clock);
 
             long startTime = System.currentTimeMillis();
 
@@ -471,6 +456,70 @@ public final class LogManager extends TabletManagerBase {
         LOG.info("Shut down LogManager complete.");
     }
 
+    /** Create runnable jobs for loading logs from tablet directories. */
+    private Runnable[] createLogLoadingJobs(
+            List<File> tabletsToLoad,
+            boolean cleanShutdown,
+            Map<TableBucket, Long> recoveryPoints,
+            Configuration conf,
+            Clock clock) {
+        Runnable[] jobs = new Runnable[tabletsToLoad.size()];
+        for (int i = 0; i < tabletsToLoad.size(); i++) {
+            final File tabletDir = tabletsToLoad.get(i);
+            jobs[i] = createLogLoadingJob(tabletDir, cleanShutdown, 
recoveryPoints, conf, clock);
+        }
+        return jobs;
+    }
+
+    /** Create a runnable job for loading log from a single tablet directory. 
*/
+    private Runnable createLogLoadingJob(
+            File tabletDir,
+            boolean cleanShutdown,
+            Map<TableBucket, Long> recoveryPoints,
+            Configuration conf,
+            Clock clock) {
+        return new Runnable() {
+            @Override
+            public void run() {
+                LOG.debug("Loading log {}", tabletDir);
+                try {
+                    loadLog(tabletDir, cleanShutdown, recoveryPoints, conf, 
clock);
+                } catch (Exception e) {
+                    LOG.error("Fail to loadLog from {}", tabletDir, e);
+                    if (e instanceof SchemaNotExistException) {
+                        LOG.error(
+                                "schema not exist, table for {} has already 
been dropped, the residual data will be removed.",
+                                tabletDir,
+                                e);
+                        FileUtils.deleteDirectoryQuietly(tabletDir);
+
+                        // Also delete corresponding KV tablet directory if it 
exists
+                        try {
+                            Tuple2<PhysicalTablePath, TableBucket> 
pathAndBucket =
+                                    FlussPaths.parseTabletDir(tabletDir);
+                            File kvTabletDir =
+                                    FlussPaths.kvTabletDir(
+                                            dataDir, pathAndBucket.f0, 
pathAndBucket.f1);
+                            if (kvTabletDir.exists()) {
+                                LOG.info(
+                                        "Also removing corresponding KV tablet 
directory: {}",
+                                        kvTabletDir);
+                                FileUtils.deleteDirectoryQuietly(kvTabletDir);
+                            }
+                        } catch (Exception kvDeleteException) {
+                            LOG.warn(
+                                    "Failed to delete corresponding KV tablet 
directory for log {}: {}",
+                                    tabletDir,
+                                    kvDeleteException.getMessage());
+                        }
+                        return;
+                    }
+                    throw new FlussRuntimeException(e);
+                }
+            }
+        };
+    }
+
     @VisibleForTesting
     void checkpointRecoveryOffsets() {
         // Assuming TableBucket and LogTablet are actual types used in your 
application
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/DroppedTableRecoveryTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/log/DroppedTableRecoveryTest.java
new file mode 100644
index 000000000..ee5d082b5
--- /dev/null
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/log/DroppedTableRecoveryTest.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.log;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.TableConfig;
+import org.apache.fluss.metadata.KvFormat;
+import org.apache.fluss.metadata.LogFormat;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.LogTestBase;
+import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.server.kv.KvManager;
+import org.apache.fluss.server.kv.KvTablet;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
+import org.apache.fluss.server.zk.NOPErrorHandler;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.ZooKeeperExtension;
+import org.apache.fluss.server.zk.data.TableRegistration;
+import org.apache.fluss.testutils.common.AllCallbackWrapper;
+import org.apache.fluss.utils.clock.SystemClock;
+import org.apache.fluss.utils.concurrent.FlussScheduler;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+
+import static 
org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
+import static org.apache.fluss.record.TestData.DATA1;
+import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
+import static 
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for handling recovery from residual data of already dropped tables. */
+final class DroppedTableRecoveryTest extends LogTestBase {
+
+    @RegisterExtension
+    public static final AllCallbackWrapper<ZooKeeperExtension> 
ZOO_KEEPER_EXTENSION_WRAPPER =
+            new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+    private static ZooKeeperClient zkClient;
+    private @TempDir File tempDir;
+    private TablePath tablePath;
+    private TableBucket tableBucket;
+    private LogManager logManager;
+    private KvManager kvManager;
+
+    @BeforeAll
+    static void baseBeforeAll() {
+        zkClient =
+                ZOO_KEEPER_EXTENSION_WRAPPER
+                        .getCustomExtension()
+                        .getZooKeeperClient(NOPErrorHandler.INSTANCE);
+    }
+
+    @BeforeEach
+    public void setup() throws Exception {
+        super.before();
+        conf.setString(ConfigOptions.DATA_DIR, tempDir.getAbsolutePath());
+
+        String dbName = "test_db";
+        tablePath = TablePath.of(dbName, "dropped_table");
+        tableBucket = new TableBucket(DATA1_TABLE_ID, 1);
+
+        registerTableInZkClient();
+        logManager =
+                LogManager.create(
+                        conf,
+                        zkClient,
+                        new FlussScheduler(1),
+                        SystemClock.getInstance(),
+                        TestingMetricGroups.TABLET_SERVER_METRICS);
+        logManager.startup();
+
+        kvManager =
+                KvManager.create(
+                        conf, zkClient, logManager, 
TestingMetricGroups.TABLET_SERVER_METRICS);
+        kvManager.startup();
+    }
+
+    private void registerTableInZkClient() throws Exception {
+        ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupRoot();
+        zkClient.registerTable(
+                tablePath, TableRegistration.newTable(DATA1_TABLE_ID, 
DATA1_TABLE_DESCRIPTOR));
+        zkClient.registerSchema(tablePath, DATA1_SCHEMA);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (kvManager != null) {
+            kvManager.shutdown();
+        }
+        if (logManager != null) {
+            logManager.shutdown();
+        }
+    }
+
+    @Test
+    void testMultipleLogTabletResidualDataDirectoriesCleanup() throws 
Exception {
+        // Create multiple logs for the same table
+        TableBucket tableBucket1 = new TableBucket(DATA1_TABLE_ID, 1);
+        TableBucket tableBucket2 = new TableBucket(DATA1_TABLE_ID, 2);
+
+        LogTablet log1 =
+                logManager.getOrCreateLog(
+                        PhysicalTablePath.of(tablePath), tableBucket1, 
LogFormat.ARROW, 1, false);
+        LogTablet log2 =
+                logManager.getOrCreateLog(
+                        PhysicalTablePath.of(tablePath), tableBucket2, 
LogFormat.ARROW, 1, false);
+
+        // Write some data to both logs
+        MemoryLogRecords records = genMemoryLogRecordsByObject(DATA1);
+        log1.appendAsLeader(records);
+        log2.appendAsLeader(records);
+        log1.flush(false);
+        log2.flush(false);
+
+        // Get log directories before shutdown
+        String logDir1 = log1.getLogDir().getAbsolutePath();
+        String logDir2 = log2.getLogDir().getAbsolutePath();
+
+        // Shutdown log manager first
+        logManager.shutdown();
+
+        // Simulate table drop: remove metadata from ZooKeeper
+        zkClient.deleteTable(tablePath);
+
+        // Start LogManager again
+        LogManager newLogManager =
+                LogManager.create(
+                        conf,
+                        zkClient,
+                        new FlussScheduler(1),
+                        SystemClock.getInstance(),
+                        TestingMetricGroups.TABLET_SERVER_METRICS);
+        newLogManager.startup();
+
+        // Verify that both residual data directories were cleaned up
+        assertThat(new File(logDir1)).doesNotExist();
+        assertThat(new File(logDir2)).doesNotExist();
+
+        newLogManager.shutdown();
+    }
+
+    @Test
+    void testLogTabletResidualDataCleanupWithPartitionedTable() throws 
Exception {
+        // Create a partitioned table log
+        TableBucket partitionedTableBucket = new TableBucket(DATA1_TABLE_ID, 
2024L, 1);
+        PhysicalTablePath partitionedTablePath =
+                PhysicalTablePath.of(tablePath.getDatabaseName(), 
tablePath.getTableName(), "2024");
+
+        LogTablet log =
+                logManager.getOrCreateLog(
+                        partitionedTablePath, partitionedTableBucket, 
LogFormat.ARROW, 1, false);
+
+        // Write some data to the log
+        MemoryLogRecords records = genMemoryLogRecordsByObject(DATA1);
+        log.appendAsLeader(records);
+        log.flush(false);
+
+        String logDir = log.getLogDir().getAbsolutePath();
+
+        // Shutdown log manager first
+        logManager.shutdown();
+
+        // Simulate table drop: remove metadata from ZooKeeper
+        zkClient.deleteTable(tablePath);
+
+        // Start LogManager again
+        LogManager newLogManager =
+                LogManager.create(
+                        conf,
+                        zkClient,
+                        new FlussScheduler(1),
+                        SystemClock.getInstance(),
+                        TestingMetricGroups.TABLET_SERVER_METRICS);
+        newLogManager.startup();
+
+        // Verify that the residual data directory was cleaned up
+        assertThat(new File(logDir)).doesNotExist();
+
+        newLogManager.shutdown();
+    }
+
+    @Test
+    void testMultipleKvTabletResidualDataDirectoriesCleanup() throws Exception 
{
+        // Create multiple logs and KV tablets for the same table
+        TableBucket tableBucket1 = new TableBucket(DATA1_TABLE_ID, 1);
+        TableBucket tableBucket2 = new TableBucket(DATA1_TABLE_ID, 2);
+
+        LogTablet log1 =
+                logManager.getOrCreateLog(
+                        PhysicalTablePath.of(tablePath), tableBucket1, 
LogFormat.ARROW, 1, false);
+        LogTablet log2 =
+                logManager.getOrCreateLog(
+                        PhysicalTablePath.of(tablePath), tableBucket2, 
LogFormat.ARROW, 1, false);
+
+        // Write some data to both logs
+        MemoryLogRecords records = genMemoryLogRecordsByObject(DATA1);
+        log1.appendAsLeader(records);
+        log2.appendAsLeader(records);
+        log1.flush(false);
+        log2.flush(false);
+
+        // Create KV tablets
+        TableConfig tableConfig =
+                new 
TableConfig(Configuration.fromMap(DATA1_TABLE_DESCRIPTOR.getProperties()));
+        KvTablet kvTablet1 =
+                kvManager.getOrCreateKv(
+                        PhysicalTablePath.of(tablePath),
+                        tableBucket1,
+                        log1,
+                        KvFormat.COMPACTED,
+                        DATA1_SCHEMA,
+                        tableConfig,
+                        DEFAULT_COMPRESSION);
+        KvTablet kvTablet2 =
+                kvManager.getOrCreateKv(
+                        PhysicalTablePath.of(tablePath),
+                        tableBucket2,
+                        log2,
+                        KvFormat.COMPACTED,
+                        DATA1_SCHEMA,
+                        tableConfig,
+                        DEFAULT_COMPRESSION);
+
+        // Get directories before shutdown
+        String kvDir1 = kvTablet1.getKvTabletDir().getAbsolutePath();
+        String kvDir2 = kvTablet2.getKvTabletDir().getAbsolutePath();
+        String logDir1 = log1.getLogDir().getAbsolutePath();
+        String logDir2 = log2.getLogDir().getAbsolutePath();
+
+        // Shutdown managers first
+        kvManager.shutdown();
+        logManager.shutdown();
+
+        // Simulate table drop: remove metadata from ZooKeeper
+        zkClient.deleteTable(tablePath);
+
+        // Start managers again
+        LogManager newLogManager =
+                LogManager.create(
+                        conf,
+                        zkClient,
+                        new FlussScheduler(1),
+                        SystemClock.getInstance(),
+                        TestingMetricGroups.TABLET_SERVER_METRICS);
+        newLogManager.startup(); // Should clean up log directories
+
+        KvManager newKvManager =
+                KvManager.create(
+                        conf, zkClient, newLogManager, 
TestingMetricGroups.TABLET_SERVER_METRICS);
+        newKvManager.startup();
+
+        // KV tablet directories should be cleaned up by LogManager 
automatically
+
+        newKvManager.shutdown();
+        newLogManager.shutdown();
+
+        // Verify that all residual data directories were cleaned up
+        assertThat(new File(logDir1)).doesNotExist();
+        assertThat(new File(logDir2)).doesNotExist();
+        assertThat(new File(kvDir1)).doesNotExist(); // Also cleaned by 
LogManager
+        assertThat(new File(kvDir2)).doesNotExist();
+    }
+
+    @Test
+    void testKvTabletResidualDataCleanupWithPartitionedTable() throws 
Exception {
+        // Create a partitioned table KV tablet
+        TableBucket partitionedTableBucket = new TableBucket(DATA1_TABLE_ID, 
2024L, 1);
+        PhysicalTablePath partitionedTablePath =
+                PhysicalTablePath.of(tablePath.getDatabaseName(), 
tablePath.getTableName(), "2024");
+
+        LogTablet log =
+                logManager.getOrCreateLog(
+                        partitionedTablePath, partitionedTableBucket, 
LogFormat.ARROW, 1, false);
+
+        // Write some data to the log
+        MemoryLogRecords records = genMemoryLogRecordsByObject(DATA1);
+        log.appendAsLeader(records);
+        log.flush(false);
+
+        // Create KV tablet
+        TableConfig tableConfig =
+                new 
TableConfig(Configuration.fromMap(DATA1_TABLE_DESCRIPTOR.getProperties()));
+        KvTablet kvTablet =
+                kvManager.getOrCreateKv(
+                        partitionedTablePath,
+                        partitionedTableBucket,
+                        log,
+                        KvFormat.COMPACTED,
+                        DATA1_SCHEMA,
+                        tableConfig,
+                        DEFAULT_COMPRESSION);
+
+        String kvDir = kvTablet.getKvTabletDir().getAbsolutePath();
+        String logDir = log.getLogDir().getAbsolutePath();
+
+        // Shutdown managers first
+        kvManager.shutdown();
+        logManager.shutdown();
+
+        // Simulate table drop: remove metadata from ZooKeeper
+        zkClient.deleteTable(tablePath);
+
+        // Start managers again
+        LogManager newLogManager =
+                LogManager.create(
+                        conf,
+                        zkClient,
+                        new FlussScheduler(1),
+                        SystemClock.getInstance(),
+                        TestingMetricGroups.TABLET_SERVER_METRICS);
+        newLogManager.startup();
+
+        KvManager newKvManager =
+                KvManager.create(
+                        conf, zkClient, newLogManager, 
TestingMetricGroups.TABLET_SERVER_METRICS);
+        newKvManager.startup();
+
+        // KV tablet directory should be cleaned up by LogManager automatically
+
+        newKvManager.shutdown();
+        newLogManager.shutdown();
+
+        // Verify that both residual data directories were cleaned up
+        assertThat(new File(logDir)).doesNotExist();
+        assertThat(new File(kvDir)).doesNotExist(); // Also cleaned by 
LogManager
+    }
+}

Reply via email to