This is an automated email from the ASF dual-hosted git repository.
mehulbatra pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new c9370aec6 [Flink] Support Partial Updates to the Flink Sink (#2042)
c9370aec6 is described below
commit c9370aec6035caae43e3f8e5fcc5baa28f7ec627
Author: Giannis Polyzos <[email protected]>
AuthorDate: Thu Jan 15 10:58:56 2026 +0200
[Flink] Support Partial Updates to the Flink Sink (#2042)
* add partial updates to the datastraem api
* refactor builder
* fix checkstyle violations
* update setter
* fix checkstyle
* update setter
---
.../apache/fluss/flink/sink/FlussSinkBuilder.java | 73 ++++-
.../fluss/flink/sink/FlussSinkBuilderTest.java | 46 ++-
.../apache/fluss/flink/sink/FlussSinkITCase.java | 340 +++++++++++++++++++++
3 files changed, 457 insertions(+), 2 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java
index 489eefedb..2fea5f86a 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -73,6 +74,9 @@ public class FlussSinkBuilder<InputT> {
private final Map<String, String> configOptions = new HashMap<>();
private FlussSerializationSchema<InputT> serializationSchema;
private boolean shuffleByBucketId = true;
+ // Optional list of columns for partial update. When set, upsert will only
update these columns.
+ // The primary key columns must be fully specified in this list.
+ private List<String> partialUpdateColumns;
/** Set the bootstrap server for the sink. */
public FlussSinkBuilder<InputT> setBootstrapServers(String
bootstrapServers) {
@@ -98,6 +102,24 @@ public class FlussSinkBuilder<InputT> {
return this;
}
+ /**
+ * Enable partial update by specifying the column names to update for
upsert tables. Primary key
+ * columns must be included in this list.
+ */
+ public FlussSinkBuilder<InputT> setPartialUpdateColumns(List<String>
columns) {
+ this.partialUpdateColumns = columns;
+ return this;
+ }
+
+ /**
+ * Enable partial update by specifying the column names to update for
upsert tables. Convenience
+ * varargs overload.
+ */
+ public FlussSinkBuilder<InputT> setPartialUpdateColumns(String... columns)
{
+ this.partialUpdateColumns = Arrays.asList(columns);
+ return this;
+ }
+
/** Set a configuration option. */
public FlussSinkBuilder<InputT> setOption(String key, String value) {
configOptions.put(key, value);
@@ -153,12 +175,17 @@ public class FlussSinkBuilder<InputT> {
if (isUpsert) {
LOG.info("Initializing Fluss upsert sink writer ...");
+ int[] targetColumnIndexes =
+ computeTargetColumnIndexes(
+ tableRowType.getFieldNames(),
+ tableInfo.getPrimaryKeys(),
+ partialUpdateColumns);
writerBuilder =
new FlinkSink.UpsertSinkWriterBuilder<>(
tablePath,
flussConfig,
tableRowType,
- null, // not support partialUpdateColumns yet
+ targetColumnIndexes,
numBucket,
bucketKeys,
partitionKeys,
@@ -193,4 +220,48 @@ public class FlussSinkBuilder<InputT> {
checkNotNull(tableName, "Table name is required but not provided.");
checkArgument(!tableName.isEmpty(), "Table name cannot be empty.");
}
+
+ // -------------- Test-visible helper methods --------------
+ /**
+ * Computes target column indexes for partial updates. If {@code
specifiedColumns} is null or
+ * empty, returns null indicating full update. Validates that all primary
key columns are
+ * included in the specified columns.
+ *
+ * @param allFieldNames the list of all field names in table row type order
+ * @param primaryKeyNames the list of primary key column names
+ * @param specifiedColumns the optional list of columns specified for
partial update
+ * @return the indexes into {@code allFieldNames} corresponding to {@code
specifiedColumns}, or
+ * null for full update
+ * @throws IllegalArgumentException if a specified column does not exist
or primary key coverage
+ * is incomplete
+ */
+ static int[] computeTargetColumnIndexes(
+ List<String> allFieldNames,
+ List<String> primaryKeyNames,
+ List<String> specifiedColumns) {
+ if (specifiedColumns == null || specifiedColumns.isEmpty()) {
+ return null; // full update
+ }
+
+ // Map specified column names to indexes
+ int[] indexes = new int[specifiedColumns.size()];
+ for (int i = 0; i < specifiedColumns.size(); i++) {
+ String col = specifiedColumns.get(i);
+ int idx = allFieldNames.indexOf(col);
+ checkArgument(
+ idx >= 0, "Column '%s' not found in table schema: %s",
col, allFieldNames);
+ indexes[i] = idx;
+ }
+
+ // Validate that all primary key columns are covered
+ for (String pk : primaryKeyNames) {
+ checkArgument(
+ specifiedColumns.contains(pk),
+ "Partial updates must include all primary key columns.
Missing primary key column: %s. Provided columns: %s",
+ pk,
+ specifiedColumns);
+ }
+
+ return indexes;
+ }
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkBuilderTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkBuilderTest.java
index d70dd16cf..283612eb7 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkBuilderTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkBuilderTest.java
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.lang.reflect.Field;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -171,12 +172,55 @@ class FlussSinkBuilderTest {
.setTable(tableName)
.setOption("key1", "value1")
.setOptions(new HashMap<>())
- .setShuffleByBucketId(false);
+ .setShuffleByBucketId(false)
+ .setPartialUpdateColumns("id", "price");
// Verify the builder instance is returned
assertThat(chainedBuilder).isInstanceOf(FlussSinkBuilder.class);
}
+ @Test
+ void testComputeTargetColumnIndexesFullUpdate() {
+ int[] result =
+ FlussSinkBuilder.computeTargetColumnIndexes(
+ Arrays.asList("id", "name", "price"),
Arrays.asList("id"), null);
+ assertThat(result).isNull();
+ }
+
+ @Test
+ void testComputeTargetColumnIndexesValidPartialIncludesPk() {
+ int[] result =
+ FlussSinkBuilder.computeTargetColumnIndexes(
+ Arrays.asList("id", "name", "price", "ts"),
+ Arrays.asList("id"),
+ Arrays.asList("id", "price"));
+ assertThat(result).containsExactly(0, 2);
+ }
+
+ @Test
+ void testComputeTargetColumnIndexesMissingPkThrows() {
+ assertThatThrownBy(
+ () ->
+ FlussSinkBuilder.computeTargetColumnIndexes(
+ Arrays.asList("id", "name", "price"),
+ Arrays.asList("id"),
+ Arrays.asList("name", "price")))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Partial updates must include all
primary key columns");
+ }
+
+ @Test
+ void testComputeTargetColumnIndexesUnknownColumnThrows() {
+ assertThatThrownBy(
+ () ->
+ FlussSinkBuilder.computeTargetColumnIndexes(
+ Arrays.asList("id", "name"),
+ Arrays.asList("id"),
+ Arrays.asList("id", "unknown")))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("not found in table schema");
+ }
+
// Helper method to get private field values using reflection
@SuppressWarnings("unchecked")
private <T> T getFieldValue(Object object, String fieldName) throws
Exception {
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java
index b2d4e4708..1a5e98eeb 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkITCase.java
@@ -310,6 +310,346 @@ public class FlussSinkITCase extends FlinkTestBase {
logScanner.close();
}
+ @Test
+ public void testPartialUpdateWithTwoWriters() throws Exception {
+ createTable(TablePath.of(DEFAULT_DB,
"partial_update_two_writers_test"), pkTableDescriptor);
+
+ // Initial inserts
+ ArrayList<TestOrder> initialOrders = new ArrayList<>();
+ initialOrders.add(new TestOrder(2001, 3001, -1, null, RowKind.INSERT));
+ initialOrders.add(new TestOrder(2002, 3002, -1, null, RowKind.INSERT));
+ initialOrders.add(new TestOrder(2003, 3003, -1, null, RowKind.INSERT));
+
+ DataStream<TestOrder> initialStream = env.fromData(initialOrders);
+
+ FlinkSink<TestOrder> initialSink =
+ FlussSink.<TestOrder>builder()
+ .setBootstrapServers(bootstrapServers)
+ .setDatabase(DEFAULT_DB)
+ .setTable("partial_update_two_writers_test")
+ .setPartialUpdateColumns("orderId", "itemId")
+ .setSerializationSchema(new
TestOrderSerializationSchema())
+ .build();
+
+ initialStream.sinkTo(initialSink).name("Fluss Initial Data Sink");
+ env.execute("First Stream Updates");
+
+ ArrayList<TestOrder> itemIdUpdates = new ArrayList<>();
+ itemIdUpdates.add(new TestOrder(2001, -1, 100, "addr1",
RowKind.UPDATE_AFTER));
+ itemIdUpdates.add(new TestOrder(2003, -1, 300, "addr3",
RowKind.UPDATE_AFTER));
+
+ DataStream<TestOrder> updateStream = env.fromData(itemIdUpdates);
+
+ FlinkSink<TestOrder> updateSink =
+ FlussSink.<TestOrder>builder()
+ .setBootstrapServers(bootstrapServers)
+ .setDatabase(DEFAULT_DB)
+ .setTable("partial_update_two_writers_test")
+ .setPartialUpdateColumns("orderId", "amount",
"address")
+ .setSerializationSchema(new
TestOrderSerializationSchema())
+ .build();
+
+ updateStream.sinkTo(updateSink).name("Fluss Amount/Address Update
Sink");
+ env.execute("Test Amount/Address Updates");
+
+ Table table = conn.getTable(new TablePath(DEFAULT_DB,
"partial_update_two_writers_test"));
+ LogScanner logScanner = table.newScan().createLogScanner();
+
+ int numBuckets = table.getTableInfo().getNumBuckets();
+ for (int i = 0; i < numBuckets; i++) {
+ logScanner.subscribeFromBeginning(i);
+ }
+
+ // Build expected change log: 3 inserts, then before/after for 2001
and 2003
+ List<TestOrder> expected = new ArrayList<>();
+ expected.add(new TestOrder(2001, 3001, -1, null, RowKind.INSERT));
+ expected.add(new TestOrder(2002, 3002, -1, null, RowKind.INSERT));
+ expected.add(new TestOrder(2003, 3003, -1, null, RowKind.INSERT));
+ // update for 2001: before and after (itemId stays, amount/address
updated)
+ expected.add(new TestOrder(2001, 3001, -1, null,
RowKind.UPDATE_BEFORE));
+ expected.add(new TestOrder(2001, 3001, 100, "addr1",
RowKind.UPDATE_AFTER));
+ // update for 2003
+ expected.add(new TestOrder(2003, 3003, -1, null,
RowKind.UPDATE_BEFORE));
+ expected.add(new TestOrder(2003, 3003, 300, "addr3",
RowKind.UPDATE_AFTER));
+
+ // Poll actual changelog until we have all expected records or timeout
+ List<TestOrder> actual = new ArrayList<>();
+ int idlePolls = 0;
+ int maxIdlePolls = 60; // ~60s max wait
+ while (actual.size() < expected.size() && idlePolls < maxIdlePolls) {
+ int sizeBefore = actual.size();
+ ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
+ for (TableBucket bucket : scanRecords.buckets()) {
+ for (ScanRecord record : scanRecords.records(bucket)) {
+ InternalRow row = record.getRow();
+ String address = row.getString(3) != null ?
row.getString(3).toString() : null;
+ TestOrder order =
+ new TestOrder(
+ row.getLong(0),
+ row.getLong(1),
+ row.getInt(2),
+ address,
+ toFlinkRowKind(record.getChangeType()));
+ actual.add(order);
+ }
+ }
+ idlePolls = (actual.size() == sizeBefore) ? idlePolls + 1 : 0;
+ }
+
+ assertThat(actual.size()).isEqualTo(expected.size());
+ assertThat(actual).containsAll(expected);
+
+ logScanner.close();
+ }
+
+ @Test
+ public void testPartialUpdateWithTwoWritersWithRD() throws Exception {
+ // Create PK table
+ createTable(TablePath.of(DEFAULT_DB, pkTableName), pkTableDescriptor);
+
+ // Helper converter for building RowData
+ FlussRowToFlinkRowConverter converter =
+ new FlussRowToFlinkRowConverter(pkSchema.getRowType());
+
+ // Initial inserts
+ List<RowData> inserts = new ArrayList<>();
+ RowData i1 = converter.toFlinkRowData(row(101L, 1001L, 10, "a1"));
+ i1.setRowKind(RowKind.INSERT);
+ RowData i2 = converter.toFlinkRowData(row(102L, 1002L, 20, "a2"));
+ i2.setRowKind(RowKind.INSERT);
+ RowData i3 = converter.toFlinkRowData(row(103L, 1003L, 30, "a3"));
+ i3.setRowKind(RowKind.INSERT);
+ RowData i4 = converter.toFlinkRowData(row(104L, 1004L, 40, "a4"));
+ i4.setRowKind(RowKind.INSERT);
+ inserts.add(i1);
+ inserts.add(i2);
+ inserts.add(i3);
+ inserts.add(i4);
+
+ // Writer 1: updates only address for keys 101, 102 (partial columns:
orderId, address)
+ List<RowData> updatesWriter1 = new ArrayList<>();
+ // Set other columns to null to emphasize partial update behavior
+ RowData u1a = converter.toFlinkRowData(row(101L, null, null,
"a1_new"));
+ u1a.setRowKind(RowKind.UPDATE_AFTER);
+ RowData u2a = converter.toFlinkRowData(row(102L, null, null,
"a2_new"));
+ u2a.setRowKind(RowKind.UPDATE_AFTER);
+ updatesWriter1.add(u1a);
+ updatesWriter1.add(u2a);
+
+ // Writer 2: updates only amount for keys 103, 104 (partial columns:
orderId, amount)
+ List<RowData> updatesWriter2 = new ArrayList<>();
+ RowData u3b = converter.toFlinkRowData(row(103L, null, 31, null));
+ u3b.setRowKind(RowKind.UPDATE_AFTER);
+ RowData u4b = converter.toFlinkRowData(row(104L, null, 41, null));
+ u4b.setRowKind(RowKind.UPDATE_AFTER);
+ updatesWriter2.add(u3b);
+ updatesWriter2.add(u4b);
+
+ // Execute initial inserts in a dedicated, synchronous Flink job to
ensure they land first
+ StreamExecutionEnvironment insertEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<RowData> insertStream = insertEnv.fromData(inserts);
+ RowDataSerializationSchema insertSerializationSchema =
+ new RowDataSerializationSchema(false, true);
+ FlinkSink<RowData> insertSink =
+ FlussSink.<RowData>builder()
+ .setBootstrapServers(bootstrapServers)
+ .setDatabase(DEFAULT_DB)
+ .setTable(pkTableName)
+ .setSerializationSchema(insertSerializationSchema)
+ .build();
+
+ insertStream.sinkTo(insertSink).name("Fluss Insert Sink");
+ // finish inserts before starting partial update writers
+ insertEnv.execute("Test Inserts for Partial Updates Two Writers");
+
+ // Now start partial updates in a separate async job to avoid racing
with inserts
+ StreamExecutionEnvironment updatesEnv =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<RowData> streamWriter1 =
updatesEnv.fromData(updatesWriter1);
+ DataStream<RowData> streamWriter2 =
updatesEnv.fromData(updatesWriter2);
+
+ RowDataSerializationSchema updatesSerializationSchema =
+ new RowDataSerializationSchema(false, true);
+
+ // Partial update sink 1: orderId + address
+ FlinkSink<RowData> partialSink1 =
+ FlussSink.<RowData>builder()
+ .setBootstrapServers(bootstrapServers)
+ .setDatabase(DEFAULT_DB)
+ .setTable(pkTableName)
+ .setSerializationSchema(updatesSerializationSchema)
+ .setPartialUpdateColumns("orderId", "address")
+ .build();
+
+ // Partial update sink 2: orderId + amount
+ FlinkSink<RowData> partialSink2 =
+ FlussSink.<RowData>builder()
+ .setBootstrapServers(bootstrapServers)
+ .setDatabase(DEFAULT_DB)
+ .setTable(pkTableName)
+ .setSerializationSchema(updatesSerializationSchema)
+ .setPartialUpdateColumns("orderId", "amount")
+ .build();
+
+ streamWriter1.sinkTo(partialSink1).name("Fluss Partial Sink 1");
+ streamWriter2.sinkTo(partialSink2).name("Fluss Partial Sink 2");
+
+ updatesEnv.executeAsync("Test Partial Updates Two Writers");
+
+ // Consume change-log and assert contents
+ Table table = conn.getTable(new TablePath(DEFAULT_DB, pkTableName));
+ LogScanner logScanner = table.newScan().createLogScanner();
+ int numBuckets = table.getTableInfo().getNumBuckets();
+ for (int i = 0; i < numBuckets; i++) {
+ logScanner.subscribeFromBeginning(i);
+ }
+
+ List<RowData> expected = new ArrayList<>();
+ expected.addAll(inserts);
+ // Add UPDATE_BEFORE for each partial update (based on initial state
of those rows)
+ RowData b1 = converter.toFlinkRowData(row(101L, 1001L, 10, "a1"));
+ b1.setRowKind(RowKind.UPDATE_BEFORE);
+ RowData b2 = converter.toFlinkRowData(row(102L, 1002L, 20, "a2"));
+ b2.setRowKind(RowKind.UPDATE_BEFORE);
+ RowData b3 = converter.toFlinkRowData(row(103L, 1003L, 30, "a3"));
+ b3.setRowKind(RowKind.UPDATE_BEFORE);
+ RowData b4 = converter.toFlinkRowData(row(104L, 1004L, 40, "a4"));
+ b4.setRowKind(RowKind.UPDATE_BEFORE);
+ expected.add(b1);
+ expected.add(b2);
+ expected.add(b3);
+ expected.add(b4);
+ // And the UPDATE_AFTER rows as provided in the update streams, but
with full after-image
+ // Writer1 updates address only; amount/itemId should remain as before
+ RowData a1 = converter.toFlinkRowData(row(101L, 1001L, 10, "a1_new"));
+ a1.setRowKind(RowKind.UPDATE_AFTER);
+ RowData a2 = converter.toFlinkRowData(row(102L, 1002L, 20, "a2_new"));
+ a2.setRowKind(RowKind.UPDATE_AFTER);
+ // Writer2 updates amount only
+ RowData a3 = converter.toFlinkRowData(row(103L, 1003L, 31, "a3"));
+ a3.setRowKind(RowKind.UPDATE_AFTER);
+ RowData a4 = converter.toFlinkRowData(row(104L, 1004L, 41, "a4"));
+ a4.setRowKind(RowKind.UPDATE_AFTER);
+ expected.add(a1);
+ expected.add(a2);
+ expected.add(a3);
+ expected.add(a4);
+
+ List<RowData> actual = new ArrayList<>();
+ int idlePolls = 0;
+ int maxIdlePolls = 60; // ~60s max wait to avoid indefinite hang
+ while (actual.size() < expected.size() && idlePolls < maxIdlePolls) {
+ int sizeBefore = actual.size();
+ ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
+ for (TableBucket bucket : scanRecords.buckets()) {
+ for (ScanRecord record : scanRecords.records(bucket)) {
+ RowData row = converter.toFlinkRowData(record.getRow());
+ row.setRowKind(toFlinkRowKind(record.getChangeType()));
+ actual.add(row);
+ }
+ }
+ idlePolls = (actual.size() == sizeBefore) ? idlePolls + 1 : 0;
+ }
+
+ assertThat(actual.size()).isEqualTo(expected.size());
+ assertThat(actual).containsAll(expected);
+
+ logScanner.close();
+ }
+
+ @Test
+ public void testPartialUpdateSingleWriterNullRemainder() throws Exception {
+ // Create PK table
+ createTable(TablePath.of(DEFAULT_DB, pkTableName), pkTableDescriptor);
+
+ FlussRowToFlinkRowConverter converter =
+ new FlussRowToFlinkRowConverter(pkSchema.getRowType());
+
+ // Initial insert full row
+ List<RowData> inserts = new ArrayList<>();
+ RowData i1 = converter.toFlinkRowData(row(201L, 2001L, 100, "x1"));
+ i1.setRowKind(RowKind.INSERT);
+ inserts.add(i1);
+
+ // Partial update only address (orderId, address). Other columns are
explicitly null in
+ // input
+ List<RowData> updates = new ArrayList<>();
+ RowData u1 = converter.toFlinkRowData(row(201L, null, null, "x1_new"));
+ u1.setRowKind(RowKind.UPDATE_AFTER);
+ updates.add(u1);
+
+ // Stage 1: run inserts in a dedicated synchronous job to avoid race
with partial updates
+ StreamExecutionEnvironment insertEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<RowData> insertStream = insertEnv.fromData(inserts);
+ RowDataSerializationSchema insertSerializationSchema =
+ new RowDataSerializationSchema(false, true);
+ FlinkSink<RowData> insertSink =
+ FlussSink.<RowData>builder()
+ .setBootstrapServers(bootstrapServers)
+ .setDatabase(DEFAULT_DB)
+ .setTable(pkTableName)
+ .setSerializationSchema(insertSerializationSchema)
+ .build();
+ insertStream.sinkTo(insertSink).name("Fluss Insert Sink");
+ insertEnv.execute("Test Partial Update Single Writer - Inserts");
+
+ // Stage 2: start partial updates in a separate async job
+ StreamExecutionEnvironment updateEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream<RowData> updateStream = updateEnv.fromData(updates);
+ RowDataSerializationSchema updateSerializationSchema =
+ new RowDataSerializationSchema(false, true);
+ FlinkSink<RowData> partialSink =
+ FlussSink.<RowData>builder()
+ .setBootstrapServers(bootstrapServers)
+ .setDatabase(DEFAULT_DB)
+ .setTable(pkTableName)
+ .setSerializationSchema(updateSerializationSchema)
+ .setPartialUpdateColumns("orderId", "address")
+ .build();
+ updateStream.sinkTo(partialSink).name("Fluss Partial Sink");
+ updateEnv.executeAsync("Test Partial Update Single Writer - Updates");
+
+ // Read changelog
+ Table table = conn.getTable(new TablePath(DEFAULT_DB, pkTableName));
+ LogScanner logScanner = table.newScan().createLogScanner();
+ int numBuckets = table.getTableInfo().getNumBuckets();
+ for (int i = 0; i < numBuckets; i++) {
+ logScanner.subscribeFromBeginning(i);
+ }
+
+ List<RowData> expected = new ArrayList<>();
+ expected.addAll(inserts);
+ // Before image prior to the address update
+ RowData before = converter.toFlinkRowData(row(201L, 2001L, 100, "x1"));
+ before.setRowKind(RowKind.UPDATE_BEFORE);
+ expected.add(before);
+ // After image: only address changed; other fields remain unchanged
(not null)
+ RowData after = converter.toFlinkRowData(row(201L, 2001L, 100,
"x1_new"));
+ after.setRowKind(RowKind.UPDATE_AFTER);
+ expected.add(after);
+
+ List<RowData> actual = new ArrayList<>();
+ int idlePolls = 0;
+ int maxIdlePolls = 60; // ~60s max wait to avoid indefinite hang
+ while (actual.size() < expected.size() && idlePolls < maxIdlePolls) {
+ int sizeBefore = actual.size();
+ ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
+ for (TableBucket bucket : scanRecords.buckets()) {
+ for (ScanRecord record : scanRecords.records(bucket)) {
+ RowData row = converter.toFlinkRowData(record.getRow());
+ row.setRowKind(toFlinkRowKind(record.getChangeType()));
+ actual.add(row);
+ }
+ }
+ idlePolls = (actual.size() == sizeBefore) ? idlePolls + 1 : 0;
+ }
+
+ assertThat(actual.size()).isEqualTo(expected.size());
+ assertThat(actual).containsAll(expected);
+
+ logScanner.close();
+ }
+
private static class TestOrder implements Serializable {
private static final long serialVersionUID = 1L;
private final long orderId;