This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 26d342601a5 [FLINK-36837][table] Introduce RowTimeDeduplicateFunction 
with Async State API
26d342601a5 is described below

commit 26d342601a545293b6d9f447a033c0559ba86d14
Author: Xuyang <xyzhong...@163.com>
AuthorDate: Fri Jan 17 20:30:25 2025 +0800

    [FLINK-36837][table] Introduce RowTimeDeduplicateFunction with Async State 
API
    
    This closes #25728
---
 .../nodes/exec/stream/StreamExecDeduplicate.java   |  36 +++-
 .../runtime/stream/sql/DeduplicateITCase.scala     |  36 +++-
 .../deduplicate/DeduplicateFunctionBase.java       |  23 +--
 .../ProcTimeDeduplicateKeepFirstRowFunction.java   |   4 +-
 .../ProcTimeDeduplicateKeepLastRowFunction.java    |   6 +-
 ...meMiniBatchDeduplicateKeepFirstRowFunction.java |   2 +-
 ...imeMiniBatchDeduplicateKeepLastRowFunction.java |   4 +-
 .../deduplicate/RowTimeDeduplicateFunction.java    |  55 +++---
 .../RowTimeMiniBatchDeduplicateFunction.java       |   6 +-
 ...meMiniBatchLatestChangeDeduplicateFunction.java |   6 +-
 ....java => SyncStateDeduplicateFunctionBase.java} |  17 +-
 .../AsyncStateDeduplicateFunctionBase.java}        |  27 ++-
 .../AsyncStateRowTimeDeduplicateFunction.java      |  85 ++++++++++
 .../{ => utils}/DeduplicateFunctionHelper.java     |  19 ++-
 .../RowTimeDeduplicateFunctionHelper.java}         |  62 +++----
 .../RowTimeDeduplicateRecordsCombiner.java         |   2 +-
 .../RowTimeDeduplicateFunctionTest.java            | 186 ++++++++++++++-------
 .../RowTimeDeduplicateFunctionTestBase.java        |  22 +--
 18 files changed, 371 insertions(+), 227 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
index 812ab1b5cbf..e5ec2d3518a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
@@ -22,6 +22,7 @@ import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
+import 
org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -51,6 +52,7 @@ import 
org.apache.flink.table.runtime.operators.deduplicate.ProcTimeMiniBatchDed
 import 
org.apache.flink.table.runtime.operators.deduplicate.RowTimeDeduplicateFunction;
 import 
org.apache.flink.table.runtime.operators.deduplicate.RowTimeMiniBatchDeduplicateFunction;
 import 
org.apache.flink.table.runtime.operators.deduplicate.RowTimeMiniBatchLatestChangeDeduplicateFunction;
+import 
org.apache.flink.table.runtime.operators.deduplicate.asyncprocessing.AsyncStateRowTimeDeduplicateFunction;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
 import org.apache.flink.table.types.logical.RowType;
@@ -256,6 +258,10 @@ public class StreamExecDeduplicate extends 
ExecNodeBase<RowData>
             return 
config.get(TABLE_EXEC_DEDUPLICATE_MINIBATCH_COMPACT_CHANGES_ENABLED);
         }
 
+        protected boolean isAsyncStateEnabled() {
+            return 
config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED);
+        }
+
         protected long getMiniBatchSize() {
             if (isMiniBatchEnabled()) {
                 long size = 
config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE);
@@ -332,15 +338,27 @@ public class StreamExecDeduplicate extends 
ExecNodeBase<RowData>
                             trigger);
                 }
             } else {
-                RowTimeDeduplicateFunction processFunction =
-                        new RowTimeDeduplicateFunction(
-                                rowTypeInfo,
-                                stateRetentionTime,
-                                rowtimeIndex,
-                                generateUpdateBefore,
-                                generateInsert(),
-                                keepLastRow);
-                return new KeyedProcessOperator<>(processFunction);
+                if (isAsyncStateEnabled()) {
+                    AsyncStateRowTimeDeduplicateFunction processFunction =
+                            new AsyncStateRowTimeDeduplicateFunction(
+                                    rowTypeInfo,
+                                    stateRetentionTime,
+                                    rowtimeIndex,
+                                    generateUpdateBefore,
+                                    generateInsert(),
+                                    keepLastRow);
+                    return new AsyncKeyedProcessOperator<>(processFunction);
+                } else {
+                    RowTimeDeduplicateFunction processFunction =
+                            new RowTimeDeduplicateFunction(
+                                    rowTypeInfo,
+                                    stateRetentionTime,
+                                    rowtimeIndex,
+                                    generateUpdateBefore,
+                                    generateInsert(),
+                                    keepLastRow);
+                    return new KeyedProcessOperator<>(processFunction);
+                }
             }
         }
     }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
index dccaf33bcb1..450270c47df 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeduplicateITCase.scala
@@ -17,29 +17,31 @@
  */
 package org.apache.flink.table.planner.runtime.stream.sql
 
-import org.apache.flink.api.common.eventtime.{AscendingTimestampsWatermarks, 
TimestampAssigner, TimestampAssignerSupplier, WatermarkGenerator, 
WatermarkGeneratorSupplier, WatermarkStrategy}
+import org.apache.flink.api.common.eventtime._
 import org.apache.flink.core.testutils.EachCallbackWrapper
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.runtime.utils._
-import 
org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.{MiniBatchMode,
 MiniBatchOn}
-import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import 
org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.{MiniBatchMode,
 MiniBatchOff, MiniBatchOn}
+import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND,
 ROCKSDB_BACKEND, StateBackendMode}
 import org.apache.flink.table.utils.LegacyRowExtension
-import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension
+import 
org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension,
 Parameters}
 import org.apache.flink.types.Row
 
 import org.assertj.core.api.Assertions.assertThat
 import org.assertj.core.api.Assumptions.assumeThat
