This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a475f571777017805bac8872f19ebe8160ad4cd6 Author: Stephan Ewen <[email protected]> AuthorDate: Mon Nov 9 21:06:42 2020 +0100 [FLINK-20051][connector kafka] Ensure KafkaPartitionSplitRecords returned on consumer wakeup is properly initializes --- .../flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java index 3313a2a..c120055 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java @@ -96,6 +96,7 @@ public class KafkaPartitionSplitReader<T> implements SplitReader<Tuple3<T, Long, try { consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT)); } catch (WakeupException we) { + recordsBySplits.prepareForRead(); return recordsBySplits; }
