This is an automated email from the ASF dual-hosted git repository.

hongshun pushed a commit to branch fix-write-new-table-id
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 2c6ec9d562170e7d16971e085cc06082525172ce
Author: Hongshun Wang <[email protected]>
AuthorDate: Sat Feb 28 19:44:23 2026 +0800

    [client] Abort write batch if table id changes.
---
 .../client/write/AbstractRowLogWriteBatch.java     |  3 +-
 .../fluss/client/write/ArrowLogWriteBatch.java     |  3 +-
 .../fluss/client/write/CompactedLogWriteBatch.java |  2 +
 .../fluss/client/write/IndexedLogWriteBatch.java   |  2 +
 .../apache/fluss/client/write/KvWriteBatch.java    |  3 +-
 .../fluss/client/write/RecordAccumulator.java      | 47 +++++++++++++++----
 .../java/org/apache/fluss/client/write/Sender.java |  2 +-
 .../org/apache/fluss/client/write/WriteBatch.java  |  9 +++-
 .../apache/fluss/client/write/WriterClient.java    |  2 +-
 .../client/metadata/TestingMetadataUpdater.java    |  5 +++
 .../client/utils/ClientRpcMessageUtilsTest.java    |  1 +
 .../fluss/client/write/ArrowLogWriteBatchTest.java |  3 ++
 .../client/write/CompactedLogWriteBatchTest.java   | 19 ++++----
 .../client/write/IndexedLogWriteBatchTest.java     | 19 ++++----
 .../fluss/client/write/KvWriteBatchTest.java       |  2 +
 .../fluss/client/write/RecordAccumulatorTest.java  | 52 ++++++++++++++++++++++
 .../org/apache/fluss/client/write/SenderTest.java  | 44 +++++++++++++++++-
 .../fluss/exception/TableNotExistException.java    |  4 ++
 18 files changed, 185 insertions(+), 37 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/AbstractRowLogWriteBatch.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/AbstractRowLogWriteBatch.java
