This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a48f6dd2 [test/flink] Collect flink rows should always be with 
timeout (#1622)
3a48f6dd2 is described below

commit 3a48f6dd22bed4e6bf3ebe009f34750228aa0cd7
Author: yuxia Luo <luoyu...@alumni.sjtu.edu.cn>
AuthorDate: Thu Sep 4 10:47:32 2025 +0800

    [test/flink] Collect flink rows should always be with timeout (#1622)
---
 .../fluss/flink/sink/FlinkTableSinkITCase.java     |   7 +-
 .../flink/source/FlinkTableSourceBatchITCase.java  |  15 +--
 .../fluss/flink/source/FlinkTableSourceITCase.java | 126 ++++++---------------
 .../source/testutils/FlinkRowAssertionsUtils.java  |  83 ++++++++++----
 .../testutils/FlinkRowAssertionsUtilsTest.java     |  76 +++++++++++++
 .../apache/fluss/flink/utils/FlinkTestBase.java    |  14 ---
 6 files changed, 179 insertions(+), 142 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index 048ba42bc..54fb906ae 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -66,6 +66,7 @@ import java.util.stream.Stream;
 
 import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
 import static org.apache.fluss.flink.utils.FlinkTestBase.waitUntilPartitions;
 import static 
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -210,11 +211,7 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
         List<String> expectedRows =
                 
expectedGroups.stream().flatMap(List::stream).collect(Collectors.toList());
 
-        List<String> actual = new ArrayList<>(expectedRows.size());
-        for (int i = 0; i < expectedRows.size(); i++) {
-            actual.add(rowIter.next().toString());
-        }
-        rowIter.close();
+        List<String> actual = collectRowsWithTimeout(rowIter, 
expectedRows.size());
         assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRows);
 
         // check data with the same bucket key should be read in sequence.
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
index a8353491e..7f3894f96 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
@@ -45,6 +45,7 @@ import java.util.Map;
 
 import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
 import static 
org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
 import static org.apache.fluss.testutils.DataTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -189,7 +190,7 @@ abstract class FlinkTableSourceBatchITCase extends 
FlinkTestBase {
         // normal scan
         String query = String.format("SELECT * FROM %s limit 2", tableName);
         CloseableIterator<Row> iterRows = tEnv.executeSql(query).collect();
-        List<String> collected = assertAndCollectRecords(iterRows, 2);
+        List<String> collected = collectRowsWithTimeout(iterRows, 2);
         List<String> expected =
                 Arrays.asList(
                         "+I[1, address1, name1]",
@@ -203,14 +204,14 @@ abstract class FlinkTableSourceBatchITCase extends 
FlinkTestBase {
         // limit which is larger than all the data.
         query = String.format("SELECT * FROM %s limit 10", tableName);
         iterRows = tEnv.executeSql(query).collect();
-        collected = assertAndCollectRecords(iterRows, 5);
+        collected = collectRowsWithTimeout(iterRows, 5);
         assertThat(collected).isSubsetOf(expected);
         assertThat(collected).hasSize(5);
 
         // projection scan
         query = String.format("SELECT id, name FROM %s limit 3", tableName);
         iterRows = tEnv.executeSql(query).collect();
-        collected = assertAndCollectRecords(iterRows, 3);
+        collected = collectRowsWithTimeout(iterRows, 3);
         expected =
                 Arrays.asList(
                         "+I[1, name1]",
@@ -237,7 +238,7 @@ abstract class FlinkTableSourceBatchITCase extends 
FlinkTestBase {
         // normal scan
         String query = String.format("SELECT * FROM %s limit 2", tableName);
         CloseableIterator<Row> iterRows = tEnv.executeSql(query).collect();
-        List<String> collected = assertAndCollectRecords(iterRows, 2);
+        List<String> collected = collectRowsWithTimeout(iterRows, 2);
         List<String> expected =
                 Arrays.asList(
                         "+I[1, address1, name1]",
@@ -251,7 +252,7 @@ abstract class FlinkTableSourceBatchITCase extends 
FlinkTestBase {
         // projection scan
         query = String.format("SELECT id, name FROM %s limit 3", tableName);
         iterRows = tEnv.executeSql(query).collect();
-        collected = assertAndCollectRecords(iterRows, 3);
+        collected = collectRowsWithTimeout(iterRows, 3);
         expected =
                 Arrays.asList(
                         "+I[1, name1]",
@@ -266,7 +267,7 @@ abstract class FlinkTableSourceBatchITCase extends 
FlinkTestBase {
         String partitionTable = preparePartitionedLogTable();
         query = String.format("SELECT id, name FROM %s limit 3", 
partitionTable);
         iterRows = tEnv.executeSql(query).collect();
-        collected = assertAndCollectRecords(iterRows, 3);
+        collected = collectRowsWithTimeout(iterRows, 3);
         assertThat(collected).isSubsetOf(expected);
         assertThat(collected).hasSize(3);
     }
@@ -286,7 +287,7 @@ abstract class FlinkTableSourceBatchITCase extends 
FlinkTestBase {
                                         + "fields=[count1$0])",
                                 tableName));
         CloseableIterator<Row> iterRows = tEnv.executeSql(query).collect();
-        List<String> collected = assertAndCollectRecords(iterRows, 1);
+        List<String> collected = collectRowsWithTimeout(iterRows, 1);
         List<String> expected = 
Collections.singletonList(String.format("+I[%s]", expectedRows));
         assertThat(collected).isEqualTo(expected);
 
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
index 24757cb05..c279c15b3 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java
@@ -70,7 +70,9 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertQueryResultExactOrder;
 import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
 import static org.apache.fluss.flink.utils.FlinkTestBase.waitUntilPartitions;
 import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
 import static org.apache.fluss.flink.utils.FlinkTestBase.writeRowsToPartition;
@@ -205,16 +207,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
         writeRows(conn, tablePath, rows, true);
 
         List<String> expected = Arrays.asList("+I[1, v1]", "+I[2, v2]", "+I[3, 
v3]");
-        try (org.apache.flink.util.CloseableIterator<Row> rowIter =
-                tEnv.executeSql("select * from non_pk_table_test").collect()) {
-            int expectRecords = expected.size();
-            List<String> actual = new ArrayList<>(expectRecords);
-            for (int i = 0; i < expectRecords; i++) {
-                String row = rowIter.next().toString();
-                actual.add(row);
-            }
-            assertThat(actual).containsExactlyElementsOf(expected);
-        }
+        assertQueryResultExactOrder(tEnv, "select * from non_pk_table_test", 
expected);
     }
 
     @ParameterizedTest
@@ -262,17 +255,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                         "+I[v8, 8000, 800]",
                         "+I[v9, 9000, 900]",
                         "+I[v10, 10000, 1000]");
-        try (org.apache.flink.util.CloseableIterator<Row> rowIter =
-                tEnv.executeSql(query).collect()) {
-            int expectRecords = expected.size();
-            List<String> actual = new ArrayList<>(expectRecords);
-            for (int i = 0; i < expectRecords; i++) {
-                Row r = rowIter.next();
-                String row = r.toString();
-                actual.add(row);
-            }
-            assertThat(actual).containsExactlyElementsOf(expected);
-        }
+        assertQueryResultExactOrder(tEnv, query, expected);
     }
 
     @ParameterizedTest
@@ -331,22 +314,15 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                         "+I[v8, 8, 800]",
                         "+I[v9, 9, 900]",
                         "+I[v10, 10, 1000]");
-        try (org.apache.flink.util.CloseableIterator<Row> rowIter =
-                tEnv.executeSql(query).collect()) {
-            int expectRecords = expected.size();
-            List<String> actual = new ArrayList<>(expectRecords);
-            if (testPkLog) {
-                // delay the write after collect job start,
-                // to make sure reading from log instead of snapshot
-                writeRows(conn, tablePath, rows, false);
-            }
-            for (int i = 0; i < expectRecords; i++) {
-                Row r = rowIter.next();
-                String row = r.toString();
-                actual.add(row);
-            }
-            assertThat(actual).containsExactlyElementsOf(expected);
+        org.apache.flink.util.CloseableIterator<Row> rowIter = 
tEnv.executeSql(query).collect();
+        if (testPkLog) {
+            // delay the write after collect job start,
+            // to make sure reading from log instead of snapshot
+            writeRows(conn, tablePath, rows, false);
         }
+        int expectRecords = expected.size();
+        List<String> actual = collectRowsWithTimeout(rowIter, expectRecords);
+        assertThat(actual).containsExactlyElementsOf(expected);
     }
 
     @Test
@@ -451,12 +427,12 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                         "+I[8, v8, 800, 8000]",
                         "+I[9, v9, 900, 9000]",
                         "+I[10, v10, 1000, 10000]");
-        assertQueryResult(query, expected);
+        assertQueryResultExactOrder(tEnv, query, expected);
 
         // 2. read kv table with scan.startup.mode='earliest'
         options = " /*+ OPTIONS('scan.startup.mode' = 'earliest') */";
         query = "select a, b, c, d from " + tableName + options;
-        assertQueryResult(query, expected);
+        assertQueryResultExactOrder(tEnv, query, expected);
 
         // 3. read log table with scan.startup.mode='timestamp'
         expected =
@@ -471,7 +447,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                         " /*+ OPTIONS('scan.startup.mode' = 'timestamp', 
'scan.startup.timestamp' ='%d') */",
                         timestamp);
         query = "select a, b, c, d from " + tableName + options;
-        assertQueryResult(query, expected);
+        assertQueryResultExactOrder(tEnv, query, expected);
     }
 
     @Test
