This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 29edd60f85a [FLINK-36831][table] Introduce AppendOnlyTopNFunction in 
Rank with Async State API
29edd60f85a is described below

commit 29edd60f85a38a3286a8c6da2f6400affb04ce6f
Author: Xuyang <xyzhong...@163.com>
AuthorDate: Tue Jan 14 20:39:42 2025 +0800

    [FLINK-36831][table] Introduce AppendOnlyTopNFunction in Rank with Async 
State API
    
    This closes #25723
---
 .../plan/nodes/exec/stream/StreamExecRank.java     |  38 ++--
 .../planner/runtime/harness/RankHarnessTest.scala  | 165 +++++++++++++++--
 .../rank/AbstractSyncStateTopNFunction.java        |  32 ++++
 .../operators/rank/AbstractTopNFunction.java       |  91 ++++++----
 .../operators/rank/AppendOnlyTopNFunction.java     | 122 +++----------
 .../rank/async/AbstractAsyncStateTopNFunction.java |  63 ++++++-
 .../async/AsyncStateAppendOnlyTopNFunction.java    | 199 +++++++++++++++++++++
 .../operators/rank/utils/AppendOnlyTopNHelper.java | 171 ++++++++++++++++++
 .../operators/rank/utils/FastTop1Helper.java       |   8 +-
 .../operators/rank/AppendOnlyTopNFunctionTest.java |  38 ++--
 .../operators/rank/FastTop1FunctionTest.java       |   3 -
 .../operators/rank/TopNFunctionTestBase.java       |  13 --
 12 files changed, 754 insertions(+), 189 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
index 469270f0c5f..f2a0dd40e40 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
@@ -60,6 +60,7 @@ import org.apache.flink.table.runtime.operators.rank.RankType;
 import org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction;
 import org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction;
 import 
org.apache.flink.table.runtime.operators.rank.async.AbstractAsyncStateTopNFunction;
+import 
org.apache.flink.table.runtime.operators.rank.async.AsyncStateAppendOnlyTopNFunction;
 import 
org.apache.flink.table.runtime.operators.rank.async.AsyncStateFastTop1Function;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
@@ -283,17 +284,31 @@ public class StreamExecRank extends ExecNodeBase<RowData>
                                     cacheSize);
                 }
             } else {
-                processFunction =
-                        new AppendOnlyTopNFunction(
-                                ttlConfig,
-                                inputRowTypeInfo,
-                                sortKeyComparator,
-                                sortKeySelector,
-                                rankType,
-                                rankRange,
-                                generateUpdateBefore,
-                                outputRankNumber,
-                                cacheSize);
+                if (isAsyncStateEnabled) {
+                    processFunction =
+                            new AsyncStateAppendOnlyTopNFunction(
+                                    ttlConfig,
+                                    inputRowTypeInfo,
+                                    sortKeyComparator,
+                                    sortKeySelector,
+                                    rankType,
+                                    rankRange,
+                                    generateUpdateBefore,
+                                    outputRankNumber,
+                                    cacheSize);
+                } else {
+                    processFunction =
+                            new AppendOnlyTopNFunction(
+                                    ttlConfig,
+                                    inputRowTypeInfo,
+                                    sortKeyComparator,
+                                    sortKeySelector,
+                                    rankType,
+                                    rankRange,
+                                    generateUpdateBefore,
+                                    outputRankNumber,
+                                    cacheSize);
+                }
             }
         } else if (rankStrategy instanceof 
RankProcessStrategy.UpdateFastStrategy) {
             if (RankUtil.isTop1(rankRange)) {
@@ -344,7 +359,6 @@ public class StreamExecRank extends ExecNodeBase<RowData>
                                 outputRankNumber,
                                 cacheSize);
             }
-            // TODO Use UnaryUpdateTopNFunction after SortedMapState is merged
         } else if (rankStrategy instanceof 
RankProcessStrategy.RetractStrategy) {
             EqualiserCodeGenerator equaliserCodeGen =
                     new EqualiserCodeGenerator(
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala
index 032eba0d47a..4766668af92 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala
@@ -28,6 +28,7 @@ import 
org.apache.flink.table.planner.runtime.utils.{JavaUserDefinedTableFunctio
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND,
 ROCKSDB_BACKEND, StateBackendMode}
 import org.apache.flink.table.runtime.util.RowDataHarnessAssertor
 import org.apache.flink.table.runtime.util.StreamRecordUtils.binaryRecord
+import org.apache.flink.table.types.logical.LogicalType
 import 
org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension,
 Parameters}
 import org.apache.flink.types.Row
 import org.apache.flink.types.RowKind._
@@ -468,7 +469,10 @@ class RankHarnessTest(mode: StateBackendMode, 
enableAsyncState: Boolean)
     testHarness.close()
   }
 
-  def prepareTop1Tester(query: String, operatorNameIdentifier: String)
+  def prepareRankTester(
+      query: String,
+      operatorNameIdentifier: String,
+      operatorOutputLogicalTypes: Array[LogicalType])
       : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], 
