Hi, Yang Li. IIUC, Your issue is similar to the one described in this JIRA[1], right?
[1] https://issues.apache.org/jira/browse/FLINK-33936 -- Best! Xuyang At 2024-10-29 15:16:14, "李阳" <liyangfromdez...@gmail.com> wrote: >Hello devs, > >I would like to initiate a discussion about the Flink TopNFunction. In our >experience, we encountered the following issue while implementing the >Top3Function. > >During this process, if no input has a timestamp smaller than the RowData >currently held by the Top3Function, that operator will not emit any >messages related to this product. As a result, some state in the join >operator remains unupdated, and when the state expires, certain states are >cleared. From the user's perspective, even though data about a particular >product is continuously flowing in, the join may fail to find a matching >record. > >The issue arises because the TopNFunction processes data in a manner that >it will not take any action if it encounters input RowData that exceeds the >specified N. This situation can occur in both retract streams and >append-only streams. > >Considering that in the aggregation function, if the state retention time >is set and the function output remains unchanged, result will still be >emitted. This can lead to a triggering of the reset of the state retention >time in downstream operator. >Proposed Solutions > >To address this issue, we propose a couple of solutions: > > 1. > > Emit Changed Dataset on Each Incoming Record: If we can confirm that the > join key of the downstream join operator and the partition key of the TopN > operator are consistent, we can emit only the modified RowData. > 2. > > Full Emission on Each Incoming Record: If the keys are inconsistent we > could emit the full dataset each time a new record is received. However, > this approach would place a heavier burden on the system. > >These two methods may prevent the downstream operator's state from expiring. > >WDYT? > >Best, > >Yang Li > >https://docs.google.com/document/d/1OQXE6Pf6pVLyX1cVmlwvZ1GnX5rvRpIyVttiEhy8BRs/edit?tab=t.edj7xjbz07e4