This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b11d53cbc5 Detect expired messages in Kafka. Log and set a gauge.
(#12608)
b11d53cbc5 is described below
commit b11d53cbc5966f1daaf7c6b025cbdc0c53446f61
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Fri Mar 8 20:29:15 2024 -0800
Detect expired messages in Kafka. Log and set a gauge. (#12608)
---
.../apache/pinot/common/metrics/ServerMeter.java | 3 ++-
.../realtime/RealtimeSegmentDataManager.java | 23 ++++++++++++++++++++++
.../plugin/stream/kafka20/KafkaMessageBatch.java | 12 +++++++++--
.../kafka20/KafkaPartitionLevelConsumer.java | 6 +++++-
.../org/apache/pinot/spi/stream/MessageBatch.java | 11 +++++++++++
5 files changed, 51 insertions(+), 4 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index c516740070..e8819ca945 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -112,7 +112,8 @@ public enum ServerMeter implements AbstractMetrics.Meter {
NUM_SEGMENTS_PRUNED_BY_VALUE("numSegmentsPrunedByValue", false),
LARGE_QUERY_RESPONSES_SENT("largeResponses", false),
TOTAL_THREAD_CPU_TIME_MILLIS("millis", false),
- LARGE_QUERY_RESPONSE_SIZE_EXCEPTIONS("exceptions", false);
+ LARGE_QUERY_RESPONSE_SIZE_EXCEPTIONS("exceptions", false),
+ STREAM_DATA_LOSS("streamDataLoss", false);
private final String _meterName;
private final String _unit;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 2a5da62c2a..49cddb5574 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -456,6 +456,11 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
throw t;
}
+ StreamPartitionMsgOffset batchFirstOffset =
messageBatch.getFirstMessageOffset();
+ if (batchFirstOffset != null) {
+ validateStartOffset(_currentOffset, batchFirstOffset);
+ }
+
boolean endCriteriaReached = processStreamEvents(messageBatch,
idlePipeSleepTimeMillis);
if (_currentOffset.compareTo(lastUpdatedOffset) != 0) {
@@ -900,6 +905,24 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
return
_partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap);
}
+ /**
+ * Checks if the begin offset of the stream partition has been
fast-forwarded.
+ * batchFirstOffset should be less than or equal to startOffset.
+ * If batchFirstOffset is greater, then some messages were not received.
+ *
+ * @param startOffset The offset of the first message desired, inclusive.
+ * @param batchFirstOffset The offset of the first message in the batch.
+ */
+ private void validateStartOffset(StreamPartitionMsgOffset startOffset,
StreamPartitionMsgOffset batchFirstOffset) {
+ if (batchFirstOffset.compareTo(startOffset) > 0) {
+ _serverMetrics.addMeteredTableValue(_tableStreamName,
ServerMeter.STREAM_DATA_LOSS, 1L);
+ String message =
+ "startOffset(" + startOffset + ") is older than topic's beginning
offset(" + batchFirstOffset + ")";
+ _segmentLogger.error(message);
+ _realtimeTableDataManager.addSegmentError(_segmentNameStr, new
SegmentErrorInfo(now(), message, null));
+ }
+ }
+
public StreamPartitionMsgOffset getCurrentOffset() {
return _currentOffset;
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
index dbc3e8d2a6..005b4c27b3 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
@@ -31,19 +31,22 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
public class KafkaMessageBatch implements MessageBatch<StreamMessage<byte[]>> {
private final List<StreamMessage<byte[]>> _messageList;
private final int _unfilteredMessageCount;
+ private final long _firstOffset;
private final long _lastOffset;
private final StreamMessageMetadata _lastMessageMetadata;
/**
* @param unfilteredMessageCount how many messages were received from the
topic before being filtered
+ * @param firstOffset the offset of the first message in the batch
* @param lastOffset the offset of the last message in the batch
* @param batch the messages, which may be smaller than {@see
unfilteredMessageCount}
* @param lastMessageMetadata metadata for last filtered message in the
batch, useful for estimating ingestion delay
* when a batch has all messages filtered.
*/
- public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset,
List<StreamMessage<byte[]>> batch,
- StreamMessageMetadata lastMessageMetadata) {
+ public KafkaMessageBatch(int unfilteredMessageCount, long firstOffset, long
lastOffset,
+ List<StreamMessage<byte[]>> batch, StreamMessageMetadata
lastMessageMetadata) {
_messageList = batch;
+ _firstOffset = firstOffset;
_lastOffset = lastOffset;
_unfilteredMessageCount = unfilteredMessageCount;
_lastMessageMetadata = lastMessageMetadata;
@@ -111,4 +114,9 @@ public class KafkaMessageBatch implements
MessageBatch<StreamMessage<byte[]>> {
public StreamMessage getStreamMessage(int index) {
return _messageList.get(index);
}
+
+ @Override
+ public StreamPartitionMsgOffset getFirstMessageOffset() {
+ return new LongMsgOffset(_firstOffset);
+ }
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index ff90f4b1a3..df51d2fda9 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -69,8 +69,12 @@ public class KafkaPartitionLevelConsumer extends
KafkaPartitionLevelConnectionHa
ConsumerRecords<String, Bytes> consumerRecords =
_consumer.poll(Duration.ofMillis(timeoutMillis));
List<ConsumerRecord<String, Bytes>> messageAndOffsets =
consumerRecords.records(_topicPartition);
List<StreamMessage<byte[]>> filtered = new
ArrayList<>(messageAndOffsets.size());
+ long firstOffset = startOffset;
long lastOffset = startOffset;
StreamMessageMetadata rowMetadata = null;
+ if (!messageAndOffsets.isEmpty()) {
+ firstOffset = messageAndOffsets.get(0).offset();
+ }
for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) {
long offset = messageAndOffset.offset();
_lastFetchedOffset = offset;
@@ -90,6 +94,6 @@ public class KafkaPartitionLevelConsumer extends
KafkaPartitionLevelConnectionHa
endOffset);
}
}
- return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset,
filtered, rowMetadata);
+ return new KafkaMessageBatch(messageAndOffsets.size(), firstOffset,
lastOffset, filtered, rowMetadata);
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
index 8bde04aed4..9a8f4e15fc 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
@@ -118,6 +118,17 @@ public interface MessageBatch<T> {
return false;
}
+ /**
+ * Return the offset of the first message in the batch.
+ * The first offset of the batch is useful to determine if there were gaps
in the stream.
+ *
+ * @return null by default
+ */
+ @Nullable
+ default public StreamPartitionMsgOffset getFirstMessageOffset() {
+ return null;
+ }
+
/**
* This is useful while determining ingestion delay for a message batch.
Retaining metadata for last filtered message
* in a batch can enable us to estimate the ingestion delay for the batch.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]