This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e8638b614c438e879435819d1a993ec65e39fd44
Author: Fabian Paul <[email protected]>
AuthorDate: Fri Oct 1 15:06:09 2021 +0200

    [FLINK-24405][tests] Harden kafka tests based on KafkaTestBase
---
 .../flink/connector/kafka/sink/KafkaUtil.java      | 40 +++++++++----------
 .../connectors/kafka/FlinkKafkaProducerITCase.java | 19 +++++----
 .../connectors/kafka/KafkaMigrationTestBase.java   |  2 +-
 .../connectors/kafka/KafkaProducerTestBase.java    |  3 +-
 .../streaming/connectors/kafka/KafkaTestBase.java  | 45 ++++++----------------
 .../connectors/kafka/KafkaTestEnvironment.java     |  2 +-
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 36 +++--------------
 .../kafka/shuffle/KafkaShuffleITCase.java          | 21 ++++++----
 8 files changed, 62 insertions(+), 106 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaUtil.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaUtil.java
index d460919..5aa5a67 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaUtil.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaUtil.java
@@ -22,7 +22,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.slf4j.Logger;
@@ -30,11 +29,10 @@ import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.function.Function;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /** Collection of methods to interact with a Kafka cluster. */
@@ -88,34 +86,33 @@ public class KafkaUtil {
         consumerConfig.put("key.deserializer", 
ByteArrayDeserializer.class.getName());
         consumerConfig.put("value.deserializer", 
ByteArrayDeserializer.class.getName());
         try (KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerConfig)) {
-            Map<Integer, TopicPartition> assignments = 
getAllPartitions(consumer, topic);
-            Map<TopicPartition, Long> endOffsets = 
consumer.endOffsets(assignments.values());
-            consumer.assign(assignments.values());
-            consumer.seekToBeginning(assignments.values());
+            Set<TopicPartition> topicPartitions = getAllPartitions(consumer, 
topic);
+            Map<TopicPartition, Long> endOffsets = 
consumer.endOffsets(topicPartitions);
+            consumer.assign(topicPartitions);
+            consumer.seekToBeginning(topicPartitions);
 
             final List<ConsumerRecord<byte[], byte[]>> consumerRecords = new 
ArrayList<>();
-            while (!assignments.isEmpty()) {
-                consumer.assign(assignments.values());
+            while (!topicPartitions.isEmpty()) {
                 ConsumerRecords<byte[], byte[]> records = 
consumer.poll(CONSUMER_POLL_DURATION);
-                LOG.info("Fetched {} records from topic {}.", records.count(), 
topic);
+                LOG.debug("Fetched {} records from topic {}.", 
records.count(), topic);
 
                 // Remove partitions from polling which have reached its end.
-                final Iterator<Map.Entry<Integer, TopicPartition>> 
assignmentIterator =
-                        assignments.entrySet().iterator();
-                while (assignmentIterator.hasNext()) {
-                    final Map.Entry<Integer, TopicPartition> assignment = 
assignmentIterator.next();
-                    final TopicPartition topicPartition = 
assignment.getValue();
+                final List<TopicPartition> finishedPartitions = new 
ArrayList<>();
+                for (final TopicPartition topicPartition : topicPartitions) {
                     final long position = consumer.position(topicPartition);
                     final long endOffset = endOffsets.get(topicPartition);
-                    LOG.info(
+                    LOG.debug(
                             "Endoffset {} and current position {} for 
partition {}",
                             endOffset,
                             position,
-                            assignment.getKey());
+                            topicPartition.partition());
                     if (endOffset - position > 0) {
                         continue;
                     }
-                    assignmentIterator.remove();
+                    finishedPartitions.add(topicPartition);
+                }
+                if (topicPartitions.removeAll(finishedPartitions)) {
+                    consumer.assign(topicPartitions);
                 }
                 for (ConsumerRecord<byte[], byte[]> r : records) {
                     consumerRecords.add(r);
@@ -125,11 +122,10 @@ public class KafkaUtil {
         }
     }
 
-    private static Map<Integer, TopicPartition> getAllPartitions(
+    private static Set<TopicPartition> getAllPartitions(
             KafkaConsumer<byte[], byte[]> consumer, String topic) {
-        final List<PartitionInfo> partitionInfos = 
consumer.partitionsFor(topic);
-        return partitionInfos.stream()
+        return consumer.partitionsFor(topic).stream()
                 .map(info -> new TopicPartition(info.topic(), 
info.partition()))
-                .collect(Collectors.toMap(TopicPartition::partition, 
Function.identity()));
+                .collect(Collectors.toSet());
     }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
index 022fbe2..3194d49 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
@@ -156,7 +156,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                 testHarness2.open();
             }
 
-            assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42));
+            assertExactlyOnceForTopic(createProperties(), topic, 
Arrays.asList(42));
             deleteTestTopic(topic);
         } catch (Exception ex) {
             // testHarness1 will be fenced off after creating and closing 
testHarness2
@@ -202,7 +202,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
         testHarness.initializeState(snapshot);
         testHarness.close();
 
-        assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43));
+        assertExactlyOnceForTopic(createProperties(), topic, Arrays.asList(42, 
43));
 
         deleteTestTopic(topic);
         checkProducerLeak();
