Copilot commented on code in PR #2042:
URL: https://github.com/apache/fluss/pull/2042#discussion_r2569751569


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java:
##########
@@ -73,6 +74,9 @@ public class FlussSinkBuilder<InputT> {
     private final Map<String, String> configOptions = new HashMap<>();
     private FlussSerializationSchema<InputT> serializationSchema;
     private boolean shuffleByBucketId = true;
+    // Optional list of columns for partial update. When set, upsert will only 
update these columns.
+    // The primary key columns must be fully specified in this list.

Review Comment:
   The comment should clarify what happens to non-specified columns during a 
partial update (they retain their existing values). Consider adding: 
'Non-specified columns will retain their existing values in the table.'
   ```suggestion
       // The primary key columns must be fully specified in this list.
       // Non-specified columns will retain their existing values in the table.
   ```



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java:
##########
@@ -310,6 +310,346 @@ public void testOrdersTablePKSink() throws Exception {
         logScanner.close();
     }
 
+    @Test
+    public void testPartialUpdateWithTwoWriters() throws Exception {
+        createTable(TablePath.of(DEFAULT_DB, 
"partial_update_two_writers_test"), pkTableDescriptor);
+
+        // Initial inserts
+        ArrayList<TestOrder> initialOrders = new ArrayList<>();
+        initialOrders.add(new TestOrder(2001, 3001, -1, null, RowKind.INSERT));
+        initialOrders.add(new TestOrder(2002, 3002, -1, null, RowKind.INSERT));
+        initialOrders.add(new TestOrder(2003, 3003, -1, null, RowKind.INSERT));
+
+        DataStream<TestOrder> initialStream = env.fromData(initialOrders);

Review Comment:
   [nitpick] The variable name 'initialStream' is inconsistent with naming used 
elsewhere in the test. Consider renaming to 'insertStream' to match the pattern 
used in other test methods (e.g., lines 450, 468-469).



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java:
##########
@@ -310,6 +310,346 @@ public void testOrdersTablePKSink() throws Exception {
         logScanner.close();
     }
 
+    @Test
+    public void testPartialUpdateWithTwoWriters() throws Exception {
+        createTable(TablePath.of(DEFAULT_DB, 
"partial_update_two_writers_test"), pkTableDescriptor);
+
+        // Initial inserts
+        ArrayList<TestOrder> initialOrders = new ArrayList<>();
+        initialOrders.add(new TestOrder(2001, 3001, -1, null, RowKind.INSERT));
+        initialOrders.add(new TestOrder(2002, 3002, -1, null, RowKind.INSERT));
+        initialOrders.add(new TestOrder(2003, 3003, -1, null, RowKind.INSERT));
+
+        DataStream<TestOrder> initialStream = env.fromData(initialOrders);
+
+        FlinkSink<TestOrder> initialSink =
+                FlussSink.<TestOrder>builder()
+                        .setBootstrapServers(bootstrapServers)
+                        .setDatabase(DEFAULT_DB)
+                        .setTable("partial_update_two_writers_test")
+                        .setPartialUpdateColumns("orderId", "itemId")
+                        .setSerializationSchema(new 
TestOrderSerializationSchema())
+                        .build();
+
+        initialStream.sinkTo(initialSink).name("Fluss Initial Data Sink");
+        env.execute("First Stream Updates");
+
+        ArrayList<TestOrder> itemIdUpdates = new ArrayList<>();
+        itemIdUpdates.add(new TestOrder(2001, -1, 100, "addr1", 
RowKind.UPDATE_AFTER));
+        itemIdUpdates.add(new TestOrder(2003, -1, 300, "addr3", 
RowKind.UPDATE_AFTER));
+
+        DataStream<TestOrder> updateStream = env.fromData(itemIdUpdates);

Review Comment:
   [nitpick] The variable name 'itemIdUpdates' is misleading as these updates 
actually modify 'amount' and 'address' fields, not 'itemId'. Consider renaming 
to 'amountAddressUpdates' for clarity.
   ```suggestion
           ArrayList<TestOrder> amountAddressUpdates = new ArrayList<>();
           amountAddressUpdates.add(new TestOrder(2001, -1, 100, "addr1", 
RowKind.UPDATE_AFTER));
           amountAddressUpdates.add(new TestOrder(2003, -1, 300, "addr3", 
RowKind.UPDATE_AFTER));
   
           DataStream<TestOrder> updateStream = 
env.fromData(amountAddressUpdates);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to