JingsongLi opened a new pull request #13744: URL: https://github.com/apache/flink/pull/13744
## What is the purpose of the change & Brief change log ### Introduce Compaction operators: The compaction operator graph is: TempFileWriter|parallel ---(InputFile&EndInputFile)---> CompactCoordinator|non-parallel ---(CompactionUnit&EndCompaction)--->CompactOperator|parallel---(PartitionCommitInfo)---> PartitionCommitter|non-parallel Because the end message is a kind of barrier of record messages, they can only be transmitted in the way of full broadcast in the link from coordinator to compact operator. ### Introduce CompactCoordinator This is the single (non-parallel) monitoring task which coordinate input files to compaction units. - Receives in-flight input files inside checkpoint. - Receives all upstream end input messages after the checkpoint completes successfully, starts coordination. NOTE: The coordination is a stable algorithm, which can ensure that the downstream can perform compaction at any time without worrying about fail over. STATE: This operator stores input files in state, after the checkpoint completes successfully, input files are taken out from the state for coordination. ### Introduce CompactOperator Receives compaction units to do compaction. Send partition commit information after compaction finished. Use BulkFormat to read and use BucketWriter to write. STATE: This operator stores expired files in state, after the checkpoint completes successfully, We can ensure that these files will not be used again and they can be deleted from the file system. ## Verifying this change - `CompactOperatorsTest` - `BinPackingTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org