@@ -250,7 +250,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
         // - aborted transactions with records 44 and 45
         // - committed transaction with record 46
         // - pending transaction with record 47
-        assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43, 46));
+        assertExactlyOnceForTopic(createProperties(), topic, Arrays.asList(42, 
43, 46));
 
         try {
             testHarness1.close();
@@ -313,7 +313,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
         // now we should have:
         // - records 42 and 43 in committed transactions
         // - aborted transactions with records 44 and 45
-        assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43));
+        assertExactlyOnceForTopic(createProperties(), topic, Arrays.asList(42, 
43));
         deleteTestTopic(topic);
         checkProducerLeak();
     }
@@ -369,7 +369,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
         // - records 42, 43, 44 and 45 in aborted transactions
         // - committed transaction with record 46
         // - pending transaction with record 47
-        assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(46));
+        assertExactlyOnceForTopic(createProperties(), topic, 
Arrays.asList(46));
 
         postScaleDownOperator1.close();
         // ignore ProducerFencedExceptions, because postScaleDownOperator1 
could reuse transactional
@@ -454,7 +454,6 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
         assertExactlyOnceForTopic(
                 createProperties(),
                 topic,
-                0,
                 IntStream.range(0, parallelism1 + parallelism2 + parallelism3)
                         .boxed()
                         .collect(Collectors.toList()));
@@ -548,7 +547,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                 checkpoint0); // recover state 0 - producerA recover and 
commit txn 0
         testHarness.close();
 
-        assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42));
+        assertExactlyOnceForTopic(createProperties(), topic, 
Arrays.asList(42));
 
         deleteTestTopic(topic);
         checkProducerLeak();
@@ -584,7 +583,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                 topic,
                 FlinkKafkaProducer.Semantic.AT_LEAST_ONCE,
                 FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
-        assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43, 44, 45));
+        assertExactlyOnceForTopic(createProperties(), topic, Arrays.asList(42, 
43, 44, 45));
         deleteTestTopic(topic);
     }
 
@@ -595,7 +594,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                 topic,
                 FlinkKafkaProducer.Semantic.EXACTLY_ONCE,
                 FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
-        assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43, 45, 46, 47));
+        assertExactlyOnceForTopic(createProperties(), topic, Arrays.asList(42, 
43, 45, 46, 47));
         deleteTestTopic(topic);
     }
 
@@ -717,7 +716,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
             testHarness2.processElement(46, 6);
         }
 
