This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d9931c8af05 [FLINK-35988][table-runtime] Reduce the number of state 
lookups in AppendOnlyFirstNFunction
d9931c8af05 is described below

commit d9931c8af05d0f1f721be9fe920690fe122507ad
Author: lexluo09 <[email protected]>
AuthorDate: Thu Aug 29 20:56:38 2024 +0800

    [FLINK-35988][table-runtime] Reduce the number of state lookups in 
AppendOnlyFirstNFunction
    
    This closes #25159
---
 .../runtime/operators/rank/AppendOnlyFirstNFunction.java  | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyFirstNFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyFirstNFunction.java
index e93c3ea66a5..a597a61d85a 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyFirstNFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyFirstNFunction.java
@@ -31,6 +31,8 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
+import java.io.IOException;
+
 /**
  * A variant of {@link AppendOnlyTopNFunction} to handle first-n case.
  *
@@ -79,14 +81,14 @@ public class AppendOnlyFirstNFunction extends 
AbstractTopNFunction {
             throws Exception {
         initRankEnd(input);
 
-        // check message should be insert only.
+        // Ensure the message is an insert-only operation.
         Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT);
-        int currentRank = state.value() == null ? 0 : state.value();
-        // ignore record if it does not belong to the first-n rows
+        int currentRank = getCurrentRank();
+        // Ignore record if it does not belong to the first-n rows
         if (currentRank >= rankEnd) {
             return;
         }
-        currentRank += 1;
+        currentRank++;
         state.update(currentRank);
 
         if (outputRankNumber || hasOffset()) {
@@ -95,4 +97,9 @@ public class AppendOnlyFirstNFunction extends 
AbstractTopNFunction {
             collectInsert(out, input);
         }
     }
+
+    private int getCurrentRank() throws IOException {
+        Integer value = state.value();
+        return value == null ? 0 : value;
+    }
 }

Reply via email to