-import org.junit.jupiter.api.TestTemplate
+import org.junit.jupiter.api.{BeforeEach, TestTemplate}
 import org.junit.jupiter.api.extension.{ExtendWith, RegisterExtension}
 
+import java.util
+
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 
 @ExtendWith(Array(classOf[ParameterizedTestExtension]))
-class DeduplicateITCase(miniBatch: MiniBatchMode, mode: StateBackendMode)
+class DeduplicateITCase(miniBatch: MiniBatchMode, mode: StateBackendMode, 
enableAsyncState: Boolean)
   extends StreamingWithMiniBatchTestBase(miniBatch, mode) {
 
   @RegisterExtension private val _: EachCallbackWrapper[LegacyRowExtension] =
@@ -55,6 +57,14 @@ class DeduplicateITCase(miniBatch: MiniBatchMode, mode: 
StateBackendMode)
   rowtimeTestData.+=((3, 4L, "Comment#2"))
   rowtimeTestData.+=((4, 4L, "Comment#3"))
 
+  @BeforeEach
+  override def before(): Unit = {
+    super.before()
+    tEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED,
+      Boolean.box(enableAsyncState))
+  }
+
   @TestTemplate
   def testFirstRowOnProctime(): Unit = {
     val t = failingDataSource(TestData.tupleData3)
@@ -425,3 +435,17 @@ class RowtimeExtractor extends WatermarkStrategy[(Int, 
Long, String)] {
     (e: (Int, Long, String), _: Long) => e._2
   }
 }
