This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b11d53cbc5 Detect expired messages in Kafka. Log and set a gauge. 
(#12608)
b11d53cbc5 is described below

commit b11d53cbc5966f1daaf7c6b025cbdc0c53446f61
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Fri Mar 8 20:29:15 2024 -0800

    Detect expired messages in Kafka. Log and set a gauge. (#12608)
---
 .../apache/pinot/common/metrics/ServerMeter.java   |  3 ++-
 .../realtime/RealtimeSegmentDataManager.java       | 23 ++++++++++++++++++++++
 .../plugin/stream/kafka20/KafkaMessageBatch.java   | 12 +++++++++--
 .../kafka20/KafkaPartitionLevelConsumer.java       |  6 +++++-
 .../org/apache/pinot/spi/stream/MessageBatch.java  | 11 +++++++++++
 5 files changed, 51 insertions(+), 4 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index c516740070..e8819ca945 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -112,7 +112,8 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   NUM_SEGMENTS_PRUNED_BY_VALUE("numSegmentsPrunedByValue", false),
   LARGE_QUERY_RESPONSES_SENT("largeResponses", false),
   TOTAL_THREAD_CPU_TIME_MILLIS("millis", false),
-  LARGE_QUERY_RESPONSE_SIZE_EXCEPTIONS("exceptions", false);
+  LARGE_QUERY_RESPONSE_SIZE_EXCEPTIONS("exceptions", false),
+  STREAM_DATA_LOSS("streamDataLoss", false);
 
   private final String _meterName;
   private final String _unit;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 2a5da62c2a..49cddb5574 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -456,6 +456,11 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
         throw t;
       }
 
+      StreamPartitionMsgOffset batchFirstOffset = 
messageBatch.getFirstMessageOffset();
+      if (batchFirstOffset != null) {
+        validateStartOffset(_currentOffset, batchFirstOffset);
+      }
+
       boolean endCriteriaReached = processStreamEvents(messageBatch, 
idlePipeSleepTimeMillis);
 
       if (_currentOffset.compareTo(lastUpdatedOffset) != 0) {
@@ -900,6 +905,24 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     return 
_partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap);
   }
 
+  /**
+   * Checks if the begin offset of the stream partition has been 
fast-forwarded.
+   * batchFirstOffset should be less than or equal to startOffset.
+   * If batchFirstOffset is greater, then some messages were not received.
+   *
+   * @param startOffset The offset of the first message desired, inclusive.
+   * @param batchFirstOffset The offset of the first message in the batch.
+   */
+  private void validateStartOffset(StreamPartitionMsgOffset startOffset, 
StreamPartitionMsgOffset batchFirstOffset) {
+    if (batchFirstOffset.compareTo(startOffset) > 0) {
+      _serverMetrics.addMeteredTableValue(_tableStreamName, 
ServerMeter.STREAM_DATA_LOSS, 1L);
+      String message =
+          "startOffset(" + startOffset + ") is older than topic's beginning 
offset(" + batchFirstOffset + ")";
+      _segmentLogger.error(message);
+      _realtimeTableDataManager.addSegmentError(_segmentNameStr, new 
SegmentErrorInfo(now(), message, null));
+    }
+  }
+
   public StreamPartitionMsgOffset getCurrentOffset() {
     return _currentOffset;
   }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
index dbc3e8d2a6..005b4c27b3 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
@@ -31,19 +31,22 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 public class KafkaMessageBatch implements MessageBatch<StreamMessage<byte[]>> {
   private final List<StreamMessage<byte[]>> _messageList;
   private final int _unfilteredMessageCount;
+  private final long _firstOffset;
   private final long _lastOffset;
   private final StreamMessageMetadata _lastMessageMetadata;
 
   /**
    * @param unfilteredMessageCount how many messages were received from the 
topic before being filtered
+   * @param firstOffset the offset of the first message in the batch
    * @param lastOffset the offset of the last message in the batch
    * @param batch the messages, which may be smaller than {@see 
unfilteredMessageCount}
    * @param lastMessageMetadata metadata for last filtered message in the 
batch, useful for estimating ingestion delay
    *                            when a batch has all messages filtered.
    */
-  public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, 
List<StreamMessage<byte[]>> batch,
-      StreamMessageMetadata lastMessageMetadata) {
+  public KafkaMessageBatch(int unfilteredMessageCount, long firstOffset, long 
lastOffset,
+      List<StreamMessage<byte[]>> batch, StreamMessageMetadata 
lastMessageMetadata) {
     _messageList = batch;
+    _firstOffset = firstOffset;
     _lastOffset = lastOffset;
     _unfilteredMessageCount = unfilteredMessageCount;
     _lastMessageMetadata = lastMessageMetadata;
@@ -111,4 +114,9 @@ public class KafkaMessageBatch implements 
MessageBatch<StreamMessage<byte[]>> {
   public StreamMessage getStreamMessage(int index) {
     return _messageList.get(index);
   }
+
+  @Override
+  public StreamPartitionMsgOffset getFirstMessageOffset() {
+    return new LongMsgOffset(_firstOffset);
+  }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index ff90f4b1a3..df51d2fda9 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -69,8 +69,12 @@ public class KafkaPartitionLevelConsumer extends 
KafkaPartitionLevelConnectionHa
     ConsumerRecords<String, Bytes> consumerRecords = 
_consumer.poll(Duration.ofMillis(timeoutMillis));
     List<ConsumerRecord<String, Bytes>> messageAndOffsets = 
consumerRecords.records(_topicPartition);
     List<StreamMessage<byte[]>> filtered = new 
ArrayList<>(messageAndOffsets.size());
+    long firstOffset = startOffset;
     long lastOffset = startOffset;
     StreamMessageMetadata rowMetadata = null;
+    if (!messageAndOffsets.isEmpty()) {
+      firstOffset = messageAndOffsets.get(0).offset();
+    }
     for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) {
       long offset = messageAndOffset.offset();
       _lastFetchedOffset = offset;
@@ -90,6 +94,6 @@ public class KafkaPartitionLevelConsumer extends 
KafkaPartitionLevelConnectionHa
             endOffset);
       }
     }
-    return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset, 
filtered, rowMetadata);
+    return new KafkaMessageBatch(messageAndOffsets.size(), firstOffset, 
lastOffset, filtered, rowMetadata);
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
index 8bde04aed4..9a8f4e15fc 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
@@ -118,6 +118,17 @@ public interface MessageBatch<T> {
     return false;
   }
 
+  /**
+   * Return the offset of the first message in the batch.
+   * The first offset of the batch is useful to determine if there were gaps 
in the stream.
+   *
+   * @return null by default
+   */
+  @Nullable
+  default public StreamPartitionMsgOffset getFirstMessageOffset() {
+    return null;
+  }
+
   /**
    * This is useful while determining ingestion delay for a message batch. 
Retaining metadata for last filtered message
    * in a batch can enable us to estimate the ingestion delay for the batch.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to