lindong28 opened a new pull request #15161:
URL: https://github.com/apache/flink/pull/15161
## What is the purpose of the change
This PR fixed a few bugs related to `KafkaSource`. See "Brief change log"
for the list of bug fixes made in this PR.
## Brief change log
Bug fixes:
- `KafkaSourceReader` should not commit offsets for partitions whose offsets
have not been initialized.
- `SourceCoordinatorContext` should not log and fail job again if it
receives InterruptedException after it is closed.
- `SourceOperatorStreamTask` should be closed synchronously to avoid
ClassNotFoundException.
- `PartitionOffsetsRetrieverImpl.committedOffsets()` should handle the case
without committed offsets.
- `SourceOperatorStreamTask` should check the committed offset first before
using OffsetResetStrategy.
- Auto offset commit should be disabled by default.
- `SourceCoordinatorContext` should fail job if it fails to send event to
subtasks.
Usability improvements:
- Reduce the offset commit logging verbosity from INFO to DEBUG.
- `SourceOperatorStreamTask` should report numRecordsOutCount.
- `KafkaSourceEnumerator` should close the admin client early if periodic
partition discovery is disabled.
- Remove the unused `close.timeout.ms` config.
Tests:
- Add IT cases for KafkaSource by migrating IT cases from FlinkKafkaConsumer.
## Verifying this change
- Added tests in `KafkaSourceITCase`.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]