@@ -501,20 +477,13 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                         "-U[2, v2]",
                         "+U[2, v22]",
                         "+I[4, v4]");
-        try (org.apache.flink.util.CloseableIterator<Row> rowIter =
-                tEnv.executeSql(query).collect()) {
-            int expectRecords = 8;
-            List<String> actual = new ArrayList<>(expectRecords);
-            // delay to write after collect job start, to make sure reading 
from log instead of
-            // snapshot
-            writeRows(conn, tablePath, rows2, false);
-            for (int i = 0; i < expectRecords; i++) {
-                Row r = rowIter.next();
-                String row = r.toString();
-                actual.add(row);
-            }
-            assertThat(actual).containsExactlyElementsOf(expected);
-        }
+        org.apache.flink.util.CloseableIterator<Row> rowIter = 
tEnv.executeSql(query).collect();
+        int expectRecords = 8;
+        // delay to write after collect job start, to make sure reading from 
log instead of
+        // snapshot
+        writeRows(conn, tablePath, rows2, false);
+        List<String> actual = collectRowsWithTimeout(rowIter, expectRecords);
+        assertThat(actual).containsExactlyElementsOf(expected);
     }
 
     private static Stream<Arguments> readKvTableScanStartupModeArgs() {
@@ -595,17 +564,7 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                         "-U[2, v2]",
                         "+U[2, v22]",
                         "+I[4, v4]");
