richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r771822032



##########
File path: 
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
##########
@@ -18,61 +18,53 @@
  */
 package org.apache.pinot.plugin.stream.kafka20;
 
-import com.google.common.collect.Iterables;
-import java.io.IOException;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeoutException;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.pinot.plugin.stream.kafka.MessageAndOffset;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 public class KafkaPartitionLevelConsumer extends 
KafkaPartitionLevelConnectionHandler
     implements PartitionLevelConsumer {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class);
 
   public KafkaPartitionLevelConsumer(String clientId, StreamConfig 
streamConfig, int partition) {
     super(clientId, streamConfig, partition);
   }
 
   @Override
-  public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, 
StreamPartitionMsgOffset endMsgOffset,
-      int timeoutMillis)
-      throws TimeoutException {
+  public MessageBatch<byte[]> fetchMessages(StreamPartitionMsgOffset 
startMsgOffset, StreamPartitionMsgOffset endMsgOffset,
+      int timeoutMillis) {
     final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
     final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : 
((LongMsgOffset) endMsgOffset).getOffset();
     return fetchMessages(startOffset, endOffset, timeoutMillis);
   }
 
-  public MessageBatch fetchMessages(long startOffset, long endOffset, int 
timeoutMillis)
-      throws TimeoutException {
+  public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, 
int timeoutMillis) {
     _consumer.seek(_topicPartition, startOffset);
     ConsumerRecords<String, Bytes> consumerRecords = 
_consumer.poll(Duration.ofMillis(timeoutMillis));
-    final Iterable<ConsumerRecord<String, Bytes>> messageAndOffsetIterable =
-        buildOffsetFilteringIterable(consumerRecords.records(_topicPartition), 
startOffset, endOffset);
-    return new KafkaMessageBatch(messageAndOffsetIterable);
-  }
-
-  private Iterable<ConsumerRecord<String, Bytes>> buildOffsetFilteringIterable(
-      final List<ConsumerRecord<String, Bytes>> messageAndOffsets, final long 
startOffset, final long endOffset) {
-    return Iterables.filter(messageAndOffsets, input -> {
-      // Filter messages that are either null or have an offset ∉ 
[startOffset, endOffset]
-      return input != null && input.value() != null && input.offset() >= 
startOffset && (endOffset > input.offset()
-          || endOffset == -1);
-    });
-  }
-
-  @Override
-  public void close()
-      throws IOException {
-    super.close();
+    List<ConsumerRecord<String, Bytes>> messageAndOffsets = 
consumerRecords.records(_topicPartition);
+    List<MessageAndOffset> filtered = new 
ArrayList<>(messageAndOffsets.size());

Review comment:
       Note that a list was being materialised anyway in `KafkaMessageBatch`, 
it's just easier to do it here because we can also capture the last offset. 
This is likely more efficient than using `Iterables.filter` anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to