This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 2671af6c1 [kv] Return log end offset in UpsertResult and DeleteResult 
for exactly-once semantics (#2530)
2671af6c1 is described below

commit 2671af6c198715309c371e778709f79f40ce1eab
Author: Yang Wang <[email protected]>
AuthorDate: Sun Feb 1 21:18:52 2026 +0800

    [kv] Return log end offset in UpsertResult and DeleteResult for 
exactly-once semantics (#2530)
---
 .../client/table/writer/AbstractTableWriter.java   |  33 ++++++-
 .../fluss/client/table/writer/DeleteResult.java    |  50 +++++++++-
 .../fluss/client/table/writer/UpsertResult.java    |  48 +++++++++-
 .../client/table/writer/UpsertWriterImpl.java      |  14 +--
 .../java/org/apache/fluss/client/write/Sender.java |  16 +++-
 .../org/apache/fluss/client/write/WriteBatch.java  |  35 +++++--
 .../apache/fluss/client/write/WriteCallback.java   |  13 ++-
 .../fluss/client/table/FlussTableITCase.java       | 101 +++++++++++++++++++++
 .../fluss/client/write/ArrowLogWriteBatchTest.java |   4 +-
 .../client/write/CompactedLogWriteBatchTest.java   |   4 +-
 .../client/write/IndexedLogWriteBatchTest.java     |   4 +-
 .../fluss/client/write/KvWriteBatchTest.java       |   4 +-
 .../fluss/client/write/RecordAccumulatorTest.java  |   2 +-
 .../org/apache/fluss/client/write/SenderTest.java  |  58 ++++++------
 fluss-rpc/src/main/proto/FlussApi.proto            |   3 +
 .../fluss/server/utils/ServerRpcMessageUtils.java  |   7 ++
 16 files changed, 334 insertions(+), 62 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AbstractTableWriter.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AbstractTableWriter.java
index 98a93fdd2..96739d327 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AbstractTableWriter.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AbstractTableWriter.java
@@ -22,6 +22,7 @@ import org.apache.fluss.client.write.WriteRecord;
 import org.apache.fluss.client.write.WriterClient;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.row.InternalRow;
@@ -65,7 +66,7 @@ public abstract class AbstractTableWriter implements 
TableWriter {
         CompletableFuture<Void> future = new CompletableFuture<>();
         writerClient.send(
                 record,
-                (exception) -> {
+                (bucket, offset, exception) -> {
                     if (exception == null) {
                         future.complete(null);
                     } else {
@@ -75,6 +76,36 @@ public abstract class AbstractTableWriter implements 
TableWriter {
         return future;
     }
 
+    /**
+     * Send a record and return a future with a result containing bucket and 
log end offset. This is
+     * used by KV writers that need to track offsets for exactly-once 
semantics.
+     *
+     * @param record the record to send
+     * @param resultFactory factory to create the result from bucket and log 
end offset
+     * @param <T> the result type
+     * @return a future that completes with the result
+     */
+    protected <T> CompletableFuture<T> sendWithResult(
+            WriteRecord record, ResultFactory<T> resultFactory) {
+        CompletableFuture<T> future = new CompletableFuture<>();
+        writerClient.send(
+                record,
+                (bucket, logEndOffset, exception) -> {
+                    if (exception == null) {
+                        future.complete(resultFactory.create(bucket, 
logEndOffset));
+                    } else {
+                        future.completeExceptionally(exception);
+                    }
+                });
+        return future;
+    }
+
+    /** Factory interface for creating result objects from bucket and log end 
offset. */
+    @FunctionalInterface
+    protected interface ResultFactory<T> {
+        T create(TableBucket bucket, long logEndOffset);
+    }
+
     protected PhysicalTablePath getPhysicalPath(InternalRow row) {
         // not partitioned table, return the original physical path
         if (partitionFieldGetter == null) {
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/DeleteResult.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/DeleteResult.java
index 0fefa33cb..56e05933d 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/DeleteResult.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/DeleteResult.java
@@ -18,15 +18,59 @@
 package org.apache.fluss.client.table.writer;
 
 import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.row.InternalRow;
 
+import javax.annotation.Nullable;
+
 /**
- * The result of deleting a record ({@link UpsertWriter#delete(InternalRow)}.
+ * The result of deleting a record ({@link UpsertWriter#delete(InternalRow)}).
  *
  * @since 0.6
  */
 @PublicEvolving
 public class DeleteResult {
-    // currently, it's an empty class, it will be a compatible evolution if it 
extends
-    // to have offset, timestamp, etc. in the future
+
+    private final @Nullable TableBucket bucket;
+    private final long logEndOffset;
+
+    /**
+     * Creates a result with bucket and log end offset information.
+     *
+     * @param bucket the bucket this record was deleted from
+     * @param logEndOffset the log end offset (LEO) after this delete, i.e., 
the offset of the next
+     *     record to be written
+     */
+    public DeleteResult(@Nullable TableBucket bucket, long logEndOffset) {
+        this.bucket = bucket;
+        this.logEndOffset = logEndOffset;
+    }
+
+    /**
+     * Returns the bucket this record was deleted from.
+     *
+     * @return the bucket, or null if not available
+     */
+    @Nullable
+    public TableBucket getBucket() {
+        return bucket;
+    }
+
+    /**
+     * Returns the log end offset (LEO) after this delete. This is the offset 
of the next record to
+     * be written to the changelog, not the offset of this specific record.
+     *
+     * <p>Note: When multiple records are batched together, all records in the 
same batch will
+     * receive the same log end offset.
+     *
+     * @return the log end offset, or -1 if not available
+     */
+    public long getLogEndOffset() {
+        return logEndOffset;
+    }
+
+    /** Creates an empty result (for testing purposes). */
+    public static DeleteResult empty() {
+        return new DeleteResult(null, -1);
+    }
 }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertResult.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertResult.java
index 99cf4a59e..1ea38848c 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertResult.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertResult.java
@@ -18,8 +18,11 @@
 package org.apache.fluss.client.table.writer;
 
 import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.row.InternalRow;
 
+import javax.annotation.Nullable;
+
 /**
  * The result of upserting a record ({@link UpsertWriter#upsert(InternalRow)}).
  *
@@ -27,6 +30,47 @@ import org.apache.fluss.row.InternalRow;
  */
 @PublicEvolving
 public class UpsertResult {
-    // currently, it's an empty class, it will be a compatible evolution if it 
extends
-    // to have offset, timestamp, etc. in the future
+
+    private final @Nullable TableBucket bucket;
+    private final long logEndOffset;
+
+    /**
+     * Creates a result with bucket and log end offset information.
+     *
+     * @param bucket the bucket this record was written to
+     * @param logEndOffset the log end offset (LEO) after this write, i.e., 
the offset of the next
+     *     record to be written
+     */
+    public UpsertResult(@Nullable TableBucket bucket, long logEndOffset) {
+        this.bucket = bucket;
+        this.logEndOffset = logEndOffset;
+    }
+
+    /**
+     * Returns the bucket this record was written to.
+     *
+     * @return the bucket, or null if not available
+     */
+    @Nullable
+    public TableBucket getBucket() {
+        return bucket;
+    }
+
+    /**
+     * Returns the log end offset (LEO) after this write. This is the offset 
of the next record to
+     * be written to the changelog, not the offset of this specific record.
+     *
+     * <p>Note: When multiple records are batched together, all records in the 
same batch will
+     * receive the same log end offset.
+     *
+     * @return the log end offset, or -1 if not available
+     */
+    public long getLogEndOffset() {
+        return logEndOffset;
+    }
+
+    /** Creates an empty result (for testing purposes). */
+    public static UpsertResult empty() {
+        return new UpsertResult(null, -1);
+    }
 }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
index d2f52d5ca..6b7f821a1 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
@@ -41,9 +41,8 @@ import java.util.concurrent.CompletableFuture;
 
 /** The writer to write data to the primary key table. */
 class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
-    private static final UpsertResult UPSERT_SUCCESS = new UpsertResult();
-    private static final DeleteResult DELETE_SUCCESS = new DeleteResult();
 
+    private final TableInfo tableInfo;
     private final KeyEncoder primaryKeyEncoder;
     private final @Nullable int[] targetColumns;
 
@@ -54,7 +53,6 @@ class UpsertWriterImpl extends AbstractTableWriter implements 
UpsertWriter {
     private final WriteFormat writeFormat;
     private final RowEncoder rowEncoder;
     private final FieldGetter[] fieldGetters;
-    private final TableInfo tableInfo;
 
     /** The merge mode for this writer. This controls how the server handles 
data merging. */
     private final MergeMode mergeMode;
@@ -173,8 +171,9 @@ class UpsertWriterImpl extends AbstractTableWriter 
implements UpsertWriter {
      * Inserts row into Fluss table if they do not already exist, or updates 
them if they do exist.
      *
      * @param row the row to upsert.
-     * @return A {@link CompletableFuture} that always returns null when 
complete normally.
+     * @return A {@link CompletableFuture} that returns upsert result with 
bucket and offset info.
      */
+    @Override
     public CompletableFuture<UpsertResult> upsert(InternalRow row) {
         checkFieldCount(row);
         byte[] key = primaryKeyEncoder.encodeKey(row);
@@ -190,7 +189,7 @@ class UpsertWriterImpl extends AbstractTableWriter 
implements UpsertWriter {
                         writeFormat,
                         targetColumns,
                         mergeMode);
-        return send(record).thenApply(ignored -> UPSERT_SUCCESS);
+        return sendWithResult(record, UpsertResult::new);
     }
 
     /**
@@ -198,8 +197,9 @@ class UpsertWriterImpl extends AbstractTableWriter 
implements UpsertWriter {
      * key.
      *
      * @param row the row to delete.
-     * @return A {@link CompletableFuture} that always returns null when 
complete normally.
+     * @return A {@link CompletableFuture} that returns delete result with 
bucket and offset info.
      */
+    @Override
     public CompletableFuture<DeleteResult> delete(InternalRow row) {
         checkFieldCount(row);
         byte[] key = primaryKeyEncoder.encodeKey(row);
@@ -214,7 +214,7 @@ class UpsertWriterImpl extends AbstractTableWriter 
implements UpsertWriter {
                         writeFormat,
                         targetColumns,
                         mergeMode);
-        return send(record).thenApply(ignored -> DELETE_SUCCESS);
+        return sendWithResult(record, DeleteResult::new);
     }
 
     private BinaryRow encodeRow(InternalRow row) {
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
index e9c285306..66b7719b1 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
@@ -263,6 +263,17 @@ public class Sender implements Runnable {
         }
     }
 
+    /** Complete batch with bucket and log end offset info (for KV batches). */
+    private void completeBatch(
+            ReadyWriteBatch readyWriteBatch, TableBucket bucket, long 
logEndOffset) {
+        if (idempotenceManager.idempotenceEnabled()) {
+            idempotenceManager.handleCompletedBatch(readyWriteBatch);
+        }
+        if (readyWriteBatch.writeBatch().complete(bucket, logEndOffset)) {
+            maybeRemoveAndDeallocateBatch(readyWriteBatch);
+        }
+    }
+
     private void failBatch(
             ReadyWriteBatch batch, Exception exception, boolean 
adjustBatchSequences) {
         if (batch.writeBatch().completeExceptionally(exception)) {
@@ -492,7 +503,10 @@ public class Sender implements Runnable {
                                 writeBatch, 
ApiError.fromErrorMessage(respForBucket));
                 invalidMetadataTablesSet.addAll(invalidMetadataTables);
             } else {
-                completeBatch(writeBatch);
+                // Get log end offset (LEO) from response for KV batches
+                long logEndOffset =
+                        respForBucket.hasLogEndOffset() ? 
respForBucket.getLogEndOffset() : -1;
+                completeBatch(writeBatch, tb, logEndOffset);
             }
         }
         
metadataUpdater.invalidPhysicalTableBucketMeta(invalidMetadataTablesSet);
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
index a35a9f58b..cb83943cf 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
@@ -21,6 +21,7 @@ import org.apache.fluss.annotation.Internal;
 import org.apache.fluss.memory.MemorySegment;
 import org.apache.fluss.memory.MemorySegmentPool;
 import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.record.bytesview.BytesView;
 
 import org.slf4j.Logger;
@@ -145,7 +146,7 @@ public abstract class WriteBatch {
                 physicalTablePath,
                 bucketId,
                 exception);
-        completeFutureAndFireCallbacks(exception);
+        completeFutureAndFireCallbacks(null, -1, exception);
     }
 
     public boolean sequenceHasBeenReset() {
@@ -176,9 +177,14 @@ public abstract class WriteBatch {
         return drainedMs - createdMs;
     }
 
-    /** Complete the batch successfully. */
+    /** Complete the batch successfully (for log batches without offset 
tracking). */
     public boolean complete() {
-        return done(null);
+        return done(null, -1, null);
+    }
+
+    /** Complete the batch successfully with bucket and log end offset info 
(for KV batches). */
+    public boolean complete(TableBucket bucket, long logEndOffset) {
+        return done(bucket, logEndOffset, null);
     }
 
     /**
@@ -187,15 +193,18 @@ public abstract class WriteBatch {
      */
     public boolean completeExceptionally(Exception exception) {
         checkNotNull(exception);
-        return done(exception);
+        return done(null, -1, exception);
     }
 
-    private void completeFutureAndFireCallbacks(@Nullable Exception exception) 
{
-        // execute callbacks.
+    private void completeFutureAndFireCallbacks(
+            @Nullable TableBucket resultBucket,
+            long resultLogEndOffset,
+            @Nullable Exception exception) {
+        // execute callbacks with result info (for KV batches that track log 
end offsets)
         callbacks.forEach(
                 callback -> {
                     try {
-                        callback.onCompletion(exception);
+                        callback.onCompletion(resultBucket, 
resultLogEndOffset, exception);
                     } catch (Exception e) {
                         LOG.error(
                                 "Error executing user-provided callback on 
message for table path '{}'",
@@ -232,8 +241,16 @@ public abstract class WriteBatch {
      * called twice when an inflight batch expires before a response from the 
tablet server is
      * received. The batch's final state is set to FAILED. But it could 
succeed on the tablet server
      * and second time around batch.done() may try to set SUCCEEDED final 
state.
+     *
+     * @param resultBucket the table bucket of the batch written into
+     * @param resultLogEndOffset the log end offset after the batch write
+     * @param batchException the exception if the batch write failed, null 
otherwise
+     * @return true if this call set the final state, false if the final state 
was already set
      */
-    private boolean done(@Nullable Exception batchException) {
+    private boolean done(
+            @Nullable TableBucket resultBucket,
+            long resultLogEndOffset,
+            @Nullable Exception batchException) {
         final FinalState tryFinalState =
                 (batchException == null) ? FinalState.SUCCEEDED : 
FinalState.FAILED;
         if (tryFinalState == FinalState.SUCCEEDED) {
@@ -243,7 +260,7 @@ public abstract class WriteBatch {
         }
 
         if (finalState.compareAndSet(null, tryFinalState)) {
-            completeFutureAndFireCallbacks(batchException);
+            completeFutureAndFireCallbacks(resultBucket, resultLogEndOffset, 
batchException);
             return true;
         }
 
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteCallback.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteCallback.java
index a5a6f6f9a..fe3d2c5e2 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteCallback.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteCallback.java
@@ -18,6 +18,7 @@
 package org.apache.fluss.client.write;
 
 import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.metadata.TableBucket;
 
 import javax.annotation.Nullable;
 
@@ -29,5 +30,15 @@ import javax.annotation.Nullable;
 @Internal
 public interface WriteCallback {
 
-    void onCompletion(@Nullable Exception exception);
+    /**
+     * Called when the write operation completes.
+     *
+     * @param bucket the bucket this record was written to, or null if not 
available (e.g., log
+     *     tables)
+     * @param logEndOffset the log end offset (LEO) after this write, i.e., 
the offset of the next
+     *     record to be written, or -1 if not available
+     * @param exception the exception if the write failed, or null if 
successful
+     */
+    void onCompletion(
+            @Nullable TableBucket bucket, long logEndOffset, @Nullable 
Exception exception);
 }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
index 8dcfeb2bb..c6bf09520 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
@@ -27,7 +27,9 @@ import org.apache.fluss.client.table.scanner.ScanRecord;
 import org.apache.fluss.client.table.scanner.log.LogScanner;
 import org.apache.fluss.client.table.scanner.log.ScanRecords;
 import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.client.table.writer.DeleteResult;
 import org.apache.fluss.client.table.writer.TableWriter;
+import org.apache.fluss.client.table.writer.UpsertResult;
 import org.apache.fluss.client.table.writer.UpsertWriter;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
@@ -1805,4 +1807,103 @@ class FlussTableITCase extends ClientToServerITCaseBase 
{
             scanner.close();
         }
     }
+
+    // ---------------------- Upsert/Delete Result with LogEndOffset tests 
----------------------
+
+    @Test
+    void testUpsertAndDeleteReturnLogEndOffset() throws Exception {
+        // Create a PK table with single bucket for predictable offset tracking
+        TablePath tablePath = TablePath.of("test_db_1", 
"test_upsert_delete_log_end_offset");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.STRING())
+                        .primaryKey("a")
+                        .build();
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder().schema(schema).distributedBy(1, 
"a").build();
+        createTable(tablePath, tableDescriptor, true);
+
+        try (Table table = conn.getTable(tablePath)) {
+            UpsertWriter upsertWriter = table.newUpsert().createWriter();
+            TableBucket expectedBucket = new 
TableBucket(table.getTableInfo().getTableId(), 0);
+
+            // First upsert - should return log end offset > 0
+            UpsertResult upsertResult1 = upsertWriter.upsert(row(1, 
"a")).get();
+            assertThat(upsertResult1.getBucket()).isEqualTo(expectedBucket);
+            assertThat(upsertResult1.getLogEndOffset()).isEqualTo(1);
+
+            // Second upsert - should return higher log end offset
+            UpsertResult upsertResult2 = upsertWriter.upsert(row(2, 
"b")).get();
+            assertThat(upsertResult2.getBucket()).isEqualTo(expectedBucket);
+            assertThat(upsertResult2.getLogEndOffset()).isEqualTo(2);
+
+            // Update existing key - should return higher log end offset
+            UpsertResult upsertResult3 = upsertWriter.upsert(row(1, 
"aa")).get();
+            assertThat(upsertResult3.getBucket()).isEqualTo(expectedBucket);
+            assertThat(upsertResult3.getLogEndOffset()).isEqualTo(4);
+
+            // Delete - should return higher log end offset
+            DeleteResult deleteResult = upsertWriter.delete(row(1, 
"aa")).get();
+            assertThat(deleteResult.getBucket()).isEqualTo(expectedBucket);
+            assertThat(deleteResult.getLogEndOffset()).isEqualTo(5);
+
+            // Verify the data via lookup
+            Lookuper lookuper = table.newLookup().createLookuper();
+            // key 1 should be deleted
+            assertThat(lookupRow(lookuper, row(1))).isNull();
+            // key 2 should exist
+            assertThat(lookupRow(lookuper, row(2))).isNotNull();
+        }
+    }
+
+    @Test
+    void testBatchedUpsertReturnsSameLogEndOffset() throws Exception {
+        // Test that multiple records in the same batch receive the same log 
end offset
+        TablePath tablePath = TablePath.of("test_db_1", 
"test_batched_upsert_log_end_offset");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.STRING())
+                        .primaryKey("a")
+                        .build();
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder().schema(schema).distributedBy(1, 
"a").build();
+        long tableId = createTable(tablePath, tableDescriptor, true);
+
+        // Configure small batch size to ensure records are batched together
+        Configuration config = new Configuration(clientConf);
+        config.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, new MemorySize(1024 
* 1024)); // 1MB
+        config.set(
+                ConfigOptions.CLIENT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET, 
1); // Force batching
+
+        try (Connection connection = 
ConnectionFactory.createConnection(config);
+                Table table = connection.getTable(tablePath)) {
+            UpsertWriter upsertWriter = table.newUpsert().createWriter();
+
+            // Send multiple upserts without waiting - they should be batched
+            CompletableFuture<UpsertResult> future1 = 
upsertWriter.upsert(row(1, "a"));
+            CompletableFuture<UpsertResult> future2 = 
upsertWriter.upsert(row(2, "b"));
+            CompletableFuture<UpsertResult> future3 = 
upsertWriter.upsert(row(3, "c"));
+
+            // Flush to send the batch
+            upsertWriter.flush();
+
+            // Get results
+            UpsertResult result1 = future1.get();
+            UpsertResult result2 = future2.get();
+            UpsertResult result3 = future3.get();
+
+            TableBucket expectedBucket = new TableBucket(tableId, 0);
+            // All results should have valid bucket and log end offset
+            assertThat(result1.getBucket()).isEqualTo(expectedBucket);
+            assertThat(result2.getBucket()).isEqualTo(expectedBucket);
+            assertThat(result3.getBucket()).isEqualTo(expectedBucket);
+            // Records in the same batch should have the same log end offset
+            // (since they're sent to the same bucket)
+            assertThat(result1.getLogEndOffset()).isEqualTo(3);
+            assertThat(result2.getLogEndOffset()).isEqualTo(3);
+            assertThat(result3.getLogEndOffset()).isEqualTo(3);
+        }
+    }
 }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java
index cb76d7688..afdef5616 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java
@@ -253,7 +253,7 @@ public class ArrowLogWriteBatchTest {
             CompletableFuture<Void> future = new CompletableFuture<>();
             arrowLogWriteBatch.tryAppend(
                     createWriteRecord(row(i, "a" + i)),
-                    exception -> {
+                    (bucket, offset, exception) -> {
                         if (exception != null) {
                             future.completeExceptionally(exception);
                         } else {
@@ -314,7 +314,7 @@ public class ArrowLogWriteBatchTest {
     }
 
     private WriteCallback newWriteCallback() {
-        return exception -> {
+        return (bucket, offset, exception) -> {
             if (exception != null) {
                 throw new RuntimeException(exception);
             }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java
index cbd6680a7..4f8b37919 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/CompactedLogWriteBatchTest.java
@@ -163,7 +163,7 @@ public class CompactedLogWriteBatchTest {
             CompletableFuture<Void> future = new CompletableFuture<>();
             logProducerBatch.tryAppend(
                     createWriteRecord(),
-                    exception -> {
+                    (bucket, offset, exception) -> {
                         if (exception != null) {
                             future.completeExceptionally(exception);
                         } else {
@@ -236,7 +236,7 @@ public class CompactedLogWriteBatchTest {
     }
 
     private WriteCallback newWriteCallback() {
-        return exception -> {
+        return (bucket, offset, exception) -> {
             if (exception != null) {
                 throw new RuntimeException(exception);
             }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
index aca1cde1b..683013b89 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
@@ -167,7 +167,7 @@ public class IndexedLogWriteBatchTest {
             CompletableFuture<Void> future = new CompletableFuture<>();
             logProducerBatch.tryAppend(
                     createWriteRecord(),
-                    exception -> {
+                    (bucket, offset, exception) -> {
                         if (exception != null) {
                             future.completeExceptionally(exception);
                         } else {
@@ -240,7 +240,7 @@ public class IndexedLogWriteBatchTest {
     }
 
     private WriteCallback newWriteCallback() {
-        return exception -> {
+        return (bucket, offset, exception) -> {
             if (exception != null) {
                 throw new RuntimeException(exception);
             }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
index 3fe428388..e2340d170 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
@@ -168,7 +168,7 @@ class KvWriteBatchTest {
             CompletableFuture<Void> future = new CompletableFuture<>();
             kvProducerBatch.tryAppend(
                     createWriteRecord(),
-                    exception -> {
+                    (bucket, offset, exception) -> {
                         if (exception != null) {
                             future.completeExceptionally(exception);
                         } else {
@@ -233,7 +233,7 @@ class KvWriteBatchTest {
     }
 
     private WriteCallback newWriteCallback() {
-        return exception -> {
+        return (bucket, offset, exception) -> {
             if (exception != null) {
                 throw new RuntimeException(exception);
             }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
index efebb3278..d3f772f38 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
@@ -121,7 +121,7 @@ class RecordAccumulatorTest {
                     DATA1_PHYSICAL_TABLE_PATH, DATA1_TABLE_ID, 3, node2.id(), 
serverNodes);
 
     private final WriteCallback writeCallback =
-            exception -> {
+            (bucket, offset, exception) -> {
                 if (exception != null) {
                     throw new RuntimeException(exception);
                 }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java 
b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
index b0f61c9ea..c32b3f34e 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
@@ -105,7 +105,7 @@ final class SenderTest {
     void testSimple() throws Exception {
         long offset = 0;
         CompletableFuture<Exception> future = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future::complete);
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future.complete(e));
         sender.runOnce();
         assertThat(sender.numOfInFlightBatches(tb1)).isEqualTo(1);
         finishRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1));
@@ -129,7 +129,7 @@ final class SenderTest {
                         0);
         // do a successful retry.
         CompletableFuture<Exception> future = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future::complete);
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future.complete(e));
         sender1.runOnce();
         assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(1);
         long offset = 0;
@@ -140,8 +140,8 @@ final class SenderTest {
         assertThat(future.get()).isNull();
 
         // do an unsuccessful retry.
-        future = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future::complete);
+        CompletableFuture<Exception> future2 = new CompletableFuture<>();
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future2.complete(e));
         sender1.runOnce();
         assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(1);
 
@@ -155,7 +155,7 @@ final class SenderTest {
         finishRequest(tb1, 0, createProduceLogResponse(tb1, 
Errors.REQUEST_TIME_OUT));
         sender1.runOnce();
         assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(0);
-        assertThat(future.get())
+        assertThat(future2.get())
                 .isInstanceOf(TimeoutException.class)
                 .hasMessageContaining(Errors.REQUEST_TIME_OUT.message());
     }
@@ -174,7 +174,7 @@ final class SenderTest {
     void testCanRetryWithoutIdempotence() throws Exception {
         // do a successful retry.
         CompletableFuture<Exception> future = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future::complete);
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future.complete(e));
         sender.runOnce();
         assertThat(sender.numOfInFlightBatches(tb1)).isEqualTo(1);
         assertThat(future.isDone()).isFalse();
@@ -204,14 +204,14 @@ final class SenderTest {
 
         // Send first ProduceLogRequest.
         CompletableFuture<Exception> future1 = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future1::complete);
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future1.complete(e));
         sender1.runOnce();
         assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(1);
         
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
 
         // Send second ProduceLogRequest.
         CompletableFuture<Exception> future2 = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future2::complete);
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future2.complete(e));
         sender1.runOnce();
         assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(2);
         
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
@@ -244,7 +244,7 @@ final class SenderTest {
 
         for (int i = 0; i < MAX_INFLIGHT_REQUEST_PER_BUCKET - 1; i++) {
             CompletableFuture<Exception> future = new CompletableFuture<>();
-            appendToAccumulator(tb1, row(1, "a"), future::complete);
+            appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future.complete(e));
             sender1.runOnce();
             assertThat(idempotenceManager.inflightBatchSize(tb1)).isEqualTo(i 
+ 1);
             assertThat(idempotenceManager.canSendMoreRequests(tb1)).isTrue();
@@ -252,7 +252,7 @@ final class SenderTest {
 
         // add one batch to make the inflight request size equal to max.
         CompletableFuture<Exception> future = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future::complete);
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future.complete(e));
         sender1.runOnce();
         assertThat(idempotenceManager.inflightBatchSize(tb1))
                 .isEqualTo(MAX_INFLIGHT_REQUEST_PER_BUCKET);
@@ -260,7 +260,7 @@ final class SenderTest {
 
         // add one more batch, it will not be drained from accumulator.
         CompletableFuture<Exception> future1 = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future1::complete);
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future1.complete(e));
         sender1.runOnce();
         assertThat(idempotenceManager.inflightBatchSize(tb1))
                 .isEqualTo(MAX_INFLIGHT_REQUEST_PER_BUCKET);
@@ -288,7 +288,7 @@ final class SenderTest {
         // 1. send five batches first to full MAX_INFLIGHT_REQUEST_PER_BUCKET.
         for (int i = 0; i < MAX_INFLIGHT_REQUEST_PER_BUCKET; i++) {
             CompletableFuture<Exception> future = new CompletableFuture<>();
-            appendToAccumulator(tb1, row(1, "a"), future::complete);
+            appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future.complete(e));
             assertThat(idempotenceManager.canSendMoreRequests(tb1)).isTrue();
             sender.runOnce(); // runOnce to send request.
         }
@@ -298,7 +298,7 @@ final class SenderTest {
         // 2. try to append more data into accumulator, it will not be drained 
from accumulator.
         for (int i = 0; i < 1000; i++) {
             CompletableFuture<Exception> future = new CompletableFuture<>();
-            appendToAccumulator(tb1, row(1, "a"), future::complete);
+            appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future.complete(e));
         }
         // No batches can be drained from accumulator as the inflight request 
size is max in
         // IdempotenceManager.
@@ -326,7 +326,7 @@ final class SenderTest {
             // empty.
             for (int j = 0; j < 50; j++) {
                 CompletableFuture<Exception> future = new 
CompletableFuture<>();
-                appendToAccumulator(tb1, row(1, "a"), future::complete);
+                appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future.complete(e));
             }
 
             // already have five batches in idempotenceManager inflight 
batches.
@@ -359,19 +359,19 @@ final class SenderTest {
 
         // Send first ProduceLogRequest.
         CompletableFuture<Exception> future1 = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future1::complete);
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future1.complete(e));
         sender1.runOnce();
         assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(1);
         
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
 
         // Send second ProduceLogRequest.
         CompletableFuture<Exception> future2 = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future2::complete);
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future2.complete(e));
         sender1.runOnce();
 
         // Send third ProduceLogRequest.
         CompletableFuture<Exception> future3 = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future3::complete);
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future3.complete(e));
         sender1.runOnce();
 
         // finish batch one with retriable error.
@@ -381,7 +381,7 @@ final class SenderTest {
 
         // Queue the forth request, it shouldn't sent until the first 3 
complete.
         CompletableFuture<Exception> future4 = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future4::complete);
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future4.complete(e));
         
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
 
         finishIdempotentProduceLogRequest(
@@ -445,11 +445,11 @@ final class SenderTest {
 
         // Send the first ProduceLogRequest.
         CompletableFuture<Exception> future1 = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future1::complete);
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future1.complete(e));
         sender1.runOnce();
         // Send the second ProduceLogRequest.
         CompletableFuture<Exception> future2 = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future2::complete);
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future2.complete(e));
         sender1.runOnce();
 
         // response 0 with retrievable error which will reEnqueue the batch.
@@ -495,14 +495,14 @@ final class SenderTest {
 
         // Send first ProduceLogRequest.
         CompletableFuture<Exception> future1 = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future1::complete);
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future1.complete(e));
         sender1.runOnce();
         assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(1);
         
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
 
         // Send second ProduceLogRequest.
         CompletableFuture<Exception> future2 = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future2::complete);
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future2.complete(e));
         sender1.runOnce();
         assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(2);
         
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
@@ -564,14 +564,14 @@ final class SenderTest {
 
         // Send first ProduceLogRequest.
         CompletableFuture<Exception> future1 = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future1::complete);
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future1.complete(e));
         sender1.runOnce();
         assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(1);
         
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
 
         // Send second ProduceLogRequest.
         CompletableFuture<Exception> future2 = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future2::complete);
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future2.complete(e));
         sender1.runOnce();
         assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(2);
         
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
@@ -619,14 +619,14 @@ final class SenderTest {
 
         // first finish second ProduceLogRequest with success.
         CompletableFuture<Exception> future1 = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future1::complete);
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future1.complete(e));
         sender1.runOnce();
         assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(1);
         
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
 
         // Send second ProduceLogRequest.
         CompletableFuture<Exception> future2 = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future2::complete);
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future2.complete(e));
         sender1.runOnce();
         assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(2);
         
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
@@ -657,7 +657,7 @@ final class SenderTest {
         assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(0);
 
         CompletableFuture<Exception> future1 = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future1::complete);
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future1.complete(e));
         sender1.runOnce();
         finishIdempotentProduceLogRequest(0, tb1, 0, 
createProduceLogResponse(tb1, 0L, 1L));
         sender1.runOnce();
@@ -671,7 +671,7 @@ final class SenderTest {
     void testSendWhenDestinationIsNullInMetadata() throws Exception {
         long offset = 0;
         CompletableFuture<Exception> future = new CompletableFuture<>();
-        appendToAccumulator(tb1, row(1, "a"), future::complete);
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future.complete(e));
 
         int leaderNode = metadataUpdater.leaderFor(DATA1_TABLE_PATH, tb1);
         // now, remove leader node ,so that send destination
@@ -730,7 +730,7 @@ final class SenderTest {
                         key,
                         WriteFormat.COMPACTED_KV,
                         null),
-                future::complete,
+                (tb, leo, e) -> future.complete(e),
                 metadataUpdater.getCluster(),
                 0,
                 false);
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto 
b/fluss-rpc/src/main/proto/FlussApi.proto
index d800549d8..dc05efd83 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -787,6 +787,9 @@ message PbPutKvRespForBucket {
   required int32 bucket_id = 2;
   optional int32 error_code = 3;
   optional string error_message = 4;
+  // the log end offset (LEO) of the changelog after this write
+  // this is the offset of the next record to be written, used for 
exactly-once semantics
+  optional int64 log_end_offset = 5;
 }
 
 message PbLookupReqForBucket {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
index a7bed51b8..4e4e59908 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
@@ -1026,6 +1026,13 @@ public class ServerRpcMessageUtils {
 
             if (bucketResult.failed()) {
                 putKvBucket.setError(bucketResult.getErrorCode(), 
bucketResult.getErrorMessage());
+            } else {
+                // set log end offset for successful writes
+                // this is used for exactly-once semantics to track checkpoint 
offsets
+                long logEndOffset = bucketResult.getWriteLogEndOffset();
+                if (logEndOffset >= 0) {
+                    putKvBucket.setLogEndOffset(logEndOffset);
+                }
             }
             putKvRespForBucketList.add(putKvBucket);
         }


Reply via email to