RowDataHarnessAssertor) = {
     val sourceDDL =
       s"""
@@ -486,11 +490,7 @@ class RankHarnessTest(mode: StateBackendMode, 
enableAsyncState: Boolean)
 
     val testHarness =
       createHarnessTester(t1.toRetractStream[Row], operatorNameIdentifier)
-    val assertor = new RowDataHarnessAssertor(
-      Array(
-        DataTypes.STRING().getLogicalType,
-        DataTypes.BIGINT().getLogicalType,
-        DataTypes.BIGINT().getLogicalType))
+    val assertor = new RowDataHarnessAssertor(operatorOutputLogicalTypes)
 
     (testHarness, assertor)
   }
@@ -500,7 +500,7 @@ class RankHarnessTest(mode: StateBackendMode, 
enableAsyncState: Boolean)
     tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1))
     val query =
       """
-        |SELECT a, b, rn
+        |SELECT a, b
         |FROM
         |(
         |    SELECT a, b,
@@ -510,7 +510,11 @@ class RankHarnessTest(mode: StateBackendMode, 
enableAsyncState: Boolean)
         |WHERE rn <= 1
       """.stripMargin
     val (testHarness, assertor) =
-      prepareTop1Tester(query, "Rank(strategy=[AppendFastStrategy")
+      prepareRankTester(
+        query,
+        "Rank(strategy=[AppendFastStrategy",
+        Array(DataTypes.STRING().getLogicalType, 
DataTypes.BIGINT().getLogicalType)
+      )
 
     if (enableAsyncState) {
       assertThat(isAsyncStateOperator(testHarness)).isTrue
@@ -541,7 +545,7 @@ class RankHarnessTest(mode: StateBackendMode, 
enableAsyncState: Boolean)
     tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1))
     val query =
       """
-        |SELECT a, b, rn
+        |SELECT a, b
         |FROM
         |(
         |    SELECT a, b,
@@ -553,7 +557,11 @@ class RankHarnessTest(mode: StateBackendMode, 
enableAsyncState: Boolean)
         |WHERE rn <= 1
       """.stripMargin
     val (testHarness, assertor) =
-      prepareTop1Tester(query, "Rank(strategy=[UpdateFastStrategy")
+      prepareRankTester(
+        query,
+        "Rank(strategy=[UpdateFastStrategy",
+        Array(DataTypes.STRING().getLogicalType, 
DataTypes.BIGINT().getLogicalType)
+      )
 
     if (enableAsyncState) {
       assertThat(isAsyncStateOperator(testHarness)).isTrue
@@ -580,6 +588,143 @@ class RankHarnessTest(mode: StateBackendMode, 
enableAsyncState: Boolean)
 
     testHarness.close()
   }
+
+  @TestTemplate
+  def testAppendOnlyTopNWithRowNumber(): Unit = {
+    tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1))
+    val query =
+      """
+        |SELECT a, b, rn
+        |FROM
+        |(
+        |    SELECT a, b,
+        |        ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) AS rn
+        |    FROM T
+        |) t1
+        |WHERE rn <= 3
+      """.stripMargin
+    val (testHarness, assertor) =
+      prepareRankTester(
+        query,
+        "Rank(strategy=[AppendFastStrategy",
+        Array(
+          DataTypes.STRING().getLogicalType,
+          DataTypes.BIGINT().getLogicalType,
+          DataTypes.BIGINT().getLogicalType)
+      )
+
+    if (enableAsyncState) {
+      assertThat(isAsyncStateOperator(testHarness)).isTrue
+    } else {
+      assertThat(isAsyncStateOperator(testHarness)).isFalse
+    }
+
+    testHarness.open()
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    // a,2 - top1
+    testHarness.processElement(binaryRecord(INSERT, "a", 2L: JLong))
+    expectedOutput.add(binaryRecord(INSERT, "a", 2L: JLong, 1L: JLong))
+
+    // a,2 - top1
+    // a,1 - top2
+    testHarness.processElement(binaryRecord(INSERT, "a", 1L: JLong))
+    expectedOutput.add(binaryRecord(INSERT, "a", 1L: JLong, 2L: JLong))
+
+    // a,3 - top1
+    // a,2 - top2
+    // a,1 - top3
+    testHarness.processElement(binaryRecord(INSERT, "a", 3L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_BEFORE, "a", 2L: JLong, 1L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_AFTER, "a", 3L: JLong, 1L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_BEFORE, "a", 1L: JLong, 2L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_AFTER, "a", 2L: JLong, 2L: JLong))
+    expectedOutput.add(binaryRecord(INSERT, "a", 1L: JLong, 3L: JLong))
+
+    // a,3 - top1
+    // a,2 - top2
+    // a,1 - top3
+    testHarness.processElement(binaryRecord(INSERT, "a", 0L: JLong))
+
+    // a,3 - top1
+    // a,3 - top2
+    // a,2 - top3
+    testHarness.processElement(binaryRecord(INSERT, "a", 3L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_BEFORE, "a", 2L: JLong, 2L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_AFTER, "a", 3L: JLong, 2L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_BEFORE, "a", 1L: JLong, 3L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_AFTER, "a", 2L: JLong, 3L: JLong))
+
+    val result = dropWatermarks(testHarness.getOutput.toArray)
+    assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, 
result)
+
+    testHarness.close()
+  }
+
+  @TestTemplate
+  def testAppendOnlyTopNWithoutRowNumber(): Unit = {
+    tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1))
+    val query =
+      """
+        |SELECT a, b
+        |FROM
+        |(
+        |    SELECT a, b,
+        |        ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) AS rn
+        |    FROM T
+        |) t1
+        |WHERE rn <= 3
+      """.stripMargin
+    val (testHarness, assertor) =
+      prepareRankTester(
+        query,
+        "Rank(strategy=[AppendFastStrategy",
+        Array(DataTypes.STRING().getLogicalType, 
DataTypes.BIGINT().getLogicalType)
+      )
+
+    if (enableAsyncState) {
+      assertThat(isAsyncStateOperator(testHarness)).isTrue
+    } else {
+      assertThat(isAsyncStateOperator(testHarness)).isFalse
+    }
+
+    testHarness.open()
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    // a,2 - top1
+    testHarness.processElement(binaryRecord(INSERT, "a", 2L: JLong))
+    expectedOutput.add(binaryRecord(INSERT, "a", 2L: JLong))
+
+    // a,2 - top1
+    // a,1 - top2
+    testHarness.processElement(binaryRecord(INSERT, "a", 1L: JLong))
+    expectedOutput.add(binaryRecord(INSERT, "a", 1L: JLong))
+
+    // a,3 - top1
+    // a,2 - top2
+    // a,1 - top3
+    testHarness.processElement(binaryRecord(INSERT, "a", 3L: JLong))
+    expectedOutput.add(binaryRecord(INSERT, "a", 3L: JLong))
+
+    // a,3 - top1
+    // a,2 - top2
+    // a,1 - top3
+    testHarness.processElement(binaryRecord(INSERT, "a", 0L: JLong))
+
+    // a,3 - top1
+    // a,3 - top2
+    // a,2 - top3
+    testHarness.processElement(binaryRecord(INSERT, "a", 3L: JLong))
+    expectedOutput.add(binaryRecord(DELETE, "a", 1L: JLong))
+    expectedOutput.add(binaryRecord(INSERT, "a", 3L: JLong))
+
+    val result = dropWatermarks(testHarness.getOutput.toArray)
+    assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, 
result)
+
+    testHarness.close()
+  }
 }
 
 object RankHarnessTest {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AbstractSyncStateTopNFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AbstractSyncStateTopNFunction.java
index a2cc38ca69b..a42b4c1e7d6 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AbstractSyncStateTopNFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AbstractSyncStateTopNFunction.java
@@ -27,12 +27,17 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.util.Collector;
+
+import java.util.Objects;
 
 /** Base class for TopN Function with sync state api. */
 public abstract class AbstractSyncStateTopNFunction extends 
AbstractTopNFunction {
 
     private ValueState<Long> rankEndState;
 
+    protected long rankEnd;
+
     public AbstractSyncStateTopNFunction(
             StateTtlConfig ttlConfig,
             InternalTypeInfo<RowData> inputRowType,
@@ -76,6 +81,7 @@ public abstract class AbstractSyncStateTopNFunction extends 
AbstractTopNFunction
      */
     protected long initRankEnd(RowData row) throws Exception {
         if (isConstantRankEnd) {
+            rankEnd = Objects.requireNonNull(constantRankEnd);
             return rankEnd;
         } else {
             Long rankEndValue = rankEndState.value();
@@ -95,4 +101,30 @@ public abstract class AbstractSyncStateTopNFunction extends 
AbstractTopNFunction
             }
         }
     }
+
+    // ====== utility methods that omit the specified rank end ======
+
+    protected boolean isInRankEnd(long rank) {
+        return rank <= rankEnd;
+    }
+
+    protected boolean isInRankRange(long rank) {
+        return rank <= rankEnd && rank >= rankStart;
+    }
+
+    protected void collectInsert(Collector<RowData> out, RowData inputRow, 
long rank) {
+        collectInsert(out, inputRow, rank, rankEnd);
+    }
+
+    protected void collectDelete(Collector<RowData> out, RowData inputRow, 
long rank) {
+        collectDelete(out, inputRow, rank, rankEnd);
+    }
+
+    protected void collectUpdateAfter(Collector<RowData> out, RowData 
inputRow, long rank) {
+        collectUpdateAfter(out, inputRow, rank, rankEnd);
+    }
+
+    protected void collectUpdateBefore(Collector<RowData> out, RowData 
inputRow, long rank) {
+        collectUpdateBefore(out, inputRow, rank, rankEnd);
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AbstractTopNFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AbstractTopNFunction.java
index 9c0c62b058e..6be70356116 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AbstractTopNFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AbstractTopNFunction.java
@@ -38,7 +38,10 @@ import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.Comparator;
+import java.util.Objects;
 import java.util.function.Function;
 
 /**
@@ -65,20 +68,33 @@ public abstract class AbstractTopNFunction extends 
KeyedProcessFunction<RowData,
 
     protected final StateTtlConfig ttlConfig;
 
-    // The util to compare two sortKey equals to each other.
-    private GeneratedRecordComparator generatedSortKeyComparator;
-    protected Comparator<RowData> sortKeyComparator;
-
     private final boolean generateUpdateBefore;
+
     protected final boolean outputRankNumber;
+
     protected final InternalTypeInfo<RowData> inputRowType;
+
     protected final KeySelector<RowData, RowData> sortKeySelector;
 
-    protected KeyContext keyContext;
     protected final boolean isConstantRankEnd;
-    private final long rankStart;
+
+    protected final long rankStart;
+
+    // constant rank end
+    // if rank end is variable, this var is null
+    @Nullable protected final Long constantRankEnd;
+
+    // variable rank end index
     private final int rankEndIndex;
-    protected long rankEnd;
+
+    // The util to compare two sortKey equals to each other.
+    private GeneratedRecordComparator generatedSortKeyComparator;
+
+    protected Comparator<RowData> sortKeyComparator;
+
+    protected KeyContext keyContext;
+
+    // variable rank end fetcher
     protected transient Function<RowData, Long> rankEndFetcher;
 
     protected Counter invalidCounter;
@@ -118,15 +134,14 @@ public abstract class AbstractTopNFunction extends 
KeyedProcessFunction<RowData,
             ConstantRankRange constantRankRange = (ConstantRankRange) 
rankRange;
             isConstantRankEnd = true;
             rankStart = constantRankRange.getRankStart();
-            rankEnd = constantRankRange.getRankEnd();
             rankEndIndex = -1;
+            constantRankEnd = constantRankRange.getRankEnd();
         } else if (rankRange instanceof VariableRankRange) {
             VariableRankRange variableRankRange = (VariableRankRange) 
rankRange;
             rankEndIndex = variableRankRange.getRankEndIndex();
             isConstantRankEnd = false;
             rankStart = -1;
-            rankEnd = -1;
-
+            constantRankEnd = null;
         } else {
             LOG.error(WITHOUT_RANK_END_UNSUPPORTED_MSG);
             throw new 
UnsupportedOperationException(WITHOUT_RANK_END_UNSUPPORTED_MSG);
@@ -180,7 +195,7 @@ public abstract class AbstractTopNFunction extends 
KeyedProcessFunction<RowData,
      * @return default topN size
      */
     protected long getDefaultTopNSize() {
-        return isConstantRankEnd ? rankEnd : DEFAULT_TOPN_SIZE;
+        return isConstantRankEnd ? Objects.requireNonNull(constantRankEnd) : 
DEFAULT_TOPN_SIZE;
     }
 
     /**
@@ -213,8 +228,9 @@ public abstract class AbstractTopNFunction extends 
KeyedProcessFunction<RowData,
                 .<Long, Gauge<Long>>gauge("topn.cache.size", () -> heapSize);
     }
 
-    protected void collectInsert(Collector<RowData> out, RowData inputRow, 
long rank) {
-        if (isInRankRange(rank)) {
+    protected void collectInsert(
+            Collector<RowData> out, RowData inputRow, long rank, long rankEnd) 
{
+        if (isInRankRange(rank, rankEnd)) {
             out.collect(createOutputRow(inputRow, rank, RowKind.INSERT));
         }
     }
@@ -224,8 +240,9 @@ public abstract class AbstractTopNFunction extends 
KeyedProcessFunction<RowData,
         out.collect(inputRow);
     }
 
-    protected void collectDelete(Collector<RowData> out, RowData inputRow, 
long rank) {
-        if (isInRankRange(rank)) {
+    protected void collectDelete(
+            Collector<RowData> out, RowData inputRow, long rank, long rankEnd) 
{
+        if (isInRankRange(rank, rankEnd)) {
             out.collect(createOutputRow(inputRow, rank, RowKind.DELETE));
         }
     }
@@ -235,8 +252,9 @@ public abstract class AbstractTopNFunction extends 
KeyedProcessFunction<RowData,
         out.collect(inputRow);
     }
 
-    protected void collectUpdateAfter(Collector<RowData> out, RowData 
inputRow, long rank) {
-        if (isInRankRange(rank)) {
+    protected void collectUpdateAfter(
+            Collector<RowData> out, RowData inputRow, long rank, long rankEnd) 
{
+        if (isInRankRange(rank, rankEnd)) {
             out.collect(createOutputRow(inputRow, rank, RowKind.UPDATE_AFTER));
         }
     }
@@ -246,8 +264,9 @@ public abstract class AbstractTopNFunction extends 
KeyedProcessFunction<RowData,
         out.collect(inputRow);
     }
 
-    protected void collectUpdateBefore(Collector<RowData> out, RowData 
inputRow, long rank) {
-        if (generateUpdateBefore && isInRankRange(rank)) {
+    protected void collectUpdateBefore(
+            Collector<RowData> out, RowData inputRow, long rank, long rankEnd) 
{
+        if (generateUpdateBefore && isInRankRange(rank, rankEnd)) {
             out.collect(createOutputRow(inputRow, rank, 
RowKind.UPDATE_BEFORE));
         }
     }
@@ -259,11 +278,7 @@ public abstract class AbstractTopNFunction extends 
KeyedProcessFunction<RowData,
         }
     }
 
-    protected boolean isInRankEnd(long rank) {
-        return rank <= rankEnd;
-    }
-
-    protected boolean isInRankRange(long rank) {
+    protected boolean isInRankRange(long rank, long rankEnd) {
         return rank <= rankEnd && rank >= rankStart;
     }
 
@@ -323,30 +338,46 @@ public abstract class AbstractTopNFunction extends 
KeyedProcessFunction<RowData,
             this.keyContext = topNFunction.keyContext;
         }
 
-        protected void collectInsert(Collector<RowData> out, RowData inputRow, 
long rank) {
-            topNFunction.collectInsert(out, inputRow, rank);
+        protected void collectInsert(
+                Collector<RowData> out, RowData inputRow, long rank, long 
rankEnd) {
+            topNFunction.collectInsert(out, inputRow, rank, rankEnd);
         }
 
         protected void collectInsert(Collector<RowData> out, RowData inputRow) 
{
             topNFunction.collectInsert(out, inputRow);
         }
 
-        protected void collectUpdateAfter(Collector<RowData> out, RowData 
inputRow, long rank) {
-            topNFunction.collectUpdateAfter(out, inputRow, rank);
+        protected void collectDelete(
+                Collector<RowData> out, RowData inputRow, long rank, long 
rankEnd) {
+            topNFunction.collectDelete(out, inputRow, rank, rankEnd);
+        }
+
+        protected void collectDelete(Collector<RowData> out, RowData inputRow) 
{
+            topNFunction.collectDelete(out, inputRow);
+        }
+
+        protected void collectUpdateAfter(
+                Collector<RowData> out, RowData inputRow, long rank, long 
rankEnd) {
+            topNFunction.collectUpdateAfter(out, inputRow, rank, rankEnd);
         }
 
         protected void collectUpdateAfter(Collector<RowData> out, RowData 
inputRow) {
             topNFunction.collectUpdateAfter(out, inputRow);
         }
 
-        protected void collectUpdateBefore(Collector<RowData> out, RowData 
inputRow, long rank) {
-            topNFunction.collectUpdateBefore(out, inputRow, rank);
+        protected void collectUpdateBefore(
+                Collector<RowData> out, RowData inputRow, long rank, long 
rankEnd) {
+            topNFunction.collectUpdateBefore(out, inputRow, rank, rankEnd);
         }
 
         protected void collectUpdateBefore(Collector<RowData> out, RowData 
inputRow) {
             topNFunction.collectUpdateBefore(out, inputRow);
         }
 
+        protected boolean isInRankEnd(long rank, long rankEnd) {
+            return rank <= rankEnd;
+        }
+
         public void accRequestCount() {
             requestCount++;
         }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java
index abcaa881014..a5a031c16ea 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java
@@ -28,21 +28,15 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import 
org.apache.flink.table.runtime.operators.rank.utils.AppendOnlyTopNHelper;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.util.Collector;
 
-import org.apache.flink.shaded.guava32.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava32.com.google.common.cache.CacheBuilder;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 /**
  * A TopN function could handle insert-only stream.
@@ -53,8 +47,6 @@ public class AppendOnlyTopNFunction extends 
AbstractSyncStateTopNFunction {
 
     private static final long serialVersionUID = -4708453213104128011L;
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(AppendOnlyTopNFunction.class);
-
     private final InternalTypeInfo<RowData> sortKeyType;
     private final TypeSerializer<RowData> inputRowSer;
     private final long cacheSize;
@@ -65,8 +57,7 @@ public class AppendOnlyTopNFunction extends 
AbstractSyncStateTopNFunction {
     // the buffer stores mapping from sort key to records list, a heap mirror 
to dataState
     private transient TopNBuffer buffer;
 
-    // the kvSortedMap stores mapping from partition key to it's buffer
-    private transient Cache<RowData, TopNBuffer> kvSortedMap;
+    private transient SyncStateAppendOnlyTopNHelper helper;
 
     public AppendOnlyTopNFunction(
             StateTtlConfig ttlConfig,
@@ -95,17 +86,6 @@ public class AppendOnlyTopNFunction extends 
AbstractSyncStateTopNFunction {
     @Override
     public void open(OpenContext openContext) throws Exception {
         super.open(openContext);
-        int lruCacheSize = Math.max(1, (int) (cacheSize / 
getDefaultTopNSize()));
-        CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder();
-        if (ttlConfig.isEnabled()) {
-            cacheBuilder.expireAfterWrite(
-                    ttlConfig.getTimeToLive().toMillis(), 
TimeUnit.MILLISECONDS);
-        }
-        kvSortedMap = cacheBuilder.maximumSize(lruCacheSize).build();
-        LOG.info(
-                "Top{} operator is using LRU caches key-size: {}",
-                getDefaultTopNSize(),
-                lruCacheSize);
 
         ListTypeInfo<RowData> valueTypeInfo = new ListTypeInfo<>(inputRowType);
         MapStateDescriptor<RowData, List<RowData>> mapStateDescriptor =
@@ -115,8 +95,9 @@ public class AppendOnlyTopNFunction extends 
AbstractSyncStateTopNFunction {
         }
         dataState = getRuntimeContext().getMapState(mapStateDescriptor);
 
-        // metrics
-        registerMetric(kvSortedMap.size() * getDefaultTopNSize());
+        helper = new SyncStateAppendOnlyTopNHelper();
+
+        helper.registerMetric();
     }
 
     @Override
@@ -139,20 +120,20 @@ public class AppendOnlyTopNFunction extends 
AbstractSyncStateTopNFunction {
             if (outputRankNumber || hasOffset()) {
                 // the without-number-algorithm can't handle topN with offset,
                 // so use the with-number-algorithm to handle offset
-                processElementWithRowNumber(sortKey, input, out);
+                helper.processElementWithRowNumber(buffer, sortKey, input, 
rankEnd, out);
             } else {
-                processElementWithoutRowNumber(input, out);
+                helper.processElementWithoutRowNumber(buffer, input, rankEnd, 
out);
             }
         }
     }
 
     private void initHeapStates() throws Exception {
-        requestCount += 1;
+        helper.accRequestCount();
         RowData currentKey = (RowData) keyContext.getCurrentKey();
-        buffer = kvSortedMap.getIfPresent(currentKey);
+        buffer = helper.getTopNBufferFromCache(currentKey);
         if (buffer == null) {
             buffer = new TopNBuffer(sortKeyComparator, ArrayList::new);
-            kvSortedMap.put(currentKey, buffer);
+            helper.saveTopNBufferToCache(currentKey, buffer);
             // restore buffer
             Iterator<Map.Entry<RowData, List<RowData>>> iter = 
dataState.iterator();
             if (iter != null) {
@@ -165,84 +146,27 @@ public class AppendOnlyTopNFunction extends 
AbstractSyncStateTopNFunction {
                 }
             }
         } else {
-            hitCount += 1;
+            helper.accHitCount();
         }
     }
 
-    private void processElementWithRowNumber(RowData sortKey, RowData input, 
Collector<RowData> out)
-            throws Exception {
-        Iterator<Map.Entry<RowData, Collection<RowData>>> iterator = 
buffer.entrySet().iterator();
-        long currentRank = 0L;
-        boolean findsSortKey = false;
-        RowData currentRow = null;
-        while (iterator.hasNext() && isInRankEnd(currentRank)) {
-            Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
-            Collection<RowData> records = entry.getValue();
-            // meet its own sort key
-            if (!findsSortKey && entry.getKey().equals(sortKey)) {
-                currentRank += records.size();
-                currentRow = input;
-                findsSortKey = true;
-            } else if (findsSortKey) {
-                Iterator<RowData> recordsIter = records.iterator();
-                while (recordsIter.hasNext() && isInRankEnd(currentRank)) {
-                    RowData prevRow = recordsIter.next();
-                    collectUpdateBefore(out, prevRow, currentRank);
-                    collectUpdateAfter(out, currentRow, currentRank);
-                    currentRow = prevRow;
-                    currentRank += 1;
-                }
-            } else {
-                currentRank += records.size();
-            }
-        }
-        if (isInRankEnd(currentRank)) {
-            // there is no enough elements in Top-N, emit INSERT message for 
the new record.
-            collectInsert(out, currentRow, currentRank);
+    private class SyncStateAppendOnlyTopNHelper extends AppendOnlyTopNHelper {
+
+        public SyncStateAppendOnlyTopNHelper() {
+            super(
+                    AppendOnlyTopNFunction.this,
+                    cacheSize,
+                    AppendOnlyTopNFunction.this.getDefaultTopNSize());
         }
 
-        // remove the records associated to the sort key which is out of topN
-        List<RowData> toDeleteSortKeys = new ArrayList<>();
-        while (iterator.hasNext()) {
-            Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
-            RowData key = entry.getKey();
+        @Override
+        protected void removeFromState(RowData key) throws Exception {
             dataState.remove(key);
-            toDeleteSortKeys.add(key);
-        }
-        for (RowData toDeleteKey : toDeleteSortKeys) {
-            buffer.removeAll(toDeleteKey);
         }
-    }
 
-    private void processElementWithoutRowNumber(RowData input, 
Collector<RowData> out)
-            throws Exception {
-        // remove retired element
-        if (buffer.getCurrentTopNum() > rankEnd) {
-            Map.Entry<RowData, Collection<RowData>> lastEntry = 
buffer.lastEntry();
-            RowData lastKey = lastEntry.getKey();
-            Collection<RowData> lastList = lastEntry.getValue();
-            RowData lastElement = buffer.lastElement();
-            int size = lastList.size();
-            // remove last one
-            if (size <= 1) {
-                buffer.removeAll(lastKey);
-                dataState.remove(lastKey);
-            } else {
-                buffer.removeLast();
-                // last element has been removed from lastList, we have to 
copy a new collection
-                // for lastList to avoid mutating state values, see 
CopyOnWriteStateMap,
-                // otherwise, the result might be corrupt.
-                // don't need to perform a deep copy, because RowData elements 
will not be updated
-                dataState.put(lastKey, new ArrayList<>(lastList));
-            }
-            if (size == 0 || input.equals(lastElement)) {
-                return;
-            } else {
-                // lastElement shouldn't be null
-                collectDelete(out, lastElement);
-            }
+        @Override
+        protected void updateState(RowData key, List<RowData> value) throws 
Exception {
+            dataState.put(key, value);
         }
-        // it first appears in the TopN, send INSERT message
-        collectInsert(out, input);
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java
index 4e8c884b5c2..3fb63d8a282 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java
@@ -18,7 +18,14 @@
 
 package org.apache.flink.table.runtime.operators.rank.async;
 
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.core.state.StateFutureUtils;
+import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
@@ -27,13 +34,13 @@ import 
org.apache.flink.table.runtime.operators.rank.RankRange;
 import org.apache.flink.table.runtime.operators.rank.RankType;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 
-/**
- * Base class for TopN Function with async state api.
- *
- * <p>TODO FLINK-36831 support variable rank end in async state rank later.
- */
+import java.util.Objects;
+
+/** Base class for TopN Function with async state api. */
 public abstract class AbstractAsyncStateTopNFunction extends 
AbstractTopNFunction {
 
+    private ValueState<Long> rankEndState;
+
     public AbstractAsyncStateTopNFunction(
             StateTtlConfig ttlConfig,
             InternalTypeInfo<RowData> inputRowType,
@@ -52,9 +59,51 @@ public abstract class AbstractAsyncStateTopNFunction extends 
AbstractTopNFunctio
                 rankRange,
                 generateUpdateBefore,
                 outputRankNumber);
+    }
+
+    @Override
+    public void open(OpenContext openContext) throws Exception {
+        super.open(openContext);
+
         if (!isConstantRankEnd) {
-            throw new UnsupportedOperationException(
-                    "Variable rank end is not supported in rank with async 
state api.");
+            ValueStateDescriptor<Long> rankStateDesc =
+                    new ValueStateDescriptor<>("rankEnd", Types.LONG);
+            if (ttlConfig.isEnabled()) {
+                rankStateDesc.enableTimeToLive(ttlConfig);
+            }
+            rankEndState =
+                    ((StreamingRuntimeContext) 
getRuntimeContext()).getValueState(rankStateDesc);
+        }
+    }
+
+    /**
+     * Initialize rank end.
+     *
+     * @param row input record
+     * @return rank end
+     */
+    protected StateFuture<Long> initRankEnd(RowData row) {
+        if (isConstantRankEnd) {
+            return 
StateFutureUtils.completedFuture(Objects.requireNonNull(constantRankEnd));
+        } else {
+            return rankEndState
+                    .asyncValue()
+                    .thenApply(
+                            rankEndInState -> {
+                                long curRankEnd = rankEndFetcher.apply(row);
+                                if (rankEndInState == null) {
+                                    // no need to wait this future
+                                    rankEndState.asyncUpdate(curRankEnd);
+                                    return curRankEnd;
+                                } else {
+                                    if (rankEndInState != curRankEnd) {
+                                        // increment the invalid counter when 
the current rank end
+                                        // not equal to previous rank end
+                                        invalidCounter.inc();
+                                    }
+                                    return rankEndInState;
+                                }
+                            });
         }
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateAppendOnlyTopNFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateAppendOnlyTopNFunction.java
new file mode 100644
index 00000000000..962298bb989
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AsyncStateAppendOnlyTopNFunction.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.rank.async;
+
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.v2.MapState;
+import org.apache.flink.api.common.state.v2.MapStateDescriptor;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.core.state.StateFutureUtils;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction;
+import org.apache.flink.table.runtime.operators.rank.RankRange;
+import org.apache.flink.table.runtime.operators.rank.RankType;
+import org.apache.flink.table.runtime.operators.rank.TopNBuffer;
+import 
org.apache.flink.table.runtime.operators.rank.utils.AppendOnlyTopNHelper;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A TopN function could handle insert-only stream.
+ *
+ * <p>The input stream should only contain INSERT messages.
+ *
+ * <p>Different with {@link AppendOnlyTopNFunction}, this function is used 
with async state api.
+ */
+public class AsyncStateAppendOnlyTopNFunction extends 
AbstractAsyncStateTopNFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final InternalTypeInfo<RowData> sortKeyType;
+    private final TypeSerializer<RowData> inputRowSer;
+    private final long cacheSize;
+
+    // a map state stores mapping from sort key to records list which is in 
topN
+    private transient MapState<RowData, List<RowData>> dataState;
+
+    private transient AsyncStateAppendOnlyTopNHelper helper;
+
+    public AsyncStateAppendOnlyTopNFunction(
+            StateTtlConfig ttlConfig,
+            InternalTypeInfo<RowData> inputRowType,
+            GeneratedRecordComparator sortKeyGeneratedRecordComparator,
+            RowDataKeySelector sortKeySelector,
+            RankType rankType,
+            RankRange rankRange,
+            boolean generateUpdateBefore,
+            boolean outputRankNumber,
+            long cacheSize) {
+        super(
+                ttlConfig,
+                inputRowType,
+                sortKeyGeneratedRecordComparator,
+                sortKeySelector,
+                rankType,
+                rankRange,
+                generateUpdateBefore,
+                outputRankNumber);
+        this.sortKeyType = sortKeySelector.getProducedType();
+        this.inputRowSer = inputRowType.createSerializer(new 
SerializerConfigImpl());
+        this.cacheSize = cacheSize;
+    }
+
+    @Override
+    public void open(OpenContext openContext) throws Exception {
+        super.open(openContext);
+
+        ListTypeInfo<RowData> valueTypeInfo = new ListTypeInfo<>(inputRowType);
+        MapStateDescriptor<RowData, List<RowData>> mapStateDescriptor =
+                new MapStateDescriptor<>("data-state-with-append", 
sortKeyType, valueTypeInfo);
+        if (ttlConfig.isEnabled()) {
+            mapStateDescriptor.enableTimeToLive(ttlConfig);
+        }
+        dataState = ((StreamingRuntimeContext) 
getRuntimeContext()).getMapState(mapStateDescriptor);
+
+        helper = new AsyncStateAppendOnlyTopNHelper();
+
+        helper.registerMetric();
+    }
+
+    @Override
+    public void processElement(RowData input, Context context, 
Collector<RowData> out)
+            throws Exception {
+        StateFuture<TopNBuffer> topNBufferFuture = initHeapStates();
+        StateFuture<Long> rankEndFuture = initRankEnd(input);
+
+        RowData sortKey = sortKeySelector.getKey(input);
+        topNBufferFuture.thenCombine(
+                rankEndFuture,
+                (buffer, rankEnd) -> {
+                    if (checkSortKeyInBufferRange(sortKey, buffer)) {
+                        // insert sort key into buffer
+                        buffer.put(sortKey, inputRowSer.copy(input));
+                        Collection<RowData> inputs = buffer.get(sortKey);
+
+                        // update data state
+                        // copy a new collection to avoid mutating state 
values, see
+                        // CopyOnWriteStateMap,
+                        // otherwise, the result might be corrupt.
+                        // don't need to perform a deep copy, because RowData 
elements will not be
+                        // updated
+                        dataState
+                                .asyncPut(sortKey, new ArrayList<>(inputs))
+                                .thenAccept(
+                                        VOID -> {
+                                            if (outputRankNumber || 
hasOffset()) {
+                                                // the 
without-number-algorithm can't handle topN
+                                                // with offset,
+                                                // so use the 
with-number-algorithm to handle offset
+                                                
helper.processElementWithRowNumber(
+                                                        buffer, sortKey, 
input, rankEnd, out);
+                                            } else {
+                                                
helper.processElementWithoutRowNumber(
+                                                        buffer, input, 
rankEnd, out);
+                                            }
+                                        });
+                    }
+
+                    return null;
+                });
+    }
+
+    private StateFuture<TopNBuffer> initHeapStates() {
+        helper.accRequestCount();
+        RowData currentKey = (RowData) keyContext.getCurrentKey();
+        TopNBuffer bufferFromCache = helper.getTopNBufferFromCache(currentKey);
+        if (bufferFromCache != null) {
+            helper.accHitCount();
+            return StateFutureUtils.completedFuture(bufferFromCache);
+        }
+
+        TopNBuffer bufferFromState = new TopNBuffer(sortKeyComparator, 
ArrayList::new);
+        // restore buffer
+        return dataState
+                .asyncEntries()
+                .thenCompose(
+                        iter ->
+                                iter.onNext(
+                                        entry -> {
+                                            RowData sortKey = entry.getKey();
+                                            List<RowData> values = 
entry.getValue();
+                                            // the order is preserved
+                                            bufferFromState.putAll(sortKey, 
values);
+                                        }))
+                .thenApply(
+                        VOID -> {
+                            helper.saveTopNBufferToCache(currentKey, 
bufferFromState);
+                            return bufferFromState;
+                        });
+    }
+
+    private class AsyncStateAppendOnlyTopNHelper extends AppendOnlyTopNHelper {
+
+        public AsyncStateAppendOnlyTopNHelper() {
+            super(
+                    AsyncStateAppendOnlyTopNFunction.this,
+                    cacheSize,
+                    
AsyncStateAppendOnlyTopNFunction.this.getDefaultTopNSize());
+        }
+
+        @Override
+        protected void removeFromState(RowData key) throws Exception {
+            // no need to wait this async request to end
+            dataState.asyncRemove(key);
+        }
+
+        @Override
+        protected void updateState(RowData key, List<RowData> value) throws 
Exception {
+            // no need to wait this async request to end
+            dataState.asyncPut(key, value);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/AppendOnlyTopNHelper.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/AppendOnlyTopNHelper.java
new file mode 100644
index 00000000000..e05a9934fe5
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/AppendOnlyTopNHelper.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.rank.utils;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.rank.AbstractTopNFunction;
+import org.apache.flink.table.runtime.operators.rank.AppendOnlyTopNFunction;
+import org.apache.flink.table.runtime.operators.rank.TopNBuffer;
+import 
org.apache.flink.table.runtime.operators.rank.async.AsyncStateAppendOnlyTopNFunction;
+import org.apache.flink.util.Collector;
+
+import org.apache.flink.shaded.guava32.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava32.com.google.common.cache.CacheBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A helper to help do the logic 'Top-n' for append-only stream in {@link 
AppendOnlyTopNFunction}
+ * and {@link AsyncStateAppendOnlyTopNFunction}.
+ */
+public abstract class AppendOnlyTopNHelper extends 
AbstractTopNFunction.AbstractTopNHelper {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AppendOnlyTopNHelper.class);
+
+    // the kvSortedMap stores mapping from partition key to it's buffer
+    private final Cache<RowData, TopNBuffer> kvSortedMap;
+
+    private final long topNSize;
+
+    public AppendOnlyTopNHelper(AbstractTopNFunction topNFunction, long 
cacheSize, long topNSize) {
+        super(topNFunction);
+
+        this.topNSize = topNSize;
+
+        int lruCacheSize = Math.max(1, (int) (cacheSize / topNSize));
+        CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder();
+        if (ttlConfig.isEnabled()) {
+            cacheBuilder.expireAfterWrite(
+                    ttlConfig.getTimeToLive().toMillis(), 
TimeUnit.MILLISECONDS);
+        }
+        kvSortedMap = cacheBuilder.maximumSize(lruCacheSize).build();
+
+        LOG.info("Top{} operator is using LRU caches key-size: {}", topNSize, 
lruCacheSize);
+    }
+
+    public void registerMetric() {
+        registerMetric(kvSortedMap.size() * topNSize);
+    }
+
+    @Nullable
+    public TopNBuffer getTopNBufferFromCache(RowData currentKey) {
+        return kvSortedMap.getIfPresent(currentKey);
+    }
+
+    public void saveTopNBufferToCache(RowData currentKey, TopNBuffer 
topNBuffer) {
+        kvSortedMap.put(currentKey, topNBuffer);
+    }
+
+    /**
+     * The without-number-algorithm can't handle topN with offset, so use the 
with-number-algorithm
+     * to handle offset.
+     */
+    public void processElementWithRowNumber(
+            TopNBuffer buffer, RowData sortKey, RowData input, long rankEnd, 
Collector<RowData> out)
+            throws Exception {
+        Iterator<Map.Entry<RowData, Collection<RowData>>> iterator = 
buffer.entrySet().iterator();
+        long currentRank = 0L;
+        boolean findsSortKey = false;
+        RowData currentRow = null;
+        while (iterator.hasNext() && isInRankEnd(currentRank, rankEnd)) {
+            Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
+            Collection<RowData> records = entry.getValue();
+            // meet its own sort key
+            if (!findsSortKey && entry.getKey().equals(sortKey)) {
+                currentRank += records.size();
+                currentRow = input;
+                findsSortKey = true;
+            } else if (findsSortKey) {
+                Iterator<RowData> recordsIter = records.iterator();
+                while (recordsIter.hasNext() && isInRankEnd(currentRank, 
rankEnd)) {
+                    RowData prevRow = recordsIter.next();
+                    collectUpdateBefore(out, prevRow, currentRank, rankEnd);
+                    collectUpdateAfter(out, currentRow, currentRank, rankEnd);
+                    currentRow = prevRow;
+                    currentRank += 1;
+                }
+            } else {
+                currentRank += records.size();
+            }
+        }
+        if (isInRankEnd(currentRank, rankEnd)) {
+            // there is no enough elements in Top-N, emit INSERT message for 
the new record.
+            collectInsert(out, currentRow, currentRank, rankEnd);
+        }
+
+        // remove the records associated to the sort key which is out of topN
+        List<RowData> toDeleteSortKeys = new ArrayList<>();
+        while (iterator.hasNext()) {
+            Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
+            RowData key = entry.getKey();
+            removeFromState(key);
+            toDeleteSortKeys.add(key);
+        }
+        for (RowData toDeleteKey : toDeleteSortKeys) {
+            buffer.removeAll(toDeleteKey);
+        }
+    }
+
+    public void processElementWithoutRowNumber(
+            TopNBuffer buffer, RowData input, long rankEnd, Collector<RowData> 
out)
+            throws Exception {
+        // remove retired element
+        if (buffer.getCurrentTopNum() > rankEnd) {
+            Map.Entry<RowData, Collection<RowData>> lastEntry = 
buffer.lastEntry();
+            RowData lastKey = lastEntry.getKey();
+            Collection<RowData> lastList = lastEntry.getValue();
+            RowData lastElement = buffer.lastElement();
+            int size = lastList.size();
+            // remove last one
+            if (size <= 1) {
+                buffer.removeAll(lastKey);
+                removeFromState(lastKey);
+            } else {
+                buffer.removeLast();
+                // last element has been removed from lastList, we have to 
copy a new collection
+                // for lastList to avoid mutating state values, see 
CopyOnWriteStateMap,
+                // otherwise, the result might be corrupt.
+                // don't need to perform a deep copy, because RowData elements 
will not be updated
+                updateState(lastKey, new ArrayList<>(lastList));
+            }
+            if (size == 0 || input.equals(lastElement)) {
+                return;
+            } else {
+                // lastElement shouldn't be null
+                collectDelete(out, lastElement);
+            }
+        }
+        // it first appears in the TopN, send INSERT message
+        collectInsert(out, input);
+    }
+
+    protected abstract void removeFromState(RowData key) throws Exception;
+
+    protected abstract void updateState(RowData key, List<RowData> value) 
throws Exception;
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/FastTop1Helper.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/FastTop1Helper.java
index 3ee0a381952..0d8ce3154fb 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/FastTop1Helper.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/utils/FastTop1Helper.java
@@ -88,7 +88,8 @@ public abstract class FastTop1Helper extends 
AbstractTopNFunction.AbstractTopNHe
     public void processAsFirstRow(RowData input, RowData currentKey, 
Collector<RowData> out) {
         kvCache.put(currentKey, inputRowSer.copy(input));
         if (outputRankNumber) {
-            collectInsert(out, input, 1);
+            // the rank end of top-1 is always 1L
+            collectInsert(out, input, 1, 1);
         } else {
             collectInsert(out, input);
         }
@@ -106,8 +107,9 @@ public abstract class FastTop1Helper extends 
AbstractTopNFunction.AbstractTopNHe
             // Note: partition key is unique key if only top-1 is desired,
             //  thus emitting UB and UA here
             if (outputRankNumber) {
-                collectUpdateBefore(out, prevRow, 1);
-                collectUpdateAfter(out, input, 1);
+                // the rank end of top-1 is always 1L
+                collectUpdateBefore(out, prevRow, 1, 1);
+                collectUpdateAfter(out, input, 1, 1);
             } else {
                 collectUpdateBefore(out, prevRow);
                 collectUpdateAfter(out, input);
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunctionTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunctionTest.java
index c6800fe6e94..f124e9700e5 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunctionTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunctionTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.operators.rank;
 
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.runtime.operators.rank.async.AsyncStateAppendOnlyTopNFunction;
 
 import org.junit.jupiter.api.TestTemplate;
 
@@ -29,7 +30,7 @@ import java.util.List;
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
 
-/** Tests for {@link AppendOnlyTopNFunction}. */
+/** Tests for {@link AppendOnlyTopNFunction} and {@link 
AsyncStateAppendOnlyTopNFunction}. */
 class AppendOnlyTopNFunctionTest extends TopNFunctionTestBase {
 
     @Override
@@ -39,21 +40,34 @@ class AppendOnlyTopNFunctionTest extends 
TopNFunctionTestBase {
             boolean generateUpdateBefore,
             boolean outputRankNumber,
             boolean enableAsyncState) {
-        return new AppendOnlyTopNFunction(
-                ttlConfig,
-                inputRowType,
-                generatedSortKeyComparator,
-                sortKeySelector,
-                rankType,
-                rankRange,
-                generateUpdateBefore,
-                outputRankNumber,
-                cacheSize);
+        if (enableAsyncState) {
+            return new AsyncStateAppendOnlyTopNFunction(
+                    ttlConfig,
+                    inputRowType,
+                    generatedSortKeyComparator,
+                    sortKeySelector,
+                    rankType,
+                    rankRange,
+                    generateUpdateBefore,
+                    outputRankNumber,
+                    cacheSize);
+        } else {
+            return new AppendOnlyTopNFunction(
+                    ttlConfig,
+                    inputRowType,
+                    generatedSortKeyComparator,
+                    sortKeySelector,
+                    rankType,
+                    rankRange,
+                    generateUpdateBefore,
+                    outputRankNumber,
+                    cacheSize);
+        }
     }
 
     @Override
     boolean supportedAsyncState() {
-        return false;
+        return true;
     }
 
     @TestTemplate
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/FastTop1FunctionTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/FastTop1FunctionTest.java
index 31205dd3ec8..90784dcfd34 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/FastTop1FunctionTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/FastTop1FunctionTest.java
@@ -22,10 +22,8 @@ import 
org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
 import 
org.apache.flink.table.runtime.operators.rank.async.AsyncStateFastTop1Function;
-import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
 
 import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -35,7 +33,6 @@ import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterR
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
 
 /** Tests for {@link FastTop1Function} and {@link AsyncStateFastTop1Function}. 
*/
-@ExtendWith(ParameterizedTestExtension.class)
 public class FastTop1FunctionTest extends TopNFunctionTestBase {
 
     @Override
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java
index b831709ec8f..2a444117c60 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java
@@ -60,7 +60,6 @@ import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterR
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assumptions.assumeFalse;
-import static org.junit.jupiter.api.Assumptions.assumeTrue;
 
 /** Base Tests for all subclass of {@link AbstractTopNFunction}. */
 @ExtendWith(ParameterizedTestExtension.class)
@@ -155,8 +154,6 @@ abstract class TopNFunctionTestBase {
     /** RankEnd column must be long, int or short type, but could not be 
string type yet. */
     @TestTemplate
     void testInvalidVariableRankRangeWithIntType() throws Exception {
-        // rank with async state does not support variable rank range yet
-        assumeFalse(enableAsyncState);
         AbstractTopNFunction func =
                 createFunction(RankType.ROW_NUMBER, new VariableRankRange(0), 
true, false);
         OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = 
createTestHarness(func);
@@ -393,16 +390,6 @@ abstract class TopNFunctionTestBase {
                 "output wrong.", expectedOutput, testHarness.getOutput());
     }
 
-    @TestTemplate
-    void testNotSupportConstantRankRangeWithAsyncState() {
-        assumeTrue(enableAsyncState);
-        assertThatThrownBy(
-                        () ->
-                                createFunction(
-                                        RankType.ROW_NUMBER, new 
VariableRankRange(0), true, false))
-                .isInstanceOf(UnsupportedOperationException.class);
-    }
-
     OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(
             AbstractTopNFunction rankFunction) throws Exception {
         if (enableAsyncState) {

Reply via email to