This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6505fe43d1b0cc3cf8543caf3fbdb1deae9697c5
Author: Aljoscha Krettek <[email protected]>
AuthorDate: Thu Jun 4 10:16:06 2020 +0200

    [FLINK-17260] Make number of expected messages explicit in 
StreamingKafkaITCase
    
    Before, it could happen that we time out and return early, which would
    lead to a test failure. Now, we would fail at the source of the problem.
---
 .../org/apache/flink/tests/util/kafka/KafkaResource.java  |  7 ++++---
 .../tests/util/kafka/LocalStandaloneKafkaResource.java    | 15 ++++++++++-----
 2 files changed, 14 insertions(+), 8 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
index 679d6c4..0157ad2 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
@@ -63,15 +63,16 @@ public interface KafkaResource extends ExternalResource {
        InetSocketAddress getZookeeperAddress();
 
        /**
-        * Reads up to {@code maxNumMessages} from the given topic.
+        * Reads {@code expectedNumMessages} from the given topic. If we can't 
read the expected number
+        * of messages we throw an exception.
         *
-        * @param maxNumMessages maximum number of messages that should be read
+        * @param expectedNumMessages expected number of messages that should 
be read
         * @param groupId group id to identify consumer
         * @param topic topic name
         * @return read messages
         * @throws IOException
         */
-       List<String> readMessage(int maxNumMessages, String groupId, String 
topic) throws IOException;
+       List<String> readMessage(int expectedNumMessages, String groupId, 
String topic) throws IOException;
 
        /**
         * Modifies the number of partitions for the given topic.
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
index 405690f..a651d12 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
@@ -260,8 +260,9 @@ public class LocalStandaloneKafkaResource implements 
KafkaResource {
        }
 
        @Override
-       public List<String> readMessage(int maxNumMessages, String groupId, 
String topic) throws IOException {
-               final List<String> messages = Collections.synchronizedList(new 
ArrayList<>(maxNumMessages));
+       public List<String> readMessage(int expectedNumMessages, String 
groupId, String topic) throws IOException {
+               final List<String> messages = Collections.synchronizedList(new 
ArrayList<>(
+                               expectedNumMessages));
 
                try (final AutoClosableProcess kafka = AutoClosableProcess
                        .create(kafkaDir.resolve(Paths.get("bin", 
"kafka-console-consumer.sh")).toString(),
@@ -269,7 +270,7 @@ public class LocalStandaloneKafkaResource implements 
KafkaResource {
                                KAFKA_ADDRESS,
                                "--from-beginning",
                                "--max-messages",
-                               String.valueOf(maxNumMessages),
+                               String.valueOf(expectedNumMessages),
                                "--topic",
                                topic,
                                "--consumer-property",
@@ -278,15 +279,19 @@ public class LocalStandaloneKafkaResource implements 
KafkaResource {
                        .runNonBlocking()) {
 
                        final Deadline deadline = 
Deadline.fromNow(Duration.ofSeconds(30));
-                       while (deadline.hasTimeLeft() && messages.size() < 
maxNumMessages) {
+                       while (deadline.hasTimeLeft() && messages.size() < 
expectedNumMessages) {
                                try {
-                                       LOG.info("Waiting for messages. 
Received {}/{}.", messages.size(), maxNumMessages);
+                                       LOG.info("Waiting for messages. 
Received {}/{}.", messages.size(),
+                                                       expectedNumMessages);
                                        Thread.sleep(500);
                                } catch (InterruptedException e) {
                                        Thread.currentThread().interrupt();
                                        break;
                                }
                        }
+                       if (messages.size() != expectedNumMessages) {
+                               throw new IOException("Could not read expected 
number of messages.");
+                       }
                        return messages;
                }
        }

Reply via email to