AHeise commented on a change in pull request #17406:
URL: https://github.com/apache/flink/pull/17406#discussion_r722956800
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
##########
@@ -279,32 +279,16 @@ public String getVersion() {
}
@Override
+ @SuppressWarnings("unchecked")
public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(
- Properties properties, String topic, int partition, long timeout) {
- List<ConsumerRecord<K, V>> result = new ArrayList<>();
-
- try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties)) {
- consumer.assign(Arrays.asList(new TopicPartition(topic,
partition)));
-
- while (true) {
- boolean processedAtLeastOneRecord = false;
-
- // wait for new records with timeout and break the loop if we
didn't get any
- Iterator<ConsumerRecord<K, V>> iterator =
consumer.poll(timeout).iterator();
- while (iterator.hasNext()) {
- ConsumerRecord<K, V> record = iterator.next();
- result.add(record);
- processedAtLeastOneRecord = true;
- }
-
- if (!processedAtLeastOneRecord) {
- break;
- }
- }
- consumer.commitSync();
- }
-
- return UnmodifiableList.decorate(result);
+ Properties properties, String topic) {
+ final boolean committed =
+ Objects.equals(
+ properties.getProperty(
+ ConsumerConfig.ISOLATION_LEVEL_CONFIG,
"read_uncommitted"),
+ "read_committed");
+ return UnmodifiableList.decorate(
+ KafkaUtil.drainAllRecordsFromTopic(topic, properties,
committed));
Review comment:
It still feels hacky to pull out committed from properties, just to
reset the properties to that value. Maybe 2 overloads where committed is
optional and overriddes the properties?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]