This is an automated email from the ASF dual-hosted git repository.
mayanks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 652cc98 realtime debug logging (#7946)
652cc98 is described below
commit 652cc9824d260a46226d4733b5972a6b1e7d65e1
Author: Richard Startin <[email protected]>
AuthorDate: Wed Dec 22 18:02:39 2021 +0000
realtime debug logging (#7946)
---
.../realtime/LLRealtimeSegmentDataManager.java | 25 ++++++++++++++++++----
.../kafka20/KafkaPartitionLevelConsumer.java | 12 +++++++++++
2 files changed, 33 insertions(+), 4 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index f463e0b..b56a794 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -397,6 +397,11 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
try {
messageBatch = _partitionGroupConsumer
.fetchMessages(_currentOffset, null,
_partitionLevelStreamConfig.getFetchTimeoutMillis());
+ if (_segmentLogger.isDebugEnabled()) {
+ _segmentLogger.debug("message batch received. filter={}
unfiltered={} endOfPartitionGroup={}",
+ messageBatch.getUnfilteredMessageCount(),
messageBatch.getMessageCount(),
+ messageBatch.isEndOfPartitionGroup());
+ }
_endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
_consecutiveErrorCount = 0;
} catch (PermanentConsumerException e) {
@@ -426,8 +431,12 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
} else if (messageBatch.getUnfilteredMessageCount() > 0) {
// we consumed something from the stream but filtered all the content
out,
// so we need to advance the offsets to avoid getting stuck
- _currentOffset = messageBatch.getOffsetOfNextBatch();
- lastUpdatedOffset =
_streamPartitionMsgOffsetFactory.create(_currentOffset);
+ StreamPartitionMsgOffset nextOffset =
messageBatch.getOffsetOfNextBatch();
+ if (_segmentLogger.isDebugEnabled()) {
+ _segmentLogger.debug("Skipped empty batch. Advancing from {} to {}",
_currentOffset, nextOffset);
+ }
+ _currentOffset = nextOffset;
+ lastUpdatedOffset =
_streamPartitionMsgOffsetFactory.create(nextOffset);
} else {
// We did not consume any rows. Update the partition-consuming metric
only if we have been idling for a long
// time.
@@ -459,6 +468,9 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
GenericRow reuse = new GenericRow();
for (int index = 0; index < messagesAndOffsets.getMessageCount(); index++)
{
if (_shouldStop || endCriteriaReached()) {
+ if (_segmentLogger.isDebugEnabled()) {
+ _segmentLogger.debug("stop processing message batch early
shouldStop: {}", _shouldStop);
+ }
break;
}
if (!canTakeMore) {
@@ -556,9 +568,14 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
}
updateCurrentDocumentCountMetrics();
if (streamMessageCount != 0) {
- _segmentLogger.debug("Indexed {} messages ({} messages read from stream)
current offset {}", indexedMessageCount,
- streamMessageCount, _currentOffset);
+ if (_segmentLogger.isDebugEnabled()) {
+ _segmentLogger.debug("Indexed {} messages ({} messages read from
stream) current offset {}",
+ indexedMessageCount, streamMessageCount, _currentOffset);
+ }
} else if (messagesAndOffsets.getUnfilteredMessageCount() == 0) {
+ if (_segmentLogger.isDebugEnabled()) {
+ _segmentLogger.debug("empty batch received - sleeping for {}ms",
idlePipeSleepTimeMillis);
+ }
// If there were no messages to be fetched from stream, wait for a
little bit as to avoid hammering the stream
Uninterruptibles.sleepUninterruptibly(idlePipeSleepTimeMillis,
TimeUnit.MILLISECONDS);
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index 68bbc9e..a1e0cc8 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -30,11 +30,15 @@ import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class KafkaPartitionLevelConsumer extends
KafkaPartitionLevelConnectionHandler
implements PartitionLevelConsumer {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class);
+
public KafkaPartitionLevelConsumer(String clientId, StreamConfig
streamConfig, int partition) {
super(clientId, streamConfig, partition);
}
@@ -48,6 +52,10 @@ public class KafkaPartitionLevelConsumer extends
KafkaPartitionLevelConnectionHa
}
public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset,
int timeoutMillis) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("poll consumer: {}, startOffset: {}, endOffset:{} timeout:
{}ms", _topicPartition, startOffset,
+ endOffset, timeoutMillis);
+ }
_consumer.seek(_topicPartition, startOffset);
ConsumerRecords<String, Bytes> consumerRecords =
_consumer.poll(Duration.ofMillis(timeoutMillis));
List<ConsumerRecord<String, Bytes>> messageAndOffsets =
consumerRecords.records(_topicPartition);
@@ -59,8 +67,12 @@ public class KafkaPartitionLevelConsumer extends
KafkaPartitionLevelConnectionHa
if (offset >= startOffset & (endOffset > offset | endOffset == -1)) {
if (message != null) {
filtered.add(new MessageAndOffset(message.get(), offset));
+ } else if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("tombstone message at offset {}", offset);
}
lastOffset = offset;
+ } else if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("filter message at offset {} (outside of offset range {}
{})", offset, startOffset, endOffset);
}
}
return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset,
filtered);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]