[
https://issues.apache.org/jira/browse/FLINK-35988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-35988:
-----------------------------------
Labels: pull-request-available (was: )
> Reduce the number of state queries in the AppendOnlyFirstNFunction.
> -------------------------------------------------------------------
>
> Key: FLINK-35988
> URL: https://issues.apache.org/jira/browse/FLINK-35988
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / Runtime
> Affects Versions: 1.14.0
> Reporter: luolei
> Priority: Minor
> Labels: pull-request-available
>
> In the AppendOnlyFirstNFunction, there are two data reads from the state
> within a single processElement operation. This has a significant impact on
> performance, especially when the state size is large.
> {code:java}
> public void processElement(RowData input, Context context, Collector<RowData>
> out)
> throws Exception {
> initRankEnd(input);
> // check message should be insert only.
> Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT);
> int currentRank = state.value() == null ? 0 : state.value();
> // ignore record if it does not belong to the first-n rows
> if (currentRank >= rankEnd) {
> return;
> }
> currentRank += 1;
> state.update(currentRank);
> if (outputRankNumber || hasOffset()) {
> collectInsert(out, input, currentRank);
> } else {
> collectInsert(out, input);
> }
> }{code}
>
> Remedial measure: Optimize the code to reduce one state query invocation.
> {code:java}
> @Override
> public void processElement(RowData input, Context context, Collector<RowData>
> out)
> throws Exception {
> initRankEnd(input);
> // Ensure the message is an insert-only operation.
> Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT);
> int currentRank = getCurrentRank();
> // Ignore record if it does not belong to the first-n rows
> if (currentRank >= rankEnd) {
> return;
> }
> currentRank++;
> state.update(currentRank);
> if (outputRankNumber || hasOffset()) {
> collectInsert(out, input, currentRank);
> } else {
> collectInsert(out, input);
> }
> }
> private int getCurrentRank() throws IOException {
> Integer value = state.value();
> return value == null ? 0 : value;
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)