This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit 85bdc841e72e8d4ac41c1824139b3e9730a1ed2d Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Wed Apr 5 14:58:32 2023 -0700 [hotfix] Refactor test util method for collecting results of bounded query --- .../connectors/kafka/table/KafkaTableITCase.java | 18 +++--------------- .../connectors/kafka/table/KafkaTableTestUtils.java | 17 +++++++++++++++++ 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java index 515526f9..2674183f 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java @@ -29,7 +29,6 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.utils.EncodingUtils; import org.apache.flink.test.util.SuccessException; import org.apache.flink.types.Row; -import org.apache.flink.util.CloseableIterator; import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -58,6 +57,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectAllRows; import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectRows; import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.readLines; import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT; @@ -225,13 +225,7 @@ public class KafkaTableITCase extends KafkaTableTestBase { // ---------- Consume stream from Kafka ------------------- - List<Row> results = new ArrayList<>(); - try (CloseableIterator<Row> resultsItr = - tEnv.sqlQuery("SELECT * from kafka").execute().collect()) { - while (resultsItr.hasNext()) { - results.add(resultsItr.next()); - } - } + List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from kafka")); assertThat(results) .containsExactly(Row.of(1, 1102, "behavior 1"), Row.of(2, 1103, "behavior 2")); @@ -286,13 +280,7 @@ public class KafkaTableITCase extends KafkaTableTestBase { // ---------- Consume stream from Kafka ------------------- - List<Row> results = new ArrayList<>(); - try (CloseableIterator<Row> resultsItr = - tEnv.sqlQuery("SELECT * from kafka").execute().collect()) { - while (resultsItr.hasNext()) { - results.add(resultsItr.next()); - } - } + List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from kafka")); assertThat(results) .containsExactly( diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java index c1aada46..793d8da7 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java @@ -68,6 +68,23 @@ public class KafkaTableTestUtils { return collectedRows; } + /** + * Variant of {@link #collectRows(Table, int)} for bounded queries. This should not run + * indefinitely if there is a bounded number of returned rows. + */ + public static List<Row> collectAllRows(Table table) throws Exception { + final TableResult result = table.execute(); + + final List<Row> collectedRows = new ArrayList<>(); + try (CloseableIterator<Row> iterator = result.collect()) { + while (iterator.hasNext()) { + collectedRows.add(iterator.next()); + } + } + + return collectedRows; + } + public static List<String> readLines(String resource) throws IOException { final URL url = KafkaChangelogTableITCase.class.getClassLoader().getResource(resource); assertThat(url).isNotNull();
