Hi Fabian, Thanks for drafting the FLIP!
## Few thoughts of user requirements 1.compact files from multiple checkpoints This is what users need very much. 2.The compaction block the checkpointing - Some scenarios are required. For example, the user expects the output data to be consistent with the input data. If the checkpoint succeeds, it needs to see how much data is output. Otherwise, the user restarts a job to consume the same source offset, and he may lose data. - But I think in most cases, users are concerned about this, and we can delay the data to be visible. 3.The end-to-end latency of data This also depends on the situation. - Some user delays are very important. We'd better compact the data at the current checkpoint, even if it affects the checkpoint delay. - Some users think that the delay doesn't matter (the delay is at the minute level). As long as you compact the file, it won't bring me trouble with small files. So I think flexibility is important. ## Few thoughts on the option 2 Option1 and option2 seem to be just the difference between the aggregator in the middle, whether it is a separate operator or a coordinator. I would be prefer to option 2. If I understand correctly, the whole process should be as follows: 1.SinkWriter -> 2.Aggregator(I think 1 parallelism is OK, why is it multiple parallelism?) -> 3.LocalCommitter (Do compaction works) -> 4.GlobalCommitter The Aggregator can control single checkpoint compaction or cross checkpoint compaction. Controlling block or not block the current checkpoint. Controlling the end-to-end latency of data is how many checkpoints to wait for. Best, Jingsong On Wed, Nov 3, 2021 at 11:01 PM Fabian Paul <fabianp...@ververica.com> wrote: > > > Hi David and Till, > > Thanks for your great feedback. One definitely confusing point in the FLIP is > who is doing the actual compaction. The compaction will not be done > by the CommittableAggregator operator but the committers so it should also > not affect the checkpointing duration or have a significant performance > bottleneck because the committers are executed in parallel (also in batch > mode [1]). > > I will update the FLIP to clarify it. > > > From your description I would be in favour of option 2 for the following > > reasons: Assuming that option 2 solves all our current problems, it seems > > like the least invasive change and smallest in scope. Your main concern is > > that it might not cover future use cases. Do you have some specific use > > cases in mind? > > No, I do not have anything specific in mind I just wanted to raise the point > that adding more and more operators to the sink might complicate the > development in the future that they can all be used together. > > > What I am missing a bit > > from the description is how option 2 will behave wrt checkpoints and the > > batch execution mode. > > My idea was to always invoke CommittableAggregate#aggregate on a checkpoint > and endOfInput. In the batch case the aggregation is only done > once on all committables. > > > > Few thoughts on the option 2) > > > > The file compaction is by definition quite costly IO bound operation. If I > > understand the proposal correctly, the aggregation itself would run during > > operator (aggregator) checkpoint. Would this significantly increase the > > checkpoint duration? > > > > Compaction between different sub-tasks incur additional network IO (to > > fetch the raw non-compacted files from the remote filesystem), so this > > could quickly become a bottleneck. Basically we're decreasing the sink > > parallelism (possible throughput) to parallelism of the aggregator. > > Hopefully these concerns are covered by the explanation at the beginning. > > > To be really effective here, compaction would ideally be able to compact > > files from multiple checkpoints. However there is a huge tradeoff between > > latency a efficiency (especially with exactly once). Is this something > > worth exploring? > > I agree with you by enabling the compaction across checkpoint the latency can > increase because files might be committed several checkpoints > later. I guess the best we can do is to let the user configure the behaviour. > By configuring the checkpointing interval and the wanted file size the > user can already affect the latency. > Is this answering you questions? I am not fully sure what you are referring > to with efficiency. @dvmk > > > I hope that with option 2, we can support both use cases: single task > compaction as well as cross task compaction if needed. Similarly for single > checkpoint compaction as well as cross checkpoint compaction. > > Compaction across subtasks should be controllable by the parallelism of the > commttableAggregator operator i.e. a parallelism of 2 can reduce > the computational complexity but might not compute the best compaction. > > Best, > Fabian > > [1] https://github.com/apache/flink/pull/17536 > <https://github.com/apache/flink/pull/17536>) -- Best, Jingsong Lee