This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 85bdc841e72e8d4ac41c1824139b3e9730a1ed2d
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Wed Apr 5 14:58:32 2023 -0700

    [hotfix] Refactor test util method for collecting results of bounded query
---
 .../connectors/kafka/table/KafkaTableITCase.java       | 18 +++---------------
 .../connectors/kafka/table/KafkaTableTestUtils.java    | 17 +++++++++++++++++
 2 files changed, 20 insertions(+), 15 deletions(-)

diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
index 515526f9..2674183f 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.utils.EncodingUtils;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
 
 import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -58,6 +57,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectAllRows;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectRows;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.readLines;
 import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT;
@@ -225,13 +225,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
 
         // ---------- Consume stream from Kafka -------------------
 
-        List<Row> results = new ArrayList<>();
-        try (CloseableIterator<Row> resultsItr =
-                tEnv.sqlQuery("SELECT * from kafka").execute().collect()) {
-            while (resultsItr.hasNext()) {
-                results.add(resultsItr.next());
-            }
-        }
+        List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from 
kafka"));
 
         assertThat(results)
                 .containsExactly(Row.of(1, 1102, "behavior 1"), Row.of(2, 
1103, "behavior 2"));
@@ -286,13 +280,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
 
         // ---------- Consume stream from Kafka -------------------
 
-        List<Row> results = new ArrayList<>();
-        try (CloseableIterator<Row> resultsItr =
-                tEnv.sqlQuery("SELECT * from kafka").execute().collect()) {
-            while (resultsItr.hasNext()) {
-                results.add(resultsItr.next());
-            }
-        }
+        List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from 
kafka"));
 
         assertThat(results)
                 .containsExactly(
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java
index c1aada46..793d8da7 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java
@@ -68,6 +68,23 @@ public class KafkaTableTestUtils {
         return collectedRows;
     }
 
+    /**
+     * Variant of {@link #collectRows(Table, int)} for bounded queries. This 
should not run
+     * indefinitely if there is a bounded number of returned rows.
+     */
+    public static List<Row> collectAllRows(Table table) throws Exception {
+        final TableResult result = table.execute();
+
+        final List<Row> collectedRows = new ArrayList<>();
+        try (CloseableIterator<Row> iterator = result.collect()) {
+            while (iterator.hasNext()) {
+                collectedRows.add(iterator.next());
+            }
+        }
+
+        return collectedRows;
+    }
+
     public static List<String> readLines(String resource) throws IOException {
         final URL url = 
KafkaChangelogTableITCase.class.getClassLoader().getResource(resource);
         assertThat(url).isNotNull();

Reply via email to