This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 22b30ce9f [lake] fix potential dirty commit on re-create table (#2316)
22b30ce9f is described below
commit 22b30ce9f0a24ffca9cbb92826a5bac1158fe733
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Jan 7 17:58:58 2026 +0800
[lake] fix potential dirty commit on re-create table (#2316)
---
.../tiering/committer/TieringCommitOperator.java | 19 ++++++++--
.../committer/TieringCommitOperatorTest.java | 44 ++++++++++++++++++++++
2 files changed, 60 insertions(+), 3 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
index 2992705d9..79bcc5f85 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -34,6 +34,7 @@ import org.apache.fluss.lake.committer.LakeCommitter;
import org.apache.fluss.lake.writer.LakeTieringFactory;
import org.apache.fluss.lake.writer.LakeWriter;
import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.utils.ExceptionUtils;
@@ -199,12 +200,24 @@ public class TieringCommitOperator<WriteResult,
Committable>
if (committableWriteResults.isEmpty()) {
return null;
}
+
+ // Check if the table was dropped and recreated during tiering.
+ // If the current table id differs from the committable's table id,
fail this commit
+ // to avoid dirty commit to a newly created table.
+ TableInfo currentTableInfo = admin.getTableInfo(tablePath).get();
+ if (currentTableInfo.getTableId() != tableId) {
+ throw new IllegalStateException(
+ String.format(
+ "The current table id %s for table path %s is
different from the table id %s in the committable. "
+ + "This usually happens when a table was
dropped and recreated during tiering. "
+ + "Aborting commit to prevent dirty
commit.",
+ currentTableInfo.getTableId(), tablePath,
tableId));
+ }
+
try (LakeCommitter<WriteResult, Committable> lakeCommitter =
lakeTieringFactory.createLakeCommitter(
new TieringCommitterInitContext(
- tablePath,
- admin.getTableInfo(tablePath).get(),
- lakeTieringConfig))) {
+ tablePath, currentTableInfo,
lakeTieringConfig))) {
List<WriteResult> writeResults =
committableWriteResults.stream()
.map(TableBucketWriteResult::writeResult)
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
index 731395604..bdecfa8fa 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
@@ -461,6 +461,50 @@ class TieringCommitOperatorTest extends FlinkTestBase {
assertThat(failedTieringEvent.failReason()).contains(failedReason);
}
+ @Test
+ void testCommitFailsWhenTableRecreated() throws Exception {
+ TablePath tablePath = TablePath.of("fluss",
"test_commit_fails_when_table_recreated");
+ long originalTableId = createTable(tablePath,
DEFAULT_PK_TABLE_DESCRIPTOR);
+ int numberOfWriteResults = 3;
+
+ // Send write results for the first bucket
+ TableBucket tableBucket = new TableBucket(originalTableId, 0);
+ committerOperator.processElement(
+ createTableBucketWriteResultStreamRecord(
+ tablePath, tableBucket, 1, 1, 1L,
numberOfWriteResults));
+
+ // Drop and recreate the table with the same path
+ admin.dropTable(tablePath, true).get();
+ long newTableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR);
+
+ // Verify that the table id has changed
+ assertThat(newTableId).isNotEqualTo(originalTableId);
+
+ // Try to commit the remaining write results - should fail because
table was recreated
+ for (int bucket = 1; bucket < numberOfWriteResults; bucket++) {
+ tableBucket = new TableBucket(originalTableId, bucket);
+ committerOperator.processElement(
+ createTableBucketWriteResultStreamRecord(
+ tablePath,
+ tableBucket,
+ bucket,
+ bucket,
+ (long) bucket,
+ numberOfWriteResults));
+ }
+
+ // Verify that a FailedTieringEvent was sent with the expected error
message
+ List<OperatorEvent> operatorEvents =
mockOperatorEventGateway.getEventsSent();
+ SourceEventWrapper sourceEventWrapper =
+ (SourceEventWrapper) operatorEvents.get(operatorEvents.size()
- 1);
+ FailedTieringEvent failedTieringEvent =
+ (FailedTieringEvent) sourceEventWrapper.getSourceEvent();
+ assertThat(failedTieringEvent.getTableId()).isEqualTo(originalTableId);
+ assertThat(failedTieringEvent.failReason())
+ .contains("different from the table id")
+ .contains("dropped and recreated during tiering");
+ }
+
private CommittedLakeSnapshot mockCommittedLakeSnapshot(
long tableId, TablePath tablePath, int snapshotId,
Map<TableBucket, Long> logEndOffsets)
throws Exception {