Copilot commented on code in PR #2042:
URL: https://github.com/apache/fluss/pull/2042#discussion_r2569751569
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java:
##########
@@ -73,6 +74,9 @@ public class FlussSinkBuilder<InputT> {
private final Map<String, String> configOptions = new HashMap<>();
private FlussSerializationSchema<InputT> serializationSchema;
private boolean shuffleByBucketId = true;
+ // Optional list of columns for partial update. When set, upsert will only
update these columns.
+ // The primary key columns must be fully specified in this list.
Review Comment:
The comment should clarify what happens to non-specified columns during a
partial update (they retain their existing values). Consider adding:
'Non-specified columns will retain their existing values in the table.'
```suggestion
// The primary key columns must be fully specified in this list.
// Non-specified columns will retain their existing values in the table.
```
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java:
##########
@@ -310,6 +310,346 @@ public void testOrdersTablePKSink() throws Exception {
logScanner.close();
}
+ @Test
+ public void testPartialUpdateWithTwoWriters() throws Exception {
+ createTable(TablePath.of(DEFAULT_DB,
"partial_update_two_writers_test"), pkTableDescriptor);
+
+ // Initial inserts
+ ArrayList<TestOrder> initialOrders = new ArrayList<>();
+ initialOrders.add(new TestOrder(2001, 3001, -1, null, RowKind.INSERT));
+ initialOrders.add(new TestOrder(2002, 3002, -1, null, RowKind.INSERT));
+ initialOrders.add(new TestOrder(2003, 3003, -1, null, RowKind.INSERT));
+
+ DataStream<TestOrder> initialStream = env.fromData(initialOrders);
Review Comment:
[nitpick] The variable name 'initialStream' is inconsistent with naming used
elsewhere in the test. Consider renaming to 'insertStream' to match the pattern
used in other test methods (e.g., lines 450, 468-469).
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java:
##########
@@ -310,6 +310,346 @@ public void testOrdersTablePKSink() throws Exception {
logScanner.close();
}
+ @Test
+ public void testPartialUpdateWithTwoWriters() throws Exception {
+ createTable(TablePath.of(DEFAULT_DB,
"partial_update_two_writers_test"), pkTableDescriptor);
+
+ // Initial inserts
+ ArrayList<TestOrder> initialOrders = new ArrayList<>();
+ initialOrders.add(new TestOrder(2001, 3001, -1, null, RowKind.INSERT));
+ initialOrders.add(new TestOrder(2002, 3002, -1, null, RowKind.INSERT));
+ initialOrders.add(new TestOrder(2003, 3003, -1, null, RowKind.INSERT));
+
+ DataStream<TestOrder> initialStream = env.fromData(initialOrders);
+
+ FlinkSink<TestOrder> initialSink =
+ FlussSink.<TestOrder>builder()
+ .setBootstrapServers(bootstrapServers)
+ .setDatabase(DEFAULT_DB)
+ .setTable("partial_update_two_writers_test")
+ .setPartialUpdateColumns("orderId", "itemId")
+ .setSerializationSchema(new
TestOrderSerializationSchema())
+ .build();
+
+ initialStream.sinkTo(initialSink).name("Fluss Initial Data Sink");
+ env.execute("First Stream Updates");
+
+ ArrayList<TestOrder> itemIdUpdates = new ArrayList<>();
+ itemIdUpdates.add(new TestOrder(2001, -1, 100, "addr1",
RowKind.UPDATE_AFTER));
+ itemIdUpdates.add(new TestOrder(2003, -1, 300, "addr3",
RowKind.UPDATE_AFTER));
+
+ DataStream<TestOrder> updateStream = env.fromData(itemIdUpdates);
Review Comment:
[nitpick] The variable name 'itemIdUpdates' is misleading as these updates
actually modify 'amount' and 'address' fields, not 'itemId'. Consider renaming
to 'amountAddressUpdates' for clarity.
```suggestion
ArrayList<TestOrder> amountAddressUpdates = new ArrayList<>();
amountAddressUpdates.add(new TestOrder(2001, -1, 100, "addr1",
RowKind.UPDATE_AFTER));
amountAddressUpdates.add(new TestOrder(2003, -1, 300, "addr3",
RowKind.UPDATE_AFTER));
DataStream<TestOrder> updateStream =
env.fromData(amountAddressUpdates);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]