swaminathanmanish commented on code in PR #12697:
URL: https://github.com/apache/pinot/pull/12697#discussion_r1550612147


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java:
##########
@@ -47,53 +50,60 @@ public KafkaPartitionLevelConsumer(String clientId, 
StreamConfig streamConfig, i
   }
 
   @Override
-  public MessageBatch<StreamMessage<byte[]>> 
fetchMessages(StreamPartitionMsgOffset startMsgOffset,
-      StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) {
-    final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
-    final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : 
((LongMsgOffset) endMsgOffset).getOffset();
-    return fetchMessages(startOffset, endOffset, timeoutMillis);
-  }
-
-  public synchronized MessageBatch<StreamMessage<byte[]>> fetchMessages(long 
startOffset, long endOffset,
-      int timeoutMillis) {
+  public synchronized KafkaMessageBatch fetchMessages(StreamPartitionMsgOffset 
startMsgOffset, int timeoutMs) {
+    long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
     if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("Polling partition: {}, startOffset: {}, endOffset: {} 
timeout: {}ms", _topicPartition, startOffset,
-          endOffset, timeoutMillis);
+      LOGGER.debug("Polling partition: {}, startOffset: {}, timeout: {}ms", 
_topicPartition, startOffset, timeoutMs);
     }
     if (_lastFetchedOffset < 0 || _lastFetchedOffset != startOffset - 1) {
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("Seeking to offset: {}", startOffset);
       }
       _consumer.seek(_topicPartition, startOffset);
     }
