Hello devs,

I would like to initiate a discussion about the Flink TopNFunction. In our
experience, we encountered the following issue while implementing the
Top3Function.

During this process, if no input has a timestamp smaller than the RowData
currently held by the Top3Function, that operator will not emit any
messages related to this product. As a result, some state in the join
operator remains unupdated, and when the state expires, certain states are
cleared. From the user's perspective, even though data about a particular
product is continuously flowing in, the join may fail to find a matching
record.

The issue arises because the TopNFunction processes data in a manner that
it will not take any action if it encounters input RowData that exceeds the
specified N. This situation can occur in both retract streams and
append-only streams.

Considering that in the aggregation function, if the state retention time
is set and the function output remains unchanged, result will still be
emitted. This can lead to a triggering of the reset of the state retention
time in downstream operator.
Proposed Solutions

To address this issue, we propose a couple of solutions:

   1.

   Emit Changed Dataset on Each Incoming Record: If we can confirm that the
   join key of the downstream join operator and the partition key of the TopN
   operator are consistent, we can emit only the modified RowData.
   2.

    Full Emission on Each Incoming Record: If the keys are inconsistent we
   could emit the full dataset each time a new record is received. However,
   this approach would place a heavier burden on the system.

These two methods may prevent the downstream operator's state from expiring.

WDYT?

Best,

Yang Li

https://docs.google.com/document/d/1OQXE6Pf6pVLyX1cVmlwvZ1GnX5rvRpIyVttiEhy8BRs/edit?tab=t.edj7xjbz07e4

Reply via email to