becketqin commented on a change in pull request #17991:
URL: https://github.com/apache/flink/pull/17991#discussion_r766356300



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
##########
@@ -452,82 +392,72 @@ private void maybeRegisterKafkaConsumerMetrics(
 
     // ---------------- private helper class ------------------------
 
-    private static class KafkaPartitionSplitRecords<T> implements 
RecordsWithSplitIds<T> {
-        private final Map<String, Collection<T>> recordsBySplits;
-        private final Set<String> finishedSplits;
-        private Iterator<Map.Entry<String, Collection<T>>> splitIterator;
-        private String currentSplitId;
-        private Iterator<T> recordIterator;
+    private static class KafkaPartitionSplitRecords
+            implements RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> {
+
+        private final Set<String> finishedSplits = new HashSet<>();
+        private final Map<TopicPartition, Long> stoppingOffsets = new 
HashMap<>();
+        private final ConsumerRecords<byte[], byte[]> consumerRecords;
+        private final KafkaSourceReaderMetrics metrics;
+        private Iterator<TopicPartition> splitIterator;
+        private Iterator<ConsumerRecord<byte[], byte[]>> recordIterator;
+        private TopicPartition currentTopicPartition;
+
+        private KafkaPartitionSplitRecords(
+                ConsumerRecords<byte[], byte[]> consumerRecords, 
KafkaSourceReaderMetrics metrics) {
+            this.consumerRecords = consumerRecords;
+            this.metrics = metrics;
+        }
 
-        private KafkaPartitionSplitRecords() {
-            this.recordsBySplits = new HashMap<>();
-            this.finishedSplits = new HashSet<>();
+        private void prepareForRead() {
+            splitIterator = consumerRecords.partitions().iterator();
         }
 
-        private Collection<T> recordsForSplit(String splitId) {
-            return recordsBySplits.computeIfAbsent(splitId, id -> new 
ArrayList<>());
+        private void setPartitionStoppingOffset(
+                TopicPartition topicPartition, long stoppingOffset) {
+            stoppingOffsets.put(topicPartition, stoppingOffset);
         }
 
         private void addFinishedSplit(String splitId) {
             finishedSplits.add(splitId);
         }
 
-        private void prepareForRead() {
-            splitIterator = recordsBySplits.entrySet().iterator();
-        }
-
-        @Override
         @Nullable
+        @Override
         public String nextSplit() {
             if (splitIterator.hasNext()) {
-                Map.Entry<String, Collection<T>> entry = splitIterator.next();
-                currentSplitId = entry.getKey();
-                recordIterator = entry.getValue().iterator();
-                return currentSplitId;
+                currentTopicPartition = splitIterator.next();
+                recordIterator = 
consumerRecords.records(currentTopicPartition).iterator();
+                return currentTopicPartition.toString();
             } else {
-                currentSplitId = null;
+                currentTopicPartition = null;
                 recordIterator = null;
                 return null;
             }
         }
 
-        @Override
         @Nullable
-        public T nextRecordFromSplit() {
+        @Override
+        public ConsumerRecord<byte[], byte[]> nextRecordFromSplit() {
             Preconditions.checkNotNull(
-                    currentSplitId,
+                    currentTopicPartition,
                     "Make sure nextSplit() did not return null before "
                             + "iterate over the records split.");
             if (recordIterator.hasNext()) {
-                return recordIterator.next();
-            } else {
-                return null;
+                final ConsumerRecord<byte[], byte[]> record = 
recordIterator.next();
+                // Only emit records before stopping offset
+                if (record.offset()
+                        < stoppingOffsets.getOrDefault(currentTopicPartition, 
Long.MAX_VALUE)) {

Review comment:
       Can we avoid retrieving the stopping offsets for the current partition 
on each record? Ideally the stopping offsets can be cached and updated when the 
current split changes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to