richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r771821781
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -423,12 +427,17 @@ protected boolean consumeLoop()
consecutiveIdleCount = 0;
// We consumed something. Update the highest stream offset as well as
partition-consuming metric.
// TODO Issue 5359 Need to find a way to bump metrics without getting
actual offset value.
-// _serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED,
-// _currentOffset.getOffset());
-// _serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED,
-// _currentOffset.getOffset());
+ //_serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED,
+ //_currentOffset.getOffset());
+ //_serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED,
+ //_currentOffset.getOffset());
_serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.LLC_PARTITION_CONSUMING, 1);
lastUpdatedOffset =
_streamPartitionMsgOffsetFactory.create(_currentOffset);
+ } else if (messageBatch.getUnfilteredMessageCount() > 0) {
+ // we consumed something from the stream but filtered all the content
out,
+ // so we need to advance the offsets to avoid getting stuck
+ _currentOffset = messageBatch.getLastOffset();
+ lastUpdatedOffset =
_streamPartitionMsgOffsetFactory.create(_currentOffset);
Review comment:
This is the bug fix, this ensures that we advance after consuming a bad
batch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]