-    ConsumerRecords<String, Bytes> consumerRecords = 
_consumer.poll(Duration.ofMillis(timeoutMillis));
-    List<ConsumerRecord<String, Bytes>> messageAndOffsets = 
consumerRecords.records(_topicPartition);
-    List<StreamMessage<byte[]>> filtered = new 
ArrayList<>(messageAndOffsets.size());
-    long firstOffset = startOffset;
-    long lastOffset = startOffset;
-    StreamMessageMetadata rowMetadata = null;
-    if (!messageAndOffsets.isEmpty()) {
-      firstOffset = messageAndOffsets.get(0).offset();
-    }
-    for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) {
-      long offset = messageAndOffset.offset();
-      _lastFetchedOffset = offset;
-      if (offset >= startOffset && (endOffset > offset || endOffset < 0)) {
-        Bytes message = messageAndOffset.value();
-        rowMetadata = (StreamMessageMetadata) 
_kafkaMetadataExtractor.extract(messageAndOffset);
+    ConsumerRecords<String, Bytes> consumerRecords = 
_consumer.poll(Duration.ofMillis(timeoutMs));
+    List<ConsumerRecord<String, Bytes>> records = 
consumerRecords.records(_topicPartition);
+    List<BytesStreamMessage> filteredRecords = new ArrayList<>(records.size());
+    long firstOffset = -1;
+    long offsetOfNextBatch = startOffset;
+    StreamMessageMetadata lastMessageMetadata = null;
+    if (!records.isEmpty()) {
+      firstOffset = records.get(0).offset();
+      _lastFetchedOffset = records.get(records.size() - 1).offset();
+      offsetOfNextBatch = _lastFetchedOffset + 1;

Review Comment:
   This makes sense and simplifies the logic. Hope we can comprehensively 
verify through integ tests. 



##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java:
##########
@@ -71,52 +61,22 @@ public int getUnfilteredMessageCount() {
   }
 
   @Override
-  public StreamMessage getMessageAtIndex(int index) {
-    return _messageList.get(index);
-  }
-
-  @Override
-  public int getMessageOffsetAtIndex(int index) {
-    return ByteBuffer.wrap(_messageList.get(index).getValue()).arrayOffset();
-  }
-
-  @Override
-  public int getMessageLengthAtIndex(int index) {
-    return _messageList.get(index).getValue().length;
-  }
-
-  @Override
-  public long getNextStreamMessageOffsetAtIndex(int index) {
-    throw new UnsupportedOperationException("This method is deprecated");
-  }
-
-  @Override
-  public StreamPartitionMsgOffset getNextStreamPartitionMsgOffsetAtIndex(int 
index) {
-    return new LongMsgOffset(((KafkaStreamMessage) 
_messageList.get(index)).getNextOffset());
+  public BytesStreamMessage getStreamMessage(int index) {
+    return _messages.get(index);
   }
 
   @Override
   public StreamPartitionMsgOffset getOffsetOfNextBatch() {
-    return new LongMsgOffset(_lastOffset + 1);
-  }
-
-  @Override
-  public RowMetadata getMetadataAtIndex(int index) {
-    return _messageList.get(index).getMetadata();
+    return new LongMsgOffset(_offsetOfNextBatch);
   }
 
   @Override
-  public byte[] getMessageBytesAtIndex(int index) {
-    return _messageList.get(index).getValue();
-  }
-
-  @Override
-  public StreamMessage getStreamMessage(int index) {
-    return _messageList.get(index);
+  public StreamPartitionMsgOffset getFirstMessageOffset() {

Review Comment:
   is this backwards compatible, since it can return null



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -591,67 +600,52 @@ private boolean processStreamEvents(MessageBatch 
messagesAndOffsets, long idlePi
           _numRowsErrored++;
           // when exception happens we prefer abandoning the whole batch and 
not partially indexing some rows
           reusedResult.getTransformedRows().clear();
-          String errorMessage = String.format("Caught exception while 
transforming the record at offset: %s , row: %s",
-              messageOffset, decodedRow.getResult());
+          String errorMessage =
+              String.format("Caught exception while transforming the record at 
offset: %s , row: %s", offset,
+                  decodedRow.getResult());
           _segmentLogger.error(errorMessage, e);
           _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(), errorMessage, e));
         }
         if (reusedResult.getSkippedRowCount() > 0) {
-          realtimeRowsDroppedMeter =
-              _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.REALTIME_ROWS_FILTERED,
-                  reusedResult.getSkippedRowCount(), realtimeRowsDroppedMeter);
+          realtimeRowsDroppedMeter = 
_serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.REALTIME_ROWS_FILTERED,
+              reusedResult.getSkippedRowCount(), realtimeRowsDroppedMeter);
         }
         if (reusedResult.getIncompleteRowCount() > 0) {
           realtimeIncompleteRowsConsumedMeter =
               _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.INCOMPLETE_REALTIME_ROWS_CONSUMED,
                   reusedResult.getIncompleteRowCount(), 
realtimeIncompleteRowsConsumedMeter);
         }
         List<GenericRow> transformedRows = reusedResult.getTransformedRows();
-        if (transformedRows.size() > 0) {
-          hasTransformedRows = true;
-        }
         for (GenericRow transformedRow : transformedRows) {
           try {
-            canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
+            canTakeMore = _realtimeSegment.index(transformedRow, metadata);
             indexedMessageCount++;
-            _lastRowMetadata = msgMetadata;
+            _lastRowMetadata = metadata;
             _lastConsumedTimestampMs = System.currentTimeMillis();
             realtimeRowsConsumedMeter =
                 _serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.REALTIME_ROWS_CONSUMED, 1,
                     realtimeRowsConsumedMeter);
             
_serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L);
           } catch (Exception e) {
             _numRowsErrored++;
-            String errorMessage = String.format("Caught exception while 
indexing the record at offset: %s , row: %s",
-                messageOffset, transformedRow);
+            String errorMessage =
+                String.format("Caught exception while indexing the record at 
offset: %s , row: %s", offset,
+                    transformedRow);
             _segmentLogger.error(errorMessage, e);
             _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(), errorMessage, e));
           }
         }
       }
-      _currentOffset = messageOffset;
+      _currentOffset = nextOffset;
       _numRowsIndexed = _realtimeSegment.getNumDocsIndexed();
       _numRowsConsumed++;
       streamMessageCount++;
     }

Review Comment:
   Thanks for cleaning this up. Hope this can be verified. 
   
   if (indexedMessageCount > 0) {
         // Record Ingestion delay for this partition with metadata for last 
message we processed
         updateIngestionDelay(_lastRowMetadata);
       } else if (!hasTransformedRows && (msgMetadata != null)) {
         // If all messages were filtered by transformation, we still attempt 
to update ingestion delay using
         // the metadata for the last message we processed if any.
         updateIngestionDelay(msgMetadata);
       }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to