wuchong commented on code in PR #2530:
URL: https://github.com/apache/fluss/pull/2530#discussion_r2751139359
##########
fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java:
##########
@@ -1805,4 +1807,108 @@ void testPkCompactedPollFromLatestNoRecords() throws
Exception {
scanner.close();
}
}
+
+ // ---------------------- Upsert/Delete Result with LogEndOffset tests
----------------------
+
+ @Test
+ void testUpsertAndDeleteReturnLogEndOffset() throws Exception {
+ // Create a PK table with single bucket for predictable offset tracking
+ TablePath tablePath = TablePath.of("test_db_1",
"test_upsert_delete_log_end_offset");
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.STRING())
+ .primaryKey("a")
+ .build();
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder().schema(schema).distributedBy(1,
"a").build();
+ createTable(tablePath, tableDescriptor, true);
+
+ try (Table table = conn.getTable(tablePath)) {
+ UpsertWriter upsertWriter = table.newUpsert().createWriter();
+ TableBucket expectedBucket = new
TableBucket(table.getTableInfo().getTableId(), 0);
+
+ // First upsert - should return log end offset > 0
+ UpsertResult upsertResult1 = upsertWriter.upsert(row(1,
"a")).get();
+ assertThat(upsertResult1.getBucket()).isEqualTo(expectedBucket);
+ assertThat(upsertResult1.getLogEndOffset()).isGreaterThan(0);
Review Comment:
Since there is only one bucket, I think we can assert the log end offset
with specific value. This ensures the correctness of the end offset.
##########
fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java:
##########
@@ -1805,4 +1807,108 @@ void testPkCompactedPollFromLatestNoRecords() throws
Exception {
scanner.close();
}
}
+
+ // ---------------------- Upsert/Delete Result with LogEndOffset tests
----------------------
+
+ @Test
+ void testUpsertAndDeleteReturnLogEndOffset() throws Exception {
+ // Create a PK table with single bucket for predictable offset tracking
+ TablePath tablePath = TablePath.of("test_db_1",
"test_upsert_delete_log_end_offset");
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.STRING())
+ .primaryKey("a")
+ .build();
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder().schema(schema).distributedBy(1,
"a").build();
+ createTable(tablePath, tableDescriptor, true);
+
+ try (Table table = conn.getTable(tablePath)) {
+ UpsertWriter upsertWriter = table.newUpsert().createWriter();
+ TableBucket expectedBucket = new
TableBucket(table.getTableInfo().getTableId(), 0);
+
+ // First upsert - should return log end offset > 0
+ UpsertResult upsertResult1 = upsertWriter.upsert(row(1,
"a")).get();
+ assertThat(upsertResult1.getBucket()).isEqualTo(expectedBucket);
+ assertThat(upsertResult1.getLogEndOffset()).isGreaterThan(0);
+ long firstOffset = upsertResult1.getLogEndOffset();
+
+ // Second upsert - should return higher log end offset
+ UpsertResult upsertResult2 = upsertWriter.upsert(row(2,
"b")).get();
+ assertThat(upsertResult2.getBucket()).isEqualTo(expectedBucket);
+
assertThat(upsertResult2.getLogEndOffset()).isGreaterThan(firstOffset);
+ long secondOffset = upsertResult2.getLogEndOffset();
+
+ // Update existing key - should return higher log end offset
+ UpsertResult upsertResult3 = upsertWriter.upsert(row(1,
"aa")).get();
+ assertThat(upsertResult3.getBucket()).isEqualTo(expectedBucket);
+
assertThat(upsertResult3.getLogEndOffset()).isGreaterThan(secondOffset);
+ long thirdOffset = upsertResult3.getLogEndOffset();
+
+ // Delete - should return higher log end offset
+ DeleteResult deleteResult = upsertWriter.delete(row(1,
"aa")).get();
+ assertThat(deleteResult.getBucket()).isEqualTo(expectedBucket);
+
assertThat(deleteResult.getLogEndOffset()).isGreaterThan(thirdOffset);
+
+ // Verify the data via lookup
+ Lookuper lookuper = table.newLookup().createLookuper();
+ // key 1 should be deleted
+ assertThat(lookupRow(lookuper, row(1))).isNull();
+ // key 2 should exist
+ assertThat(lookupRow(lookuper, row(2))).isNotNull();
+ }
+ }
+
+ @Test
+ void testBatchedUpsertReturnsSameLogEndOffset() throws Exception {
+ // Test that multiple records in the same batch receive the same log
end offset
+ TablePath tablePath = TablePath.of("test_db_1",
"test_batched_upsert_log_end_offset");
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.STRING())
+ .primaryKey("a")
+ .build();
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder().schema(schema).distributedBy(1,
"a").build();
+ createTable(tablePath, tableDescriptor, true);
+
+ // Configure small batch size to ensure records are batched together
+ Configuration config = new Configuration(clientConf);
+ config.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, new MemorySize(1024
* 1024)); // 1MB
+ config.set(
+ ConfigOptions.CLIENT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET,
1); // Force batching
+
+ try (Connection connection =
ConnectionFactory.createConnection(config);
+ Table table = connection.getTable(tablePath)) {
+ UpsertWriter upsertWriter = table.newUpsert().createWriter();
+
+ // Send multiple upserts without waiting - they should be batched
+ CompletableFuture<UpsertResult> future1 =
upsertWriter.upsert(row(1, "a"));
+ CompletableFuture<UpsertResult> future2 =
upsertWriter.upsert(row(2, "b"));
+ CompletableFuture<UpsertResult> future3 =
upsertWriter.upsert(row(3, "c"));
+
+ // Flush to send the batch
+ upsertWriter.flush();
+
+ // Get results
+ UpsertResult result1 = future1.get();
+ UpsertResult result2 = future2.get();
+ UpsertResult result3 = future3.get();
+
+ // All results should have valid bucket and log end offset
+ assertThat(result1.getBucket()).isNotNull();
+ assertThat(result1.getLogEndOffset()).isGreaterThan(0);
+ assertThat(result2.getBucket()).isNotNull();
+ assertThat(result2.getLogEndOffset()).isGreaterThan(0);
+ assertThat(result3.getBucket()).isNotNull();
+ assertThat(result3.getLogEndOffset()).isGreaterThan(0);
Review Comment:
ditto
##########
fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java:
##########
@@ -176,11 +183,18 @@ public long getQueueTimeMs() {
return drainedMs - createdMs;
}
- /** Complete the batch successfully. */
+ /** Complete the batch successfully (for log batches without offset
tracking). */
public boolean complete() {
return done(null);
}
+ /** Complete the batch successfully with bucket and log end offset info
(for KV batches). */
+ public boolean complete(TableBucket bucket, long logEndOffset) {
+ this.resultBucket = bucket;
+ this.resultLogEndOffset = logEndOffset;
+ return done(null);
Review Comment:
I think we can make `bucket ` and `logEndOffset` being parameters of
`done(..)` and `completeFutureAndFireCallbacks(..)`, just like how we handle
the `exception` parameter. This avoid member variables `resultBucket` and
`resultLogEndOffset` which looks like having concurrency issues.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]