+
+object DeduplicateITCase {
+
+  @Parameters(name = "{0}, StateBackend={1}, EnableAsyncState={2}")
+  def parameters(): util.Collection[Array[java.lang.Object]] = {
+    Seq[Array[AnyRef]](
+      Array(MiniBatchOff, HEAP_BACKEND, Boolean.box(false)),
+      Array(MiniBatchOff, ROCKSDB_BACKEND, Boolean.box(false)),
+      Array(MiniBatchOn, HEAP_BACKEND, Boolean.box(false)),
+      Array(MiniBatchOn, ROCKSDB_BACKEND, Boolean.box(false)),
+      Array(MiniBatchOff, HEAP_BACKEND, Boolean.box(true))
+    )
+  }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java
index b87a02e7d9c..d1eb00a5a56 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java
@@ -18,16 +18,10 @@
 
 package org.apache.flink.table.runtime.operators.deduplicate;
 
-import org.apache.flink.api.common.functions.OpenContext;
-import org.apache.flink.api.common.state.StateTtlConfig;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 
-import static 
org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig;
-
 /**
  * Base class for deduplicate function.
  *
@@ -36,7 +30,8 @@ import static 
org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfi
  * @param <IN> Type of the input elements.
  * @param <OUT> Type of the returned elements.
  */
-abstract class DeduplicateFunctionBase<T, K, IN, OUT> extends 
KeyedProcessFunction<K, IN, OUT> {
+public abstract class DeduplicateFunctionBase<T, K, IN, OUT>
+        extends KeyedProcessFunction<K, IN, OUT> {
 
     private static final long serialVersionUID = 1L;
 
@@ -44,8 +39,6 @@ abstract class DeduplicateFunctionBase<T, K, IN, OUT> extends 
KeyedProcessFuncti
     protected final TypeInformation<T> typeInfo;
     protected final long stateRetentionTime;
     protected final TypeSerializer<OUT> serializer;
-    // state stores previous message under the key.
-    protected ValueState<T> state;
 
     public DeduplicateFunctionBase(
             TypeInformation<T> typeInfo, TypeSerializer<OUT> serializer, long 
stateRetentionTime) {
@@ -53,16 +46,4 @@ abstract class DeduplicateFunctionBase<T, K, IN, OUT> 
extends KeyedProcessFuncti
         this.stateRetentionTime = stateRetentionTime;
         this.serializer = serializer;
     }
-
-    @Override
-    public void open(OpenContext openContext) throws Exception {
-        super.open(openContext);
-        ValueStateDescriptor<T> stateDesc =
-                new ValueStateDescriptor<>("deduplicate-state", typeInfo);
-        StateTtlConfig ttlConfig = createTtlConfig(stateRetentionTime);
-        if (ttlConfig.isEnabled()) {
-            stateDesc.enableTimeToLive(ttlConfig);
-        }
-        state = getRuntimeContext().getState(stateDesc);
-    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepFirstRowFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepFirstRowFunction.java
index a01ae4a6886..d522b996540 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepFirstRowFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepFirstRowFunction.java
@@ -22,11 +22,11 @@ import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Collector;
 
-import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processFirstRowOnProcTime;
+import static 
org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.processFirstRowOnProcTime;
 
 /** This function is used to deduplicate on keys and keeps only first row. */
 public class ProcTimeDeduplicateKeepFirstRowFunction
-        extends DeduplicateFunctionBase<Boolean, RowData, RowData, RowData> {
+        extends SyncStateDeduplicateFunctionBase<Boolean, RowData, RowData, 
RowData> {
 
     private static final long serialVersionUID = 5865777137707602549L;
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java
index 5843f274d75..f6eaf03e584 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateKeepLastRowFunction.java
@@ -25,12 +25,12 @@ import 
org.apache.flink.table.runtime.generated.RecordEqualiser;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.util.Collector;
 
-import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog;
-import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnProcTime;
+import static 
org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.processLastRowOnChangelog;
+import static 
org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.processLastRowOnProcTime;
 
 /** This function is used to deduplicate on keys and keeps only last row. */
 public class ProcTimeDeduplicateKeepLastRowFunction
-        extends DeduplicateFunctionBase<RowData, RowData, RowData, RowData> {
+        extends SyncStateDeduplicateFunctionBase<RowData, RowData, RowData, 
RowData> {
 
     private static final long serialVersionUID = -291348892087180350L;
     private final boolean generateUpdateBefore;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepFirstRowFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepFirstRowFunction.java
index 70630ed9f04..dc6f018140e 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepFirstRowFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepFirstRowFunction.java
@@ -27,7 +27,7 @@ import javax.annotation.Nullable;
 
 import java.util.Map;
 
-import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processFirstRowOnProcTime;
+import static 
org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.processFirstRowOnProcTime;
 
 /**
  * This function is used to get the first row for every key partition in 
miniBatch mode.
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java
index eb51008e08e..3b71620aedb 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeMiniBatchDeduplicateKeepLastRowFunction.java
@@ -30,8 +30,8 @@ import javax.annotation.Nullable;
 
 import java.util.Map;
 
-import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnChangelog;
-import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processLastRowOnProcTime;
+import static 
org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.processLastRowOnChangelog;
+import static 
org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.processLastRowOnProcTime;
 
 /** This function is used to get the last row for every key partition in 
miniBatch mode. */
 public class ProcTimeMiniBatchDeduplicateKeepLastRowFunction
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.java
index 2d53d023102..03f8d0238af 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.java
@@ -18,18 +18,15 @@
 
 package org.apache.flink.table.runtime.operators.deduplicate;
 
-import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.runtime.operators.deduplicate.utils.RowTimeDeduplicateFunctionHelper;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.util.Collector;
 
-import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.checkInsertOnly;
-import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
-import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.updateDeduplicateResult;
-
 /** This function is used to deduplicate on keys and keeps only first or last 
row on row time. */
 public class RowTimeDeduplicateFunction
-        extends DeduplicateFunctionBase<RowData, RowData, RowData, RowData> {
+        extends SyncStateDeduplicateFunctionBase<RowData, RowData, RowData, 
RowData> {
 
     private static final long serialVersionUID = 1L;
 
@@ -38,6 +35,8 @@ public class RowTimeDeduplicateFunction
     private final int rowtimeIndex;
     private final boolean keepLastRow;
 
+    private transient SyncStateRowTimeDeduplicateFunctionHelper helper;
+
     public RowTimeDeduplicateFunction(
             InternalTypeInfo<RowData> typeInfo,
             long minRetentionTime,
@@ -52,39 +51,29 @@ public class RowTimeDeduplicateFunction
         this.keepLastRow = keepLastRow;
     }
 
+    @Override
+    public void open(OpenContext openContext) throws Exception {
+        super.open(openContext);
+
+        helper = new SyncStateRowTimeDeduplicateFunctionHelper();
+    }
+
     @Override
     public void processElement(RowData input, Context ctx, Collector<RowData> 
out)
             throws Exception {
-        deduplicateOnRowTime(
-                state, input, out, generateUpdateBefore, generateInsert, 
rowtimeIndex, keepLastRow);
+        RowData prevRow = state.value();
+        helper.deduplicateOnRowTime(input, prevRow, out);
     }
 
-    /**
-     * Processes element to deduplicate on keys with row time semantic, sends 
current element if it
-     * is last or first row, retracts previous element if needed.
-     *
-     * @param state state of function
-     * @param currentRow latest row received by deduplicate function
-     * @param out underlying collector
-     * @param generateUpdateBefore flag to generate UPDATE_BEFORE message or 
not
-     * @param generateInsert flag to gennerate INSERT message or not
-     * @param rowtimeIndex the index of rowtime field
-     * @param keepLastRow flag to keep last row or keep first row
-     */
-    public static void deduplicateOnRowTime(
-            ValueState<RowData> state,
-            RowData currentRow,
-            Collector<RowData> out,
-            boolean generateUpdateBefore,
-            boolean generateInsert,
-            int rowtimeIndex,
-            boolean keepLastRow)
-            throws Exception {
-        checkInsertOnly(currentRow);
-        RowData preRow = state.value();
+    private class SyncStateRowTimeDeduplicateFunctionHelper
+            extends RowTimeDeduplicateFunctionHelper {
+
+        public SyncStateRowTimeDeduplicateFunctionHelper() {
+            super(generateUpdateBefore, generateInsert, rowtimeIndex, 
keepLastRow);
+        }
 
-        if (isDuplicate(preRow, currentRow, rowtimeIndex, keepLastRow)) {
-            updateDeduplicateResult(generateUpdateBefore, generateInsert, 
preRow, currentRow, out);
+        @Override
+        protected void updateState(RowData currentRow) throws Exception {
             state.update(currentRow);
         }
     }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchDeduplicateFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchDeduplicateFunction.java
index 73d8f28335a..6b342e8a9fc 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchDeduplicateFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchDeduplicateFunction.java
@@ -30,9 +30,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.checkInsertOnly;
-import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
-import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.updateDeduplicateResult;
+import static 
org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.checkInsertOnly;
+import static 
org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.isDuplicate;
+import static 
org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.updateDeduplicateResult;
 
 /** This function is used to get the first or last row for every key partition 
in miniBatch mode. */
 public class RowTimeMiniBatchDeduplicateFunction
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunction.java
index a46aed4f66a..f8d8192b006 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchLatestChangeDeduplicateFunction.java
@@ -27,9 +27,9 @@ import javax.annotation.Nullable;
 
 import java.util.Map;
 
-import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.checkInsertOnly;
-import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
-import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.updateDeduplicateResult;
+import static 
org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.checkInsertOnly;
+import static 
org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.isDuplicate;
+import static 
org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.updateDeduplicateResult;
 
 /**
  * This function is used to get the first or last row for every key partition 
in miniBatch mode. But
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/SyncStateDeduplicateFunctionBase.java
similarity index 79%
copy from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java
copy to 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/SyncStateDeduplicateFunctionBase.java
index b87a02e7d9c..a1d05512724 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/SyncStateDeduplicateFunctionBase.java
@@ -24,39 +24,34 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 
 import static 
org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig;
 
 /**
- * Base class for deduplicate function.
+ * Base class for deduplicate function with sync state api.
  *
  * @param <T> Type of the value in the state.
  * @param <K> Type of the key.
  * @param <IN> Type of the input elements.
  * @param <OUT> Type of the returned elements.
  */
-abstract class DeduplicateFunctionBase<T, K, IN, OUT> extends 
KeyedProcessFunction<K, IN, OUT> {
+abstract class SyncStateDeduplicateFunctionBase<T, K, IN, OUT>
+        extends DeduplicateFunctionBase<T, K, IN, OUT> {
 
     private static final long serialVersionUID = 1L;
 
-    // the TypeInformation of the values in the state.
-    protected final TypeInformation<T> typeInfo;
-    protected final long stateRetentionTime;
-    protected final TypeSerializer<OUT> serializer;
     // state stores previous message under the key.
     protected ValueState<T> state;
 
-    public DeduplicateFunctionBase(
+    public SyncStateDeduplicateFunctionBase(
             TypeInformation<T> typeInfo, TypeSerializer<OUT> serializer, long 
stateRetentionTime) {
-        this.typeInfo = typeInfo;
-        this.stateRetentionTime = stateRetentionTime;
-        this.serializer = serializer;
+        super(typeInfo, serializer, stateRetentionTime);
     }
 
     @Override
     public void open(OpenContext openContext) throws Exception {
         super.open(openContext);
+
         ValueStateDescriptor<T> stateDesc =
                 new ValueStateDescriptor<>("deduplicate-state", typeInfo);
         StateTtlConfig ttlConfig = createTtlConfig(stateRetentionTime);
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java
similarity index 70%
copy from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java
copy to 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java
index b87a02e7d9c..f3af0b883d4 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionBase.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java
@@ -16,53 +16,50 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.runtime.operators.deduplicate;
+package org.apache.flink.table.runtime.operators.deduplicate.asyncprocessing;
 
 import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.state.StateTtlConfig;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.state.v2.ValueState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionBase;
 
 import static 
org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig;
 
 /**
- * Base class for deduplicate function.
+ * Base class for deduplicate function with async state api.
  *
  * @param <T> Type of the value in the state.
  * @param <K> Type of the key.
  * @param <IN> Type of the input elements.
  * @param <OUT> Type of the returned elements.
  */
-abstract class DeduplicateFunctionBase<T, K, IN, OUT> extends 
KeyedProcessFunction<K, IN, OUT> {
+abstract class AsyncStateDeduplicateFunctionBase<T, K, IN, OUT>
+        extends DeduplicateFunctionBase<T, K, IN, OUT> {
 
     private static final long serialVersionUID = 1L;
 
-    // the TypeInformation of the values in the state.
-    protected final TypeInformation<T> typeInfo;
-    protected final long stateRetentionTime;
-    protected final TypeSerializer<OUT> serializer;
     // state stores previous message under the key.
     protected ValueState<T> state;
 
-    public DeduplicateFunctionBase(
+    public AsyncStateDeduplicateFunctionBase(
             TypeInformation<T> typeInfo, TypeSerializer<OUT> serializer, long 
stateRetentionTime) {
-        this.typeInfo = typeInfo;
-        this.stateRetentionTime = stateRetentionTime;
-        this.serializer = serializer;
+        super(typeInfo, serializer, stateRetentionTime);
     }
 
     @Override
     public void open(OpenContext openContext) throws Exception {
         super.open(openContext);
+
         ValueStateDescriptor<T> stateDesc =
                 new ValueStateDescriptor<>("deduplicate-state", typeInfo);
         StateTtlConfig ttlConfig = createTtlConfig(stateRetentionTime);
         if (ttlConfig.isEnabled()) {
             stateDesc.enableTimeToLive(ttlConfig);
         }
-        state = getRuntimeContext().getState(stateDesc);
+        state = ((StreamingRuntimeContext) 
getRuntimeContext()).getValueState(stateDesc);
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateRowTimeDeduplicateFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateRowTimeDeduplicateFunction.java
new file mode 100644
index 00000000000..c1f1f8bdad4
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateRowTimeDeduplicateFunction.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.asyncprocessing;
+
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.runtime.operators.deduplicate.RowTimeDeduplicateFunction;
+import 
org.apache.flink.table.runtime.operators.deduplicate.utils.RowTimeDeduplicateFunctionHelper;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.util.Collector;
+
+/**
+ * This function is used to deduplicate on keys and keeps only first or last 
row on row time.
+ *
+ * <p>Different with {@link RowTimeDeduplicateFunction}, this function is 
based on async state api.
+ */
+public class AsyncStateRowTimeDeduplicateFunction
+        extends AsyncStateDeduplicateFunctionBase<RowData, RowData, RowData, 
RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final boolean generateUpdateBefore;
+    private final boolean generateInsert;
+    private final int rowtimeIndex;
+    private final boolean keepLastRow;
+
+    private transient RowTimeDeduplicateFunctionHelper helper;
+
+    public AsyncStateRowTimeDeduplicateFunction(
+            InternalTypeInfo<RowData> typeInfo,
+            long minRetentionTime,
+            int rowtimeIndex,
+            boolean generateUpdateBefore,
+            boolean generateInsert,
+            boolean keepLastRow) {
+        super(typeInfo, null, minRetentionTime);
+        this.generateUpdateBefore = generateUpdateBefore;
+        this.generateInsert = generateInsert;
+        this.rowtimeIndex = rowtimeIndex;
+        this.keepLastRow = keepLastRow;
+    }
+
+    @Override
+    public void open(OpenContext openContext) throws Exception {
+        super.open(openContext);
+
+        helper = new AsyncStateRowTimeDeduplicateFunctionHelper();
+    }
+
+    @Override
+    public void processElement(RowData input, Context ctx, Collector<RowData> 
out)
+            throws Exception {
+        state.asyncValue().thenAccept(prevRow -> 
helper.deduplicateOnRowTime(input, prevRow, out));
+    }
+
+    private class AsyncStateRowTimeDeduplicateFunctionHelper
+            extends RowTimeDeduplicateFunctionHelper {
+
+        public AsyncStateRowTimeDeduplicateFunctionHelper() {
+            super(generateUpdateBefore, generateInsert, rowtimeIndex, 
keepLastRow);
+        }
+
+        @Override
+        protected void updateState(RowData currentRow) throws Exception {
+            // no need to wait this async request to end
+            state.asyncUpdate(currentRow);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/DeduplicateFunctionHelper.java
similarity index 94%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
rename to 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/DeduplicateFunctionHelper.java
index b1776c8ca74..e9c7f81b51e 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/DeduplicateFunctionHelper.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.runtime.operators.deduplicate;
+package org.apache.flink.table.runtime.operators.deduplicate.utils;
 
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.table.data.RowData;
@@ -25,7 +25,12 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
-/** Utility for deduplicate function. */
+/**
+ * Utility for deduplicate function.
+ *
+ * <p>TODO utilize the respective helper classes that inherit from an abstract 
deduplicate function
+ * helper in each deduplicate function.
+ */
 public class DeduplicateFunctionHelper {
 
     /**
@@ -39,7 +44,7 @@ public class DeduplicateFunctionHelper {
      * @param isStateTtlEnabled whether state ttl is disabled
      * @param equaliser the record equaliser used to equal RowData.
      */
-    static void processLastRowOnProcTime(
+    public static void processLastRowOnProcTime(
             RowData currentRow,
             boolean generateUpdateBefore,
             boolean generateInsert,
@@ -95,7 +100,7 @@ public class DeduplicateFunctionHelper {
      * @param state state of function
      * @param out underlying collector
      */
-    static void processLastRowOnChangelog(
+    public static void processLastRowOnChangelog(
             RowData currentRow,
             boolean generateUpdateBefore,
             ValueState<RowData> state,
@@ -155,7 +160,7 @@ public class DeduplicateFunctionHelper {
      * @param state state of function
      * @param out underlying collector
      */
-    static void processFirstRowOnProcTime(
+    public static void processFirstRowOnProcTime(
             RowData currentRow, ValueState<Boolean> state, Collector<RowData> 
out)
             throws Exception {
 
@@ -178,7 +183,7 @@ public class DeduplicateFunctionHelper {
      * @param currentRow current row under the key which is the duplicate row
      * @param out underlying collector
      */
-    static void updateDeduplicateResult(
+    public static void updateDeduplicateResult(
             boolean generateUpdateBefore,
             boolean generateInsert,
             RowData preRow,
@@ -223,7 +228,7 @@ public class DeduplicateFunctionHelper {
     }
 
     /** check message should be insert only. */
-    static void checkInsertOnly(RowData currentRow) {
+    public static void checkInsertOnly(RowData currentRow) {
         Preconditions.checkArgument(currentRow.getRowKind() == RowKind.INSERT);
     }
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/RowTimeDeduplicateFunctionHelper.java
similarity index 53%
copy from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.java
copy to 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/RowTimeDeduplicateFunctionHelper.java
index 2d53d023102..56867fd4dc8 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/RowTimeDeduplicateFunctionHelper.java
@@ -16,76 +16,60 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.runtime.operators.deduplicate;
+package org.apache.flink.table.runtime.operators.deduplicate.utils;
 
-import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import 
org.apache.flink.table.runtime.operators.deduplicate.RowTimeDeduplicateFunction;
+import 
org.apache.flink.table.runtime.operators.deduplicate.asyncprocessing.AsyncStateRowTimeDeduplicateFunction;
 import org.apache.flink.util.Collector;
 
-import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.checkInsertOnly;
-import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
-import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.updateDeduplicateResult;
+import javax.annotation.Nullable;
 
-/** This function is used to deduplicate on keys and keeps only first or last 
row on row time. */
-public class RowTimeDeduplicateFunction
-        extends DeduplicateFunctionBase<RowData, RowData, RowData, RowData> {
+import static 
org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.checkInsertOnly;
+import static 
org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.isDuplicate;
+import static 
org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.updateDeduplicateResult;
 
-    private static final long serialVersionUID = 1L;
+/**
+ * A helper to deduplicate data with row time in {@link 
RowTimeDeduplicateFunction} and {@link
+ * AsyncStateRowTimeDeduplicateFunction}.
+ */
+public abstract class RowTimeDeduplicateFunctionHelper {
 
     private final boolean generateUpdateBefore;
     private final boolean generateInsert;
     private final int rowtimeIndex;
     private final boolean keepLastRow;
 
-    public RowTimeDeduplicateFunction(
-            InternalTypeInfo<RowData> typeInfo,
-            long minRetentionTime,
-            int rowtimeIndex,
+    public RowTimeDeduplicateFunctionHelper(
             boolean generateUpdateBefore,
             boolean generateInsert,
+            int rowtimeIndex,
             boolean keepLastRow) {
-        super(typeInfo, null, minRetentionTime);
         this.generateUpdateBefore = generateUpdateBefore;
         this.generateInsert = generateInsert;
         this.rowtimeIndex = rowtimeIndex;
         this.keepLastRow = keepLastRow;
     }
 
-    @Override
-    public void processElement(RowData input, Context ctx, Collector<RowData> 
out)
-            throws Exception {
-        deduplicateOnRowTime(
-                state, input, out, generateUpdateBefore, generateInsert, 
rowtimeIndex, keepLastRow);
-    }
-
     /**
      * Processes element to deduplicate on keys with row time semantic, sends 
current element if it
      * is last or first row, retracts previous element if needed.
      *
-     * @param state state of function
      * @param currentRow latest row received by deduplicate function
+     * @param prevRow previous row received by deduplicate function. `null` if 
current row is the
+     *     first row
      * @param out underlying collector
-     * @param generateUpdateBefore flag to generate UPDATE_BEFORE message or 
not
-     * @param generateInsert flag to gennerate INSERT message or not
-     * @param rowtimeIndex the index of rowtime field
-     * @param keepLastRow flag to keep last row or keep first row
      */
-    public static void deduplicateOnRowTime(
-            ValueState<RowData> state,
-            RowData currentRow,
-            Collector<RowData> out,
-            boolean generateUpdateBefore,
-            boolean generateInsert,
-            int rowtimeIndex,
-            boolean keepLastRow)
+    public void deduplicateOnRowTime(
+            RowData currentRow, @Nullable RowData prevRow, Collector<RowData> 
out)
             throws Exception {
         checkInsertOnly(currentRow);
-        RowData preRow = state.value();
 
-        if (isDuplicate(preRow, currentRow, rowtimeIndex, keepLastRow)) {
-            updateDeduplicateResult(generateUpdateBefore, generateInsert, 
preRow, currentRow, out);
-            state.update(currentRow);
+        if (isDuplicate(prevRow, currentRow, rowtimeIndex, keepLastRow)) {
+            updateDeduplicateResult(generateUpdateBefore, generateInsert, 
prevRow, currentRow, out);
+            updateState(currentRow);
         }
     }
+
+    protected abstract void updateState(RowData currentRow) throws Exception;
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
index ebb91688791..229c150cca2 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
@@ -32,7 +32,7 @@ import org.apache.flink.table.runtime.util.WindowKey;
 import java.util.Iterator;
 
 import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
-import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
+import static 
org.apache.flink.table.runtime.operators.deduplicate.utils.DeduplicateFunctionHelper.isDuplicate;
 
 /**
  * An implementation of {@link RecordsCombiner} that stores the first/last 
records of incremental
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTest.java
index fc56f5d5e8a..cefe67157b2 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTest.java
@@ -18,13 +18,16 @@
 
 package org.apache.flink.table.runtime.operators.deduplicate;
 
+import 
org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
 import 
org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
+import 
org.apache.flink.table.runtime.operators.deduplicate.asyncprocessing.AsyncStateRowTimeDeduplicateFunction;
 import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
 import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.types.RowKind;
@@ -41,21 +44,26 @@ import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord
 import static org.apache.flink.table.runtime.util.StreamRecordUtils.record;
 
 /**
- * Harness tests for {@link RowTimeDeduplicateFunction} and {@link
- * RowTimeMiniBatchDeduplicateFunction}.
+ * Harness tests for {@link RowTimeDeduplicateFunction}, {@link
+ * AsyncStateRowTimeDeduplicateFunction} and {@link 
RowTimeMiniBatchDeduplicateFunction}.
  */
 @ExtendWith(ParameterizedTestExtension.class)
 class RowTimeDeduplicateFunctionTest extends 
RowTimeDeduplicateFunctionTestBase {
 
     private final boolean miniBatchEnable;
+    private final boolean enableAsyncState;
 
-    RowTimeDeduplicateFunctionTest(boolean miniBacthEnable) {
-        this.miniBatchEnable = miniBacthEnable;
+    RowTimeDeduplicateFunctionTest(boolean miniBatchEnable, boolean 
enableAsyncState) {
+        this.enableAsyncState = enableAsyncState;
+        this.miniBatchEnable = miniBatchEnable;
     }
 
-    @Parameters(name = "miniBatchEnable = {0}")
+    @Parameters(name = "miniBatchEnable = {0}, enableAsyncState = {1}")
     private static Collection<Boolean[]> runMode() {
-        return Arrays.asList(new Boolean[] {false}, new Boolean[] {true});
+        return Arrays.asList(
+                new Boolean[] {false, false},
+                new Boolean[] {false, true},
+                new Boolean[] {true, false});
     }
 
     @TestTemplate
@@ -66,8 +74,11 @@ class RowTimeDeduplicateFunctionTest extends 
RowTimeDeduplicateFunctionTestBase
         expectedOutput.add(new Watermark(102));
         expectedOutput.add(record(RowKind.INSERT, "key3", 5, 299L));
         expectedOutput.add(new Watermark(302));
-        expectedOutput.add(record(RowKind.INSERT, "key1", 12, 400L));
-        expectedOutput.add(record(RowKind.INSERT, "key2", 11, 401L));
+        // async state does not support set ttl
+        if (!enableAsyncState) {
+            expectedOutput.add(record(RowKind.INSERT, "key1", 12, 400L));
+            expectedOutput.add(record(RowKind.INSERT, "key2", 11, 401L));
+        }
         expectedOutput.add(new Watermark(402));
 
         // generateUpdateBefore: true, generateInsert: true
@@ -86,8 +97,11 @@ class RowTimeDeduplicateFunctionTest extends 
RowTimeDeduplicateFunctionTestBase
         expectedOutput.add(new Watermark(102));
         expectedOutput.add(record(RowKind.UPDATE_AFTER, "key3", 5, 299L));
         expectedOutput.add(new Watermark(302));
-        expectedOutput.add(record(RowKind.UPDATE_AFTER, "key1", 12, 400L));
-        expectedOutput.add(record(RowKind.UPDATE_AFTER, "key2", 11, 401L));
+        // async state does not support set ttl
+        if (!enableAsyncState) {
+            expectedOutput.add(record(RowKind.UPDATE_AFTER, "key1", 12, 400L));
+            expectedOutput.add(record(RowKind.UPDATE_AFTER, "key2", 11, 401L));
+        }
         expectedOutput.add(new Watermark(402));
         testRowTimeDeduplicateKeepFirstRow(false, false, expectedOutput);
     }
@@ -106,8 +120,17 @@ class RowTimeDeduplicateFunctionTest extends 
RowTimeDeduplicateFunctionTestBase
         expectedOutput.add(record(RowKind.UPDATE_AFTER, "key2", 11, 301L));
         expectedOutput.add(record(RowKind.INSERT, "key3", 5, 299L));
         expectedOutput.add(new Watermark(302));
-        expectedOutput.add(record(RowKind.INSERT, "key1", 12, 400L));
-        expectedOutput.add(record(RowKind.INSERT, "key2", 11, 401L));
+        if (enableAsyncState) {
+            // async state does not support set ttl
+            expectedOutput.add(record(RowKind.UPDATE_BEFORE, "key1", 12, 
300L));
+            expectedOutput.add(record(RowKind.UPDATE_AFTER, "key1", 12, 400L));
+            expectedOutput.add(record(RowKind.UPDATE_BEFORE, "key2", 11, 
301L));
+            expectedOutput.add(record(RowKind.UPDATE_AFTER, "key2", 11, 401L));
+        } else {
+            expectedOutput.add(record(RowKind.INSERT, "key1", 12, 400L));
+            expectedOutput.add(record(RowKind.INSERT, "key2", 11, 401L));
+        }
+
         expectedOutput.add(new Watermark(402));
 
         // generateUpdateBefore: true, generateInsert: true
@@ -126,8 +149,14 @@ class RowTimeDeduplicateFunctionTest extends 
RowTimeDeduplicateFunctionTestBase
         expectedOutput.add(record(RowKind.UPDATE_AFTER, "key2", 11, 301L));
         expectedOutput.add(record(RowKind.INSERT, "key3", 5, 299L));
         expectedOutput.add(new Watermark(302));
-        expectedOutput.add(record(RowKind.INSERT, "key1", 12, 400L));
-        expectedOutput.add(record(RowKind.INSERT, "key2", 11, 401L));
+        if (enableAsyncState) {
+            // async state does not support set ttl
+            expectedOutput.add(record(RowKind.UPDATE_AFTER, "key1", 12, 400L));
+            expectedOutput.add(record(RowKind.UPDATE_AFTER, "key2", 11, 401L));
+        } else {
+            expectedOutput.add(record(RowKind.INSERT, "key1", 12, 400L));
+            expectedOutput.add(record(RowKind.INSERT, "key2", 11, 401L));
+        }
         expectedOutput.add(new Watermark(402));
         testRowTimeDeduplicateKeepLastRow(false, true, expectedOutput);
 
@@ -153,30 +182,47 @@ class RowTimeDeduplicateFunctionTest extends 
RowTimeDeduplicateFunctionTestBase
         final boolean keepLastRow = false;
         OneInputStreamOperatorTestHarness<RowData, RowData> testHarness;
         KeyedMapBundleOperator<RowData, RowData, RowData, RowData> 
keyedMapBundleOperator = null;
-        KeyedProcessOperator keyedProcessOperator = null;
+        OneInputStreamOperator<RowData, RowData> keyedProcessOperator = null;
         if (miniBatchEnable) {
-            RowTimeMiniBatchDeduplicateFunction func =
-                    new RowTimeMiniBatchDeduplicateFunction(
-                            inputRowType,
-                            serializer,
-                            minTtlTime.toMillis(),
-                            rowTimeIndex,
-                            generateUpdateBefore,
-                            generateInsert,
-                            keepLastRow);
-            CountBundleTrigger trigger = new 
CountBundleTrigger<RowData>(miniBatchSize);
-            keyedMapBundleOperator = new KeyedMapBundleOperator(func, trigger);
-            testHarness = createTestHarness(keyedMapBundleOperator);
+            if (enableAsyncState) {
+                throw new UnsupportedOperationException(
+                        "Mini-batch deduplicate op is not supported async 
state api");
+            } else {
+                RowTimeMiniBatchDeduplicateFunction func =
+                        new RowTimeMiniBatchDeduplicateFunction(
+                                inputRowType,
+                                serializer,
+                                minTtlTime.toMillis(),
+                                rowTimeIndex,
+                                generateUpdateBefore,
+                                generateInsert,
+                                keepLastRow);
+                CountBundleTrigger<RowData> trigger = new 
CountBundleTrigger<>(miniBatchSize);
+                keyedMapBundleOperator = new KeyedMapBundleOperator(func, 
trigger);
+                testHarness = createTestHarness(keyedMapBundleOperator);
+            }
         } else {
-            RowTimeDeduplicateFunction func =
-                    new RowTimeDeduplicateFunction(
-                            inputRowType,
-                            minTtlTime.toMillis(),
-                            rowTimeIndex,
-                            generateUpdateBefore,
-                            generateInsert,
-                            keepLastRow);
-            keyedProcessOperator = new KeyedProcessOperator<>(func);
+            if (enableAsyncState) {
+                AsyncStateRowTimeDeduplicateFunction func =
+                        new AsyncStateRowTimeDeduplicateFunction(
+                                inputRowType,
+                                minTtlTime.toMillis(),
+                                rowTimeIndex,
+                                generateUpdateBefore,
+                                generateInsert,
+                                keepLastRow);
+                keyedProcessOperator = new AsyncKeyedProcessOperator<>(func);
+            } else {
+                RowTimeDeduplicateFunction func =
+                        new RowTimeDeduplicateFunction(
+                                inputRowType,
+                                minTtlTime.toMillis(),
+                                rowTimeIndex,
+                                generateUpdateBefore,
+                                generateInsert,
+                                keepLastRow);
+                keyedProcessOperator = new KeyedProcessOperator<>(func);
+            }
             testHarness = createTestHarness(keyedProcessOperator);
         }
 
@@ -233,30 +279,47 @@ class RowTimeDeduplicateFunctionTest extends 
RowTimeDeduplicateFunctionTestBase
         final boolean keepLastRow = true;
         OneInputStreamOperatorTestHarness<RowData, RowData> testHarness;
         KeyedMapBundleOperator<RowData, RowData, RowData, RowData> 
keyedMapBundleOperator = null;
-        KeyedProcessOperator keyedProcessOperator = null;
+        OneInputStreamOperator<RowData, RowData> keyedProcessOperator = null;
         if (miniBatchEnable) {
-            RowTimeMiniBatchDeduplicateFunction func =
-                    new RowTimeMiniBatchDeduplicateFunction(
-                            inputRowType,
-                            serializer,
-                            minTtlTime.toMillis(),
-                            rowTimeIndex,
-                            generateUpdateBefore,
-                            generateInsert,
-                            keepLastRow);
-            CountBundleTrigger trigger = new 
CountBundleTrigger<RowData>(miniBatchSize);
-            keyedMapBundleOperator = new KeyedMapBundleOperator(func, trigger);
-            testHarness = createTestHarness(keyedMapBundleOperator);
+            if (enableAsyncState) {
+                throw new UnsupportedOperationException(
+                        "Mini-batch deduplicate op is not supported async 
state api");
+            } else {
+                RowTimeMiniBatchDeduplicateFunction func =
+                        new RowTimeMiniBatchDeduplicateFunction(
+                                inputRowType,
+                                serializer,
+                                minTtlTime.toMillis(),
+                                rowTimeIndex,
+                                generateUpdateBefore,
+                                generateInsert,
+                                keepLastRow);
+                CountBundleTrigger<RowData> trigger = new 
CountBundleTrigger<>(miniBatchSize);
+                keyedMapBundleOperator = new KeyedMapBundleOperator(func, 
trigger);
+                testHarness = createTestHarness(keyedMapBundleOperator);
+            }
         } else {
-            RowTimeDeduplicateFunction func =
-                    new RowTimeDeduplicateFunction(
-                            inputRowType,
-                            minTtlTime.toMillis(),
-                            rowTimeIndex,
-                            generateUpdateBefore,
-                            generateInsert,
-                            true);
-            keyedProcessOperator = new KeyedProcessOperator<>(func);
+            if (enableAsyncState) {
+                AsyncStateRowTimeDeduplicateFunction func =
+                        new AsyncStateRowTimeDeduplicateFunction(
+                                inputRowType,
+                                minTtlTime.toMillis(),
+                                rowTimeIndex,
+                                generateUpdateBefore,
+                                generateInsert,
+                                true);
+                keyedProcessOperator = new AsyncKeyedProcessOperator<>(func);
+            } else {
+                RowTimeDeduplicateFunction func =
+                        new RowTimeDeduplicateFunction(
+                                inputRowType,
+                                minTtlTime.toMillis(),
+                                rowTimeIndex,
+                                generateUpdateBefore,
+                                generateInsert,
+                                true);
+                keyedProcessOperator = new KeyedProcessOperator<>(func);
+            }
             testHarness = createTestHarness(keyedProcessOperator);
         }
 
@@ -298,8 +361,11 @@ class RowTimeDeduplicateFunctionTest extends 
RowTimeDeduplicateFunctionTestBase
         testHarness.processElement(insertRecord("key2", 11, 401L));
         testHarness.processWatermark(402);
 
-        // all state has expired, so the record ("key1", 12, 400L), ("key2", 
12, 401L) will be
-        // INSERT message
+        // all state has expired with sync state api, so the record ("key1", 
12, 400L),
+        // ("key2", 12, 401L) will be INSERT message;
+        // the state does not expire with async state api because async state 
api does not support
+        // set ttl , so the record ("key1", 12, 400L), ("key2", 12, 401L) will 
be output with
+        // UPDATE_AFTER message
         actualOutput.addAll(testHarness.getOutput());
 
         assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, 
actualOutput);
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java
index 777c4269521..988019d4a99 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java
@@ -19,12 +19,13 @@
 package org.apache.flink.table.runtime.operators.deduplicate;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import 
org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
-import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
 import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
@@ -55,14 +56,13 @@ abstract class RowTimeDeduplicateFunctionTestBase {
                             rowKeyIndex, 
inputRowType.toRowFieldTypes()[rowKeyIndex]));
 
     protected OneInputStreamOperatorTestHarness<RowData, RowData> 
createTestHarness(
-            KeyedProcessOperator<RowData, RowData, RowData> operator) throws 
Exception {
-        return new KeyedOneInputStreamOperatorTestHarness<>(
-                operator, rowKeySelector, rowKeySelector.getProducedType());
-    }
-
-    protected OneInputStreamOperatorTestHarness<RowData, RowData> 
createTestHarness(
-            KeyedMapBundleOperator<RowData, RowData, RowData, RowData> 
operator) throws Exception {
-        return new KeyedOneInputStreamOperatorTestHarness<>(
-                operator, rowKeySelector, rowKeySelector.getProducedType());
+            OneInputStreamOperator<RowData, RowData> operator) throws 
Exception {
+        if (operator instanceof AsyncStateProcessingOperator) {
+            return AsyncKeyedOneInputStreamOperatorTestHarness.create(
+                    operator, rowKeySelector, 
rowKeySelector.getProducedType());
+        } else {
+            return new KeyedOneInputStreamOperatorTestHarness<>(
+                    operator, rowKeySelector, 
rowKeySelector.getProducedType());
+        }
     }
 }

Reply via email to