-        assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 44));
+        assertExactlyOnceForTopic(createProperties(), topic, Arrays.asList(42, 
44));
         checkProducerLeak();
     }
 
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java
index aa52b95..9549b3d 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java
@@ -151,7 +151,7 @@ public abstract class KafkaMigrationTestBase extends 
KafkaTestBase {
                 // - transaction 43 aborted
                 // - committed transaction 44
                 // - transaction 45 pending
-                assertExactlyOnceForTopic(createProperties(), TOPIC, 0, 
Arrays.asList(42, 44));
+                assertExactlyOnceForTopic(createProperties(), TOPIC, 
Arrays.asList(42, 44));
             }
         } finally {
             shutdownClusters();
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 2267cc3..155ea7e 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -418,8 +418,7 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBaseWithFlink {
 
         for (int i = 0; i < sinksCount; i++) {
             // assert that before failure we successfully snapshot/flushed all 
expected elements
-            assertExactlyOnceForTopic(
-                    properties, topic + i, partition, expectedElements, 
KAFKA_READ_TIMEOUT);
+            assertExactlyOnceForTopic(properties, topic + i, expectedElements);
             deleteTestTopic(topic + i);
         }
     }
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index da41553..9a38990 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -42,6 +42,7 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -280,7 +281,7 @@ public abstract class KafkaTestBase extends TestLogger {
 
             // query kafka for new records ...
             Collection<ConsumerRecord<Integer, Integer>> records =
-                    kafkaServer.getAllRecordsFromTopic(properties, topic, 
partition, 100);
+                    kafkaServer.getAllRecordsFromTopic(properties, topic);
 
             for (ConsumerRecord<Integer, Integer> record : records) {
                 actualElements.add(record.value());
@@ -299,23 +300,8 @@ public abstract class KafkaTestBase extends TestLogger {
     }
 
     public void assertExactlyOnceForTopic(
-            Properties properties, String topic, int partition, List<Integer> 
expectedElements) {
-        assertExactlyOnceForTopic(properties, topic, partition, 
expectedElements, 30_000L);
-    }
-
-    /**
-     * We manually handle the timeout instead of using JUnit's timeout to 
return failure instead of
-     * timeout error. After timeout we assume that there are missing records 
and there is a bug, not
-     * that the test has run out of time.
-     */
-    public void assertExactlyOnceForTopic(
-            Properties properties,
-            String topic,
-            int partition,
-            List<Integer> expectedElements,
-            long timeoutMillis) {
+            Properties properties, String topic, List<Integer> 
expectedElements) {
 
-        long startMillis = System.currentTimeMillis();
         List<Integer> actualElements = new ArrayList<>();
 
         Properties consumerProperties = new Properties();
@@ -326,24 +312,17 @@ public abstract class KafkaTestBase extends TestLogger {
                 "value.deserializer", 
"org.apache.kafka.common.serialization.IntegerDeserializer");
         consumerProperties.put("isolation.level", "read_committed");
 
-        // until we timeout...
-        while (System.currentTimeMillis() < startMillis + timeoutMillis) {
-            // query kafka for new records ...
-            Collection<ConsumerRecord<Integer, Integer>> records =
-                    kafkaServer.getAllRecordsFromTopic(consumerProperties, 
topic, partition, 1000);
+        // query kafka for new records ...
+        Collection<ConsumerRecord<byte[], byte[]>> records =
+                kafkaServer.getAllRecordsFromTopic(consumerProperties, topic);
 
-            for (ConsumerRecord<Integer, Integer> record : records) {
-                actualElements.add(record.value());
-            }
+        for (ConsumerRecord<byte[], byte[]> record : records) {
+            actualElements.add(ByteBuffer.wrap(record.value()).getInt());
+        }
 
-            // succeed if we got all expectedElements
-            if (actualElements.equals(expectedElements)) {
-                return;
-            }
-            // fail early if we already have too many elements
-            if (actualElements.size() > expectedElements.size()) {
-                break;
-            }
+        // succeed if we got all expectedElements
+        if (actualElements.equals(expectedElements)) {
+            return;
         }
 
         fail(
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index e4e3c6b..c6fc932 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -170,7 +170,7 @@ public abstract class KafkaTestEnvironment {
             List<String> topics, KafkaDeserializationSchema<T> readSchema, 
Properties props);
 
     public abstract <K, V> Collection<ConsumerRecord<K, V>> 
getAllRecordsFromTopic(
-            Properties properties, String topic, int partition, long timeout);
+            Properties properties, String topic);
 
     public abstract <T> StreamSink<T> getProducerSink(
             String topic,
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index bdfa662..95e3241 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaUtil;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
 import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
@@ -54,11 +55,9 @@ import java.io.File;
 import java.net.BindException;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -108,11 +107,10 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
         this.config = config;
 
         File tempDir = new File(System.getProperty("java.io.tmpdir"));
-        tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + 
(UUID.randomUUID().toString()));
+        tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + 
(UUID.randomUUID()));
         assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
 
-        tmpKafkaParent =
-                new File(tempDir, "kafkaITcase-kafka-dir-" + 
(UUID.randomUUID().toString()));
+        tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + 
(UUID.randomUUID()));
         assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
 
         tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
@@ -279,32 +277,10 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(
-            Properties properties, String topic, int partition, long timeout) {
-        List<ConsumerRecord<K, V>> result = new ArrayList<>();
-
-        try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties)) {
-            consumer.assign(Arrays.asList(new TopicPartition(topic, 
partition)));
-
-            while (true) {
-                boolean processedAtLeastOneRecord = false;
-
-                // wait for new records with timeout and break the loop if we 
didn't get any
-                Iterator<ConsumerRecord<K, V>> iterator = 
consumer.poll(timeout).iterator();
-                while (iterator.hasNext()) {
-                    ConsumerRecord<K, V> record = iterator.next();
-                    result.add(record);
-                    processedAtLeastOneRecord = true;
-                }
-
-                if (!processedAtLeastOneRecord) {
-                    break;
-                }
-            }
-            consumer.commitSync();
-        }
-
-        return UnmodifiableList.decorate(result);
+            Properties properties, String topic) {
+        return 
UnmodifiableList.decorate(KafkaUtil.drainAllRecordsFromTopic(topic, 
properties));
     }
 
     @Override
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java
index ce9249d..31b34ea 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java
@@ -34,8 +34,8 @@ import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaShuffleFetcher
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaShuffleFetcher.KafkaShuffleWatermark;
 import org.apache.flink.util.PropertiesUtil;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.Rule;
@@ -507,16 +507,23 @@ public class KafkaShuffleITCase extends 
KafkaShuffleTestBase {
         FlinkKafkaShuffle.writeKeyBy(input, topic, kafkaProperties, 0);
 
         env.execute("Write to " + topic);
-        ImmutableMap.Builder<Integer, Collection<ConsumerRecord<byte[], 
byte[]>>> results =
-                ImmutableMap.builder();
 
-        for (int p = 0; p < numberOfPartitions; p++) {
-            results.put(p, kafkaServer.getAllRecordsFromTopic(kafkaProperties, 
topic, p, 5000));
-        }
+        Map<Integer, Collection<ConsumerRecord<byte[], byte[]>>> results = new 
HashMap<>();
+
+        kafkaServer
+                .<byte[], byte[]>getAllRecordsFromTopic(kafkaProperties, topic)
+                .forEach(
+                        r -> {
+                            final int partition = r.partition();
+                            if (!results.containsKey(partition)) {
+                                results.put(partition, Lists.newArrayList());
+                            }
+                            results.get(partition).add(r);
+                        });
 
         deleteTestTopic(topic);
 
-        return results.build();
+        return results;
     }
 
     private StreamExecutionEnvironment createEnvironment(

Reply via email to