Jackie-Jiang commented on code in PR #12608:
URL: https://github.com/apache/pinot/pull/12608#discussion_r1518393375


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -899,6 +904,26 @@ public Map<String, PartitionLagState> 
getPartitionToLagState(
     return 
_partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap);
   }
 
+  /**
+   * Checks if the begin offset of the stream partition has been 
fast-forwarded.
+   * batchFirstOffset should be less than or equal to startOffset.
+   * If batchFirstOffset is greater, then some messages were not received.
+   *
+   * @param startOffset The offset of the first message desired, inclusive.
+   * @param batchFirstOffset The offset of the first message in the batch.
+   */
+  private void validateStartOffset(StreamPartitionMsgOffset startOffset, 
StreamPartitionMsgOffset batchFirstOffset) {
+    if (batchFirstOffset.compareTo(startOffset) > 0) {
+      _serverMetrics.addMeteredTableValue(_tableStreamName, 
ServerMeter.STREAM_DATA_LOSS, 1L);
+      String message = "startOffset(" + startOffset
+          + ") is older than topic's beginning offset(" + batchFirstOffset + 
")";
+      _segmentLogger.error(message);
+      _realtimeTableDataManager.addSegmentError(_segmentNameStr,

Review Comment:
   (format) Let's reformat the change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to