ege-st commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1428220886
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java:
##########
@@ -46,6 +50,20 @@ public KafkaPartitionLevelConsumer(String clientId,
StreamConfig streamConfig, i
super(clientId, streamConfig, partition);
}
+ @Override
+ public void validateStreamState(StreamPartitionMsgOffset startMsgOffset)
throws PermanentConsumerException {
+ final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
+ Map<TopicPartition, Long> beginningOffsets =
+
_consumer.beginningOffsets(Collections.singletonList(_topicPartition));
+
+ final long beginningOffset =
beginningOffsets.getOrDefault(_topicPartition, 0L);
+ if (startOffset < beginningOffset) {
Review Comment:
Is `startOffset > beginningOffset` an acceptable scenario?
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1648,6 +1648,15 @@ private void makeStreamConsumer(String reason) {
_segmentLogger.info("Creating new stream consumer for topic partition {} ,
reason: {}", _clientId, reason);
_partitionGroupConsumer =
_streamConsumerFactory.createPartitionGroupConsumer(_clientId,
_partitionGroupConsumptionStatus);
+ try {
+ _partitionGroupConsumer.validateStreamState(_currentOffset);
+ _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, 0,
Review Comment:
So it's good that we have a gauge per table (because otherwise, a table
which is healthy could reset the gauge from `1` to `0` if it executes this
check _after_ an unhealthy table set the gauge to `1`).
However, what happens if there are multiple partitions on a table and one
partition's segment sets this gauge to `1`, then a different partition's
segment executes this code and resets the gauge from `1` to `0`? Put another
way: if a table has 3 partitions and 1 of them fails `validateStreamState` but
the other two pass `validateStreamState` then it's very likely that the healthy
partitions will mask the unhealthy partition.
We need to make sure that if there is a segment that fails this validation
this gauge is set to `1` until that segment is fixed or there is some other
manual intervention.
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1648,6 +1648,15 @@ private void makeStreamConsumer(String reason) {
_segmentLogger.info("Creating new stream consumer for topic partition {} ,
reason: {}", _clientId, reason);
_partitionGroupConsumer =
_streamConsumerFactory.createPartitionGroupConsumer(_clientId,
_partitionGroupConsumptionStatus);
+ try {
+ _partitionGroupConsumer.validateStreamState(_currentOffset);
+ _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, 0,
+ ServerGauge.INVALID_REALTIME_STREAM_STATE_EXCEPTION, 0);
+ } catch (PermanentConsumerException pce) {
Review Comment:
Should we catch this exception here or bubble it up to the next level?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]