This is an automated email from the ASF dual-hosted git repository.

mehulbatra pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new c9370aec6 [Flink] Support Partial Updates to the Flink Sink (#2042)
c9370aec6 is described below

commit c9370aec6035caae43e3f8e5fcc5baa28f7ec627
Author: Giannis Polyzos <[email protected]>
AuthorDate: Thu Jan 15 10:58:56 2026 +0200

    [Flink] Support Partial Updates to the Flink Sink (#2042)
    
    * add partial updates to the datastraem api
    
    * refactor builder
    
    * fix checkstyle violations
    
    * update setter
    
    * fix checkstyle
    
    * update setter
---
 .../apache/fluss/flink/sink/FlussSinkBuilder.java  |  73 ++++-
 .../fluss/flink/sink/FlussSinkBuilderTest.java     |  46 ++-
 .../apache/fluss/flink/sink/FlussSinkITCase.java   | 340 +++++++++++++++++++++
 3 files changed, 457 insertions(+), 2 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java
index 489eefedb..2fea5f86a 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -73,6 +74,9 @@ public class FlussSinkBuilder<InputT> {
     private final Map<String, String> configOptions = new HashMap<>();
     private FlussSerializationSchema<InputT> serializationSchema;
     private boolean shuffleByBucketId = true;
+    // Optional list of columns for partial update. When set, upsert will only 
update these columns.
+    // The primary key columns must be fully specified in this list.
+    private List<String> partialUpdateColumns;
 
     /** Set the bootstrap server for the sink. */
     public FlussSinkBuilder<InputT> setBootstrapServers(String 
bootstrapServers) {
@@ -98,6 +102,24 @@ public class FlussSinkBuilder<InputT> {
         return this;
     }
 
+    /**
+     * Enable partial update by specifying the column names to update for 
upsert tables. Primary key
+     * columns must be included in this list.
+     */
+    public FlussSinkBuilder<InputT> setPartialUpdateColumns(List<String> 
columns) {
+        this.partialUpdateColumns = columns;
+        return this;
+    }
+
+    /**
+     * Enable partial update by specifying the column names to update for 
upsert tables. Convenience
+     * varargs overload.
+     */
+    public FlussSinkBuilder<InputT> setPartialUpdateColumns(String... columns) 
{
+        this.partialUpdateColumns = Arrays.asList(columns);
+        return this;
+    }
+
     /** Set a configuration option. */
     public FlussSinkBuilder<InputT> setOption(String key, String value) {
         configOptions.put(key, value);
@@ -153,12 +175,17 @@ public class FlussSinkBuilder<InputT> {
 
         if (isUpsert) {
             LOG.info("Initializing Fluss upsert sink writer ...");
+            int[] targetColumnIndexes =
+                    computeTargetColumnIndexes(
+                            tableRowType.getFieldNames(),
+                            tableInfo.getPrimaryKeys(),
+                            partialUpdateColumns);
             writerBuilder =
                     new FlinkSink.UpsertSinkWriterBuilder<>(
                             tablePath,
                             flussConfig,
                             tableRowType,
-                            null, // not support partialUpdateColumns yet
+                            targetColumnIndexes,
                             numBucket,
                             bucketKeys,
                             partitionKeys,
@@ -193,4 +220,48 @@ public class FlussSinkBuilder<InputT> {
         checkNotNull(tableName, "Table name is required but not provided.");
         checkArgument(!tableName.isEmpty(), "Table name cannot be empty.");
     }
+
+    // -------------- Test-visible helper methods --------------
+    /**
+     * Computes target column indexes for partial updates. If {@code 
specifiedColumns} is null or
+     * empty, returns null indicating full update. Validates that all primary 
key columns are
+     * included in the specified columns.
+     *
+     * @param allFieldNames the list of all field names in table row type order
+     * @param primaryKeyNames the list of primary key column names
+     * @param specifiedColumns the optional list of columns specified for 
partial update
+     * @return the indexes into {@code allFieldNames} corresponding to {@code 
specifiedColumns}, or
+     *     null for full update
+     * @throws IllegalArgumentException if a specified column does not exist 
or primary key coverage
+     *     is incomplete
+     */
+    static int[] computeTargetColumnIndexes(
+            List<String> allFieldNames,
+            List<String> primaryKeyNames,
+            List<String> specifiedColumns) {
+        if (specifiedColumns == null || specifiedColumns.isEmpty()) {
+            return null; // full update
+        }
+
+        // Map specified column names to indexes
+        int[] indexes = new int[specifiedColumns.size()];
+        for (int i = 0; i < specifiedColumns.size(); i++) {
+            String col = specifiedColumns.get(i);
+            int idx = allFieldNames.indexOf(col);
+            checkArgument(
+                    idx >= 0, "Column '%s' not found in table schema: %s", 
col, allFieldNames);
+            indexes[i] = idx;
+        }
+
+        // Validate that all primary key columns are covered
+        for (String pk : primaryKeyNames) {
+            checkArgument(
+                    specifiedColumns.contains(pk),
+                    "Partial updates must include all primary key columns. 
Missing primary key column: %s. Provided columns: %s",
+                    pk,
+                    specifiedColumns);
+        }
+
+        return indexes;
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkBuilderTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkBuilderTest.java
index d70dd16cf..283612eb7 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkBuilderTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkBuilderTest.java
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.lang.reflect.Field;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -171,12 +172,55 @@ class FlussSinkBuilderTest {
                         .setTable(tableName)
                         .setOption("key1", "value1")
                         .setOptions(new HashMap<>())
-                        .setShuffleByBucketId(false);
+                        .setShuffleByBucketId(false)
+                        .setPartialUpdateColumns("id", "price");
 
         // Verify the builder instance is returned
         assertThat(chainedBuilder).isInstanceOf(FlussSinkBuilder.class);
     }
 
+    @Test
+    void testComputeTargetColumnIndexesFullUpdate() {
+        int[] result =
+                FlussSinkBuilder.computeTargetColumnIndexes(
+                        Arrays.asList("id", "name", "price"), 
Arrays.asList("id"), null);
+        assertThat(result).isNull();
+    }
+
+    @Test
+    void testComputeTargetColumnIndexesValidPartialIncludesPk() {
+        int[] result =
+                FlussSinkBuilder.computeTargetColumnIndexes(
+                        Arrays.asList("id", "name", "price", "ts"),
+                        Arrays.asList("id"),
+                        Arrays.asList("id", "price"));
+        assertThat(result).containsExactly(0, 2);
+    }
+
+    @Test
+    void testComputeTargetColumnIndexesMissingPkThrows() {
+        assertThatThrownBy(
+                        () ->
+                                FlussSinkBuilder.computeTargetColumnIndexes(
+                                        Arrays.asList("id", "name", "price"),
+                                        Arrays.asList("id"),
+                                        Arrays.asList("name", "price")))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Partial updates must include all 
primary key columns");
+    }
+
+    @Test
+    void testComputeTargetColumnIndexesUnknownColumnThrows() {
+        assertThatThrownBy(
+                        () ->
+                                FlussSinkBuilder.computeTargetColumnIndexes(
+                                        Arrays.asList("id", "name"),
+                                        Arrays.asList("id"),
+                                        Arrays.asList("id", "unknown")))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("not found in table schema");
+    }
+
     // Helper method to get private field values using reflection
     @SuppressWarnings("unchecked")
     private <T> T getFieldValue(Object object, String fieldName) throws 
Exception {
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java
index b2d4e4708..1a5e98eeb 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java
@@ -310,6 +310,346 @@ public class FlussSinkITCase extends FlinkTestBase {
         logScanner.close();
     }
 
+    @Test
+    public void testPartialUpdateWithTwoWriters() throws Exception {
+        createTable(TablePath.of(DEFAULT_DB, 
"partial_update_two_writers_test"), pkTableDescriptor);
+
+        // Initial inserts
+        ArrayList<TestOrder> initialOrders = new ArrayList<>();
+        initialOrders.add(new TestOrder(2001, 3001, -1, null, RowKind.INSERT));
+        initialOrders.add(new TestOrder(2002, 3002, -1, null, RowKind.INSERT));
+        initialOrders.add(new TestOrder(2003, 3003, -1, null, RowKind.INSERT));
+
+        DataStream<TestOrder> initialStream = env.fromData(initialOrders);
+
+        FlinkSink<TestOrder> initialSink =
+                FlussSink.<TestOrder>builder()
+                        .setBootstrapServers(bootstrapServers)
+                        .setDatabase(DEFAULT_DB)
+                        .setTable("partial_update_two_writers_test")
+                        .setPartialUpdateColumns("orderId", "itemId")
+                        .setSerializationSchema(new 
TestOrderSerializationSchema())
+                        .build();
+
+        initialStream.sinkTo(initialSink).name("Fluss Initial Data Sink");
+        env.execute("First Stream Updates");
+
+        ArrayList<TestOrder> itemIdUpdates = new ArrayList<>();
+        itemIdUpdates.add(new TestOrder(2001, -1, 100, "addr1", 
RowKind.UPDATE_AFTER));
+        itemIdUpdates.add(new TestOrder(2003, -1, 300, "addr3", 
RowKind.UPDATE_AFTER));
+
+        DataStream<TestOrder> updateStream = env.fromData(itemIdUpdates);
+
+        FlinkSink<TestOrder> updateSink =
+                FlussSink.<TestOrder>builder()
+                        .setBootstrapServers(bootstrapServers)
+                        .setDatabase(DEFAULT_DB)
+                        .setTable("partial_update_two_writers_test")
+                        .setPartialUpdateColumns("orderId", "amount", 
"address")
+                        .setSerializationSchema(new 
TestOrderSerializationSchema())
+                        .build();
+
+        updateStream.sinkTo(updateSink).name("Fluss Amount/Address Update 
Sink");
+        env.execute("Test Amount/Address Updates");
+
+        Table table = conn.getTable(new TablePath(DEFAULT_DB, 
"partial_update_two_writers_test"));
+        LogScanner logScanner = table.newScan().createLogScanner();
+
+        int numBuckets = table.getTableInfo().getNumBuckets();
+        for (int i = 0; i < numBuckets; i++) {
+            logScanner.subscribeFromBeginning(i);
+        }
+
+        // Build expected change log: 3 inserts, then before/after for 2001 
and 2003
+        List<TestOrder> expected = new ArrayList<>();
+        expected.add(new TestOrder(2001, 3001, -1, null, RowKind.INSERT));
+        expected.add(new TestOrder(2002, 3002, -1, null, RowKind.INSERT));
+        expected.add(new TestOrder(2003, 3003, -1, null, RowKind.INSERT));
+        // update for 2001: before and after (itemId stays, amount/address 
updated)
+        expected.add(new TestOrder(2001, 3001, -1, null, 
RowKind.UPDATE_BEFORE));
+        expected.add(new TestOrder(2001, 3001, 100, "addr1", 
RowKind.UPDATE_AFTER));
+        // update for 2003
+        expected.add(new TestOrder(2003, 3003, -1, null, 
RowKind.UPDATE_BEFORE));
+        expected.add(new TestOrder(2003, 3003, 300, "addr3", 
RowKind.UPDATE_AFTER));
+
+        // Poll actual changelog until we have all expected records or timeout
+        List<TestOrder> actual = new ArrayList<>();
+        int idlePolls = 0;
+        int maxIdlePolls = 60; // ~60s max wait
+        while (actual.size() < expected.size() && idlePolls < maxIdlePolls) {
+            int sizeBefore = actual.size();
+            ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
+            for (TableBucket bucket : scanRecords.buckets()) {
+                for (ScanRecord record : scanRecords.records(bucket)) {
+                    InternalRow row = record.getRow();
+                    String address = row.getString(3) != null ? 
row.getString(3).toString() : null;
+                    TestOrder order =
+                            new TestOrder(
+                                    row.getLong(0),
+                                    row.getLong(1),
+                                    row.getInt(2),
+                                    address,
+                                    toFlinkRowKind(record.getChangeType()));
+                    actual.add(order);
+                }
+            }
+            idlePolls = (actual.size() == sizeBefore) ? idlePolls + 1 : 0;
+        }
+
+        assertThat(actual.size()).isEqualTo(expected.size());
+        assertThat(actual).containsAll(expected);
+
+        logScanner.close();
+    }
+
+    @Test
+    public void testPartialUpdateWithTwoWritersWithRD() throws Exception {
+        // Create PK table
+        createTable(TablePath.of(DEFAULT_DB, pkTableName), pkTableDescriptor);
+
+        // Helper converter for building RowData
+        FlussRowToFlinkRowConverter converter =
+                new FlussRowToFlinkRowConverter(pkSchema.getRowType());
+
+        // Initial inserts
+        List<RowData> inserts = new ArrayList<>();
+        RowData i1 = converter.toFlinkRowData(row(101L, 1001L, 10, "a1"));
+        i1.setRowKind(RowKind.INSERT);
+        RowData i2 = converter.toFlinkRowData(row(102L, 1002L, 20, "a2"));
+        i2.setRowKind(RowKind.INSERT);
+        RowData i3 = converter.toFlinkRowData(row(103L, 1003L, 30, "a3"));
+        i3.setRowKind(RowKind.INSERT);
+        RowData i4 = converter.toFlinkRowData(row(104L, 1004L, 40, "a4"));
+        i4.setRowKind(RowKind.INSERT);
+        inserts.add(i1);
+        inserts.add(i2);
+        inserts.add(i3);
+        inserts.add(i4);
+
+        // Writer 1: updates only address for keys 101, 102 (partial columns: 
orderId, address)
+        List<RowData> updatesWriter1 = new ArrayList<>();
+        // Set other columns to null to emphasize partial update behavior
+        RowData u1a = converter.toFlinkRowData(row(101L, null, null, 
"a1_new"));
+        u1a.setRowKind(RowKind.UPDATE_AFTER);
+        RowData u2a = converter.toFlinkRowData(row(102L, null, null, 
"a2_new"));
+        u2a.setRowKind(RowKind.UPDATE_AFTER);
+        updatesWriter1.add(u1a);
+        updatesWriter1.add(u2a);
+
+        // Writer 2: updates only amount for keys 103, 104 (partial columns: 
orderId, amount)
+        List<RowData> updatesWriter2 = new ArrayList<>();
+        RowData u3b = converter.toFlinkRowData(row(103L, null, 31, null));
+        u3b.setRowKind(RowKind.UPDATE_AFTER);
+        RowData u4b = converter.toFlinkRowData(row(104L, null, 41, null));
+        u4b.setRowKind(RowKind.UPDATE_AFTER);
+        updatesWriter2.add(u3b);
+        updatesWriter2.add(u4b);
+
+        // Execute initial inserts in a dedicated, synchronous Flink job to 
ensure they land first
+        StreamExecutionEnvironment insertEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<RowData> insertStream = insertEnv.fromData(inserts);
+        RowDataSerializationSchema insertSerializationSchema =
+                new RowDataSerializationSchema(false, true);
+        FlinkSink<RowData> insertSink =
+                FlussSink.<RowData>builder()
+                        .setBootstrapServers(bootstrapServers)
+                        .setDatabase(DEFAULT_DB)
+                        .setTable(pkTableName)
+                        .setSerializationSchema(insertSerializationSchema)
+                        .build();
+
+        insertStream.sinkTo(insertSink).name("Fluss Insert Sink");
+        // finish inserts before starting partial update writers
+        insertEnv.execute("Test Inserts for Partial Updates Two Writers");
+
+        // Now start partial updates in a separate async job to avoid racing 
with inserts
+        StreamExecutionEnvironment updatesEnv =
+                StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<RowData> streamWriter1 = 
updatesEnv.fromData(updatesWriter1);
+        DataStream<RowData> streamWriter2 = 
updatesEnv.fromData(updatesWriter2);
+
+        RowDataSerializationSchema updatesSerializationSchema =
+                new RowDataSerializationSchema(false, true);
+
+        // Partial update sink 1: orderId + address
+        FlinkSink<RowData> partialSink1 =
+                FlussSink.<RowData>builder()
+                        .setBootstrapServers(bootstrapServers)
+                        .setDatabase(DEFAULT_DB)
+                        .setTable(pkTableName)
+                        .setSerializationSchema(updatesSerializationSchema)
+                        .setPartialUpdateColumns("orderId", "address")
+                        .build();
+
+        // Partial update sink 2: orderId + amount
+        FlinkSink<RowData> partialSink2 =
+                FlussSink.<RowData>builder()
+                        .setBootstrapServers(bootstrapServers)
+                        .setDatabase(DEFAULT_DB)
+                        .setTable(pkTableName)
+                        .setSerializationSchema(updatesSerializationSchema)
+                        .setPartialUpdateColumns("orderId", "amount")
+                        .build();
+
+        streamWriter1.sinkTo(partialSink1).name("Fluss Partial Sink 1");
+        streamWriter2.sinkTo(partialSink2).name("Fluss Partial Sink 2");
+
+        updatesEnv.executeAsync("Test Partial Updates Two Writers");
+
+        // Consume change-log and assert contents
+        Table table = conn.getTable(new TablePath(DEFAULT_DB, pkTableName));
+        LogScanner logScanner = table.newScan().createLogScanner();
+        int numBuckets = table.getTableInfo().getNumBuckets();
+        for (int i = 0; i < numBuckets; i++) {
+            logScanner.subscribeFromBeginning(i);
+        }
+
+        List<RowData> expected = new ArrayList<>();
+        expected.addAll(inserts);
+        // Add UPDATE_BEFORE for each partial update (based on initial state 
of those rows)
+        RowData b1 = converter.toFlinkRowData(row(101L, 1001L, 10, "a1"));
+        b1.setRowKind(RowKind.UPDATE_BEFORE);
+        RowData b2 = converter.toFlinkRowData(row(102L, 1002L, 20, "a2"));
+        b2.setRowKind(RowKind.UPDATE_BEFORE);
+        RowData b3 = converter.toFlinkRowData(row(103L, 1003L, 30, "a3"));
+        b3.setRowKind(RowKind.UPDATE_BEFORE);
+        RowData b4 = converter.toFlinkRowData(row(104L, 1004L, 40, "a4"));
+        b4.setRowKind(RowKind.UPDATE_BEFORE);
+        expected.add(b1);
+        expected.add(b2);
+        expected.add(b3);
+        expected.add(b4);
+        // And the UPDATE_AFTER rows as provided in the update streams, but 
with full after-image
+        // Writer1 updates address only; amount/itemId should remain as before
+        RowData a1 = converter.toFlinkRowData(row(101L, 1001L, 10, "a1_new"));
+        a1.setRowKind(RowKind.UPDATE_AFTER);
+        RowData a2 = converter.toFlinkRowData(row(102L, 1002L, 20, "a2_new"));
+        a2.setRowKind(RowKind.UPDATE_AFTER);
+        // Writer2 updates amount only
+        RowData a3 = converter.toFlinkRowData(row(103L, 1003L, 31, "a3"));
+        a3.setRowKind(RowKind.UPDATE_AFTER);
+        RowData a4 = converter.toFlinkRowData(row(104L, 1004L, 41, "a4"));
+        a4.setRowKind(RowKind.UPDATE_AFTER);
+        expected.add(a1);
+        expected.add(a2);
+        expected.add(a3);
+        expected.add(a4);
+
+        List<RowData> actual = new ArrayList<>();
+        int idlePolls = 0;
+        int maxIdlePolls = 60; // ~60s max wait to avoid indefinite hang
+        while (actual.size() < expected.size() && idlePolls < maxIdlePolls) {
+            int sizeBefore = actual.size();
+            ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
+            for (TableBucket bucket : scanRecords.buckets()) {
+                for (ScanRecord record : scanRecords.records(bucket)) {
+                    RowData row = converter.toFlinkRowData(record.getRow());
+                    row.setRowKind(toFlinkRowKind(record.getChangeType()));
+                    actual.add(row);
+                }
+            }
+            idlePolls = (actual.size() == sizeBefore) ? idlePolls + 1 : 0;
+        }
+
+        assertThat(actual.size()).isEqualTo(expected.size());
+        assertThat(actual).containsAll(expected);
+
+        logScanner.close();
+    }
+
+    @Test
+    public void testPartialUpdateSingleWriterNullRemainder() throws Exception {
+        // Create PK table
+        createTable(TablePath.of(DEFAULT_DB, pkTableName), pkTableDescriptor);
+
+        FlussRowToFlinkRowConverter converter =
+                new FlussRowToFlinkRowConverter(pkSchema.getRowType());
+
+        // Initial insert full row
+        List<RowData> inserts = new ArrayList<>();
+        RowData i1 = converter.toFlinkRowData(row(201L, 2001L, 100, "x1"));
+        i1.setRowKind(RowKind.INSERT);
+        inserts.add(i1);
+
+        // Partial update only address (orderId, address). Other columns are 
explicitly null in
+        // input
+        List<RowData> updates = new ArrayList<>();
+        RowData u1 = converter.toFlinkRowData(row(201L, null, null, "x1_new"));
+        u1.setRowKind(RowKind.UPDATE_AFTER);
+        updates.add(u1);
+
+        // Stage 1: run inserts in a dedicated synchronous job to avoid race 
with partial updates
+        StreamExecutionEnvironment insertEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<RowData> insertStream = insertEnv.fromData(inserts);
+        RowDataSerializationSchema insertSerializationSchema =
+                new RowDataSerializationSchema(false, true);
+        FlinkSink<RowData> insertSink =
+                FlussSink.<RowData>builder()
+                        .setBootstrapServers(bootstrapServers)
+                        .setDatabase(DEFAULT_DB)
+                        .setTable(pkTableName)
+                        .setSerializationSchema(insertSerializationSchema)
+                        .build();
+        insertStream.sinkTo(insertSink).name("Fluss Insert Sink");
+        insertEnv.execute("Test Partial Update Single Writer - Inserts");
+
+        // Stage 2: start partial updates in a separate async job
+        StreamExecutionEnvironment updateEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<RowData> updateStream = updateEnv.fromData(updates);
+        RowDataSerializationSchema updateSerializationSchema =
+                new RowDataSerializationSchema(false, true);
+        FlinkSink<RowData> partialSink =
+                FlussSink.<RowData>builder()
+                        .setBootstrapServers(bootstrapServers)
+                        .setDatabase(DEFAULT_DB)
+                        .setTable(pkTableName)
+                        .setSerializationSchema(updateSerializationSchema)
+                        .setPartialUpdateColumns("orderId", "address")
+                        .build();
+        updateStream.sinkTo(partialSink).name("Fluss Partial Sink");
+        updateEnv.executeAsync("Test Partial Update Single Writer - Updates");
+
+        // Read changelog
+        Table table = conn.getTable(new TablePath(DEFAULT_DB, pkTableName));
+        LogScanner logScanner = table.newScan().createLogScanner();
+        int numBuckets = table.getTableInfo().getNumBuckets();
+        for (int i = 0; i < numBuckets; i++) {
+            logScanner.subscribeFromBeginning(i);
+        }
+
+        List<RowData> expected = new ArrayList<>();
+        expected.addAll(inserts);
+        // Before image prior to the address update
+        RowData before = converter.toFlinkRowData(row(201L, 2001L, 100, "x1"));
+        before.setRowKind(RowKind.UPDATE_BEFORE);
+        expected.add(before);
+        // After image: only address changed; other fields remain unchanged 
(not null)
+        RowData after = converter.toFlinkRowData(row(201L, 2001L, 100, 
"x1_new"));
+        after.setRowKind(RowKind.UPDATE_AFTER);
+        expected.add(after);
+
+        List<RowData> actual = new ArrayList<>();
+        int idlePolls = 0;
+        int maxIdlePolls = 60; // ~60s max wait to avoid indefinite hang
+        while (actual.size() < expected.size() && idlePolls < maxIdlePolls) {
+            int sizeBefore = actual.size();
+            ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
+            for (TableBucket bucket : scanRecords.buckets()) {
+                for (ScanRecord record : scanRecords.records(bucket)) {
+                    RowData row = converter.toFlinkRowData(record.getRow());
+                    row.setRowKind(toFlinkRowKind(record.getChangeType()));
+                    actual.add(row);
+                }
+            }
+            idlePolls = (actual.size() == sizeBefore) ? idlePolls + 1 : 0;
+        }
+
+        assertThat(actual.size()).isEqualTo(expected.size());
+        assertThat(actual).containsAll(expected);
+
+        logScanner.close();
+    }
+
     private static class TestOrder implements Serializable {
         private static final long serialVersionUID = 1L;
         private final long orderId;

Reply via email to