gaoyunhaii opened a new pull request #14831:
URL: https://github.com/apache/flink/pull/14831
## What is the purpose of the change
This PR makes `CheckpointBarrierHandler` inserts barrier for channels that
have received EndPartition but not processed it yet.
When an input channel received EndOfPartition, it would insert a prioritized
`FinalizeBarrier` at the head of queue, which indicating to
`CheckpointBarrierHandler` that this channel has received `EndOfPartition` and
what its next expected barrier id is.
For `CheckpointBarrierHandler`, it introduce a new component
`FinalizeBarrierComplementProcessor` to deal with `FinalizedBarrier` and insert
barriers.
1. When start checkpoint due to received barrier from another channel or
RPC trigger, it would insert barriers for all the input channels received
EndOfPartition
2. When received `FinalizeBarrier` from one input channel, it would insert
barriers for all the pending checkpoints for this channel.
It would also record the next expected barrier id for each channel and would
not insert stale barriers.
**LocalInputChannel**
Previously LocalInputChannel would directly pull buffer from
`ResultPartition` and would not cache buffers. However, with inserted barriers,
LocalInputChannel would have to support caching some buffers. We would like to
use a FIFO queue to cache buffers instead of a prioritized queue to simplify
the notification to InputGates (e.g., do not need to notifyPrioritizedEvent for
the `LocalInputChannel` as before).
## Brief change log
- 7b5bcb2aea349560a6fac8d07e879f6e135f2e67 introduces the `FinalizedBarrier`
event.
- 6f78a9a3c18c8ed8c43cfe6213e80c793e84f5b3 InputChannels insert
`FinalizedBarrier` before EndOfPartition and support inserting barriers before
EndOfPartition
- 1240dbb9879bedee17ac69d69ab8fd514589febd Modify the
`CheckpointBarrierHandler` to support insert barriers when needed and support
alignment with EndOfPartition.
## Verifying this change
This change added tests and can be verified via added UT for input channels
and checkpoint barrier handlers.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): **no**
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: **no**
- The serializers: **no**
- The runtime per-record code paths (performance sensitive): **no**
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **no**
- The S3 file system connector: **no**
## Documentation
- Does this pull request introduce a new feature? **no**
- If yes, how is the feature documented? **not applicable**
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]