xiangfu0 opened a new pull request, #18669:
URL: https://github.com/apache/pinot/pull/18669

   ## Summary
   
   Fixes #18663.
   
   This updates `KafkaPartitionLevelConsumer` for both Kafka 3.x and Kafka 4.x 
so repeated caller offsets only skip re-seek after an empty fetch. If the 
previous Kafka poll returned records but `RealtimeSegmentDataManager` did not 
process them, the consumer now re-seeks to the caller's start offset and 
replays that batch.
   
   ## Root cause
   
   #18337 intentionally skipped re-seek when Pinot called `fetchMessages()` 
again with the same start offset, so `read_committed` consumers could preserve 
progress through broker-filtered aborted records after an empty poll.
   
   That same seek-skip is unsafe after a non-empty poll. When a realtime 
segment hits an end criterion before processing index 0, Kafka has already 
advanced the consumer position, but Pinot's `_currentOffset` has not advanced. 
A later `CATCH_UP` call with the same start offset then polls from the advanced 
Kafka position, falsely surfacing as message loss / past max offset.
   
   ## User manual
   
   No table config change is required. Existing realtime tables continue using 
the same stream config. Tables using the default `read_uncommitted` isolation 
and tables explicitly using `read_committed` both keep their existing behavior; 
the fix only changes how the consumer repositions after a repeated caller 
offset.
   
   ## Sample table config
   
   For transactional Kafka topics where Pinot should ignore aborted records, 
keep using `read_committed` as before:
   
   ```json
   {
     "tableName": "events_REALTIME",
     "tableType": "REALTIME",
     "segmentsConfig": {
       "timeColumnName": "ts",
       "timeType": "MILLISECONDS",
       "replication": "1"
     },
     "streamConfigs": {
       "streamType": "kafka",
       "stream.kafka.topic.name": "events",
       "stream.kafka.broker.list": "localhost:9092",
       "stream.kafka.consumer.factory.class.name": 
"org.apache.pinot.plugin.stream.kafka30.KafkaConsumerFactory",
       "stream.kafka.decoder.class.name": 
"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
       "stream.kafka.isolation.level": "read_committed"
     }
   }
   ```
   
   ## Review
   
   Reviewed with the Pinot PR review checklist: production safety, backward 
compatibility, correctness, state management, performance, architecture, 
testing, naming, and process. No remaining findings.
   
   ## Tests
   
   - `./mvnw -pl 
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0,pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0
 -Dtest=KafkaPartitionLevelConsumerSeekTest test`
   - `./mvnw spotless:apply checkstyle:check license:format license:check -pl 
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0,pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0`
   - `git diff --check`
   
   Note: full `KafkaPartitionLevelConsumerTest` was also attempted. Kafka 3.x 
passed locally; Kafka 4.x requires Docker/Testcontainers and this environment 
has no Docker socket.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to