dannycranmer opened a new pull request #13886:
URL: https://github.com/apache/flink/pull/13886
## What is the purpose of the change
* The Flink Kinesis EFO consumer has a SubscribeToShard retry policy which
will terminate the job after a given number of subsequent attempt failures. In
high backpressure scenarios the Netty HTTP Client throws a ReadTimeoutException
when the consumer takes longer than 30s to process a batch. If this happens (by
default) 10 times in a row, the job will terminate. There is no need to
terminate in this condition, and the restart results in the job falling further
behind.
* There is a queue used to pass events between the network client and
consumer application. When an error is thrown in the network thread, the queue
is cleared to make space for the error event. This means that records will be
thrown away to make space for errors (the records would be subsequently
reloaded from the shard).
## Brief change log
* Excluded the ReadTimeoutException from the SubscribeToShard retry policy,
such that that connector will gracefully reconnect once the consumer has
processed the queued records.
* Added a new mechanism to pass exceptions between threads, meaning data
does not need to be discarded. When an error is thrown, the error event will be
processed by the consumer once all of the records have been processed.
## Verifying this change
* Existing unit tests continue to pass
* Added additional test coverage
* Deployed and tested on local Flink cluster
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: no
- The serializers: no
- The runtime per-record code paths (performance sensitive): yes
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]