index 3c61cce82..fcb71764a 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/AbstractRowLogWriteBatch.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/AbstractRowLogWriteBatch.java
@@ -44,13 +44,14 @@ abstract class AbstractRowLogWriteBatch<R> extends 
WriteBatch {
     private final String buildErrorMessage;
 
     protected AbstractRowLogWriteBatch(
+            long tableId,
             int bucketId,
             PhysicalTablePath physicalTablePath,
             long createdMs,
             AbstractPagedOutputView outputView,
             MemoryLogRecordsRowBuilder<R> recordsBuilder,
             String buildErrorMessage) {
-        super(bucketId, physicalTablePath, createdMs);
+        super(tableId, bucketId, physicalTablePath, createdMs);
         this.outputView = outputView;
         this.recordsBuilder = recordsBuilder;
         this.buildErrorMessage = buildErrorMessage;
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java
index 578ff2a25..969f222c0 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java
@@ -50,13 +50,14 @@ public class ArrowLogWriteBatch extends WriteBatch {
     private final AbstractPagedOutputView outputView;
 
     public ArrowLogWriteBatch(
+            long tableId,
             int bucketId,
             PhysicalTablePath physicalTablePath,
             int schemaId,
             ArrowWriter arrowWriter,
             AbstractPagedOutputView outputView,
             long createdMs) {
-        super(bucketId, physicalTablePath, createdMs);
+        super(tableId, bucketId, physicalTablePath, createdMs);
         this.outputView = outputView;
         this.recordsBuilder =
                 MemoryLogRecordsArrowBuilder.builder(schemaId, arrowWriter, 
outputView, true);
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/CompactedLogWriteBatch.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/CompactedLogWriteBatch.java
index 19366a4dc..9b7cee9a7 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/CompactedLogWriteBatch.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/CompactedLogWriteBatch.java
@@ -40,6 +40,7 @@ import static 
org.apache.fluss.utils.Preconditions.checkArgument;
 public final class CompactedLogWriteBatch extends 
AbstractRowLogWriteBatch<CompactedRow> {
 
     public CompactedLogWriteBatch(
+            long tableId,
             int bucketId,
             PhysicalTablePath physicalTablePath,
             int schemaId,
@@ -47,6 +48,7 @@ public final class CompactedLogWriteBatch extends 
AbstractRowLogWriteBatch<Compa
             AbstractPagedOutputView outputView,
             long createdMs) {
         super(
+                tableId,
                 bucketId,
                 physicalTablePath,
                 createdMs,
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
index c70dd83a8..dc408a384 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
@@ -39,6 +39,7 @@ import static 
org.apache.fluss.utils.Preconditions.checkArgument;
 public final class IndexedLogWriteBatch extends 
AbstractRowLogWriteBatch<IndexedRow> {
 
     public IndexedLogWriteBatch(
+            long tableId,
             int bucketId,
             PhysicalTablePath physicalTablePath,
             int schemaId,
@@ -46,6 +47,7 @@ public final class IndexedLogWriteBatch extends 
AbstractRowLogWriteBatch<Indexed
             AbstractPagedOutputView outputView,
             long createdMs) {
         super(
+                tableId,
                 bucketId,
                 physicalTablePath,
                 createdMs,
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java
index a72d36717..814df03f7 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java
@@ -55,6 +55,7 @@ public class KvWriteBatch extends WriteBatch {
     private final MergeMode mergeMode;
 
     public KvWriteBatch(
+            long tableId,
             int bucketId,
             PhysicalTablePath physicalTablePath,
             int schemaId,
@@ -64,7 +65,7 @@ public class KvWriteBatch extends WriteBatch {
             @Nullable int[] targetColumns,
             MergeMode mergeMode,
             long createdMs) {
-        super(bucketId, physicalTablePath, createdMs);
+        super(tableId, bucketId, physicalTablePath, createdMs);
         this.outputView = outputView;
         this.recordsBuilder =
                 KvRecordBatchBuilder.builder(schemaId, writeLimit, outputView, 
kvFormat);
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
index a1f0544a5..1b615bfc3 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
@@ -25,6 +25,7 @@ import org.apache.fluss.cluster.Cluster;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.exception.TableNotExistException;
 import org.apache.fluss.memory.LazyMemorySegmentPool;
 import org.apache.fluss.memory.MemorySegment;
 import org.apache.fluss.memory.PreAllocatedPagedOutputView;
@@ -306,16 +307,20 @@ public final class RecordAccumulator {
     }
 
     /** Abort all incomplete batches (whether they have been sent or not). */
-    public void abortBatches(final Exception reason) {
+    public void abortAllBatches(final Exception reason) {
         for (WriteBatch batch : incomplete.copyAll()) {
-            Deque<WriteBatch> dq = getDeque(batch.physicalTablePath(), 
batch.bucketId());
-            synchronized (dq) {
-                batch.abortRecordAppends();
-                dq.remove(batch);
-            }
-            batch.abort(reason);
-            deallocate(batch);
+            abortBatch(reason, batch);
+        }
+    }
+
+    private void abortBatch(final Exception reason, WriteBatch batch) {
+        Deque<WriteBatch> dq = getDeque(batch.physicalTablePath(), 
batch.bucketId());
+        synchronized (dq) {
+            batch.abortRecordAppends();
+            dq.remove(batch);
         }
+        batch.abort(reason);
+        deallocate(batch);
     }
 
     /** Get the deque for the given table-bucket, creating it if necessary. */
@@ -610,6 +615,7 @@ public final class RecordAccumulator {
             case COMPACTED_KV:
             case INDEXED_KV:
                 return new KvWriteBatch(
+                        tableInfo.getTableId(),
                         bucketId,
                         physicalTablePath,
                         tableInfo.getSchemaId(),
@@ -629,6 +635,7 @@ public final class RecordAccumulator {
                                 tableInfo.getRowType(),
                                 
tableInfo.getTableConfig().getArrowCompressionInfo());
                 return new ArrowLogWriteBatch(
+                        tableInfo.getTableId(),
                         bucketId,
                         physicalTablePath,
                         tableInfo.getSchemaId(),
@@ -638,6 +645,7 @@ public final class RecordAccumulator {
 
             case COMPACTED_LOG:
                 return new CompactedLogWriteBatch(
+                        tableInfo.getTableId(),
                         bucketId,
                         physicalTablePath,
                         schemaId,
@@ -647,6 +655,7 @@ public final class RecordAccumulator {
 
             case INDEXED_LOG:
                 return new IndexedLogWriteBatch(
+                        tableInfo.getTableId(),
                         bucketId,
                         physicalTablePath,
                         tableInfo.getSchemaId(),
@@ -712,6 +721,28 @@ public final class RecordAccumulator {
                     continue;
                 }
 
+                if (tableBucket.getTableId() != first.tableId()) {
+                    LOG.warn(
+                            "Table {} has been dropped and re-created with a 
new table ID. Old ID: {}, New ID: {}. "
+                                    + "Aborting pending batches for the old 
table instance.",
+                            physicalTablePath,
+                            first.tableId(),
+                            tableBucket.getTableId());
+                    TableNotExistException reason =
+                            new TableNotExistException(
+                                    String.format(
+                                            "Table '%s' has been dropped and 
re-created with a new table ID (old: %d, new: %d). "
+                                                    + "Further writes to the 
old table instance cannot proceed. "
+                                                    + "Please recreate the 
writer with the new table metadata.",
+                                            physicalTablePath,
+                                            first.tableId(),
+                                            tableBucket.getTableId()),
+                                    null,
+                                    true);
+                    abortBatch(reason, deque.pollFirst());
+                    continue;
+                }
+
                 // TODO retry back off check.
 
                 if (size + first.estimatedSizeInBytes() > maxSize && 
!ready.isEmpty()) {
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
index 66b7719b1..08b5e0ed6 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
@@ -297,7 +297,7 @@ public class Sender implements Runnable {
     private void maybeAbortBatches(Exception exception) {
         if (accumulator.hasIncomplete()) {
             LOG.error("Aborting write batches due to fatal error", exception);
-            accumulator.abortBatches(exception);
+            accumulator.abortAllBatches(exception);
         }
     }
 
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
index cb83943cf..9702bb543 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
@@ -46,6 +46,7 @@ public abstract class WriteBatch {
     private final long createdMs;
     private final PhysicalTablePath physicalTablePath;
     private final RequestFuture requestFuture;
+    private final long tableId;
     private final int bucketId;
 
     protected final List<WriteCallback> callbacks = new ArrayList<>();
@@ -55,9 +56,11 @@ public abstract class WriteBatch {
     protected int recordCount;
     private long drainedMs;
 
-    public WriteBatch(int bucketId, PhysicalTablePath physicalTablePath, long 
createdMs) {
+    public WriteBatch(
+            long tableId, int bucketId, PhysicalTablePath physicalTablePath, 
long createdMs) {
         this.physicalTablePath = physicalTablePath;
         this.createdMs = createdMs;
+        this.tableId = tableId;
         this.bucketId = bucketId;
         this.requestFuture = new RequestFuture();
         this.recordCount = 0;
@@ -157,6 +160,10 @@ public abstract class WriteBatch {
         return bucketId;
     }
 
+    public long tableId() {
+        return tableId;
+    }
+
     public PhysicalTablePath physicalTablePath() {
         return physicalTablePath;
     }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
index 0e184f09f..4c01665ee 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
@@ -225,7 +225,7 @@ public class WriterClient {
     private void maybeAbortBatches(Throwable t) {
         if (accumulator.hasIncomplete()) {
             LOG.error("Aborting all pending write batches due to fatal error", 
t);
-            accumulator.abortBatches(toException(t));
+            accumulator.abortAllBatches(toException(t));
         }
     }
 
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
index 206395135..956231c8d 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java
@@ -87,6 +87,11 @@ public class TestingMetadataUpdater extends MetadataUpdater {
         }
     }
 
+    public void updateTableInfos(Map<TablePath, TableInfo> tableInfos) {
+        initializeCluster(
+                cluster.getCoordinatorServer(), 
cluster.getAliveTabletServerList(), tableInfos);
+    }
+
     /**
      * Create a builder for constructing TestingMetadataUpdater with custom 
gateways.
      *
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java
index 5b1ca58c7..3ed17da7d 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java
@@ -131,6 +131,7 @@ class ClientRpcMessageUtilsTest {
         PreAllocatedPagedOutputView outputView =
                 new 
PreAllocatedPagedOutputView(Collections.singletonList(segment));
         return new KvWriteBatch(
+                DATA1_TABLE_ID_PK,
                 bucketId,
                 PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
                 DATA1_TABLE_INFO_PK.getSchemaId(),
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java
index afdef5616..fe80ecd94 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java
@@ -126,6 +126,7 @@ public class ArrowLogWriteBatchTest {
         TableBucket tb = new TableBucket(DATA1_TABLE_ID, bucketId);
         ArrowLogWriteBatch arrowLogWriteBatch =
                 new ArrowLogWriteBatch(
+                        tb.getTableId(),
                         tb.getBucket(),
                         DATA1_PHYSICAL_TABLE_PATH,
                         DATA1_TABLE_INFO.getSchemaId(),
@@ -205,6 +206,7 @@ public class ArrowLogWriteBatchTest {
 
             ArrowLogWriteBatch arrowLogWriteBatch =
                     new ArrowLogWriteBatch(
+                            tb.getTableId(),
                             tb.getBucket(),
                             DATA1_PHYSICAL_TABLE_PATH,
                             DATA1_TABLE_INFO.getSchemaId(),
@@ -300,6 +302,7 @@ public class ArrowLogWriteBatchTest {
 
     private ArrowLogWriteBatch createArrowLogWriteBatch(TableBucket tb, int 
maxSizeInBytes) {
         return new ArrowLogWriteBatch(
+                tb.getTableId(),
                 tb.getBucket(),
                 DATA1_PHYSICAL_TABLE_PATH,
                 DATA1_TABLE_INFO.getSchemaId(),
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java
index 4f8b37919..140058eca 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java
@@ -67,7 +67,6 @@ public class CompactedLogWriteBatchTest {
         CompactedLogWriteBatch logProducerBatch =
                 createLogWriteBatch(
                         new TableBucket(DATA1_TABLE_ID, bucketId),
-                        0L,
                         writeLimit,
                         MemorySegment.allocateHeapMemory(writeLimit));
 
@@ -89,7 +88,7 @@ public class CompactedLogWriteBatchTest {
     void testToBytes() throws Exception {
         int bucketId = 0;
         CompactedLogWriteBatch logProducerBatch =
-                createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 
0L);
+                createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId));
         boolean appendResult = logProducerBatch.tryAppend(createWriteRecord(), 
newWriteCallback());
         assertThat(appendResult).isTrue();
 
@@ -105,7 +104,7 @@ public class CompactedLogWriteBatchTest {
     void testCompleteTwice() throws Exception {
         int bucketId = 0;
         CompactedLogWriteBatch logWriteBatch =
-                createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 
0L);
+                createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId));
         boolean appendResult = logWriteBatch.tryAppend(createWriteRecord(), 
newWriteCallback());
         assertThat(appendResult).isTrue();
 
@@ -120,7 +119,7 @@ public class CompactedLogWriteBatchTest {
     void testFailedTwice() throws Exception {
         int bucketId = 0;
         CompactedLogWriteBatch logWriteBatch =
-                createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 
0L);
+                createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId));
         boolean appendResult = logWriteBatch.tryAppend(createWriteRecord(), 
newWriteCallback());
         assertThat(appendResult).isTrue();
 
@@ -135,7 +134,7 @@ public class CompactedLogWriteBatchTest {
     void testClose() throws Exception {
         int bucketId = 0;
         CompactedLogWriteBatch logProducerBatch =
-                createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 
0L);
+                createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId));
         boolean appendResult = logProducerBatch.tryAppend(createWriteRecord(), 
newWriteCallback());
         assertThat(appendResult).isTrue();
 
@@ -153,7 +152,6 @@ public class CompactedLogWriteBatchTest {
         CompactedLogWriteBatch logProducerBatch =
                 createLogWriteBatch(
                         new TableBucket(DATA1_TABLE_ID, bucketId),
-                        0L,
                         writeLimit,
                         MemorySegment.allocateHeapMemory(writeLimit));
 
@@ -202,15 +200,14 @@ public class CompactedLogWriteBatchTest {
                 DATA1_TABLE_INFO, DATA1_PHYSICAL_TABLE_PATH, row, null);
     }
 
-    private CompactedLogWriteBatch createLogWriteBatch(TableBucket tb, long 
baseLogOffset)
-            throws Exception {
-        return createLogWriteBatch(
-                tb, baseLogOffset, Integer.MAX_VALUE, 
MemorySegment.allocateHeapMemory(1000));
+    private CompactedLogWriteBatch createLogWriteBatch(TableBucket tb) throws 
Exception {
+        return createLogWriteBatch(tb, Integer.MAX_VALUE, 
MemorySegment.allocateHeapMemory(1000));
     }
 
     private CompactedLogWriteBatch createLogWriteBatch(
-            TableBucket tb, long baseLogOffset, int writeLimit, MemorySegment 
memorySegment) {
+            TableBucket tb, int writeLimit, MemorySegment memorySegment) {
         return new CompactedLogWriteBatch(
+                tb.getTableId(),
                 tb.getBucket(),
                 DATA1_PHYSICAL_TABLE_PATH,
                 DATA1_TABLE_INFO.getSchemaId(),
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
index 683013b89..331be4209 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
@@ -70,7 +70,6 @@ public class IndexedLogWriteBatchTest {
         IndexedLogWriteBatch logProducerBatch =
                 createLogWriteBatch(
                         new TableBucket(DATA1_TABLE_ID, bucketId),
-                        0L,
                         writeLimit,
                         MemorySegment.allocateHeapMemory(writeLimit));
 
@@ -92,7 +91,7 @@ public class IndexedLogWriteBatchTest {
     void testToBytes() throws Exception {
         int bucketId = 0;
         IndexedLogWriteBatch logProducerBatch =
-                createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 
0L);
+                createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId));
         boolean appendResult = logProducerBatch.tryAppend(createWriteRecord(), 
newWriteCallback());
         assertThat(appendResult).isTrue();
 
@@ -109,7 +108,7 @@ public class IndexedLogWriteBatchTest {
     void testCompleteTwice() throws Exception {
         int bucketId = 0;
         IndexedLogWriteBatch logWriteBatch =
-                createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 
0L);
+                createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId));
         boolean appendResult = logWriteBatch.tryAppend(createWriteRecord(), 
newWriteCallback());
         assertThat(appendResult).isTrue();
 
@@ -124,7 +123,7 @@ public class IndexedLogWriteBatchTest {
     void testFailedTwice() throws Exception {
         int bucketId = 0;
         IndexedLogWriteBatch logWriteBatch =
-                createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 
0L);
+                createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId));
         boolean appendResult = logWriteBatch.tryAppend(createWriteRecord(), 
newWriteCallback());
         assertThat(appendResult).isTrue();
 
@@ -139,7 +138,7 @@ public class IndexedLogWriteBatchTest {
     void testClose() throws Exception {
         int bucketId = 0;
         IndexedLogWriteBatch logProducerBatch =
-                createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 
0L);
+                createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId));
         boolean appendResult = logProducerBatch.tryAppend(createWriteRecord(), 
newWriteCallback());
         assertThat(appendResult).isTrue();
 
@@ -157,7 +156,6 @@ public class IndexedLogWriteBatchTest {
         IndexedLogWriteBatch logProducerBatch =
                 createLogWriteBatch(
                         new TableBucket(DATA1_TABLE_ID, bucketId),
-                        0L,
                         writeLimit,
                         MemorySegment.allocateHeapMemory(writeLimit));
 
@@ -205,15 +203,14 @@ public class IndexedLogWriteBatchTest {
         return WriteRecord.forIndexedAppend(DATA1_TABLE_INFO, 
DATA1_PHYSICAL_TABLE_PATH, row, null);
     }
 
-    private IndexedLogWriteBatch createLogWriteBatch(TableBucket tb, long 
baseLogOffset)
-            throws Exception {
-        return createLogWriteBatch(
-                tb, baseLogOffset, Integer.MAX_VALUE, 
MemorySegment.allocateHeapMemory(1000));
+    private IndexedLogWriteBatch createLogWriteBatch(TableBucket tb) throws 
Exception {
+        return createLogWriteBatch(tb, Integer.MAX_VALUE, 
MemorySegment.allocateHeapMemory(1000));
     }
 
     private IndexedLogWriteBatch createLogWriteBatch(
-            TableBucket tb, long baseLogOffset, int writeLimit, MemorySegment 
memorySegment) {
+            TableBucket tb, int writeLimit, MemorySegment memorySegment) {
         return new IndexedLogWriteBatch(
+                tb.getTableId(),
                 tb.getBucket(),
                 DATA1_PHYSICAL_TABLE_PATH,
                 DATA1_TABLE_INFO.getSchemaId(),
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
index e2340d170..7b61976a0 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
@@ -221,6 +221,7 @@ class KvWriteBatchTest {
         PreAllocatedPagedOutputView outputView =
                 new 
PreAllocatedPagedOutputView(Collections.singletonList(memorySegment));
         return new KvWriteBatch(
+                tb.getTableId(),
                 tb.getBucket(),
                 PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
                 DATA1_TABLE_INFO_PK.getSchemaId(),
@@ -316,6 +317,7 @@ class KvWriteBatchTest {
                 new PreAllocatedPagedOutputView(
                         Collections.singletonList(memoryPool.nextSegment()));
         return new KvWriteBatch(
+                tb.getTableId(),
                 tb.getBucket(),
                 PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
                 DATA1_TABLE_INFO_PK.getSchemaId(),
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
index 746d251d8..27c74b9f1 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
@@ -656,4 +656,56 @@ class RecordAccumulatorTest {
                 + (bucketBatches2 == null ? 0 : bucketBatches2.size())
                 + (bucketBatches3 == null ? 0 : bucketBatches3.size());
     }
+
+    @Test
+    void testTableIdChangeAbortsBatches() throws Exception {
+        IndexedRow row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"});
+        long batchSize = getTestBatchSize(row);
+        RecordAccumulator accum = createTestRecordAccumulator((int) batchSize, 
Integer.MAX_VALUE);
+
+        // Append records to multiple buckets with the original table ID
+        List<Exception> callbackExceptions = new ArrayList<>();
+        WriteCallback trackingCallback =
+                (bucket, offset, exception) -> {
+                    if (exception != null) {
+                        callbackExceptions.add(exception);
+                    }
+                };
+
+        accum.append(createRecord(row), trackingCallback, cluster, 0, false);
+        accum.append(createRecord(row), trackingCallback, cluster, 1, false);
+        accum.append(createRecord(row), trackingCallback, cluster, 2, false);
+
+        // Verify batches exist before table ID change
+        assertThat(accum.getReadyDeque(DATA1_PHYSICAL_TABLE_PATH, 
tb1.getBucket()))
+                .isNotNull()
+                .hasSize(1);
+        assertThat(accum.getReadyDeque(DATA1_PHYSICAL_TABLE_PATH, 
tb2.getBucket()))
+                .isNotNull()
+                .hasSize(1);
+        assertThat(accum.getReadyDeque(DATA1_PHYSICAL_TABLE_PATH, 
tb3.getBucket()))
+                .isNotNull()
+                .hasSize(1);
+
+        // Simulate table ID change by using a different table ID
+        long newTableId = DATA1_TABLE_ID + 1000;
+        TableBucket newTb1 = new TableBucket(newTableId, 0);
+
+        // Call getReadyDeque with new table ID should abort stale batches
+        Deque<WriteBatch> result =
+                accum.getReadyDeque(DATA1_PHYSICAL_TABLE_PATH, 
newTb1.getBucket());
+
+        // Verify result is null (stale batches aborted)
+        assertThat(result).isNull();
+
+        // Verify callbacks were fired with TableNotExistException
+        assertThat(callbackExceptions).hasSize(1);
+        assertThat(callbackExceptions.get(0))
+                
.isInstanceOf(org.apache.fluss.exception.TableNotExistException.class)
+                .hasMessageContaining("has been dropped and re-created")
+                .hasMessageContaining("old: " + DATA1_TABLE_ID)
+                .hasMessageContaining("new: " + newTableId)
+                .hasMessageContaining("Further writes to the old table 
instance cannot proceed")
+                .hasMessageContaining("Please recreate the writer");
+    }
 }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java 
b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
index 16ba2f2ec..bf65f869e 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
@@ -24,6 +24,7 @@ import org.apache.fluss.cluster.ServerNode;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.config.MemorySize;
+import org.apache.fluss.exception.TableNotExistException;
 import org.apache.fluss.exception.TimeoutException;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.TableBucket;
@@ -61,12 +62,14 @@ import static 
org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
 import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
 import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
 import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
+import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
 import static org.apache.fluss.rpc.protocol.Errors.SCHEMA_NOT_EXIST;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.getProduceLogData;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeProduceLogResponse;
@@ -747,6 +750,39 @@ final class SenderTest {
         assertThat(future.get()).isNull();
     }
 
+    @Test
+    void testSendWhenTableIdChanges() throws Exception {
+        CompletableFuture<Exception> future1 = new CompletableFuture<>();
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future1.complete(e));
+        TableInfo newTableInfo =
+                TableInfo.of(
+                        DATA1_TABLE_PATH,
+                        DATA2_TABLE_ID,
+                        1,
+                        DATA1_TABLE_DESCRIPTOR,
+                        System.currentTimeMillis(),
+                        System.currentTimeMillis());
+        TableBucket newTableBucket = new 
TableBucket(newTableInfo.getTableId(), tb1.getBucket());
+
+        
metadataUpdater.updateTableInfos(Collections.singletonMap(DATA1_TABLE_PATH, 
newTableInfo));
+        sender.runOnce();
+        Exception exception = future1.get();
+        assertThat(exception).isNotNull();
+        
assertThat(exception).isExactlyInstanceOf(TableNotExistException.class);
+        assertThat(exception.getMessage())
+                .contains(
+                        String.format(
+                                "Table '%s' has been dropped and re-created 
with a new table ID (old: %s, new: %s)",
+                                DATA1_TABLE_PATH, DATA1_TABLE_ID, 
newTableInfo.getTableId()));
+
+        CompletableFuture<Exception> future2 = new CompletableFuture<>();
+        appendToAccumulator(
+                newTableInfo, newTableBucket, row(1, "a"), (tb, leo, e) -> 
future2.complete(e));
+        sender.runOnce();
+        finishRequest(newTableBucket, 0, 
createProduceLogResponse(newTableBucket, 0, 1));
+        assertThat(future2.get()).isNull();
+    }
+
     private TestingMetadataUpdater initializeMetadataUpdater() {
         Map<TablePath, TableInfo> tableInfos = new HashMap<>();
         tableInfos.put(DATA1_TABLE_PATH, DATA1_TABLE_INFO);
@@ -756,8 +792,14 @@ final class SenderTest {
 
     private void appendToAccumulator(TableBucket tb, GenericRow row, 
WriteCallback writeCallback)
             throws Exception {
+        appendToAccumulator(DATA1_TABLE_INFO, tb, row, writeCallback);
+    }
+
+    private void appendToAccumulator(
+            TableInfo tableInfo, TableBucket tb, GenericRow row, WriteCallback 
writeCallback)
+            throws Exception {
         accumulator.append(
-                WriteRecord.forArrowAppend(DATA1_TABLE_INFO, 
DATA1_PHYSICAL_TABLE_PATH, row, null),
+                WriteRecord.forArrowAppend(tableInfo, 
DATA1_PHYSICAL_TABLE_PATH, row, null),
                 writeCallback,
                 metadataUpdater.getCluster(),
                 tb.getBucket(),
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/exception/TableNotExistException.java
 
b/fluss-common/src/main/java/org/apache/fluss/exception/TableNotExistException.java
index b101a71bd..7c1110690 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/exception/TableNotExistException.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/exception/TableNotExistException.java
@@ -33,4 +33,8 @@ public class TableNotExistException extends ApiException {
     public TableNotExistException(String message, Throwable cause) {
         super(message, cause);
     }
+
+    public TableNotExistException(String message, Throwable cause, boolean 
stackTraceEnabled) {
+        super(message, cause, stackTraceEnabled);
+    }
 }


Reply via email to