This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d9931c8af05 [FLINK-35988][table-runtime] Reduce the number of state
lookups in AppendOnlyFirstNFunction
d9931c8af05 is described below
commit d9931c8af05d0f1f721be9fe920690fe122507ad
Author: lexluo09 <[email protected]>
AuthorDate: Thu Aug 29 20:56:38 2024 +0800
[FLINK-35988][table-runtime] Reduce the number of state lookups in
AppendOnlyFirstNFunction
This closes #25159
---
.../runtime/operators/rank/AppendOnlyFirstNFunction.java | 15 +++++++++++----
1 file changed, 11 insertions(+), 4 deletions(-)
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyFirstNFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyFirstNFunction.java
index e93c3ea66a5..a597a61d85a 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyFirstNFunction.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyFirstNFunction.java
@@ -31,6 +31,8 @@ import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
+import java.io.IOException;
+
/**
* A variant of {@link AppendOnlyTopNFunction} to handle first-n case.
*
@@ -79,14 +81,14 @@ public class AppendOnlyFirstNFunction extends
AbstractTopNFunction {
throws Exception {
initRankEnd(input);
- // check message should be insert only.
+ // Ensure the message is an insert-only operation.
Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT);
- int currentRank = state.value() == null ? 0 : state.value();
- // ignore record if it does not belong to the first-n rows
+ int currentRank = getCurrentRank();
+ // Ignore record if it does not belong to the first-n rows
if (currentRank >= rankEnd) {
return;
}
- currentRank += 1;
+ currentRank++;
state.update(currentRank);
if (outputRankNumber || hasOffset()) {
@@ -95,4 +97,9 @@ public class AppendOnlyFirstNFunction extends
AbstractTopNFunction {
collectInsert(out, input);
}
}
+
+ private int getCurrentRank() throws IOException {
+ Integer value = state.value();
+ return value == null ? 0 : value;
+ }
}