xiangfu0 opened a new pull request, #18669:
URL: https://github.com/apache/pinot/pull/18669
## Summary
Fixes #18663.
This updates `KafkaPartitionLevelConsumer` for both Kafka 3.x and Kafka 4.x
so repeated caller offsets only skip re-seek after an empty fetch. If the
previous Kafka poll returned records but `RealtimeSegmentDataManager` did not
process them, the consumer now re-seeks to the caller's start offset and
replays that batch.
## Root cause
#18337 intentionally skipped re-seek when Pinot called `fetchMessages()`
again with the same start offset, so `read_committed` consumers could preserve
progress through broker-filtered aborted records after an empty poll.
That same seek-skip is unsafe after a non-empty poll. When a realtime
segment hits an end criterion before processing index 0, Kafka has already
advanced the consumer position, but Pinot's `_currentOffset` has not advanced.
A later `CATCH_UP` call with the same start offset then polls from the advanced
Kafka position, falsely surfacing as message loss / past max offset.
## User manual
No table config change is required. Existing realtime tables continue using
the same stream config. Tables using the default `read_uncommitted` isolation
and tables explicitly using `read_committed` both keep their existing behavior;
the fix only changes how the consumer repositions after a repeated caller
offset.
## Sample table config
For transactional Kafka topics where Pinot should ignore aborted records,
keep using `read_committed` as before:
```json
{
"tableName": "events_REALTIME",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"timeType": "MILLISECONDS",
"replication": "1"
},
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "events",
"stream.kafka.broker.list": "localhost:9092",
"stream.kafka.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kafka30.KafkaConsumerFactory",
"stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
"stream.kafka.isolation.level": "read_committed"
}
}
```
## Review
Reviewed with the Pinot PR review checklist: production safety, backward
compatibility, correctness, state management, performance, architecture,
testing, naming, and process. No remaining findings.
## Tests
- `./mvnw -pl
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0,pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0
-Dtest=KafkaPartitionLevelConsumerSeekTest test`
- `./mvnw spotless:apply checkstyle:check license:format license:check -pl
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0,pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0`
- `git diff --check`
Note: full `KafkaPartitionLevelConsumerTest` was also attempted. Kafka 3.x
passed locally; Kafka 4.x requires Docker/Testcontainers and this environment
has no Docker socket.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]