This is an automated email from the ASF dual-hosted git repository. hongshun pushed a commit to branch fix-write-new-table-id in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 2c6ec9d562170e7d16971e085cc06082525172ce Author: Hongshun Wang <[email protected]> AuthorDate: Sat Feb 28 19:44:23 2026 +0800 [client] Abort write batch if table id changes. --- .../client/write/AbstractRowLogWriteBatch.java | 3 +- .../fluss/client/write/ArrowLogWriteBatch.java | 3 +- .../fluss/client/write/CompactedLogWriteBatch.java | 2 + .../fluss/client/write/IndexedLogWriteBatch.java | 2 + .../apache/fluss/client/write/KvWriteBatch.java | 3 +- .../fluss/client/write/RecordAccumulator.java | 47 +++++++++++++++---- .../java/org/apache/fluss/client/write/Sender.java | 2 +- .../org/apache/fluss/client/write/WriteBatch.java | 9 +++- .../apache/fluss/client/write/WriterClient.java | 2 +- .../client/metadata/TestingMetadataUpdater.java | 5 +++ .../client/utils/ClientRpcMessageUtilsTest.java | 1 + .../fluss/client/write/ArrowLogWriteBatchTest.java | 3 ++ .../client/write/CompactedLogWriteBatchTest.java | 19 ++++---- .../client/write/IndexedLogWriteBatchTest.java | 19 ++++---- .../fluss/client/write/KvWriteBatchTest.java | 2 + .../fluss/client/write/RecordAccumulatorTest.java | 52 ++++++++++++++++++++++ .../org/apache/fluss/client/write/SenderTest.java | 44 +++++++++++++++++- .../fluss/exception/TableNotExistException.java | 4 ++ 18 files changed, 185 insertions(+), 37 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/AbstractRowLogWriteBatch.java b/fluss-client/src/main/java/org/apache/fluss/client/write/AbstractRowLogWriteBatch.java index 3c61cce82..fcb71764a 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/AbstractRowLogWriteBatch.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/AbstractRowLogWriteBatch.java @@ -44,13 +44,14 @@ abstract class AbstractRowLogWriteBatch<R> extends WriteBatch { private final String buildErrorMessage; protected AbstractRowLogWriteBatch( + long tableId, int bucketId, PhysicalTablePath physicalTablePath, long createdMs, AbstractPagedOutputView outputView, MemoryLogRecordsRowBuilder<R> recordsBuilder, String buildErrorMessage) { - super(bucketId, physicalTablePath, createdMs); + super(tableId, bucketId, physicalTablePath, createdMs); this.outputView = outputView; this.recordsBuilder = recordsBuilder; this.buildErrorMessage = buildErrorMessage; diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java b/fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java index 578ff2a25..969f222c0 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java @@ -50,13 +50,14 @@ public class ArrowLogWriteBatch extends WriteBatch { private final AbstractPagedOutputView outputView; public ArrowLogWriteBatch( + long tableId, int bucketId, PhysicalTablePath physicalTablePath, int schemaId, ArrowWriter arrowWriter, AbstractPagedOutputView outputView, long createdMs) { - super(bucketId, physicalTablePath, createdMs); + super(tableId, bucketId, physicalTablePath, createdMs); this.outputView = outputView; this.recordsBuilder = MemoryLogRecordsArrowBuilder.builder(schemaId, arrowWriter, outputView, true); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/CompactedLogWriteBatch.java b/fluss-client/src/main/java/org/apache/fluss/client/write/CompactedLogWriteBatch.java index 19366a4dc..9b7cee9a7 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/CompactedLogWriteBatch.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/CompactedLogWriteBatch.java @@ -40,6 +40,7 @@ import static org.apache.fluss.utils.Preconditions.checkArgument; public final class CompactedLogWriteBatch extends AbstractRowLogWriteBatch<CompactedRow> { public CompactedLogWriteBatch( + long tableId, int bucketId, PhysicalTablePath physicalTablePath, int schemaId, @@ -47,6 +48,7 @@ public final class CompactedLogWriteBatch extends AbstractRowLogWriteBatch<Compa AbstractPagedOutputView outputView, long createdMs) { super( + tableId, bucketId, physicalTablePath, createdMs, diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java b/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java index c70dd83a8..dc408a384 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java @@ -39,6 +39,7 @@ import static org.apache.fluss.utils.Preconditions.checkArgument; public final class IndexedLogWriteBatch extends AbstractRowLogWriteBatch<IndexedRow> { public IndexedLogWriteBatch( + long tableId, int bucketId, PhysicalTablePath physicalTablePath, int schemaId, @@ -46,6 +47,7 @@ public final class IndexedLogWriteBatch extends AbstractRowLogWriteBatch<Indexed AbstractPagedOutputView outputView, long createdMs) { super( + tableId, bucketId, physicalTablePath, createdMs, diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java b/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java index a72d36717..814df03f7 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java @@ -55,6 +55,7 @@ public class KvWriteBatch extends WriteBatch { private final MergeMode mergeMode; public KvWriteBatch( + long tableId, int bucketId, PhysicalTablePath physicalTablePath, int schemaId, @@ -64,7 +65,7 @@ public class KvWriteBatch extends WriteBatch { @Nullable int[] targetColumns, MergeMode mergeMode, long createdMs) { - super(bucketId, physicalTablePath, createdMs); + super(tableId, bucketId, physicalTablePath, createdMs); this.outputView = outputView; this.recordsBuilder = KvRecordBatchBuilder.builder(schemaId, writeLimit, outputView, kvFormat); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java index a1f0544a5..1b615bfc3 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java @@ -25,6 +25,7 @@ import org.apache.fluss.cluster.Cluster; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.memory.LazyMemorySegmentPool; import org.apache.fluss.memory.MemorySegment; import org.apache.fluss.memory.PreAllocatedPagedOutputView; @@ -306,16 +307,20 @@ public final class RecordAccumulator { } /** Abort all incomplete batches (whether they have been sent or not). */ - public void abortBatches(final Exception reason) { + public void abortAllBatches(final Exception reason) { for (WriteBatch batch : incomplete.copyAll()) { - Deque<WriteBatch> dq = getDeque(batch.physicalTablePath(), batch.bucketId()); - synchronized (dq) { - batch.abortRecordAppends(); - dq.remove(batch); - } - batch.abort(reason); - deallocate(batch); + abortBatch(reason, batch); + } + } + + private void abortBatch(final Exception reason, WriteBatch batch) { + Deque<WriteBatch> dq = getDeque(batch.physicalTablePath(), batch.bucketId()); + synchronized (dq) { + batch.abortRecordAppends(); + dq.remove(batch); } + batch.abort(reason); + deallocate(batch); } /** Get the deque for the given table-bucket, creating it if necessary. */ @@ -610,6 +615,7 @@ public final class RecordAccumulator { case COMPACTED_KV: case INDEXED_KV: return new KvWriteBatch( + tableInfo.getTableId(), bucketId, physicalTablePath, tableInfo.getSchemaId(), @@ -629,6 +635,7 @@ public final class RecordAccumulator { tableInfo.getRowType(), tableInfo.getTableConfig().getArrowCompressionInfo()); return new ArrowLogWriteBatch( + tableInfo.getTableId(), bucketId, physicalTablePath, tableInfo.getSchemaId(), @@ -638,6 +645,7 @@ public final class RecordAccumulator { case COMPACTED_LOG: return new CompactedLogWriteBatch( + tableInfo.getTableId(), bucketId, physicalTablePath, schemaId, @@ -647,6 +655,7 @@ public final class RecordAccumulator { case INDEXED_LOG: return new IndexedLogWriteBatch( + tableInfo.getTableId(), bucketId, physicalTablePath, tableInfo.getSchemaId(), @@ -712,6 +721,28 @@ public final class RecordAccumulator { continue; } + if (tableBucket.getTableId() != first.tableId()) { + LOG.warn( + "Table {} has been dropped and re-created with a new table ID. Old ID: {}, New ID: {}. " + + "Aborting pending batches for the old table instance.", + physicalTablePath, + first.tableId(), + tableBucket.getTableId()); + TableNotExistException reason = + new TableNotExistException( + String.format( + "Table '%s' has been dropped and re-created with a new table ID (old: %d, new: %d). " + + "Further writes to the old table instance cannot proceed. " + + "Please recreate the writer with the new table metadata.", + physicalTablePath, + first.tableId(), + tableBucket.getTableId()), + null, + true); + abortBatch(reason, deque.pollFirst()); + continue; + } + // TODO retry back off check. if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java index 66b7719b1..08b5e0ed6 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java @@ -297,7 +297,7 @@ public class Sender implements Runnable { private void maybeAbortBatches(Exception exception) { if (accumulator.hasIncomplete()) { LOG.error("Aborting write batches due to fatal error", exception); - accumulator.abortBatches(exception); + accumulator.abortAllBatches(exception); } } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java index cb83943cf..9702bb543 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java @@ -46,6 +46,7 @@ public abstract class WriteBatch { private final long createdMs; private final PhysicalTablePath physicalTablePath; private final RequestFuture requestFuture; + private final long tableId; private final int bucketId; protected final List<WriteCallback> callbacks = new ArrayList<>(); @@ -55,9 +56,11 @@ public abstract class WriteBatch { protected int recordCount; private long drainedMs; - public WriteBatch(int bucketId, PhysicalTablePath physicalTablePath, long createdMs) { + public WriteBatch( + long tableId, int bucketId, PhysicalTablePath physicalTablePath, long createdMs) { this.physicalTablePath = physicalTablePath; this.createdMs = createdMs; + this.tableId = tableId; this.bucketId = bucketId; this.requestFuture = new RequestFuture(); this.recordCount = 0; @@ -157,6 +160,10 @@ public abstract class WriteBatch { return bucketId; } + public long tableId() { + return tableId; + } + public PhysicalTablePath physicalTablePath() { return physicalTablePath; } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java index 0e184f09f..4c01665ee 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java @@ -225,7 +225,7 @@ public class WriterClient { private void maybeAbortBatches(Throwable t) { if (accumulator.hasIncomplete()) { LOG.error("Aborting all pending write batches due to fatal error", t); - accumulator.abortBatches(toException(t)); + accumulator.abortAllBatches(toException(t)); } } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java index 206395135..956231c8d 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java @@ -87,6 +87,11 @@ public class TestingMetadataUpdater extends MetadataUpdater { } } + public void updateTableInfos(Map<TablePath, TableInfo> tableInfos) { + initializeCluster( + cluster.getCoordinatorServer(), cluster.getAliveTabletServerList(), tableInfos); + } + /** * Create a builder for constructing TestingMetadataUpdater with custom gateways. * diff --git a/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java b/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java index 5b1ca58c7..3ed17da7d 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/utils/ClientRpcMessageUtilsTest.java @@ -131,6 +131,7 @@ class ClientRpcMessageUtilsTest { PreAllocatedPagedOutputView outputView = new PreAllocatedPagedOutputView(Collections.singletonList(segment)); return new KvWriteBatch( + DATA1_TABLE_ID_PK, bucketId, PhysicalTablePath.of(DATA1_TABLE_PATH_PK), DATA1_TABLE_INFO_PK.getSchemaId(), diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java index afdef5616..fe80ecd94 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java @@ -126,6 +126,7 @@ public class ArrowLogWriteBatchTest { TableBucket tb = new TableBucket(DATA1_TABLE_ID, bucketId); ArrowLogWriteBatch arrowLogWriteBatch = new ArrowLogWriteBatch( + tb.getTableId(), tb.getBucket(), DATA1_PHYSICAL_TABLE_PATH, DATA1_TABLE_INFO.getSchemaId(), @@ -205,6 +206,7 @@ public class ArrowLogWriteBatchTest { ArrowLogWriteBatch arrowLogWriteBatch = new ArrowLogWriteBatch( + tb.getTableId(), tb.getBucket(), DATA1_PHYSICAL_TABLE_PATH, DATA1_TABLE_INFO.getSchemaId(), @@ -300,6 +302,7 @@ public class ArrowLogWriteBatchTest { private ArrowLogWriteBatch createArrowLogWriteBatch(TableBucket tb, int maxSizeInBytes) { return new ArrowLogWriteBatch( + tb.getTableId(), tb.getBucket(), DATA1_PHYSICAL_TABLE_PATH, DATA1_TABLE_INFO.getSchemaId(), diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java index 4f8b37919..140058eca 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java @@ -67,7 +67,6 @@ public class CompactedLogWriteBatchTest { CompactedLogWriteBatch logProducerBatch = createLogWriteBatch( new TableBucket(DATA1_TABLE_ID, bucketId), - 0L, writeLimit, MemorySegment.allocateHeapMemory(writeLimit)); @@ -89,7 +88,7 @@ public class CompactedLogWriteBatchTest { void testToBytes() throws Exception { int bucketId = 0; CompactedLogWriteBatch logProducerBatch = - createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 0L); + createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId)); boolean appendResult = logProducerBatch.tryAppend(createWriteRecord(), newWriteCallback()); assertThat(appendResult).isTrue(); @@ -105,7 +104,7 @@ public class CompactedLogWriteBatchTest { void testCompleteTwice() throws Exception { int bucketId = 0; CompactedLogWriteBatch logWriteBatch = - createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 0L); + createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId)); boolean appendResult = logWriteBatch.tryAppend(createWriteRecord(), newWriteCallback()); assertThat(appendResult).isTrue(); @@ -120,7 +119,7 @@ public class CompactedLogWriteBatchTest { void testFailedTwice() throws Exception { int bucketId = 0; CompactedLogWriteBatch logWriteBatch = - createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 0L); + createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId)); boolean appendResult = logWriteBatch.tryAppend(createWriteRecord(), newWriteCallback()); assertThat(appendResult).isTrue(); @@ -135,7 +134,7 @@ public class CompactedLogWriteBatchTest { void testClose() throws Exception { int bucketId = 0; CompactedLogWriteBatch logProducerBatch = - createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 0L); + createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId)); boolean appendResult = logProducerBatch.tryAppend(createWriteRecord(), newWriteCallback()); assertThat(appendResult).isTrue(); @@ -153,7 +152,6 @@ public class CompactedLogWriteBatchTest { CompactedLogWriteBatch logProducerBatch = createLogWriteBatch( new TableBucket(DATA1_TABLE_ID, bucketId), - 0L, writeLimit, MemorySegment.allocateHeapMemory(writeLimit)); @@ -202,15 +200,14 @@ public class CompactedLogWriteBatchTest { DATA1_TABLE_INFO, DATA1_PHYSICAL_TABLE_PATH, row, null); } - private CompactedLogWriteBatch createLogWriteBatch(TableBucket tb, long baseLogOffset) - throws Exception { - return createLogWriteBatch( - tb, baseLogOffset, Integer.MAX_VALUE, MemorySegment.allocateHeapMemory(1000)); + private CompactedLogWriteBatch createLogWriteBatch(TableBucket tb) throws Exception { + return createLogWriteBatch(tb, Integer.MAX_VALUE, MemorySegment.allocateHeapMemory(1000)); } private CompactedLogWriteBatch createLogWriteBatch( - TableBucket tb, long baseLogOffset, int writeLimit, MemorySegment memorySegment) { + TableBucket tb, int writeLimit, MemorySegment memorySegment) { return new CompactedLogWriteBatch( + tb.getTableId(), tb.getBucket(), DATA1_PHYSICAL_TABLE_PATH, DATA1_TABLE_INFO.getSchemaId(), diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java index 683013b89..331be4209 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java @@ -70,7 +70,6 @@ public class IndexedLogWriteBatchTest { IndexedLogWriteBatch logProducerBatch = createLogWriteBatch( new TableBucket(DATA1_TABLE_ID, bucketId), - 0L, writeLimit, MemorySegment.allocateHeapMemory(writeLimit)); @@ -92,7 +91,7 @@ public class IndexedLogWriteBatchTest { void testToBytes() throws Exception { int bucketId = 0; IndexedLogWriteBatch logProducerBatch = - createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 0L); + createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId)); boolean appendResult = logProducerBatch.tryAppend(createWriteRecord(), newWriteCallback()); assertThat(appendResult).isTrue(); @@ -109,7 +108,7 @@ public class IndexedLogWriteBatchTest { void testCompleteTwice() throws Exception { int bucketId = 0; IndexedLogWriteBatch logWriteBatch = - createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 0L); + createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId)); boolean appendResult = logWriteBatch.tryAppend(createWriteRecord(), newWriteCallback()); assertThat(appendResult).isTrue(); @@ -124,7 +123,7 @@ public class IndexedLogWriteBatchTest { void testFailedTwice() throws Exception { int bucketId = 0; IndexedLogWriteBatch logWriteBatch = - createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 0L); + createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId)); boolean appendResult = logWriteBatch.tryAppend(createWriteRecord(), newWriteCallback()); assertThat(appendResult).isTrue(); @@ -139,7 +138,7 @@ public class IndexedLogWriteBatchTest { void testClose() throws Exception { int bucketId = 0; IndexedLogWriteBatch logProducerBatch = - createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId), 0L); + createLogWriteBatch(new TableBucket(DATA1_TABLE_ID, bucketId)); boolean appendResult = logProducerBatch.tryAppend(createWriteRecord(), newWriteCallback()); assertThat(appendResult).isTrue(); @@ -157,7 +156,6 @@ public class IndexedLogWriteBatchTest { IndexedLogWriteBatch logProducerBatch = createLogWriteBatch( new TableBucket(DATA1_TABLE_ID, bucketId), - 0L, writeLimit, MemorySegment.allocateHeapMemory(writeLimit)); @@ -205,15 +203,14 @@ public class IndexedLogWriteBatchTest { return WriteRecord.forIndexedAppend(DATA1_TABLE_INFO, DATA1_PHYSICAL_TABLE_PATH, row, null); } - private IndexedLogWriteBatch createLogWriteBatch(TableBucket tb, long baseLogOffset) - throws Exception { - return createLogWriteBatch( - tb, baseLogOffset, Integer.MAX_VALUE, MemorySegment.allocateHeapMemory(1000)); + private IndexedLogWriteBatch createLogWriteBatch(TableBucket tb) throws Exception { + return createLogWriteBatch(tb, Integer.MAX_VALUE, MemorySegment.allocateHeapMemory(1000)); } private IndexedLogWriteBatch createLogWriteBatch( - TableBucket tb, long baseLogOffset, int writeLimit, MemorySegment memorySegment) { + TableBucket tb, int writeLimit, MemorySegment memorySegment) { return new IndexedLogWriteBatch( + tb.getTableId(), tb.getBucket(), DATA1_PHYSICAL_TABLE_PATH, DATA1_TABLE_INFO.getSchemaId(), diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java index e2340d170..7b61976a0 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java @@ -221,6 +221,7 @@ class KvWriteBatchTest { PreAllocatedPagedOutputView outputView = new PreAllocatedPagedOutputView(Collections.singletonList(memorySegment)); return new KvWriteBatch( + tb.getTableId(), tb.getBucket(), PhysicalTablePath.of(DATA1_TABLE_PATH_PK), DATA1_TABLE_INFO_PK.getSchemaId(), @@ -316,6 +317,7 @@ class KvWriteBatchTest { new PreAllocatedPagedOutputView( Collections.singletonList(memoryPool.nextSegment())); return new KvWriteBatch( + tb.getTableId(), tb.getBucket(), PhysicalTablePath.of(DATA1_TABLE_PATH_PK), DATA1_TABLE_INFO_PK.getSchemaId(), diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java index 746d251d8..27c74b9f1 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java @@ -656,4 +656,56 @@ class RecordAccumulatorTest { + (bucketBatches2 == null ? 0 : bucketBatches2.size()) + (bucketBatches3 == null ? 0 : bucketBatches3.size()); } + + @Test + void testTableIdChangeAbortsBatches() throws Exception { + IndexedRow row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"}); + long batchSize = getTestBatchSize(row); + RecordAccumulator accum = createTestRecordAccumulator((int) batchSize, Integer.MAX_VALUE); + + // Append records to multiple buckets with the original table ID + List<Exception> callbackExceptions = new ArrayList<>(); + WriteCallback trackingCallback = + (bucket, offset, exception) -> { + if (exception != null) { + callbackExceptions.add(exception); + } + }; + + accum.append(createRecord(row), trackingCallback, cluster, 0, false); + accum.append(createRecord(row), trackingCallback, cluster, 1, false); + accum.append(createRecord(row), trackingCallback, cluster, 2, false); + + // Verify batches exist before table ID change + assertThat(accum.getReadyDeque(DATA1_PHYSICAL_TABLE_PATH, tb1.getBucket())) + .isNotNull() + .hasSize(1); + assertThat(accum.getReadyDeque(DATA1_PHYSICAL_TABLE_PATH, tb2.getBucket())) + .isNotNull() + .hasSize(1); + assertThat(accum.getReadyDeque(DATA1_PHYSICAL_TABLE_PATH, tb3.getBucket())) + .isNotNull() + .hasSize(1); + + // Simulate table ID change by using a different table ID + long newTableId = DATA1_TABLE_ID + 1000; + TableBucket newTb1 = new TableBucket(newTableId, 0); + + // Call getReadyDeque with new table ID should abort stale batches + Deque<WriteBatch> result = + accum.getReadyDeque(DATA1_PHYSICAL_TABLE_PATH, newTb1.getBucket()); + + // Verify result is null (stale batches aborted) + assertThat(result).isNull(); + + // Verify callbacks were fired with TableNotExistException + assertThat(callbackExceptions).hasSize(1); + assertThat(callbackExceptions.get(0)) + .isInstanceOf(org.apache.fluss.exception.TableNotExistException.class) + .hasMessageContaining("has been dropped and re-created") + .hasMessageContaining("old: " + DATA1_TABLE_ID) + .hasMessageContaining("new: " + newTableId) + .hasMessageContaining("Further writes to the old table instance cannot proceed") + .hasMessageContaining("Please recreate the writer"); + } } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java index 16ba2f2ec..bf65f869e 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java @@ -24,6 +24,7 @@ import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.config.MemorySize; +import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.exception.TimeoutException; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.TableBucket; @@ -61,12 +62,14 @@ import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH; import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE; import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK; +import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR; import static org.apache.fluss.record.TestData.DATA1_TABLE_ID; import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK; import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO; import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK; import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH; import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK; +import static org.apache.fluss.record.TestData.DATA2_TABLE_ID; import static org.apache.fluss.rpc.protocol.Errors.SCHEMA_NOT_EXIST; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getProduceLogData; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeProduceLogResponse; @@ -747,6 +750,39 @@ final class SenderTest { assertThat(future.get()).isNull(); } + @Test + void testSendWhenTableIdChanges() throws Exception { + CompletableFuture<Exception> future1 = new CompletableFuture<>(); + appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> future1.complete(e)); + TableInfo newTableInfo = + TableInfo.of( + DATA1_TABLE_PATH, + DATA2_TABLE_ID, + 1, + DATA1_TABLE_DESCRIPTOR, + System.currentTimeMillis(), + System.currentTimeMillis()); + TableBucket newTableBucket = new TableBucket(newTableInfo.getTableId(), tb1.getBucket()); + + metadataUpdater.updateTableInfos(Collections.singletonMap(DATA1_TABLE_PATH, newTableInfo)); + sender.runOnce(); + Exception exception = future1.get(); + assertThat(exception).isNotNull(); + assertThat(exception).isExactlyInstanceOf(TableNotExistException.class); + assertThat(exception.getMessage()) + .contains( + String.format( + "Table '%s' has been dropped and re-created with a new table ID (old: %s, new: %s)", + DATA1_TABLE_PATH, DATA1_TABLE_ID, newTableInfo.getTableId())); + + CompletableFuture<Exception> future2 = new CompletableFuture<>(); + appendToAccumulator( + newTableInfo, newTableBucket, row(1, "a"), (tb, leo, e) -> future2.complete(e)); + sender.runOnce(); + finishRequest(newTableBucket, 0, createProduceLogResponse(newTableBucket, 0, 1)); + assertThat(future2.get()).isNull(); + } + private TestingMetadataUpdater initializeMetadataUpdater() { Map<TablePath, TableInfo> tableInfos = new HashMap<>(); tableInfos.put(DATA1_TABLE_PATH, DATA1_TABLE_INFO); @@ -756,8 +792,14 @@ final class SenderTest { private void appendToAccumulator(TableBucket tb, GenericRow row, WriteCallback writeCallback) throws Exception { + appendToAccumulator(DATA1_TABLE_INFO, tb, row, writeCallback); + } + + private void appendToAccumulator( + TableInfo tableInfo, TableBucket tb, GenericRow row, WriteCallback writeCallback) + throws Exception { accumulator.append( - WriteRecord.forArrowAppend(DATA1_TABLE_INFO, DATA1_PHYSICAL_TABLE_PATH, row, null), + WriteRecord.forArrowAppend(tableInfo, DATA1_PHYSICAL_TABLE_PATH, row, null), writeCallback, metadataUpdater.getCluster(), tb.getBucket(), diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/TableNotExistException.java b/fluss-common/src/main/java/org/apache/fluss/exception/TableNotExistException.java index b101a71bd..7c1110690 100644 --- a/fluss-common/src/main/java/org/apache/fluss/exception/TableNotExistException.java +++ b/fluss-common/src/main/java/org/apache/fluss/exception/TableNotExistException.java @@ -33,4 +33,8 @@ public class TableNotExistException extends ApiException { public TableNotExistException(String message, Throwable cause) { super(message, cause); } + + public TableNotExistException(String message, Throwable cause, boolean stackTraceEnabled) { + super(message, cause, stackTraceEnabled); + } }
