This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 2671af6c1 [kv] Return log end offset in UpsertResult and DeleteResult
for exactly-once semantics (#2530)
2671af6c1 is described below
commit 2671af6c198715309c371e778709f79f40ce1eab
Author: Yang Wang <[email protected]>
AuthorDate: Sun Feb 1 21:18:52 2026 +0800
[kv] Return log end offset in UpsertResult and DeleteResult for
exactly-once semantics (#2530)
---
.../client/table/writer/AbstractTableWriter.java | 33 ++++++-
.../fluss/client/table/writer/DeleteResult.java | 50 +++++++++-
.../fluss/client/table/writer/UpsertResult.java | 48 +++++++++-
.../client/table/writer/UpsertWriterImpl.java | 14 +--
.../java/org/apache/fluss/client/write/Sender.java | 16 +++-
.../org/apache/fluss/client/write/WriteBatch.java | 35 +++++--
.../apache/fluss/client/write/WriteCallback.java | 13 ++-
.../fluss/client/table/FlussTableITCase.java | 101 +++++++++++++++++++++
.../fluss/client/write/ArrowLogWriteBatchTest.java | 4 +-
.../client/write/CompactedLogWriteBatchTest.java | 4 +-
.../client/write/IndexedLogWriteBatchTest.java | 4 +-
.../fluss/client/write/KvWriteBatchTest.java | 4 +-
.../fluss/client/write/RecordAccumulatorTest.java | 2 +-
.../org/apache/fluss/client/write/SenderTest.java | 58 ++++++------
fluss-rpc/src/main/proto/FlussApi.proto | 3 +
.../fluss/server/utils/ServerRpcMessageUtils.java | 7 ++
16 files changed, 334 insertions(+), 62 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AbstractTableWriter.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AbstractTableWriter.java
index 98a93fdd2..96739d327 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AbstractTableWriter.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AbstractTableWriter.java
@@ -22,6 +22,7 @@ import org.apache.fluss.client.write.WriteRecord;
import org.apache.fluss.client.write.WriterClient;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.InternalRow;
@@ -65,7 +66,7 @@ public abstract class AbstractTableWriter implements
TableWriter {
CompletableFuture<Void> future = new CompletableFuture<>();
writerClient.send(
record,
- (exception) -> {
+ (bucket, offset, exception) -> {
if (exception == null) {
future.complete(null);
} else {
@@ -75,6 +76,36 @@ public abstract class AbstractTableWriter implements
TableWriter {
return future;
}
+ /**
+ * Send a record and return a future with a result containing bucket and
log end offset. This is
+ * used by KV writers that need to track offsets for exactly-once
semantics.
+ *
+ * @param record the record to send
+ * @param resultFactory factory to create the result from bucket and log
end offset
+ * @param <T> the result type
+ * @return a future that completes with the result
+ */
+ protected <T> CompletableFuture<T> sendWithResult(
+ WriteRecord record, ResultFactory<T> resultFactory) {
+ CompletableFuture<T> future = new CompletableFuture<>();
+ writerClient.send(
+ record,
+ (bucket, logEndOffset, exception) -> {
+ if (exception == null) {
+ future.complete(resultFactory.create(bucket,
logEndOffset));
+ } else {
+ future.completeExceptionally(exception);
+ }
+ });
+ return future;
+ }
+
+ /** Factory interface for creating result objects from bucket and log end
offset. */
+ @FunctionalInterface
+ protected interface ResultFactory<T> {
+ T create(TableBucket bucket, long logEndOffset);
+ }
+
protected PhysicalTablePath getPhysicalPath(InternalRow row) {
// not partitioned table, return the original physical path
if (partitionFieldGetter == null) {
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/DeleteResult.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/DeleteResult.java
index 0fefa33cb..56e05933d 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/DeleteResult.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/DeleteResult.java
@@ -18,15 +18,59 @@
package org.apache.fluss.client.table.writer;
import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.row.InternalRow;
+import javax.annotation.Nullable;
+
/**
- * The result of deleting a record ({@link UpsertWriter#delete(InternalRow)}.
+ * The result of deleting a record ({@link UpsertWriter#delete(InternalRow)}).
*
* @since 0.6
*/
@PublicEvolving
public class DeleteResult {
- // currently, it's an empty class, it will be a compatible evolution if it
extends
- // to have offset, timestamp, etc. in the future
+
+ private final @Nullable TableBucket bucket;
+ private final long logEndOffset;
+
+ /**
+ * Creates a result with bucket and log end offset information.
+ *
+ * @param bucket the bucket this record was deleted from
+ * @param logEndOffset the log end offset (LEO) after this delete, i.e.,
the offset of the next
+ * record to be written
+ */
+ public DeleteResult(@Nullable TableBucket bucket, long logEndOffset) {
+ this.bucket = bucket;
+ this.logEndOffset = logEndOffset;
+ }
+
+ /**
+ * Returns the bucket this record was deleted from.
+ *
+ * @return the bucket, or null if not available
+ */
+ @Nullable
+ public TableBucket getBucket() {
+ return bucket;
+ }
+
+ /**
+ * Returns the log end offset (LEO) after this delete. This is the offset
of the next record to
+ * be written to the changelog, not the offset of this specific record.
+ *
+ * <p>Note: When multiple records are batched together, all records in the
same batch will
+ * receive the same log end offset.
+ *
+ * @return the log end offset, or -1 if not available
+ */
+ public long getLogEndOffset() {
+ return logEndOffset;
+ }
+
+ /** Creates an empty result (for testing purposes). */
+ public static DeleteResult empty() {
+ return new DeleteResult(null, -1);
+ }
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertResult.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertResult.java
index 99cf4a59e..1ea38848c 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertResult.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertResult.java
@@ -18,8 +18,11 @@
package org.apache.fluss.client.table.writer;
import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.row.InternalRow;
+import javax.annotation.Nullable;
+
/**
* The result of upserting a record ({@link UpsertWriter#upsert(InternalRow)}).
*
@@ -27,6 +30,47 @@ import org.apache.fluss.row.InternalRow;
*/
@PublicEvolving
public class UpsertResult {
- // currently, it's an empty class, it will be a compatible evolution if it
extends
- // to have offset, timestamp, etc. in the future
+
+ private final @Nullable TableBucket bucket;
+ private final long logEndOffset;
+
+ /**
+ * Creates a result with bucket and log end offset information.
+ *
+ * @param bucket the bucket this record was written to
+ * @param logEndOffset the log end offset (LEO) after this write, i.e.,
the offset of the next
+ * record to be written
+ */
+ public UpsertResult(@Nullable TableBucket bucket, long logEndOffset) {
+ this.bucket = bucket;
+ this.logEndOffset = logEndOffset;
+ }
+
+ /**
+ * Returns the bucket this record was written to.
+ *
+ * @return the bucket, or null if not available
+ */
+ @Nullable
+ public TableBucket getBucket() {
+ return bucket;
+ }
+
+ /**
+ * Returns the log end offset (LEO) after this write. This is the offset
of the next record to
+ * be written to the changelog, not the offset of this specific record.
+ *
+ * <p>Note: When multiple records are batched together, all records in the
same batch will
+ * receive the same log end offset.
+ *
+ * @return the log end offset, or -1 if not available
+ */
+ public long getLogEndOffset() {
+ return logEndOffset;
+ }
+
+ /** Creates an empty result (for testing purposes). */
+ public static UpsertResult empty() {
+ return new UpsertResult(null, -1);
+ }
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
index d2f52d5ca..6b7f821a1 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
@@ -41,9 +41,8 @@ import java.util.concurrent.CompletableFuture;
/** The writer to write data to the primary key table. */
class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
- private static final UpsertResult UPSERT_SUCCESS = new UpsertResult();
- private static final DeleteResult DELETE_SUCCESS = new DeleteResult();
+ private final TableInfo tableInfo;
private final KeyEncoder primaryKeyEncoder;
private final @Nullable int[] targetColumns;
@@ -54,7 +53,6 @@ class UpsertWriterImpl extends AbstractTableWriter implements
UpsertWriter {
private final WriteFormat writeFormat;
private final RowEncoder rowEncoder;
private final FieldGetter[] fieldGetters;
- private final TableInfo tableInfo;
/** The merge mode for this writer. This controls how the server handles
data merging. */
private final MergeMode mergeMode;
@@ -173,8 +171,9 @@ class UpsertWriterImpl extends AbstractTableWriter
implements UpsertWriter {
* Inserts row into Fluss table if they do not already exist, or updates
them if they do exist.
*
* @param row the row to upsert.
- * @return A {@link CompletableFuture} that always returns null when
complete normally.
+ * @return A {@link CompletableFuture} that returns upsert result with
bucket and offset info.
*/
+ @Override
public CompletableFuture<UpsertResult> upsert(InternalRow row) {
checkFieldCount(row);
byte[] key = primaryKeyEncoder.encodeKey(row);
@@ -190,7 +189,7 @@ class UpsertWriterImpl extends AbstractTableWriter
implements UpsertWriter {
writeFormat,
targetColumns,
mergeMode);
- return send(record).thenApply(ignored -> UPSERT_SUCCESS);
+ return sendWithResult(record, UpsertResult::new);
}
/**
@@ -198,8 +197,9 @@ class UpsertWriterImpl extends AbstractTableWriter
implements UpsertWriter {
* key.
*
* @param row the row to delete.
- * @return A {@link CompletableFuture} that always returns null when
complete normally.
+ * @return A {@link CompletableFuture} that returns delete result with
bucket and offset info.
*/
+ @Override
public CompletableFuture<DeleteResult> delete(InternalRow row) {
checkFieldCount(row);
byte[] key = primaryKeyEncoder.encodeKey(row);
@@ -214,7 +214,7 @@ class UpsertWriterImpl extends AbstractTableWriter
implements UpsertWriter {
writeFormat,
targetColumns,
mergeMode);
- return send(record).thenApply(ignored -> DELETE_SUCCESS);
+ return sendWithResult(record, DeleteResult::new);
}
private BinaryRow encodeRow(InternalRow row) {
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
index e9c285306..66b7719b1 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
@@ -263,6 +263,17 @@ public class Sender implements Runnable {
}
}
+ /** Complete batch with bucket and log end offset info (for KV batches). */
+ private void completeBatch(
+ ReadyWriteBatch readyWriteBatch, TableBucket bucket, long
logEndOffset) {
+ if (idempotenceManager.idempotenceEnabled()) {
+ idempotenceManager.handleCompletedBatch(readyWriteBatch);
+ }
+ if (readyWriteBatch.writeBatch().complete(bucket, logEndOffset)) {
+ maybeRemoveAndDeallocateBatch(readyWriteBatch);
+ }
+ }
+
private void failBatch(
ReadyWriteBatch batch, Exception exception, boolean
adjustBatchSequences) {
if (batch.writeBatch().completeExceptionally(exception)) {
@@ -492,7 +503,10 @@ public class Sender implements Runnable {
writeBatch,
ApiError.fromErrorMessage(respForBucket));
invalidMetadataTablesSet.addAll(invalidMetadataTables);
} else {
- completeBatch(writeBatch);
+ // Get log end offset (LEO) from response for KV batches
+ long logEndOffset =
+ respForBucket.hasLogEndOffset() ?
respForBucket.getLogEndOffset() : -1;
+ completeBatch(writeBatch, tb, logEndOffset);
}
}
metadataUpdater.invalidPhysicalTableBucketMeta(invalidMetadataTablesSet);
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
index a35a9f58b..cb83943cf 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
@@ -21,6 +21,7 @@ import org.apache.fluss.annotation.Internal;
import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.memory.MemorySegmentPool;
import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.bytesview.BytesView;
import org.slf4j.Logger;
@@ -145,7 +146,7 @@ public abstract class WriteBatch {
physicalTablePath,
bucketId,
exception);
- completeFutureAndFireCallbacks(exception);
+ completeFutureAndFireCallbacks(null, -1, exception);
}
public boolean sequenceHasBeenReset() {
@@ -176,9 +177,14 @@ public abstract class WriteBatch {
return drainedMs - createdMs;
}
- /** Complete the batch successfully. */
+ /** Complete the batch successfully (for log batches without offset
tracking). */
public boolean complete() {
- return done(null);
+ return done(null, -1, null);
+ }
+
+ /** Complete the batch successfully with bucket and log end offset info
(for KV batches). */
+ public boolean complete(TableBucket bucket, long logEndOffset) {
+ return done(bucket, logEndOffset, null);
}
/**
@@ -187,15 +193,18 @@ public abstract class WriteBatch {
*/
public boolean completeExceptionally(Exception exception) {
checkNotNull(exception);
- return done(exception);
+ return done(null, -1, exception);
}
- private void completeFutureAndFireCallbacks(@Nullable Exception exception)
{
- // execute callbacks.
+ private void completeFutureAndFireCallbacks(
+ @Nullable TableBucket resultBucket,
+ long resultLogEndOffset,
+ @Nullable Exception exception) {
+ // execute callbacks with result info (for KV batches that track log
end offsets)
callbacks.forEach(
callback -> {
try {
- callback.onCompletion(exception);
+ callback.onCompletion(resultBucket,
resultLogEndOffset, exception);
} catch (Exception e) {
LOG.error(
"Error executing user-provided callback on
message for table path '{}'",
@@ -232,8 +241,16 @@ public abstract class WriteBatch {
* called twice when an inflight batch expires before a response from the
tablet server is
* received. The batch's final state is set to FAILED. But it could
succeed on the tablet server
* and second time around batch.done() may try to set SUCCEEDED final
state.
+ *
+ * @param resultBucket the table bucket of the batch written into
+ * @param resultLogEndOffset the log end offset after the batch write
+ * @param batchException the exception if the batch write failed, null
otherwise
+ * @return true if this call set the final state, false if the final state
was already set
*/
- private boolean done(@Nullable Exception batchException) {
+ private boolean done(
+ @Nullable TableBucket resultBucket,
+ long resultLogEndOffset,
+ @Nullable Exception batchException) {
final FinalState tryFinalState =
(batchException == null) ? FinalState.SUCCEEDED :
FinalState.FAILED;
if (tryFinalState == FinalState.SUCCEEDED) {
@@ -243,7 +260,7 @@ public abstract class WriteBatch {
}
if (finalState.compareAndSet(null, tryFinalState)) {
- completeFutureAndFireCallbacks(batchException);
+ completeFutureAndFireCallbacks(resultBucket, resultLogEndOffset,
batchException);
return true;
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteCallback.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteCallback.java
index a5a6f6f9a..fe3d2c5e2 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteCallback.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteCallback.java
@@ -18,6 +18,7 @@
package org.apache.fluss.client.write;
import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.metadata.TableBucket;
import javax.annotation.Nullable;
@@ -29,5 +30,15 @@ import javax.annotation.Nullable;
@Internal
public interface WriteCallback {
- void onCompletion(@Nullable Exception exception);
+ /**
+ * Called when the write operation completes.
+ *
+ * @param bucket the bucket this record was written to, or null if not
available (e.g., log
+ * tables)
+ * @param logEndOffset the log end offset (LEO) after this write, i.e.,
the offset of the next
+ * record to be written, or -1 if not available
+ * @param exception the exception if the write failed, or null if
successful
+ */
+ void onCompletion(
+ @Nullable TableBucket bucket, long logEndOffset, @Nullable
Exception exception);
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
index 8dcfeb2bb..c6bf09520 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
@@ -27,7 +27,9 @@ import org.apache.fluss.client.table.scanner.ScanRecord;
import org.apache.fluss.client.table.scanner.log.LogScanner;
import org.apache.fluss.client.table.scanner.log.ScanRecords;
import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.client.table.writer.DeleteResult;
import org.apache.fluss.client.table.writer.TableWriter;
+import org.apache.fluss.client.table.writer.UpsertResult;
import org.apache.fluss.client.table.writer.UpsertWriter;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
@@ -1805,4 +1807,103 @@ class FlussTableITCase extends ClientToServerITCaseBase
{
scanner.close();
}
}
+
+ // ---------------------- Upsert/Delete Result with LogEndOffset tests
----------------------
+
+ @Test
+ void testUpsertAndDeleteReturnLogEndOffset() throws Exception {
+ // Create a PK table with single bucket for predictable offset tracking
+ TablePath tablePath = TablePath.of("test_db_1",
"test_upsert_delete_log_end_offset");
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.STRING())
+ .primaryKey("a")
+ .build();
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder().schema(schema).distributedBy(1,
"a").build();
+ createTable(tablePath, tableDescriptor, true);
+
+ try (Table table = conn.getTable(tablePath)) {
+ UpsertWriter upsertWriter = table.newUpsert().createWriter();
+ TableBucket expectedBucket = new
TableBucket(table.getTableInfo().getTableId(), 0);
+
+ // First upsert - should return log end offset > 0
+ UpsertResult upsertResult1 = upsertWriter.upsert(row(1,
"a")).get();
+ assertThat(upsertResult1.getBucket()).isEqualTo(expectedBucket);
+ assertThat(upsertResult1.getLogEndOffset()).isEqualTo(1);
+
+ // Second upsert - should return higher log end offset
+ UpsertResult upsertResult2 = upsertWriter.upsert(row(2,
"b")).get();
+ assertThat(upsertResult2.getBucket()).isEqualTo(expectedBucket);
+ assertThat(upsertResult2.getLogEndOffset()).isEqualTo(2);
+
+ // Update existing key - should return higher log end offset
+ UpsertResult upsertResult3 = upsertWriter.upsert(row(1,
"aa")).get();
+ assertThat(upsertResult3.getBucket()).isEqualTo(expectedBucket);
+ assertThat(upsertResult3.getLogEndOffset()).isEqualTo(4);
+
+ // Delete - should return higher log end offset
+ DeleteResult deleteResult = upsertWriter.delete(row(1,
"aa")).get();
+ assertThat(deleteResult.getBucket()).isEqualTo(expectedBucket);
+ assertThat(deleteResult.getLogEndOffset()).isEqualTo(5);
+
+ // Verify the data via lookup
+ Lookuper lookuper = table.newLookup().createLookuper();
+ // key 1 should be deleted
+ assertThat(lookupRow(lookuper, row(1))).isNull();
+ // key 2 should exist
+ assertThat(lookupRow(lookuper, row(2))).isNotNull();
+ }
+ }
+
+ @Test
+ void testBatchedUpsertReturnsSameLogEndOffset() throws Exception {
+ // Test that multiple records in the same batch receive the same log
end offset
+ TablePath tablePath = TablePath.of("test_db_1",
"test_batched_upsert_log_end_offset");
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.STRING())
+ .primaryKey("a")
+ .build();
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder().schema(schema).distributedBy(1,
"a").build();
+ long tableId = createTable(tablePath, tableDescriptor, true);
+
+ // Configure small batch size to ensure records are batched together
+ Configuration config = new Configuration(clientConf);
+ config.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, new MemorySize(1024
* 1024)); // 1MB
+ config.set(
+ ConfigOptions.CLIENT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET,
1); // Force batching
+
+ try (Connection connection =
ConnectionFactory.createConnection(config);
+ Table table = connection.getTable(tablePath)) {
+ UpsertWriter upsertWriter = table.newUpsert().createWriter();
+
+ // Send multiple upserts without waiting - they should be batched
+ CompletableFuture<UpsertResult> future1 =
upsertWriter.upsert(row(1, "a"));
+ CompletableFuture<UpsertResult> future2 =
upsertWriter.upsert(row(2, "b"));
+ CompletableFuture<UpsertResult> future3 =
upsertWriter.upsert(row(3, "c"));
+
+ // Flush to send the batch
+ upsertWriter.flush();
+
+ // Get results
+ UpsertResult result1 = future1.get();
+ UpsertResult result2 = future2.get();
+ UpsertResult result3 = future3.get();
+
+ TableBucket expectedBucket = new TableBucket(tableId, 0);
+ // All results should have valid bucket and log end offset
+ assertThat(result1.getBucket()).isEqualTo(expectedBucket);
+ assertThat(result2.getBucket()).isEqualTo(expectedBucket);
+ assertThat(result3.getBucket()).isEqualTo(expectedBucket);
+ // Records in the same batch should have the same log end offset
+ // (since they're sent to the same bucket)
+ assertThat(result1.getLogEndOffset()).isEqualTo(3);
+ assertThat(result2.getLogEndOffset()).isEqualTo(3);
+ assertThat(result3.getLogEndOffset()).isEqualTo(3);
+ }
+ }
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java
index cb76d7688..afdef5616 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java
@@ -253,7 +253,7 @@ public class ArrowLogWriteBatchTest {
CompletableFuture<Void> future = new CompletableFuture<>();
arrowLogWriteBatch.tryAppend(
createWriteRecord(row(i, "a" + i)),
- exception -> {
+ (bucket, offset, exception) -> {
if (exception != null) {
future.completeExceptionally(exception);
} else {
@@ -314,7 +314,7 @@ public class ArrowLogWriteBatchTest {
}
private WriteCallback newWriteCallback() {
- return exception -> {
+ return (bucket, offset, exception) -> {
if (exception != null) {
throw new RuntimeException(exception);
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java
index cbd6680a7..4f8b37919 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java
@@ -163,7 +163,7 @@ public class CompactedLogWriteBatchTest {
CompletableFuture<Void> future = new CompletableFuture<>();
logProducerBatch.tryAppend(
createWriteRecord(),
- exception -> {
+ (bucket, offset, exception) -> {
if (exception != null) {
future.completeExceptionally(exception);
} else {
@@ -236,7 +236,7 @@ public class CompactedLogWriteBatchTest {
}
private WriteCallback newWriteCallback() {
- return exception -> {
+ return (bucket, offset, exception) -> {
if (exception != null) {
throw new RuntimeException(exception);
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
index aca1cde1b..683013b89 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
@@ -167,7 +167,7 @@ public class IndexedLogWriteBatchTest {
CompletableFuture<Void> future = new CompletableFuture<>();
logProducerBatch.tryAppend(
createWriteRecord(),
- exception -> {
+ (bucket, offset, exception) -> {
if (exception != null) {
future.completeExceptionally(exception);
} else {
@@ -240,7 +240,7 @@ public class IndexedLogWriteBatchTest {
}
private WriteCallback newWriteCallback() {
- return exception -> {
+ return (bucket, offset, exception) -> {
if (exception != null) {
throw new RuntimeException(exception);
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
index 3fe428388..e2340d170 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
@@ -168,7 +168,7 @@ class KvWriteBatchTest {
CompletableFuture<Void> future = new CompletableFuture<>();
kvProducerBatch.tryAppend(
createWriteRecord(),
- exception -> {
+ (bucket, offset, exception) -> {
if (exception != null) {
future.completeExceptionally(exception);
} else {
@@ -233,7 +233,7 @@ class KvWriteBatchTest {
}
private WriteCallback newWriteCallback() {
- return exception -> {
+ return (bucket, offset, exception) -> {
if (exception != null) {
throw new RuntimeException(exception);
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
index efebb3278..d3f772f38 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
@@ -121,7 +121,7 @@ class RecordAccumulatorTest {
DATA1_PHYSICAL_TABLE_PATH, DATA1_TABLE_ID, 3, node2.id(),
serverNodes);
private final WriteCallback writeCallback =
- exception -> {
+ (bucket, offset, exception) -> {
if (exception != null) {
throw new RuntimeException(exception);
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
index b0f61c9ea..c32b3f34e 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
@@ -105,7 +105,7 @@ final class SenderTest {
void testSimple() throws Exception {
long offset = 0;
CompletableFuture<Exception> future = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future.complete(e));
sender.runOnce();
assertThat(sender.numOfInFlightBatches(tb1)).isEqualTo(1);
finishRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1));
@@ -129,7 +129,7 @@ final class SenderTest {
0);
// do a successful retry.
CompletableFuture<Exception> future = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future.complete(e));
sender1.runOnce();
assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(1);
long offset = 0;
@@ -140,8 +140,8 @@ final class SenderTest {
assertThat(future.get()).isNull();
// do an unsuccessful retry.
- future = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future::complete);
+ CompletableFuture<Exception> future2 = new CompletableFuture<>();
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future2.complete(e));
sender1.runOnce();
assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(1);
@@ -155,7 +155,7 @@ final class SenderTest {
finishRequest(tb1, 0, createProduceLogResponse(tb1,
Errors.REQUEST_TIME_OUT));
sender1.runOnce();
assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(0);
- assertThat(future.get())
+ assertThat(future2.get())
.isInstanceOf(TimeoutException.class)
.hasMessageContaining(Errors.REQUEST_TIME_OUT.message());
}
@@ -174,7 +174,7 @@ final class SenderTest {
void testCanRetryWithoutIdempotence() throws Exception {
// do a successful retry.
CompletableFuture<Exception> future = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future.complete(e));
sender.runOnce();
assertThat(sender.numOfInFlightBatches(tb1)).isEqualTo(1);
assertThat(future.isDone()).isFalse();
@@ -204,14 +204,14 @@ final class SenderTest {
// Send first ProduceLogRequest.
CompletableFuture<Exception> future1 = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future1::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future1.complete(e));
sender1.runOnce();
assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(1);
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
// Send second ProduceLogRequest.
CompletableFuture<Exception> future2 = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future2::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future2.complete(e));
sender1.runOnce();
assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(2);
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
@@ -244,7 +244,7 @@ final class SenderTest {
for (int i = 0; i < MAX_INFLIGHT_REQUEST_PER_BUCKET - 1; i++) {
CompletableFuture<Exception> future = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future.complete(e));
sender1.runOnce();
assertThat(idempotenceManager.inflightBatchSize(tb1)).isEqualTo(i
+ 1);
assertThat(idempotenceManager.canSendMoreRequests(tb1)).isTrue();
@@ -252,7 +252,7 @@ final class SenderTest {
// add one batch to make the inflight request size equal to max.
CompletableFuture<Exception> future = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future.complete(e));
sender1.runOnce();
assertThat(idempotenceManager.inflightBatchSize(tb1))
.isEqualTo(MAX_INFLIGHT_REQUEST_PER_BUCKET);
@@ -260,7 +260,7 @@ final class SenderTest {
// add one more batch, it will not be drained from accumulator.
CompletableFuture<Exception> future1 = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future1::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future1.complete(e));
sender1.runOnce();
assertThat(idempotenceManager.inflightBatchSize(tb1))
.isEqualTo(MAX_INFLIGHT_REQUEST_PER_BUCKET);
@@ -288,7 +288,7 @@ final class SenderTest {
// 1. send five batches first to full MAX_INFLIGHT_REQUEST_PER_BUCKET.
for (int i = 0; i < MAX_INFLIGHT_REQUEST_PER_BUCKET; i++) {
CompletableFuture<Exception> future = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future.complete(e));
assertThat(idempotenceManager.canSendMoreRequests(tb1)).isTrue();
sender.runOnce(); // runOnce to send request.
}
@@ -298,7 +298,7 @@ final class SenderTest {
// 2. try to append more data into accumulator, it will not be drained
from accumulator.
for (int i = 0; i < 1000; i++) {
CompletableFuture<Exception> future = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future.complete(e));
}
// No batches can be drained from accumulator as the inflight request
size is max in
// IdempotenceManager.
@@ -326,7 +326,7 @@ final class SenderTest {
// empty.
for (int j = 0; j < 50; j++) {
CompletableFuture<Exception> future = new
CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future.complete(e));
}
// already have five batches in idempotenceManager inflight
batches.
@@ -359,19 +359,19 @@ final class SenderTest {
// Send first ProduceLogRequest.
CompletableFuture<Exception> future1 = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future1::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future1.complete(e));
sender1.runOnce();
assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(1);
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
// Send second ProduceLogRequest.
CompletableFuture<Exception> future2 = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future2::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future2.complete(e));
sender1.runOnce();
// Send third ProduceLogRequest.
CompletableFuture<Exception> future3 = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future3::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future3.complete(e));
sender1.runOnce();
// finish batch one with retriable error.
@@ -381,7 +381,7 @@ final class SenderTest {
// Queue the forth request, it shouldn't sent until the first 3
complete.
CompletableFuture<Exception> future4 = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future4::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future4.complete(e));
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
finishIdempotentProduceLogRequest(
@@ -445,11 +445,11 @@ final class SenderTest {
// Send the first ProduceLogRequest.
CompletableFuture<Exception> future1 = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future1::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future1.complete(e));
sender1.runOnce();
// Send the second ProduceLogRequest.
CompletableFuture<Exception> future2 = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future2::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future2.complete(e));
sender1.runOnce();
// response 0 with retrievable error which will reEnqueue the batch.
@@ -495,14 +495,14 @@ final class SenderTest {
// Send first ProduceLogRequest.
CompletableFuture<Exception> future1 = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future1::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future1.complete(e));
sender1.runOnce();
assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(1);
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
// Send second ProduceLogRequest.
CompletableFuture<Exception> future2 = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future2::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future2.complete(e));
sender1.runOnce();
assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(2);
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
@@ -564,14 +564,14 @@ final class SenderTest {
// Send first ProduceLogRequest.
CompletableFuture<Exception> future1 = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future1::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future1.complete(e));
sender1.runOnce();
assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(1);
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
// Send second ProduceLogRequest.
CompletableFuture<Exception> future2 = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future2::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future2.complete(e));
sender1.runOnce();
assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(2);
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
@@ -619,14 +619,14 @@ final class SenderTest {
// first finish second ProduceLogRequest with success.
CompletableFuture<Exception> future1 = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future1::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future1.complete(e));
sender1.runOnce();
assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(1);
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
// Send second ProduceLogRequest.
CompletableFuture<Exception> future2 = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future2::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future2.complete(e));
sender1.runOnce();
assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(2);
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
@@ -657,7 +657,7 @@ final class SenderTest {
assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(0);
CompletableFuture<Exception> future1 = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future1::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future1.complete(e));
sender1.runOnce();
finishIdempotentProduceLogRequest(0, tb1, 0,
createProduceLogResponse(tb1, 0L, 1L));
sender1.runOnce();
@@ -671,7 +671,7 @@ final class SenderTest {
void testSendWhenDestinationIsNullInMetadata() throws Exception {
long offset = 0;
CompletableFuture<Exception> future = new CompletableFuture<>();
- appendToAccumulator(tb1, row(1, "a"), future::complete);
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future.complete(e));
int leaderNode = metadataUpdater.leaderFor(DATA1_TABLE_PATH, tb1);
// now, remove leader node ,so that send destination
@@ -730,7 +730,7 @@ final class SenderTest {
key,
WriteFormat.COMPACTED_KV,
null),
- future::complete,
+ (tb, leo, e) -> future.complete(e),
metadataUpdater.getCluster(),
0,
false);
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto
b/fluss-rpc/src/main/proto/FlussApi.proto
index d800549d8..dc05efd83 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -787,6 +787,9 @@ message PbPutKvRespForBucket {
required int32 bucket_id = 2;
optional int32 error_code = 3;
optional string error_message = 4;
+ // the log end offset (LEO) of the changelog after this write
+ // this is the offset of the next record to be written, used for
exactly-once semantics
+ optional int64 log_end_offset = 5;
}
message PbLookupReqForBucket {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
index a7bed51b8..4e4e59908 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
@@ -1026,6 +1026,13 @@ public class ServerRpcMessageUtils {
if (bucketResult.failed()) {
putKvBucket.setError(bucketResult.getErrorCode(),
bucketResult.getErrorMessage());
+ } else {
+ // set log end offset for successful writes
+ // this is used for exactly-once semantics to track checkpoint
offsets
+ long logEndOffset = bucketResult.getWriteLogEndOffset();
+ if (logEndOffset >= 0) {
+ putKvBucket.setLogEndOffset(logEndOffset);
+ }
}
putKvRespForBucketList.add(putKvBucket);
}