This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new cfb986bf4 [tests] Add integration test for WAL image mode change log
of Primary Key (#2217)
cfb986bf4 is described below
commit cfb986bf49eb5354ac9c6271de6692dfc0ccaf4e
Author: Yang Wang <[email protected]>
AuthorDate: Sun Dec 21 18:26:59 2025 +0800
[tests] Add integration test for WAL image mode change log of Primary Key
(#2217)
---
.../fluss/flink/sink/FlinkTableSinkITCase.java | 73 ++++++++++++++++++++++
.../source/testutils/FlinkRowAssertionsUtils.java | 42 ++++++++++---
2 files changed, 108 insertions(+), 7 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index c8aeaaae8..68fa741d4 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -66,6 +66,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertQueryResultExactOrder;
import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
import static org.apache.fluss.flink.utils.FlinkTestBase.waitUntilPartitions;
@@ -1384,4 +1385,76 @@ abstract class FlinkTableSinkITCase extends
AbstractTestBase {
assertResultsIgnoreOrder(rowIter, expectedRows, true);
}
}
+
+ @Test
+ void testWalModeWithDefaultMergeEngineAndAggregation() throws Exception {
+ // use single parallelism to make result ordering stable
+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
1);
+
+ String tableName = "wal_mode_pk_table";
+ // Create a table with WAL mode and default merge engine
+ tEnv.executeSql(
+ String.format(
+ "create table %s ("
+ + " id int not null,"
+ + " category string,"
+ + " amount bigint,"
+ + " primary key (id) not enforced"
+ + ") with ('table.changelog.image' = 'wal')",
+ tableName));
+
+ // Insert initial data
+ tEnv.executeSql(
+ String.format(
+ "INSERT INTO %s VALUES "
+ + "(1, 'A', 100), "
+ + "(2, 'B', 200), "
+ + "(3, 'A', 150), "
+ + "(4, 'B', 250)",
+ tableName))
+ .await();
+
+ // Use batch mode to update and delete records
+ tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 120 WHERE
id = 1").await();
+ tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 180 WHERE
id = 3").await();
+ tBatchEnv.executeSql("DELETE FROM " + tableName + " WHERE id =
4").await();
+
+ // Do aggregation on the table and verify ChangelogNormalize node is
generated
+ String aggQuery =
+ String.format(
+ "SELECT category, SUM(amount) as total_amount FROM %s
/*+ OPTIONS('scan.startup.mode' = 'earliest') */ GROUP BY category",
+ tableName);
+
+ // Explain the aggregation query to check for ChangelogNormalize
+ String aggPlan = tEnv.explainSql(aggQuery);
+ // ChangelogNormalize should be present to normalize the changelog for
aggregation
+ // In Flink, when the source produces changelog with primary key
semantics (I, UA, D),
+ // a ChangelogNormalize operator is inserted before aggregation
+ assertThat(aggPlan).contains("ChangelogNormalize");
+
+ // Expected aggregation results:
+ // Category A: 120 (id=1) + 180 (id=3) = 300
+ // Category B: 200 (id=2) = 200 (id=4 was deleted)
+ List<String> expectedAggResults =
+ Arrays.asList(
+ "+I[A, 100]",
+ "+I[B, 200]",
+ "-U[A, 100]",
+ "+U[A, 250]",
+ "-U[B, 200]",
+ "+U[B, 450]",
+ "-U[A, 250]",
+ "+U[A, 150]",
+ "-U[A, 150]",
+ "+U[A, 270]",
+ "-U[A, 270]",
+ "+U[A, 120]",
+ "-U[A, 120]",
+ "+U[A, 300]",
+ "-U[B, 450]",
+ "+U[B, 200]");
+
+ // Collect results with timeout
+ assertQueryResultExactOrder(tEnv, aggQuery, expectedAggResults);
+ }
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
index 772201dab..00807497d 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
@@ -25,6 +25,8 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@@ -34,6 +36,15 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Utility class providing assertion methods for Flink test results. */
public class FlinkRowAssertionsUtils {
+ // Use a daemon thread pool for hasNext checks to avoid blocking test
shutdown
+ private static final ExecutorService EXECUTOR =
+ Executors.newCachedThreadPool(
+ r -> {
+ Thread t = new Thread(r,
"FlinkRowAssertionsUtils-hasNext-checker");
+ t.setDaemon(true);
+ return t;
+ });
+
private FlinkRowAssertionsUtils() {}
public static void assertRowResultsIgnoreOrder(
@@ -109,11 +120,23 @@ public class FlinkRowAssertionsUtils {
long deadlineTimeMs = startTimeMs + maxWaitTime.toMillis();
try {
for (int i = 0; i < expectedCount; i++) {
+ long remainingTimeMs = deadlineTimeMs -
System.currentTimeMillis();
+ if (remainingTimeMs <= 0) {
+ // Deadline exceeded, throw timeout error immediately
+ throw timeoutError(
+ System.currentTimeMillis() - startTimeMs,
+ expectedCount,
+ actual.size(),
+ actual);
+ }
// Wait for next record with timeout
- if (!waitForNextWithTimeout(
- iterator, Math.max(deadlineTimeMs -
System.currentTimeMillis(), 1_000))) {
+ if (!waitForNextWithTimeout(iterator, remainingTimeMs)) {
+
throw timeoutError(
- System.currentTimeMillis() - startTimeMs,
expectedCount, actual.size());
+ System.currentTimeMillis() - startTimeMs,
+ expectedCount,
+ actual.size(),
+ actual);
}
if (iterator.hasNext()) {
actual.add(iterator.next().toString());
@@ -152,21 +175,26 @@ public class FlinkRowAssertionsUtils {
}
private static AssertionError timeoutError(
- long elapsedTime, int expectedCount, int actualCount) {
+ long elapsedTime, int expectedCount, int actualCount, List<String>
actualRecords) {
return new AssertionError(
String.format(
"Timeout after waiting %d ms for Flink job results. "
+ "Expected %d records but only received %d. "
- + "This might indicate a job hang or
insufficient data generation.",
- elapsedTime, expectedCount, actualCount));
+ + "This might indicate a job hang or
insufficient data generation.%n"
+ + "Actual records received: %s",
+ elapsedTime, expectedCount, actualCount,
actualRecords));
}
private static boolean waitForNextWithTimeout(
CloseableIterator<Row> iterator, long maxWaitTime) {
- CompletableFuture<Boolean> future =
CompletableFuture.supplyAsync(iterator::hasNext);
+ CompletableFuture<Boolean> future =
+ CompletableFuture.supplyAsync(iterator::hasNext, EXECUTOR);
try {
return future.get(maxWaitTime, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
+ // Timeout occurred - cancel the future and return false
+ // Note: The thread may still be blocked in hasNext(), but as a
daemon thread,
+ // it won't prevent JVM shutdown
future.cancel(true);
return false;
} catch (Exception e) {