This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6505fe43d1b0cc3cf8543caf3fbdb1deae9697c5 Author: Aljoscha Krettek <[email protected]> AuthorDate: Thu Jun 4 10:16:06 2020 +0200 [FLINK-17260] Make number of expected messages explicit in StreamingKafkaITCase Before, it could happen that we time out and return early, which would lead to a test failure. Now, we would fail at the source of the problem. --- .../org/apache/flink/tests/util/kafka/KafkaResource.java | 7 ++++--- .../tests/util/kafka/LocalStandaloneKafkaResource.java | 15 ++++++++++----- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java index 679d6c4..0157ad2 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java @@ -63,15 +63,16 @@ public interface KafkaResource extends ExternalResource { InetSocketAddress getZookeeperAddress(); /** - * Reads up to {@code maxNumMessages} from the given topic. + * Reads {@code expectedNumMessages} from the given topic. If we can't read the expected number + * of messages we throw an exception. * - * @param maxNumMessages maximum number of messages that should be read + * @param expectedNumMessages expected number of messages that should be read * @param groupId group id to identify consumer * @param topic topic name * @return read messages * @throws IOException */ - List<String> readMessage(int maxNumMessages, String groupId, String topic) throws IOException; + List<String> readMessage(int expectedNumMessages, String groupId, String topic) throws IOException; /** * Modifies the number of partitions for the given topic. diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java index 405690f..a651d12 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java @@ -260,8 +260,9 @@ public class LocalStandaloneKafkaResource implements KafkaResource { } @Override - public List<String> readMessage(int maxNumMessages, String groupId, String topic) throws IOException { - final List<String> messages = Collections.synchronizedList(new ArrayList<>(maxNumMessages)); + public List<String> readMessage(int expectedNumMessages, String groupId, String topic) throws IOException { + final List<String> messages = Collections.synchronizedList(new ArrayList<>( + expectedNumMessages)); try (final AutoClosableProcess kafka = AutoClosableProcess .create(kafkaDir.resolve(Paths.get("bin", "kafka-console-consumer.sh")).toString(), @@ -269,7 +270,7 @@ public class LocalStandaloneKafkaResource implements KafkaResource { KAFKA_ADDRESS, "--from-beginning", "--max-messages", - String.valueOf(maxNumMessages), + String.valueOf(expectedNumMessages), "--topic", topic, "--consumer-property", @@ -278,15 +279,19 @@ public class LocalStandaloneKafkaResource implements KafkaResource { .runNonBlocking()) { final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30)); - while (deadline.hasTimeLeft() && messages.size() < maxNumMessages) { + while (deadline.hasTimeLeft() && messages.size() < expectedNumMessages) { try { - LOG.info("Waiting for messages. Received {}/{}.", messages.size(), maxNumMessages); + LOG.info("Waiting for messages. Received {}/{}.", messages.size(), + expectedNumMessages); Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } + if (messages.size() != expectedNumMessages) { + throw new IOException("Could not read expected number of messages."); + } return messages; } }
