This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 22b30ce9f [lake] fix potential dirty commit on re-create table (#2316)
22b30ce9f is described below

commit 22b30ce9f0a24ffca9cbb92826a5bac1158fe733
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Jan 7 17:58:58 2026 +0800

    [lake] fix potential dirty commit on re-create table (#2316)
---
 .../tiering/committer/TieringCommitOperator.java   | 19 ++++++++--
 .../committer/TieringCommitOperatorTest.java       | 44 ++++++++++++++++++++++
 2 files changed, 60 insertions(+), 3 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
index 2992705d9..79bcc5f85 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -34,6 +34,7 @@ import org.apache.fluss.lake.committer.LakeCommitter;
 import org.apache.fluss.lake.writer.LakeTieringFactory;
 import org.apache.fluss.lake.writer.LakeWriter;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.utils.ExceptionUtils;
 
@@ -199,12 +200,24 @@ public class TieringCommitOperator<WriteResult, 
Committable>
         if (committableWriteResults.isEmpty()) {
             return null;
         }
+
+        // Check if the table was dropped and recreated during tiering.
+        // If the current table id differs from the committable's table id, 
fail this commit
+        // to avoid dirty commit to a newly created table.
+        TableInfo currentTableInfo = admin.getTableInfo(tablePath).get();
+        if (currentTableInfo.getTableId() != tableId) {
+            throw new IllegalStateException(
+                    String.format(
+                            "The current table id %s for table path %s is 
different from the table id %s in the committable. "
+                                    + "This usually happens when a table was 
dropped and recreated during tiering. "
+                                    + "Aborting commit to prevent dirty 
commit.",
+                            currentTableInfo.getTableId(), tablePath, 
tableId));
+        }
+
         try (LakeCommitter<WriteResult, Committable> lakeCommitter =
                 lakeTieringFactory.createLakeCommitter(
                         new TieringCommitterInitContext(
-                                tablePath,
-                                admin.getTableInfo(tablePath).get(),
-                                lakeTieringConfig))) {
+                                tablePath, currentTableInfo, 
lakeTieringConfig))) {
             List<WriteResult> writeResults =
                     committableWriteResults.stream()
                             .map(TableBucketWriteResult::writeResult)
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
index 731395604..bdecfa8fa 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
@@ -461,6 +461,50 @@ class TieringCommitOperatorTest extends FlinkTestBase {
         assertThat(failedTieringEvent.failReason()).contains(failedReason);
     }
 
+    @Test
+    void testCommitFailsWhenTableRecreated() throws Exception {
+        TablePath tablePath = TablePath.of("fluss", 
"test_commit_fails_when_table_recreated");
+        long originalTableId = createTable(tablePath, 
DEFAULT_PK_TABLE_DESCRIPTOR);
+        int numberOfWriteResults = 3;
+
+        // Send write results for the first bucket
+        TableBucket tableBucket = new TableBucket(originalTableId, 0);
+        committerOperator.processElement(
+                createTableBucketWriteResultStreamRecord(
+                        tablePath, tableBucket, 1, 1, 1L, 
numberOfWriteResults));
+
+        // Drop and recreate the table with the same path
+        admin.dropTable(tablePath, true).get();
+        long newTableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR);
+
+        // Verify that the table id has changed
+        assertThat(newTableId).isNotEqualTo(originalTableId);
+
+        // Try to commit the remaining write results - should fail because 
table was recreated
+        for (int bucket = 1; bucket < numberOfWriteResults; bucket++) {
+            tableBucket = new TableBucket(originalTableId, bucket);
+            committerOperator.processElement(
+                    createTableBucketWriteResultStreamRecord(
+                            tablePath,
+                            tableBucket,
+                            bucket,
+                            bucket,
+                            (long) bucket,
+                            numberOfWriteResults));
+        }
+
+        // Verify that a FailedTieringEvent was sent with the expected error 
message
+        List<OperatorEvent> operatorEvents = 
mockOperatorEventGateway.getEventsSent();
+        SourceEventWrapper sourceEventWrapper =
+                (SourceEventWrapper) operatorEvents.get(operatorEvents.size() 
- 1);
+        FailedTieringEvent failedTieringEvent =
+                (FailedTieringEvent) sourceEventWrapper.getSourceEvent();
+        assertThat(failedTieringEvent.getTableId()).isEqualTo(originalTableId);
+        assertThat(failedTieringEvent.failReason())
+                .contains("different from the table id")
+                .contains("dropped and recreated during tiering");
+    }
+
     private CommittedLakeSnapshot mockCommittedLakeSnapshot(
             long tableId, TablePath tablePath, int snapshotId, 
Map<TableBucket, Long> logEndOffsets)
             throws Exception {

Reply via email to