Hi, Xuyang, This issue is quite similar to FLINK-33936.
Since the inconsistency in state expiration between the rank operator and downstream operators only occurs in specific scenarios, we might need to discuss whether it’s necessary to modify the default behavior. Best, Feng On Mon, Nov 4, 2024 at 7:30 PM Xuyang <xyzhong...@163.com> wrote: > Hi, Yang Li. > > IIUC, Your issue is similar to the one described in this JIRA[1], right? > > > > > [1] https://issues.apache.org/jira/browse/FLINK-33936 > > > > > -- > > Best! > Xuyang > > > > > > At 2024-10-29 15:16:14, "李阳" <liyangfromdez...@gmail.com> wrote: > >Hello devs, > > > >I would like to initiate a discussion about the Flink TopNFunction. In our > >experience, we encountered the following issue while implementing the > >Top3Function. > > > >During this process, if no input has a timestamp smaller than the RowData > >currently held by the Top3Function, that operator will not emit any > >messages related to this product. As a result, some state in the join > >operator remains unupdated, and when the state expires, certain states are > >cleared. From the user's perspective, even though data about a particular > >product is continuously flowing in, the join may fail to find a matching > >record. > > > >The issue arises because the TopNFunction processes data in a manner that > >it will not take any action if it encounters input RowData that exceeds > the > >specified N. This situation can occur in both retract streams and > >append-only streams. > > > >Considering that in the aggregation function, if the state retention time > >is set and the function output remains unchanged, result will still be > >emitted. This can lead to a triggering of the reset of the state retention > >time in downstream operator. > >Proposed Solutions > > > >To address this issue, we propose a couple of solutions: > > > > 1. > > > > Emit Changed Dataset on Each Incoming Record: If we can confirm that > the > > join key of the downstream join operator and the partition key of the > TopN > > operator are consistent, we can emit only the modified RowData. > > 2. > > > > Full Emission on Each Incoming Record: If the keys are inconsistent we > > could emit the full dataset each time a new record is received. > However, > > this approach would place a heavier burden on the system. > > > >These two methods may prevent the downstream operator's state from > expiring. > > > >WDYT? > > > >Best, > > > >Yang Li > > > > > https://docs.google.com/document/d/1OQXE6Pf6pVLyX1cVmlwvZ1GnX5rvRpIyVttiEhy8BRs/edit?tab=t.edj7xjbz07e4 >