fapaul commented on a change in pull request #17406:
URL: https://github.com/apache/flink/pull/17406#discussion_r722243031



##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
##########
@@ -279,32 +279,16 @@ public String getVersion() {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(
-            Properties properties, String topic, int partition, long timeout) {
-        List<ConsumerRecord<K, V>> result = new ArrayList<>();
-
-        try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties)) {
-            consumer.assign(Arrays.asList(new TopicPartition(topic, 
partition)));
-
-            while (true) {
-                boolean processedAtLeastOneRecord = false;
-
-                // wait for new records with timeout and break the loop if we 
didn't get any
-                Iterator<ConsumerRecord<K, V>> iterator = 
consumer.poll(timeout).iterator();
-                while (iterator.hasNext()) {
-                    ConsumerRecord<K, V> record = iterator.next();
-                    result.add(record);
-                    processedAtLeastOneRecord = true;
-                }
-
-                if (!processedAtLeastOneRecord) {
-                    break;
-                }
-            }
-            consumer.commitSync();
-        }
-
-        return UnmodifiableList.decorate(result);
+            Properties properties, String topic) {
+        final boolean committed =
+                Objects.equals(
+                        properties.getProperty(
+                                ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
"read_uncommitted"),
+                        "read_committed");
+        return UnmodifiableList.decorate(
+                KafkaUtil.drainAllRecordsFromTopic(topic, properties, 
committed));

Review comment:
       Hmm I would like to leave it as it is to not leave all configuration 
options because I currently cannot really reason about the different things 
people want to set.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to