-        try (org.apache.flink.util.CloseableIterator<Row> rowIter =
-                tEnv.executeSql(query).collect()) {
-            int expectRecords = 10;
-            List<String> actual = new ArrayList<>(expectRecords);
-            for (int i = 0; i < expectRecords; i++) {
-                Row r = rowIter.next();
-                String row = r.toString();
-                actual.add(row);
-            }
-            assertThat(actual).containsExactlyElementsOf(expected);
-        }
+        assertQueryResultExactOrder(tEnv, query, expected);
     }
 
     @ParameterizedTest
@@ -687,25 +646,20 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                                 "the fetch timestamp %s is larger than the 
current timestamp",
                                 currentTimeMillis + 
Duration.ofMinutes(5).toMillis()));
 
-        try (org.apache.flink.util.CloseableIterator<Row> rowIter =
+        org.apache.flink.util.CloseableIterator<Row> rowIter =
                 tEnv.executeSql(
                                 String.format(
                                         "select * from timestamp_table /*+ 
OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '%s') */ 
",
                                         currentTimeMillis))
-                        .collect()) {
-            CLOCK.advanceTime(Duration.ofMillis(100L));
-            // write second batch record.
-            rows = Arrays.asList(row(4, "v4"), row(5, "v5"), row(6, "v6"));
-            writeRows(conn, tablePath, rows, true);
-            List<String> expected = Arrays.asList("+I[4, v4]", "+I[5, v5]", 
"+I[6, v6]");
-            int expectRecords = expected.size();
-            List<String> actual = new ArrayList<>(expectRecords);
-            for (int i = 0; i < expectRecords; i++) {
-                String row = rowIter.next().toString();
-                actual.add(row);
-            }
-            assertThat(actual).containsExactlyElementsOf(expected);
-        }
+                        .collect();
+        CLOCK.advanceTime(Duration.ofMillis(100L));
+        // write second batch record.
+        rows = Arrays.asList(row(4, "v4"), row(5, "v5"), row(6, "v6"));
+        writeRows(conn, tablePath, rows, true);
+        List<String> expected = Arrays.asList("+I[4, v4]", "+I[5, v5]", "+I[6, 
v6]");
+        int expectRecords = expected.size();
+        List<String> actual = collectRowsWithTimeout(rowIter, expectRecords);
+        assertThat(actual).containsExactlyElementsOf(expected);
     }
 
     // 
