This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new cfb986bf4 [tests] Add integration test for WAL image mode change log 
of Primary Key (#2217)
cfb986bf4 is described below

commit cfb986bf49eb5354ac9c6271de6692dfc0ccaf4e
Author: Yang Wang <[email protected]>
AuthorDate: Sun Dec 21 18:26:59 2025 +0800

    [tests] Add integration test for WAL image mode change log of Primary Key 
(#2217)
---
 .../fluss/flink/sink/FlinkTableSinkITCase.java     | 73 ++++++++++++++++++++++
 .../source/testutils/FlinkRowAssertionsUtils.java  | 42 ++++++++++---
 2 files changed, 108 insertions(+), 7 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index c8aeaaae8..68fa741d4 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -66,6 +66,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertQueryResultExactOrder;
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
 import static org.apache.fluss.flink.utils.FlinkTestBase.waitUntilPartitions;
@@ -1384,4 +1385,76 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
             assertResultsIgnoreOrder(rowIter, expectedRows, true);
         }
     }
+
+    @Test
+    void testWalModeWithDefaultMergeEngineAndAggregation() throws Exception {
+        // use single parallelism to make result ordering stable
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
+
+        String tableName = "wal_mode_pk_table";
+        // Create a table with WAL mode and default merge engine
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ("
+                                + " id int not null,"
+                                + " category string,"
+                                + " amount bigint,"
+                                + " primary key (id) not enforced"
+                                + ") with ('table.changelog.image' = 'wal')",
+                        tableName));
+
+        // Insert initial data
+        tEnv.executeSql(
+                        String.format(
+                                "INSERT INTO %s VALUES "
+                                        + "(1, 'A', 100), "
+                                        + "(2, 'B', 200), "
+                                        + "(3, 'A', 150), "
+                                        + "(4, 'B', 250)",
+                                tableName))
+                .await();
+
+        // Use batch mode to update and delete records
+        tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 120 WHERE 
id = 1").await();
+        tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 180 WHERE 
id = 3").await();
+        tBatchEnv.executeSql("DELETE FROM " + tableName + " WHERE id = 
4").await();
+
+        // Do aggregation on the table and verify ChangelogNormalize node is 
generated
+        String aggQuery =
+                String.format(
+                        "SELECT category, SUM(amount) as total_amount FROM %s 
/*+ OPTIONS('scan.startup.mode' = 'earliest') */ GROUP BY category",
+                        tableName);
+
+        // Explain the aggregation query to check for ChangelogNormalize
+        String aggPlan = tEnv.explainSql(aggQuery);
+        // ChangelogNormalize should be present to normalize the changelog for 
aggregation
+        // In Flink, when the source produces changelog with primary key 
semantics (I, UA, D),
+        // a ChangelogNormalize operator is inserted before aggregation
+        assertThat(aggPlan).contains("ChangelogNormalize");
+
+        // Expected aggregation results:
+        // Category A: 120 (id=1) + 180 (id=3) = 300
+        // Category B: 200 (id=2) = 200 (id=4 was deleted)
+        List<String> expectedAggResults =
+                Arrays.asList(
+                        "+I[A, 100]",
+                        "+I[B, 200]",
+                        "-U[A, 100]",
+                        "+U[A, 250]",
+                        "-U[B, 200]",
+                        "+U[B, 450]",
+                        "-U[A, 250]",
+                        "+U[A, 150]",
+                        "-U[A, 150]",
+                        "+U[A, 270]",
+                        "-U[A, 270]",
+                        "+U[A, 120]",
+                        "-U[A, 120]",
+                        "+U[A, 300]",
+                        "-U[B, 450]",
+                        "+U[B, 200]");
+
+        // Collect results with timeout
+        assertQueryResultExactOrder(tEnv, aggQuery, expectedAggResults);
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
index 772201dab..00807497d 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
@@ -25,6 +25,8 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
@@ -34,6 +36,15 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Utility class providing assertion methods for Flink test results. */
 public class FlinkRowAssertionsUtils {
 
+    // Use a daemon thread pool for hasNext checks to avoid blocking test 
shutdown
+    private static final ExecutorService EXECUTOR =
+            Executors.newCachedThreadPool(
+                    r -> {
+                        Thread t = new Thread(r, 
"FlinkRowAssertionsUtils-hasNext-checker");
+                        t.setDaemon(true);
+                        return t;
+                    });
+
     private FlinkRowAssertionsUtils() {}
 
     public static void assertRowResultsIgnoreOrder(
@@ -109,11 +120,23 @@ public class FlinkRowAssertionsUtils {
         long deadlineTimeMs = startTimeMs + maxWaitTime.toMillis();
         try {
             for (int i = 0; i < expectedCount; i++) {
+                long remainingTimeMs = deadlineTimeMs - 
System.currentTimeMillis();
+                if (remainingTimeMs <= 0) {
+                    // Deadline exceeded, throw timeout error immediately
+                    throw timeoutError(
+                            System.currentTimeMillis() - startTimeMs,
+                            expectedCount,
+                            actual.size(),
+                            actual);
+                }
                 // Wait for next record with timeout
-                if (!waitForNextWithTimeout(
-                        iterator, Math.max(deadlineTimeMs - 
System.currentTimeMillis(), 1_000))) {
+                if (!waitForNextWithTimeout(iterator, remainingTimeMs)) {
+
                     throw timeoutError(
-                            System.currentTimeMillis() - startTimeMs, 
expectedCount, actual.size());
+                            System.currentTimeMillis() - startTimeMs,
+                            expectedCount,
+                            actual.size(),
+                            actual);
                 }
                 if (iterator.hasNext()) {
                     actual.add(iterator.next().toString());
@@ -152,21 +175,26 @@ public class FlinkRowAssertionsUtils {
     }
 
     private static AssertionError timeoutError(
-            long elapsedTime, int expectedCount, int actualCount) {
+            long elapsedTime, int expectedCount, int actualCount, List<String> 
actualRecords) {
         return new AssertionError(
                 String.format(
                         "Timeout after waiting %d ms for Flink job results. "
                                 + "Expected %d records but only received %d. "
-                                + "This might indicate a job hang or 
insufficient data generation.",
-                        elapsedTime, expectedCount, actualCount));
+                                + "This might indicate a job hang or 
insufficient data generation.%n"
+                                + "Actual records received: %s",
+                        elapsedTime, expectedCount, actualCount, 
actualRecords));
     }
 
     private static boolean waitForNextWithTimeout(
             CloseableIterator<Row> iterator, long maxWaitTime) {
-        CompletableFuture<Boolean> future = 
CompletableFuture.supplyAsync(iterator::hasNext);
+        CompletableFuture<Boolean> future =
+                CompletableFuture.supplyAsync(iterator::hasNext, EXECUTOR);
         try {
             return future.get(maxWaitTime, TimeUnit.MILLISECONDS);
         } catch (TimeoutException e) {
+            // Timeout occurred - cancel the future and return false
+            // Note: The thread may still be blocked in hasNext(), but as a 
daemon thread,
+            // it won't prevent JVM shutdown
             future.cancel(true);
             return false;
         } catch (Exception e) {

Reply via email to