Hello devs, I would like to initiate a discussion about the Flink TopNFunction. In our experience, we encountered the following issue while implementing the Top3Function.
During this process, if no input has a timestamp smaller than the RowData currently held by the Top3Function, that operator will not emit any messages related to this product. As a result, some state in the join operator remains unupdated, and when the state expires, certain states are cleared. From the user's perspective, even though data about a particular product is continuously flowing in, the join may fail to find a matching record. The issue arises because the TopNFunction processes data in a manner that it will not take any action if it encounters input RowData that exceeds the specified N. This situation can occur in both retract streams and append-only streams. Considering that in the aggregation function, if the state retention time is set and the function output remains unchanged, result will still be emitted. This can lead to a triggering of the reset of the state retention time in downstream operator. Proposed Solutions To address this issue, we propose a couple of solutions: 1. Emit Changed Dataset on Each Incoming Record: If we can confirm that the join key of the downstream join operator and the partition key of the TopN operator are consistent, we can emit only the modified RowData. 2. Full Emission on Each Incoming Record: If the keys are inconsistent we could emit the full dataset each time a new record is received. However, this approach would place a heavier burden on the system. These two methods may prevent the downstream operator's state from expiring. WDYT? Best, Yang Li https://docs.google.com/document/d/1OQXE6Pf6pVLyX1cVmlwvZ1GnX5rvRpIyVttiEhy8BRs/edit?tab=t.edj7xjbz07e4