-------------------------------------------------------------------------------------
@@ -1319,20 +1273,6 @@ abstract class FlinkTableSourceITCase extends 
AbstractTestBase {
                 "Fail to wait until all bucket finish snapshot");
     }
 
-    private void assertQueryResult(String query, List<String> expected) throws 
Exception {
-        try (org.apache.flink.util.CloseableIterator<Row> rowIter =
-                tEnv.executeSql(query).collect()) {
-            int expectRecords = expected.size();
-            List<String> actual = new ArrayList<>(expectRecords);
-            for (int i = 0; i < expectRecords; i++) {
-                Row r = rowIter.next();
-                String row = r.toString();
-                actual.add(row);
-            }
-            assertThat(actual).containsExactlyElementsOf(expected);
-        }
-    }
-
     private GenericRow rowWithPartition(Object[] values, @Nullable String 
partition) {
         if (partition == null) {
             return row(values);
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
index 092220244..87c7e14c1 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
@@ -21,8 +21,12 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -62,29 +66,44 @@ public class FlinkRowAssertionsUtils {
         }
     }
 
-    private static List<String> collectRowsWithTimeout(
+    public static List<String> collectRowsWithTimeout(
+            CloseableIterator<Row> iterator, int expectedCount) {
+        return collectRowsWithTimeout(iterator, expectedCount, true);
+    }
+
+    public static List<String> collectRowsWithTimeout(
             CloseableIterator<Row> iterator, int expectedCount, boolean 
closeIterator) {
-        List<String> actual = new ArrayList<>();
-        long startTime = System.currentTimeMillis();
-        int maxWaitTime = 60000; // 60 seconds
+        if (expectedCount < 0) {
+            throw new IllegalArgumentException(
+                    "Expected count must be non-negative: " + expectedCount);
+        }
+        if (iterator == null) {
+            throw new IllegalArgumentException("Iterator cannot be null");
+        }
+        return collectRowsWithTimeout(
+                iterator,
+                expectedCount,
+                closeIterator,
+                // max wait 1 minute
+                Duration.ofMinutes(1));
+    }
 
+    protected static List<String> collectRowsWithTimeout(
+            CloseableIterator<Row> iterator,
+            int expectedCount,
+            boolean closeIterator,
+            Duration maxWaitTime) {
+        List<String> actual = new ArrayList<>();
+        long startTimeMs = System.currentTimeMillis();
+        long deadlineTimeMs = startTimeMs + maxWaitTime.toMillis();
         try {
             for (int i = 0; i < expectedCount; i++) {
                 // Wait for next record with timeout
-                while (!iterator.hasNext()) {
-                    long elapsedTime = System.currentTimeMillis() - startTime;
-                    if (elapsedTime > maxWaitTime) {
-                        // Timeout reached - provide detailed failure info
-                        throw new AssertionError(
-                                String.format(
-                                        "Timeout after waiting %d ms for Flink 
job results. "
-                                                + "Expected %d records but 
only received %d. "
-                                                + "This might indicate a job 
hang or insufficient data generation.",
-                                        elapsedTime, expectedCount, 
actual.size()));
-                    }
-                    Thread.sleep(10);
+                if (!waitForNextWithTimeout(
+                        iterator, deadlineTimeMs - 
System.currentTimeMillis())) {
+                    throw timeoutError(
+                            System.currentTimeMillis() - startTimeMs, 
expectedCount, actual.size());
                 }
-
                 if (iterator.hasNext()) {
                     actual.add(iterator.next().toString());
                 } else {
@@ -92,12 +111,7 @@ public class FlinkRowAssertionsUtils {
                     break;
                 }
             }
-
             return actual;
-
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException("Test interrupted while waiting for 
Flink job results", e);
         } catch (AssertionError e) {
             // Re-throw our timeout assertion errors
             throw e;
@@ -107,7 +121,7 @@ public class FlinkRowAssertionsUtils {
                 // Job completed normally - return what we have
                 return actual;
             } else {
-                long elapsedTime = System.currentTimeMillis() - startTime;
+                long elapsedTime = System.currentTimeMillis() - startTimeMs;
                 throw new RuntimeException(
                         String.format(
                                 "Unexpected error after waiting %d ms for 
Flink job results. "
@@ -126,6 +140,29 @@ public class FlinkRowAssertionsUtils {
         }
     }
 
+    private static AssertionError timeoutError(
+            long elapsedTime, int expectedCount, int actualCount) {
+        return new AssertionError(
+                String.format(
+                        "Timeout after waiting %d ms for Flink job results. "
+                                + "Expected %d records but only received %d. "
+                                + "This might indicate a job hang or 
insufficient data generation.",
+                        elapsedTime, expectedCount, actualCount));
+    }
+
+    private static boolean waitForNextWithTimeout(
+            CloseableIterator<Row> iterator, long maxWaitTime) {
+        CompletableFuture<Boolean> future = 
CompletableFuture.supplyAsync(iterator::hasNext);
+        try {
+            return future.get(maxWaitTime, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+            future.cancel(true);
+            return false;
+        } catch (Exception e) {
+            throw new RuntimeException("Error checking iterator.hasNext()", e);
+        }
+    }
+
     private static boolean isMiniClusterCompletionException(Exception e) {
         return e.getCause() instanceof IllegalStateException
                 && e.getMessage() != null
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtilsTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtilsTest.java
new file mode 100644
index 000000000..0086adeb0
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtilsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source.testutils;
+
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
+
+/** Test for {@link FlinkRowAssertionsUtils}. */
+class FlinkRowAssertionsUtilsTest {
+
+    @Test
+    void testCollectRowsWithTimeout() {
+        // should throw AssertionError if wait rows timeout
+        assertThatThrownBy(
+                        () ->
+                                collectRowsWithTimeout(
+                                        createBlockingHasNextIterator(),
+                                        10,
+                                        true,
+                                        Duration.ofSeconds(1)))
+                .isInstanceOf(AssertionError.class)
+                .hasMessageContaining("Timeout after waiting")
+                .hasMessageContaining(
+                        "ms for Flink job results. Expected 10 records but 
only received 0.");
+    }
+
+    CloseableIterator<Row> createBlockingHasNextIterator() {
+        return new CloseableIterator<Row>() {
+            @Override
+            public void close() throws Exception {}
+
+            @SuppressWarnings("all")
+            @Override
+            public boolean hasNext() {
+                // to mock blocking
+                try {
+                    while (true) {
+                        Thread.sleep(1_000);
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    fail("Thread sleeping for blocking hasNext() was 
interrupted.");
+                }
+                return true;
+            }
+
+            @Override
+            public Row next() {
+                return null;
+            }
+        };
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
index 914ad2239..d4ac97bf0 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java
@@ -43,7 +43,6 @@ import org.apache.fluss.server.zk.data.TableAssignment;
 import org.apache.fluss.types.DataTypes;
 
 import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.types.Row;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
@@ -61,7 +60,6 @@ import java.util.Set;
 import static 
org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment;
 import static org.apache.fluss.testutils.DataTestUtils.row;
 import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
-import static org.assertj.core.api.Assertions.assertThat;
 
 /** A base class for testing with Fluss cluster prepared. */
 public class FlinkTestBase extends AbstractTestBase {
@@ -182,18 +180,6 @@ public class FlinkTestBase extends AbstractTestBase {
         return admin.getTableInfo(tablePath).get().getTableId();
     }
 
-    public static List<String> assertAndCollectRecords(
-            org.apache.flink.util.CloseableIterator<Row> iterator, int 
expectedNum)
-            throws Exception {
-        List<String> actual = new ArrayList<>(expectedNum);
-        for (int i = 0; i < expectedNum; i++) {
-            actual.add(iterator.next().toString());
-        }
-        assertThat(iterator.hasNext()).isFalse();
-        iterator.close();
-        return actual;
-    }
-
     protected void waitUntilSnapshot(long tableId, long snapshotId) {
         for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
             TableBucket tableBucket = new TableBucket(tableId, i);

Reply via email to