Hi Zakelly, Thanks for your comments on this FLIP. Please let me attempt to clarify these points.
1. Yes, this FLIP proposes to buffer the outputs in the state backend. As only the latest one of each type of StreamElement is about to be buffered, a ValueState in keyed context or a ListState in non-keyed context would be enough to hold each type of StreamElement. The value to be stored in the ValueState/ListState would be the original StreamRecord/Watermark/WatermarkStatus/LatencyMarker. Besides, the KeyedStateBackend#applyToAllKeys method makes it possible to access states for all keys in one keyed context. 2.1 The buffered intermediate results need to be included in the next checkpoint to preserve exactly-once semantics during failover. The buffer would be cleared in the `flush()` operation, but `flush()` need not be triggered before checkpoints. I agree with it that saving buffered results to state would increase the workload about state access operations, but given that the state buffer would be enabled on aggregation operators which already involve states, the additional buffer results would not increase the time complexity of state accesses or the memory(state) complexity. If we could exchange one state read/write operation and the space of a ValueState with all computations in downstream operators to process one intermediate result, I believe the optimization to throughput would be worth the tradeoff in states. 2.2 Not considering checkpoints, it might still be meaningful to discuss the alternative solutions to store buffered results during runtime as proposed in your suggestions. At least for keyed streams, I'm concerned that saving all buffered results in memory would easily cause OOM problems, as there is no guarantee on the number of keyed states to store between a flush interval. I'm also wondering whether a file-based map would have better performance than state backends, and why Flink haven't introduced FileSystemStateBackend if file-based map could be better. Could you please provide more illustrations on the pros & cons of state backend v.s. memory/filesystem? Best regards, Yunfeng On Thu, Sep 21, 2023 at 4:10 PM Zakelly Lan <zakelly....@gmail.com> wrote: > > Hi Yunfeng and Dong, > > Thanks for this FLIP. I have reviewed it briefly and have a few questions: > > 1. Is this FLIP proposing to buffer the output in the state backend? > If so, what is the data format of this buffer (what type of state does > it use and what is the value)? Additionally, how does the operator > retrieve all the buffer data from the state backend during the > `flush()` operation (while the keyed states can only be accessed under > a keyed context)? > 2. Are the buffered intermediate results required to be included in > the next checkpoint? Or are they deleted and subsumed in the original > states during the `flush()` operation before triggering the > checkpoint? I'm asking because if they are not included in the > checkpoint, it may be more efficient to avoid using keyed states for > buffering. In this scenario, a simple heap-based or even file-based > map could be more efficient. Frequent writes and clears can lead to > increased space usage and read amplification for RocksDB, and it also > requires more CPU resources for checkpointing and compaction. > > > Looking forward to your thoughts. > > > Best, > Zakelly > > > On Mon, Sep 11, 2023 at 1:39 PM Yunfeng Zhou > <flink.zhouyunf...@gmail.com> wrote: > > > > Hi all, > > > > Dong(cc'ed) and I are opening this thread to discuss our proposal to > > support buffering & flushing the output of operators with idempotent > > semantics, which has been documented in > > FLIP-365<https://cwiki.apache.org/confluence/display/FLINK/FLIP-365%3A+Introduce+flush+interval+to+adjust+the+interval+of+emitting+results+with+idempotent+semantics>. > > > > In the pursuit of unifying batch and stream processing, it has been > > discovered that the batch execution mode provides a significant > > advantage by allowing only the final result from "rolling" operations > > such as reduce() or sum() to be emitted, thus reducing the amount of > > time and resources required by downstream applications. Inspired by > > this advantage, the proposed solution supports buffering the output of > > operators in the streaming mode and periodically emitting the final > > results, much like in batch processing. This approach is designed to > > help improve the throughput of jobs by reducing the need for > > downstream applications to process intermediate results in a stream, > > at the cost of increased latency and state size. > > > > Please refer to the FLIP document for more details about the proposed > > design and implementation. We welcome any feedback and opinions on > > this proposal. > > > > Best regards. > > Dong and Yunfeng