Jackie-Jiang commented on code in PR #12608:
URL: https://github.com/apache/pinot/pull/12608#discussion_r1518138970


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -456,6 +456,8 @@ protected boolean consumeLoop()
         throw t;
       }
 
+      validateStartOffset(_currentOffset, 
messageBatch.getFirstMessageOffset());

Review Comment:
   Consider moving the `null` check here:
   ```suggestion
         StreamPartitionMsgOffset batchFirstOffset = 
messageBatch.getFirstMessageOffset();
         if (batchFirstOffset != null) {
           validateStartOffset(_currentOffset, batchFirstOffset);
         }
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -899,6 +901,29 @@ public Map<String, PartitionLagState> 
getPartitionToLagState(
     return 
_partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap);
   }
 
+  /**
+   * Checks if the begin offset of the stream partition has been 
fast-forwarded.
+   * batchFirstOffset should be less than or equal to startOffset.
+   * If batchFirstOffset is greater, then some messages were not received.
+   *
+   * @param startOffset The offset of the first message desired, inclusive.
+   * @param batchFirstOffset The offset of the first message in the batch.
+   */
+  private void validateStartOffset(StreamPartitionMsgOffset startOffset, 
StreamPartitionMsgOffset batchFirstOffset) {
+    if (batchFirstOffset != null && batchFirstOffset.compareTo(startOffset) > 
0) {
+      _serverMetrics.addMeteredTableValue(_tableStreamName, 
ServerMeter.STREAM_DATA_LOSS, 1L);
+      String message = "startOffset(" + startOffset
+          + ") is older than topic's beginning offset(" + batchFirstOffset + 
")";
+      _segmentLogger.error(message);
+      _realtimeTableDataManager.addSegmentError(_segmentNameStr,
+          new SegmentErrorInfo(now(), message, null)
+      );
+    } else {
+      // Record that this batch has no data loss.
+      _serverMetrics.addMeteredTableValue(_tableStreamName, 
ServerMeter.STREAM_DATA_LOSS, 0L);

Review Comment:
   We don't need to emit meter when there is no loss



##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java:
##########
@@ -69,8 +69,12 @@ public synchronized MessageBatch<StreamMessage<byte[]>> 
fetchMessages(long start
     ConsumerRecords<String, Bytes> consumerRecords = 
_consumer.poll(Duration.ofMillis(timeoutMillis));
     List<ConsumerRecord<String, Bytes>> messageAndOffsets = 
consumerRecords.records(_topicPartition);
     List<StreamMessage<byte[]>> filtered = new 
ArrayList<>(messageAndOffsets.size());
+    long firstOffset = startOffset;
     long lastOffset = startOffset;
     StreamMessageMetadata rowMetadata = null;
+    if (!consumerRecords.isEmpty()) {
+      firstOffset = consumerRecords.iterator().next().offset();
+    }

Review Comment:
   There is also a check for `offset >= startOffset` underlying. Is it even 
possible?



##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java:
##########
@@ -69,8 +69,12 @@ public synchronized MessageBatch<StreamMessage<byte[]>> 
fetchMessages(long start
     ConsumerRecords<String, Bytes> consumerRecords = 
_consumer.poll(Duration.ofMillis(timeoutMillis));
     List<ConsumerRecord<String, Bytes>> messageAndOffsets = 
consumerRecords.records(_topicPartition);
     List<StreamMessage<byte[]>> filtered = new 
ArrayList<>(messageAndOffsets.size());
+    long firstOffset = startOffset;
     long lastOffset = startOffset;
     StreamMessageMetadata rowMetadata = null;
+    if (!consumerRecords.isEmpty()) {
+      firstOffset = consumerRecords.iterator().next().offset();
+    }

Review Comment:
   Should we get offset from records under the topic partition?
   ```suggestion
       if (!messageAndOffsets.isEmpty()) {
         firstOffset = messageAndOffsets.get(0).offset();
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to