This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ada0ba123e [FLINK-36926][table] Introduce window join operator with 
async state api
8ada0ba123e is described below

commit 8ada0ba123e99a408e7bafc8bcf90057d38d20b7
Author: Xuyang <xyzhong...@163.com>
AuthorDate: Tue Jan 14 22:20:13 2025 +0800

    [FLINK-36926][table] Introduce window join operator with async state api
    
    This closes #25815
---
 .../v2/adaptor/AsyncKeyedStateBackendAdaptor.java  |   6 +
 .../KeyedTwoInputStreamOperatorTestHarness.java    |   5 +-
 ...syncKeyedTwoInputStreamOperatorTestHarness.java |  56 ++-
 .../nodes/exec/stream/StreamExecWindowJoin.java    |  13 +-
 .../runtime/stream/sql/WindowJoinITCase.scala      |  19 +-
 .../operators/AsyncStateTableStreamOperator.java   | 122 ++++++
 .../operators/join/window/WindowJoinOperator.java  | 440 ++-------------------
 .../join/window/WindowJoinOperatorBuilder.java     |  90 ++---
 .../AsyncStateWindowJoinOperator.java              | 277 +++++++++++++
 .../join/window/utils/WindowJoinHelper.java        | 399 +++++++++++++++++++
 .../asyncprocessing/state/WindowAsyncState.java    |  33 ++
 .../state/WindowListAsyncState.java                |  61 +++
 .../join/window/WindowJoinOperatorTest.java        |  35 +-
 13 files changed, 1062 insertions(+), 494 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java
index dae4a959d8d..87cffb259f7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.v2.adaptor;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.state.InternalCheckpointListener;
 import org.apache.flink.api.common.state.v2.State;
@@ -197,4 +198,9 @@ public class AsyncKeyedStateBackendAdaptor<K> implements 
AsyncKeyedStateBackend<
     public boolean isSafeToReuseKVState() {
         return keyedStateBackend.isSafeToReuseKVState();
     }
+
+    @VisibleForTesting
+    public CheckpointableKeyedStateBackend<K> getKeyedStateBackend() {
+        return keyedStateBackend;
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
index b2966f36179..04db4fc6b05 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
@@ -69,7 +69,10 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, 
IN2, OUT>
         if (keyedStateBackend instanceof HeapKeyedStateBackend) {
             return ((HeapKeyedStateBackend) 
keyedStateBackend).numKeyValueStateEntries();
         } else {
-            throw new UnsupportedOperationException();
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Unsupported keyed state backend: %s",
+                            keyedStateBackend.getClass().getCanonicalName()));
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java
index 687ac8dce07..17eacf983dd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/asyncprocessing/AsyncKeyedTwoInputStreamOperatorTestHarness.java
@@ -18,10 +18,13 @@
 
 package org.apache.flink.streaming.util.asyncprocessing;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
+import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator;
+import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
 import org.apache.flink.streaming.api.operators.BoundedMultiInput;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
@@ -31,7 +34,7 @@ import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStatePr
 import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
-import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.FunctionWithException;
 import org.apache.flink.util.function.RunnableWithException;
@@ -53,7 +56,7 @@ import static org.assertj.core.api.Assertions.fail;
  * async processing, please use methods of test harness instead of operator.
  */
 public class AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
-        extends TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
+        extends KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> {
 
     private final TwoInputStreamOperator<IN1, IN2, OUT> twoInputOperator;
 
@@ -128,15 +131,14 @@ public class 
AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
             int numSubtasks,
             int subtaskIndex)
             throws Exception {
-        super(operator, maxParallelism, numSubtasks, subtaskIndex);
-
-        ClosureCleaner.clean(keySelector1, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
-        ClosureCleaner.clean(keySelector2, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
-        config.setStatePartitioner(0, keySelector1);
-        config.setStatePartitioner(1, keySelector2);
-        config.setStateKeySerializer(
-                
keyType.createSerializer(executionConfig.getSerializerConfig()));
-        config.serializeAllConfigs();
+        super(
+                operator,
+                keySelector1,
+                keySelector2,
+                keyType,
+                maxParallelism,
+                numSubtasks,
+                subtaskIndex);
 
         Preconditions.checkState(
                 operator instanceof AsyncStateProcessingOperator,
@@ -239,6 +241,34 @@ public class 
AsyncKeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
         executor.shutdown();
     }
 
+    @Override
+    public int numKeyedStateEntries() {
+        AbstractAsyncStateStreamOperator<OUT> asyncOp =
+                (AbstractAsyncStateStreamOperator<OUT>) operator;
+        AsyncKeyedStateBackend<Object> asyncKeyedStateBackend = 
asyncOp.getAsyncKeyedStateBackend();
+        KeyedStateBackend<?> keyedStateBackend;
+        if (asyncKeyedStateBackend instanceof AsyncKeyedStateBackendAdaptor) {
+            keyedStateBackend =
+                    ((AsyncKeyedStateBackendAdaptor<?>) asyncKeyedStateBackend)
+                            .getKeyedStateBackend();
+
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Unsupported async keyed state backend: %s",
+                            
asyncKeyedStateBackend.getClass().getCanonicalName()));
+        }
+
+        if (keyedStateBackend instanceof HeapKeyedStateBackend) {
+            return ((HeapKeyedStateBackend) 
keyedStateBackend).numKeyValueStateEntries();
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Unsupported keyed state backend: %s",
+                            keyedStateBackend.getClass().getCanonicalName()));
+        }
+    }
+
     private void executeAndGet(RunnableWithException runnable) throws 
Exception {
         try {
             execute(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java
index 3f0bc5be8ea..3f4b94a18a2 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowJoin.java
@@ -22,8 +22,10 @@ package 
org.apache.flink.table.planner.plan.nodes.exec.stream;
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.delegation.PlannerBase;
 import 
org.apache.flink.table.planner.plan.logical.WindowAttachedWindowingStrategy;
@@ -42,7 +44,6 @@ import 
org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
 import org.apache.flink.table.planner.utils.TableConfigUtils;
 import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
-import org.apache.flink.table.runtime.operators.join.window.WindowJoinOperator;
 import 
org.apache.flink.table.runtime.operators.join.window.WindowJoinOperatorBuilder;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.util.TimeWindowUtil;
@@ -172,7 +173,7 @@ public class StreamExecWindowJoin extends 
ExecNodeBase<RowData>
                 TimeWindowUtil.getShiftTimeZone(
                         leftWindowing.getTimeAttributeType(),
                         TableConfigUtils.getLocalTimeZone(config));
-        WindowJoinOperator operator =
+        WindowJoinOperatorBuilder operatorBuilder =
                 WindowJoinOperatorBuilder.builder()
                         .leftSerializer(leftTypeInfo.toRowSerializer())
                         .rightSerializer(rightTypeInfo.toRowSerializer())
@@ -181,8 +182,12 @@ public class StreamExecWindowJoin extends 
ExecNodeBase<RowData>
                         .rightWindowEndIndex(rightWindowEndIndex)
                         .filterNullKeys(joinSpec.getFilterNulls())
                         .joinType(joinSpec.getJoinType())
-                        .withShiftTimezone(shiftTimeZone)
-                        .build();
+                        .withShiftTimezone(shiftTimeZone);
+        if (config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED)) 
{
+            operatorBuilder.enableAsyncState();
+        }
+
+        TwoInputStreamOperator<RowData, RowData, RowData> operator = 
operatorBuilder.build();
 
         final RowType returnType = (RowType) getOutputType();
         final TwoInputTransformation<RowData, RowData, RowData> transform =
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala
index 163fd343fee..416da176395 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.runtime.stream.sql
 import org.apache.flink.configuration.{Configuration, RestartStrategyOptions}
 import org.apache.flink.core.execution.CheckpointingMode
 import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, 
StreamingWithStateTestBase, TestData, TestingAppendSink}
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND,
 ROCKSDB_BACKEND, StateBackendMode}
@@ -35,7 +36,7 @@ import java.util
 import scala.collection.JavaConversions._
 
 @ExtendWith(Array(classOf[ParameterizedTestExtension]))
-class WindowJoinITCase(mode: StateBackendMode, useTimestampLtz: Boolean)
+class WindowJoinITCase(mode: StateBackendMode, useTimestampLtz: Boolean, 
enableAsyncState: Boolean)
   extends StreamingWithStateTestBase(mode) {
 
   val SHANGHAI_ZONE = ZoneId.of("Asia/Shanghai")
@@ -55,6 +56,10 @@ class WindowJoinITCase(mode: StateBackendMode, 
useTimestampLtz: Boolean)
     env.configure(configuration, Thread.currentThread.getContextClassLoader)
     FailingCollectionSource.reset()
 
+    tEnv.getConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED,
+      Boolean.box(enableAsyncState))
+
     val dataId1 = 
TestValuesTableFactory.registerData(TestData.windowDataWithTimestamp)
     val dataIdWithLtz = 
TestValuesTableFactory.registerData(TestData.windowDataWithLtzInShanghai)
     tEnv.executeSql(
@@ -1159,13 +1164,15 @@ class WindowJoinITCase(mode: StateBackendMode, 
useTimestampLtz: Boolean)
 
 object WindowJoinITCase {
 
-  @Parameters(name = "StateBackend={0}, UseTimestampLtz = {1}")
+  @Parameters(name = "StateBackend={0}, UseTimestampLtz = {1}, 
EnableAsyncState = {2}")
   def parameters(): util.Collection[Array[java.lang.Object]] = {
     Seq[Array[AnyRef]](
-      Array(HEAP_BACKEND, java.lang.Boolean.TRUE),
-      Array(HEAP_BACKEND, java.lang.Boolean.FALSE),
-      Array(ROCKSDB_BACKEND, java.lang.Boolean.TRUE),
-      Array(ROCKSDB_BACKEND, java.lang.Boolean.FALSE)
+      Array(HEAP_BACKEND, java.lang.Boolean.TRUE, Boolean.box(false)),
+      Array(HEAP_BACKEND, java.lang.Boolean.FALSE, Boolean.box(false)),
+      Array(HEAP_BACKEND, java.lang.Boolean.TRUE, Boolean.box(true)),
+      Array(HEAP_BACKEND, java.lang.Boolean.FALSE, Boolean.box(true)),
+      Array(ROCKSDB_BACKEND, java.lang.Boolean.TRUE, Boolean.box(false)),
+      Array(ROCKSDB_BACKEND, java.lang.Boolean.FALSE, Boolean.box(false))
     )
   }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/AsyncStateTableStreamOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/AsyncStateTableStreamOperator.java
new file mode 100644
index 00000000000..9807352470f
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/AsyncStateTableStreamOperator.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators;
+
+import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Table operator to invoke close always. This is a base class for both batch 
and stream operators
+ * without key.
+ *
+ * <p>This class is nearly identical with {@link TableStreamOperator}, but 
extending from {@link
+ * AbstractAsyncStateStreamOperator} to integrate with asynchronous state 
access.
+ */
+public abstract class AsyncStateTableStreamOperator<OUT>
+        extends AbstractAsyncStateStreamOperator<OUT> {
+
+    /** We listen to this ourselves because we don't have an {@link 
InternalTimerService}. */
+    protected long currentWatermark = Long.MIN_VALUE;
+
+    protected transient ContextImpl ctx;
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.ctx = new ContextImpl(getProcessingTimeService());
+    }
+
+    @Override
+    public boolean useSplittableTimers() {
+        return true;
+    }
+
+    @Override
+    public Watermark preProcessWatermark(Watermark mark) throws Exception {
+        currentWatermark = mark.getTimestamp();
+        return super.preProcessWatermark(mark);
+    }
+
+    /** Information available in an invocation of processElement. */
+    protected class ContextImpl implements TimerService {
+
+        protected final ProcessingTimeService timerService;
+
+        public StreamRecord<?> element;
+
+        ContextImpl(ProcessingTimeService timerService) {
+            this.timerService = checkNotNull(timerService);
+        }
+
+        public Long timestamp() {
+            checkState(element != null);
+
+            if (element.hasTimestamp()) {
+                return element.getTimestamp();
+            } else {
+                return null;
+            }
+        }
+
+        @Override
+        public long currentProcessingTime() {
+            return timerService.getCurrentProcessingTime();
+        }
+
+        @Override
+        public long currentWatermark() {
+            return currentWatermark;
+        }
+
+        @Override
+        public void registerProcessingTimeTimer(long time) {
+            throw new UnsupportedOperationException(
+                    "Setting timers is only supported on a keyed streams.");
+        }
+
+        @Override
+        public void registerEventTimeTimer(long time) {
+            throw new UnsupportedOperationException(
+                    "Setting timers is only supported on a keyed streams.");
+        }
+
+        @Override
+        public void deleteProcessingTimeTimer(long time) {
+            throw new UnsupportedOperationException(
+                    "Delete timers is only supported on a keyed streams.");
+        }
+
+        @Override
+        public void deleteEventTimeTimer(long time) {
+            throw new UnsupportedOperationException(
+                    "Delete timers is only supported on a keyed streams.");
+        }
+
+        public TimerService timerService() {
+            return this;
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
index 710feb54a54..a087cfdada7 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
@@ -23,10 +23,6 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MeterView;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
@@ -35,55 +31,41 @@ import 
org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.util.RowDataUtil;
-import org.apache.flink.table.data.utils.JoinedRowData;
 import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
 import org.apache.flink.table.runtime.generated.JoinCondition;
 import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
 import 
org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters;
+import 
org.apache.flink.table.runtime.operators.join.window.utils.WindowJoinHelper;
 import 
org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
 import 
org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowTimerServiceImpl;
 import 
org.apache.flink.table.runtime.operators.window.tvf.state.WindowListState;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
-import org.apache.flink.types.RowKind;
 
 import java.time.ZoneId;
-import java.util.IdentityHashMap;
 import java.util.List;
 
-import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
-
 /**
- * Streaming window join operator.
+ * A streaming window join operator implemented by sync state api.
  *
- * <p>Note: currently, {@link WindowJoinOperator} doesn't support early-fire 
and late-arrival. Thus
+ * <p>Note: currently, {@link WindowJoinOperator} doesn't support early-fire 
and late-arrival. Thus,
  * late elements (elements belong to emitted windows) will be simply dropped.
  *
  * <p>Note: currently, {@link WindowJoinOperator} doesn't support DELETE or 
UPDATE_BEFORE input row.
  */
-public abstract class WindowJoinOperator extends TableStreamOperator<RowData>
+public class WindowJoinOperator extends TableStreamOperator<RowData>
         implements TwoInputStreamOperator<RowData, RowData, RowData>,
                 Triggerable<RowData, Long>,
                 KeyContext {
 
     private static final long serialVersionUID = 1L;
 
-    private static final String LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME =
-            "leftNumLateRecordsDropped";
-    private static final String LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME =
-            "leftLateRecordsDroppedRate";
-    private static final String RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME =
-            "rightNumLateRecordsDropped";
-    private static final String RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME =
-            "rightLateRecordsDroppedRate";
-    private static final String WATERMARK_LATENCY_METRIC_NAME = 
"watermarkLatency";
     private static final String LEFT_RECORDS_STATE_NAME = "left-records";
     private static final String RIGHT_RECORDS_STATE_NAME = "right-records";
 
-    protected final RowDataSerializer leftSerializer;
-    protected final RowDataSerializer rightSerializer;
+    private final RowDataSerializer leftSerializer;
+    private final RowDataSerializer rightSerializer;
     private final GeneratedJoinCondition generatedJoinCondition;
 
     private final int leftWindowEndIndex;
@@ -92,26 +74,19 @@ public abstract class WindowJoinOperator extends 
TableStreamOperator<RowData>
     private final boolean[] filterNullKeys;
     private final ZoneId shiftTimeZone;
 
+    private final FlinkJoinType joinType;
+
     private transient WindowTimerService<Long> windowTimerService;
 
-    // ------------------------------------------------------------------------
-    protected transient JoinConditionWithNullFilters joinCondition;
+    private transient JoinConditionWithNullFilters joinCondition;
 
     /** This is used for emitting elements with a given timestamp. */
-    protected transient TimestampedCollector<RowData> collector;
+    private transient TimestampedCollector<RowData> collector;
 
     private transient WindowListState<Long> leftWindowState;
     private transient WindowListState<Long> rightWindowState;
 
-    // ------------------------------------------------------------------------
-    // Metrics
-    // ------------------------------------------------------------------------
-
-    private transient Counter leftNumLateRecordsDropped;
-    private transient Meter leftLateRecordsDroppedRate;
-    private transient Counter rightNumLateRecordsDropped;
-    private transient Meter rightLateRecordsDroppedRate;
-    private transient Gauge<Long> watermarkLatency;
+    private transient WindowJoinHelper helper;
 
     WindowJoinOperator(
             TypeSerializer<RowData> leftSerializer,
@@ -120,7 +95,8 @@ public abstract class WindowJoinOperator extends 
TableStreamOperator<RowData>
             int leftWindowEndIndex,
             int rightWindowEndIndex,
             boolean[] filterNullKeys,
-            ZoneId shiftTimeZone) {
+            ZoneId shiftTimeZone,
+            FlinkJoinType joinType) {
         this.leftSerializer = (RowDataSerializer) leftSerializer;
         this.rightSerializer = (RowDataSerializer) rightSerializer;
         this.generatedJoinCondition = generatedJoinCondition;
@@ -128,6 +104,7 @@ public abstract class WindowJoinOperator extends 
TableStreamOperator<RowData>
         this.rightWindowEndIndex = rightWindowEndIndex;
         this.filterNullKeys = filterNullKeys;
         this.shiftTimeZone = shiftTimeZone;
+        this.joinType = joinType;
     }
 
     @Override
@@ -166,28 +143,8 @@ public abstract class WindowJoinOperator extends 
TableStreamOperator<RowData>
         this.rightWindowState =
                 new WindowListState<>((InternalListState<RowData, Long, 
RowData>) rightListState);
 
-        // metrics
-        this.leftNumLateRecordsDropped = 
metrics.counter(LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME);
-        this.leftLateRecordsDroppedRate =
-                metrics.meter(
-                        LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME,
-                        new MeterView(leftNumLateRecordsDropped));
-        this.rightNumLateRecordsDropped = 
metrics.counter(RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME);
-        this.rightLateRecordsDroppedRate =
-                metrics.meter(
-                        RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME,
-                        new MeterView(rightNumLateRecordsDropped));
-        this.watermarkLatency =
-                metrics.gauge(
-                        WATERMARK_LATENCY_METRIC_NAME,
-                        () -> {
-                            long watermark = 
windowTimerService.currentWatermark();
-                            if (watermark < 0) {
-                                return 0L;
-                            } else {
-                                return 
windowTimerService.currentProcessingTime() - watermark;
-                            }
-                        });
+        this.helper = new SyncStateWindowJoinHelper();
+        this.helper.registerMetric(getRuntimeContext().getMetricGroup());
     }
 
     @Override
@@ -201,36 +158,20 @@ public abstract class WindowJoinOperator extends 
TableStreamOperator<RowData>
 
     @Override
     public void processElement1(StreamRecord<RowData> element) throws 
Exception {
-        processElement(element, leftWindowEndIndex, 
leftLateRecordsDroppedRate, leftWindowState);
+        helper.processElement(
+                element,
+                leftWindowEndIndex,
+                helper.getLeftLateRecordsDroppedRate(),
+                (windowEnd, rowData) -> leftWindowState.add(windowEnd, 
rowData));
     }
 
     @Override
     public void processElement2(StreamRecord<RowData> element) throws 
Exception {
-        processElement(element, rightWindowEndIndex, 
rightLateRecordsDroppedRate, rightWindowState);
-    }
-
-    private void processElement(
-            StreamRecord<RowData> element,
-            int windowEndIndex,
-            Meter lateRecordsDroppedRate,
-            WindowListState<Long> recordState)
-            throws Exception {
-        RowData inputRow = element.getValue();
-        long windowEnd = inputRow.getLong(windowEndIndex);
-        if (isWindowFired(windowEnd, windowTimerService.currentWatermark(), 
shiftTimeZone)) {
-            // element is late and should be dropped
-            lateRecordsDroppedRate.markEvent();
-            return;
-        }
-        if (RowDataUtil.isAccumulateMsg(inputRow)) {
-            recordState.add(windowEnd, inputRow);
-        } else {
-            // Window join could not handle retraction input stream
-            throw new UnsupportedOperationException(
-                    "This is a bug and should not happen. Please file an 
issue.");
-        }
-        // always register time for every element
-        windowTimerService.registerEventTimeWindowTimer(windowEnd);
+        helper.processElement(
+                element,
+                rightWindowEndIndex,
+                helper.getRightLateRecordsDroppedRate(),
+                (windowEnd, rowData) -> rightWindowState.add(windowEnd, 
rowData));
     }
 
     @Override
@@ -247,329 +188,28 @@ public abstract class WindowJoinOperator extends 
TableStreamOperator<RowData>
         // join left records and right records
         List<RowData> leftData = leftWindowState.get(window);
         List<RowData> rightData = rightWindowState.get(window);
-        join(leftData, rightData);
-        // clear state
-        if (leftData != null) {
-            leftWindowState.clear(window);
-        }
-        if (rightData != null) {
-            rightWindowState.clear(window);
-        }
+        helper.joinAndClear(window, leftData, rightData);
     }
 
-    public abstract void join(Iterable<RowData> leftRecords, Iterable<RowData> 
rightRecords);
-
-    static class SemiAntiJoinOperator extends WindowJoinOperator {
-
-        private final boolean isAntiJoin;
-
-        SemiAntiJoinOperator(
-                TypeSerializer<RowData> leftSerializer,
-                TypeSerializer<RowData> rightSerializer,
-                GeneratedJoinCondition generatedJoinCondition,
-                int leftWindowEndIndex,
-                int rightWindowEndIndex,
-                boolean[] filterNullKeys,
-                boolean isAntiJoin,
-                ZoneId shiftTimeZone) {
-            super(
-                    leftSerializer,
-                    rightSerializer,
-                    generatedJoinCondition,
-                    leftWindowEndIndex,
-                    rightWindowEndIndex,
-                    filterNullKeys,
-                    shiftTimeZone);
-            this.isAntiJoin = isAntiJoin;
-        }
-
-        @Override
-        public void join(Iterable<RowData> leftRecords, Iterable<RowData> 
rightRecords) {
-            if (leftRecords == null) {
-                return;
-            }
-            if (rightRecords == null) {
-                if (isAntiJoin) {
-                    for (RowData leftRecord : leftRecords) {
-                        collector.collect(leftRecord);
-                    }
-                }
-                return;
-            }
-            for (RowData leftRecord : leftRecords) {
-                boolean matches = false;
-                for (RowData rightRecord : rightRecords) {
-                    if (joinCondition.apply(leftRecord, rightRecord)) {
-                        matches = true;
-                        break;
-                    }
-                }
-                if (matches) {
-                    if (!isAntiJoin) {
-                        // emit left record if there are matched rows on the 
other side
-                        collector.collect(leftRecord);
-                    }
-                } else {
-                    if (isAntiJoin) {
-                        // emit left record if there is no matched row on the 
other side
-                        collector.collect(leftRecord);
-                    }
-                }
-            }
-        }
-    }
+    private class SyncStateWindowJoinHelper extends WindowJoinHelper {
 
-    static class InnerJoinOperator extends WindowJoinOperator {
-        private transient JoinedRowData outRow;
-
-        InnerJoinOperator(
-                TypeSerializer<RowData> leftSerializer,
-                TypeSerializer<RowData> rightSerializer,
-                GeneratedJoinCondition generatedJoinCondition,
-                int leftWindowEndIndex,
-                int rightWindowEndIndex,
-                boolean[] filterNullKeys,
-                ZoneId shiftTimeZone) {
+        public SyncStateWindowJoinHelper() {
             super(
-                    leftSerializer,
-                    rightSerializer,
-                    generatedJoinCondition,
-                    leftWindowEndIndex,
-                    rightWindowEndIndex,
-                    filterNullKeys,
-                    shiftTimeZone);
+                    WindowJoinOperator.this.leftSerializer,
+                    WindowJoinOperator.this.rightSerializer,
+                    WindowJoinOperator.this.shiftTimeZone,
+                    WindowJoinOperator.this.windowTimerService,
+                    WindowJoinOperator.this.joinCondition,
+                    WindowJoinOperator.this.collector,
+                    WindowJoinOperator.this.joinType);
         }
 
         @Override
-        public void open() throws Exception {
-            super.open();
-            outRow = new JoinedRowData();
-        }
-
-        @Override
-        public void join(Iterable<RowData> leftRecords, Iterable<RowData> 
rightRecords) {
-            if (leftRecords == null || rightRecords == null) {
-                return;
-            }
-            for (RowData leftRecord : leftRecords) {
-                for (RowData rightRecord : rightRecords) {
-                    if (joinCondition.apply(leftRecord, rightRecord)) {
-                        outRow.setRowKind(RowKind.INSERT);
-                        outRow.replace(leftRecord, rightRecord);
-                        collector.collect(outRow);
-                    }
-                }
-            }
-        }
-    }
-
-    private abstract static class AbstractOuterJoinOperator extends 
WindowJoinOperator {
-
-        private static final long serialVersionUID = 1L;
-
-        private transient RowData leftNullRow;
-        private transient RowData rightNullRow;
-        private transient JoinedRowData outRow;
-
-        AbstractOuterJoinOperator(
-                TypeSerializer<RowData> leftSerializer,
-                TypeSerializer<RowData> rightSerializer,
-                GeneratedJoinCondition generatedJoinCondition,
-                int leftWindowEndIndex,
-                int rightWindowEndIndex,
-                boolean[] filterNullKeys,
-                ZoneId shiftTimeZone) {
-            super(
-                    leftSerializer,
-                    rightSerializer,
-                    generatedJoinCondition,
-                    leftWindowEndIndex,
-                    rightWindowEndIndex,
-                    filterNullKeys,
-                    shiftTimeZone);
-        }
-
-        @Override
-        public void open() throws Exception {
-            super.open();
-            leftNullRow = new GenericRowData(leftSerializer.getArity());
-            rightNullRow = new GenericRowData(rightSerializer.getArity());
-            outRow = new JoinedRowData();
-        }
-
-        protected void outputNullPadding(RowData row, boolean isLeft) {
+        public void clearState(long windowEnd, boolean isLeft) {
             if (isLeft) {
-                outRow.replace(row, rightNullRow);
-            } else {
-                outRow.replace(leftNullRow, row);
-            }
-            outRow.setRowKind(RowKind.INSERT);
-            collector.collect(outRow);
-        }
-
-        protected void outputNullPadding(Iterable<RowData> rows, boolean 
isLeft) {
-            for (RowData row : rows) {
-                outputNullPadding(row, isLeft);
-            }
-        }
-
-        protected void output(RowData inputRow, RowData otherRow, boolean 
inputIsLeft) {
-            if (inputIsLeft) {
-                outRow.replace(inputRow, otherRow);
-            } else {
-                outRow.replace(otherRow, inputRow);
-            }
-            outRow.setRowKind(RowKind.INSERT);
-            collector.collect(outRow);
-        }
-    }
-
-    static class LeftOuterJoinOperator extends AbstractOuterJoinOperator {
-
-        private static final long serialVersionUID = 1L;
-
-        LeftOuterJoinOperator(
-                TypeSerializer<RowData> leftSerializer,
-                TypeSerializer<RowData> rightSerializer,
-                GeneratedJoinCondition generatedJoinCondition,
-                int leftWindowEndIndex,
-                int rightWindowEndIndex,
-                boolean[] filterNullKeys,
-                ZoneId shiftTimeZone) {
-            super(
-                    leftSerializer,
-                    rightSerializer,
-                    generatedJoinCondition,
-                    leftWindowEndIndex,
-                    rightWindowEndIndex,
-                    filterNullKeys,
-                    shiftTimeZone);
-        }
-
-        @Override
-        public void join(Iterable<RowData> leftRecords, Iterable<RowData> 
rightRecords) {
-            if (leftRecords == null) {
-                return;
-            }
-            if (rightRecords == null) {
-                outputNullPadding(leftRecords, true);
-            } else {
-                for (RowData leftRecord : leftRecords) {
-                    boolean matches = false;
-                    for (RowData rightRecord : rightRecords) {
-                        if (joinCondition.apply(leftRecord, rightRecord)) {
-                            output(leftRecord, rightRecord, true);
-                            matches = true;
-                        }
-                    }
-                    if (!matches) {
-                        // padding null for left side
-                        outputNullPadding(leftRecord, true);
-                    }
-                }
-            }
-        }
-    }
-
-    static class RightOuterJoinOperator extends AbstractOuterJoinOperator {
-
-        private static final long serialVersionUID = 1L;
-
-        RightOuterJoinOperator(
-                TypeSerializer<RowData> leftSerializer,
-                TypeSerializer<RowData> rightSerializer,
-                GeneratedJoinCondition generatedJoinCondition,
-                int leftWindowEndIndex,
-                int rightWindowEndIndex,
-                boolean[] filterNullKeys,
-                ZoneId shiftTimeZone) {
-            super(
-                    leftSerializer,
-                    rightSerializer,
-                    generatedJoinCondition,
-                    leftWindowEndIndex,
-                    rightWindowEndIndex,
-                    filterNullKeys,
-                    shiftTimeZone);
-        }
-
-        @Override
-        public void join(Iterable<RowData> leftRecords, Iterable<RowData> 
rightRecords) {
-            if (rightRecords == null) {
-                return;
-            }
-            if (leftRecords == null) {
-                outputNullPadding(rightRecords, false);
-            } else {
-                for (RowData rightRecord : rightRecords) {
-                    boolean matches = false;
-                    for (RowData leftRecord : leftRecords) {
-                        if (joinCondition.apply(leftRecord, rightRecord)) {
-                            output(leftRecord, rightRecord, true);
-                            matches = true;
-                        }
-                    }
-                    if (!matches) {
-                        outputNullPadding(rightRecord, false);
-                    }
-                }
-            }
-        }
-    }
-
-    static class FullOuterJoinOperator extends AbstractOuterJoinOperator {
-
-        private static final long serialVersionUID = 1L;
-
-        FullOuterJoinOperator(
-                TypeSerializer<RowData> leftSerializer,
-                TypeSerializer<RowData> rightSerializer,
-                GeneratedJoinCondition generatedJoinCondition,
-                int leftWindowEndIndex,
-                int rightWindowEndIndex,
-                boolean[] filterNullKeys,
-                ZoneId shiftTimeZone) {
-            super(
-                    leftSerializer,
-                    rightSerializer,
-                    generatedJoinCondition,
-                    leftWindowEndIndex,
-                    rightWindowEndIndex,
-                    filterNullKeys,
-                    shiftTimeZone);
-        }
-
-        @Override
-        public void join(Iterable<RowData> leftRecords, Iterable<RowData> 
rightRecords) {
-            if (leftRecords == null && rightRecords == null) {
-                return;
-            }
-            if (rightRecords == null) {
-                outputNullPadding(leftRecords, true);
-            } else if (leftRecords == null) {
-                outputNullPadding(rightRecords, false);
+                leftWindowState.clear(windowEnd);
             } else {
-                IdentityHashMap<RowData, Boolean> emittedRightRecords = new 
IdentityHashMap<>();
-                for (RowData leftRecord : leftRecords) {
-                    boolean matches = false;
-                    for (RowData rightRecord : rightRecords) {
-                        if (joinCondition.apply(leftRecord, rightRecord)) {
-                            output(leftRecord, rightRecord, true);
-                            matches = true;
-                            emittedRightRecords.put(rightRecord, Boolean.TRUE);
-                        }
-                    }
-                    // padding null for left side
-                    if (!matches) {
-                        outputNullPadding(leftRecord, true);
-                    }
-                }
-                // padding null for never emitted right side
-                for (RowData rightRecord : rightRecords) {
-                    if (!emittedRightRecords.containsKey(rightRecord)) {
-                        outputNullPadding(rightRecord, false);
-                    }
-                }
+                rightWindowState.clear(windowEnd);
             }
         }
     }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java
index c0e566555e7..051abfc18b2 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorBuilder.java
@@ -19,9 +19,11 @@
 package org.apache.flink.table.runtime.operators.join.window;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
 import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import 
org.apache.flink.table.runtime.operators.join.window.asyncprocessing.AsyncStateWindowJoinOperator;
 
 import java.time.ZoneId;
 
@@ -41,6 +43,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *   .rightWindowEndIndex(rightWindowEndIndex)
  *   .filterNullKeys(filterNullKeys)
  *   .joinType(joinType)
+ *   .enableAsyncState()
  *   .build();
  * </pre>
  */
@@ -58,6 +61,7 @@ public class WindowJoinOperatorBuilder {
     private boolean[] filterNullKeys;
     private FlinkJoinType joinType;
     private ZoneId shiftTimeZone = ZoneId.of("UTC");
+    private boolean enableAsyncState = false;
 
     public WindowJoinOperatorBuilder leftSerializer(TypeSerializer<RowData> 
leftSerializer) {
         this.leftSerializer = leftSerializer;
@@ -105,7 +109,12 @@ public class WindowJoinOperatorBuilder {
         return this;
     }
 
-    public WindowJoinOperator build() {
+    public WindowJoinOperatorBuilder enableAsyncState() {
+        this.enableAsyncState = true;
+        return this;
+    }
+
+    public TwoInputStreamOperator<RowData, RowData, RowData> build() {
         checkNotNull(leftSerializer);
         checkNotNull(rightSerializer);
         checkNotNull(generatedJoinCondition);
@@ -123,65 +132,26 @@ public class WindowJoinOperatorBuilder {
                         "Illegal window end index %s, it should not be 
negative!",
                         rightWindowEndIndex));
 
-        switch (joinType) {
-            case INNER:
-                return new WindowJoinOperator.InnerJoinOperator(
-                        leftSerializer,
-                        rightSerializer,
-                        generatedJoinCondition,
-                        leftWindowEndIndex,
-                        rightWindowEndIndex,
-                        filterNullKeys,
-                        shiftTimeZone);
-            case SEMI:
-                return new WindowJoinOperator.SemiAntiJoinOperator(
-                        leftSerializer,
-                        rightSerializer,
-                        generatedJoinCondition,
-                        leftWindowEndIndex,
-                        rightWindowEndIndex,
-                        filterNullKeys,
-                        false,
-                        shiftTimeZone);
-            case ANTI:
-                return new WindowJoinOperator.SemiAntiJoinOperator(
-                        leftSerializer,
-                        rightSerializer,
-                        generatedJoinCondition,
-                        leftWindowEndIndex,
-                        rightWindowEndIndex,
-                        filterNullKeys,
-                        true,
-                        shiftTimeZone);
-            case LEFT:
-                return new WindowJoinOperator.LeftOuterJoinOperator(
-                        leftSerializer,
-                        rightSerializer,
-                        generatedJoinCondition,
-                        leftWindowEndIndex,
-                        rightWindowEndIndex,
-                        filterNullKeys,
-                        shiftTimeZone);
-            case RIGHT:
-                return new WindowJoinOperator.RightOuterJoinOperator(
-                        leftSerializer,
-                        rightSerializer,
-                        generatedJoinCondition,
-                        leftWindowEndIndex,
-                        rightWindowEndIndex,
-                        filterNullKeys,
-                        shiftTimeZone);
-            case FULL:
-                return new WindowJoinOperator.FullOuterJoinOperator(
-                        leftSerializer,
-                        rightSerializer,
-                        generatedJoinCondition,
-                        leftWindowEndIndex,
-                        rightWindowEndIndex,
-                        filterNullKeys,
-                        shiftTimeZone);
-            default:
-                throw new IllegalArgumentException("Invalid join type: " + 
joinType);
+        if (enableAsyncState) {
+            return new AsyncStateWindowJoinOperator(
+                    leftSerializer,
+                    rightSerializer,
+                    generatedJoinCondition,
+                    leftWindowEndIndex,
+                    rightWindowEndIndex,
+                    filterNullKeys,
+                    shiftTimeZone,
+                    joinType);
+        } else {
+            return new WindowJoinOperator(
+                    leftSerializer,
+                    rightSerializer,
+                    generatedJoinCondition,
+                    leftWindowEndIndex,
+                    rightWindowEndIndex,
+                    filterNullKeys,
+                    shiftTimeZone,
+                    joinType);
         }
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java
new file mode 100644
index 00000000000..b0c5e037fb9
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.window.asyncprocessing;
+
+import org.apache.flink.api.common.functions.DefaultOpenContext;
+import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.api.common.state.v2.ListStateDescriptor;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.state.v2.StateIterator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.state.StateFutureUtils;
+import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator;
+import org.apache.flink.runtime.state.v2.internal.InternalListState;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.KeyContext;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.JoinCondition;
+import org.apache.flink.table.runtime.operators.AsyncStateTableStreamOperator;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import 
org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters;
+import org.apache.flink.table.runtime.operators.join.window.WindowJoinOperator;
+import 
org.apache.flink.table.runtime.operators.join.window.utils.WindowJoinHelper;
+import 
org.apache.flink.table.runtime.operators.window.tvf.asyncprocessing.state.WindowListAsyncState;
+import 
org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
+import 
org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowTimerServiceImpl;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A {@link AsyncStateWindowJoinOperator} implemented by async state api.
+ *
+ * <p>This class is nearly identical with {@link WindowJoinOperator}, but 
extending from {@link
+ * AbstractAsyncStateStreamOperator} to integrate with asynchronous state 
access.
+ *
+ * <p>Note: currently, {@link AsyncStateWindowJoinOperator} doesn't support 
early-fire and
+ * late-arrival. Thus, late elements (elements belong to emitted windows) will 
be simply dropped.
+ *
+ * <p>Note: currently, {@link AsyncStateWindowJoinOperator} doesn't support 
DELETE or UPDATE_BEFORE
+ * input row.
+ */
+public class AsyncStateWindowJoinOperator extends 
AsyncStateTableStreamOperator<RowData>
+        implements TwoInputStreamOperator<RowData, RowData, RowData>,
+                Triggerable<RowData, Long>,
+                KeyContext {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String LEFT_RECORDS_STATE_NAME = "left-records";
+    private static final String RIGHT_RECORDS_STATE_NAME = "right-records";
+
+    private final RowDataSerializer leftSerializer;
+    private final RowDataSerializer rightSerializer;
+    private final GeneratedJoinCondition generatedJoinCondition;
+
+    private final int leftWindowEndIndex;
+    private final int rightWindowEndIndex;
+
+    private final boolean[] filterNullKeys;
+    private final ZoneId shiftTimeZone;
+
+    private final FlinkJoinType joinType;
+
+    private transient WindowTimerService<Long> windowTimerService;
+
+    private transient JoinConditionWithNullFilters joinCondition;
+
+    /** This is used for emitting elements with a given timestamp. */
+    private transient TimestampedCollector<RowData> collector;
+
+    private transient WindowListAsyncState<Long> leftWindowState;
+    private transient WindowListAsyncState<Long> rightWindowState;
+
+    private transient WindowJoinHelper helper;
+
+    public AsyncStateWindowJoinOperator(
+            TypeSerializer<RowData> leftSerializer,
+            TypeSerializer<RowData> rightSerializer,
+            GeneratedJoinCondition generatedJoinCondition,
+            int leftWindowEndIndex,
+            int rightWindowEndIndex,
+            boolean[] filterNullKeys,
+            ZoneId shiftTimeZone,
+            FlinkJoinType joinType) {
+        this.leftSerializer = (RowDataSerializer) leftSerializer;
+        this.rightSerializer = (RowDataSerializer) rightSerializer;
+        this.generatedJoinCondition = generatedJoinCondition;
+        this.leftWindowEndIndex = leftWindowEndIndex;
+        this.rightWindowEndIndex = rightWindowEndIndex;
+        this.filterNullKeys = filterNullKeys;
+        this.shiftTimeZone = shiftTimeZone;
+        this.joinType = joinType;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+
+        this.collector = new TimestampedCollector<>(output);
+        collector.eraseTimestamp();
+
+        final LongSerializer windowSerializer = LongSerializer.INSTANCE;
+
+        InternalTimerService<Long> internalTimerService =
+                getInternalTimerService("window-timers", windowSerializer, 
this);
+        this.windowTimerService =
+                new SlicingWindowTimerServiceImpl(internalTimerService, 
shiftTimeZone);
+
+        // init join condition
+        JoinCondition condition =
+                
generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader());
+        this.joinCondition = new JoinConditionWithNullFilters(condition, 
filterNullKeys, this);
+        this.joinCondition.setRuntimeContext(getRuntimeContext());
+        this.joinCondition.open(DefaultOpenContext.INSTANCE);
+
+        // init state
+        ListStateDescriptor<RowData> leftRecordStateDesc =
+                new ListStateDescriptor<>(LEFT_RECORDS_STATE_NAME, 
leftSerializer);
+        ListState<RowData> leftListState =
+                getOrCreateKeyedState(Long.MIN_VALUE, windowSerializer, 
leftRecordStateDesc);
+        this.leftWindowState =
+                new WindowListAsyncState<>(
+                        (InternalListState<RowData, Long, RowData>) 
leftListState);
+
+        ListStateDescriptor<RowData> rightRecordStateDesc =
+                new ListStateDescriptor<>(RIGHT_RECORDS_STATE_NAME, 
rightSerializer);
+        ListState<RowData> rightListState =
+                getOrCreateKeyedState(Long.MIN_VALUE, windowSerializer, 
rightRecordStateDesc);
+        this.rightWindowState =
+                new WindowListAsyncState<>(
+                        (InternalListState<RowData, Long, RowData>) 
rightListState);
+
+        this.helper = new AsyncStateWindowJoinHelper();
+        this.helper.registerMetric(getRuntimeContext().getMetricGroup());
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        collector = null;
+        if (joinCondition != null) {
+            joinCondition.close();
+        }
+    }
+
+    @Override
+    public void processElement1(StreamRecord<RowData> element) throws 
Exception {
+        helper.processElement(
+                element,
+                leftWindowEndIndex,
+                helper.getLeftLateRecordsDroppedRate(),
+                (windowEnd, rowData) -> leftWindowState.asyncAdd(windowEnd, 
rowData));
+    }
+
+    @Override
+    public void processElement2(StreamRecord<RowData> element) throws 
Exception {
+        helper.processElement(
+                element,
+                rightWindowEndIndex,
+                helper.getRightLateRecordsDroppedRate(),
+                (windowEnd, rowData) -> rightWindowState.asyncAdd(windowEnd, 
rowData));
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<RowData, Long> timer) throws 
Exception {
+        // Window join only support event-time now
+        throw new UnsupportedOperationException(
+                "This is a bug and should not happen. Please file an issue.");
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<RowData, Long> timer) throws 
Exception {
+        asyncProcessWithKey(timer.getKey(), () -> 
triggerJoin(timer.getNamespace()));
+    }
+
+    /**
+     * Currently, similar to the {@link WindowJoinOperator#onEventTime} that 
uses the sync state
+     * api, we directly load the list data from the state into memory to 
perform join operations.
+     *
+     * <p>Note: The order of data in the left and right side lists must be 
preserved to ensure the
+     * output data sequence is maintained.
+     */
+    private void triggerJoin(long window) {
+        StateFuture<StateIterator<RowData>> leftDataFuture = 
leftWindowState.asyncGet(window);
+        StateFuture<StateIterator<RowData>> rightDataFuture = 
rightWindowState.asyncGet(window);
+
+        // join left records and right records
+        AtomicReference<List<RowData>> leftDataRef = new AtomicReference<>();
+        AtomicReference<List<RowData>> rightDataRef = new AtomicReference<>();
+        leftDataFuture.thenCombine(
+                rightDataFuture,
+                (leftDataIterator, rightDataIterator) -> {
+                    StateFuture<Void> leftLoadToMemFuture;
+                    if (leftDataIterator == null) {
+                        leftDataRef.set(null);
+                        leftLoadToMemFuture = 
StateFutureUtils.completedVoidFuture();
+                    } else {
+                        leftDataRef.set(new ArrayList<>());
+                        leftLoadToMemFuture =
+                                leftDataIterator.onNext(
+                                        data -> {
+                                            leftDataRef.get().add(data);
+                                        });
+                    }
+
+                    StateFuture<Void> rightLoadToMemFuture;
+                    if (rightDataIterator == null) {
+                        rightDataRef.set(null);
+                        rightLoadToMemFuture = 
StateFutureUtils.completedVoidFuture();
+                    } else {
+                        rightDataRef.set(new ArrayList<>());
+                        rightLoadToMemFuture =
+                                rightDataIterator.onNext(
+                                        data -> {
+                                            rightDataRef.get().add(data);
+                                        });
+                    }
+
+                    return leftLoadToMemFuture.thenCombine(
+                            rightLoadToMemFuture,
+                            (VOID1, VOID2) -> {
+                                helper.joinAndClear(window, leftDataRef.get(), 
rightDataRef.get());
+                                return null;
+                            });
+                });
+    }
+
+    private class AsyncStateWindowJoinHelper extends WindowJoinHelper {
+
+        public AsyncStateWindowJoinHelper() {
+            super(
+                    AsyncStateWindowJoinOperator.this.leftSerializer,
+                    AsyncStateWindowJoinOperator.this.rightSerializer,
+                    AsyncStateWindowJoinOperator.this.shiftTimeZone,
+                    AsyncStateWindowJoinOperator.this.windowTimerService,
+                    AsyncStateWindowJoinOperator.this.joinCondition,
+                    AsyncStateWindowJoinOperator.this.collector,
+                    AsyncStateWindowJoinOperator.this.joinType);
+        }
+
+        @Override
+        public void clearState(long windowEnd, boolean isLeft) {
+            // no need to wait these async requests to end
+            if (isLeft) {
+                leftWindowState.asyncClear(windowEnd);
+            } else {
+                rightWindowState.asyncClear(windowEnd);
+            }
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/utils/WindowJoinHelper.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/utils/WindowJoinHelper.java
new file mode 100644
index 00000000000..27dc5c4dbb2
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/utils/WindowJoinHelper.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.window.utils;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import 
org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters;
+import org.apache.flink.table.runtime.operators.join.window.WindowJoinOperator;
+import 
org.apache.flink.table.runtime.operators.join.window.asyncprocessing.AsyncStateWindowJoinOperator;
+import 
org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerService;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import javax.annotation.Nullable;
+
+import java.time.ZoneId;
+import java.util.IdentityHashMap;
+
+import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
+
+/**
+ * A helper to do the window join operations for {@link WindowJoinOperator} 
and {@link
+ * AsyncStateWindowJoinOperator}.
+ */
+public abstract class WindowJoinHelper {
+
+    private static final String LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME =
+            "leftNumLateRecordsDropped";
+    private static final String LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME =
+            "leftLateRecordsDroppedRate";
+    private static final String RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME =
+            "rightNumLateRecordsDropped";
+    private static final String RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME =
+            "rightLateRecordsDroppedRate";
+    private static final String WATERMARK_LATENCY_METRIC_NAME = 
"watermarkLatency";
+
+    private final ZoneId shiftTimeZone;
+
+    private final WindowTimerService<Long> windowTimerService;
+
+    protected final RowDataSerializer leftSerializer;
+
+    protected final RowDataSerializer rightSerializer;
+
+    protected final JoinConditionWithNullFilters joinCondition;
+
+    /** This is used for emitting elements with a given timestamp. */
+    protected final TimestampedCollector<RowData> collector;
+
+    private final WindowJoinProcessor windowJoinProcessor;
+
+    // ------------------------------------------------------------------------
+    // Metrics
+    // ------------------------------------------------------------------------
+
+    private Meter leftLateRecordsDroppedRate;
+    private Meter rightLateRecordsDroppedRate;
+
+    public WindowJoinHelper(
+            RowDataSerializer leftSerializer,
+            RowDataSerializer rightSerializer,
+            ZoneId shiftTimeZone,
+            WindowTimerService<Long> windowTimerService,
+            JoinConditionWithNullFilters joinCondition,
+            TimestampedCollector<RowData> collector,
+            FlinkJoinType joinType) {
+        this.leftSerializer = leftSerializer;
+        this.rightSerializer = rightSerializer;
+        this.shiftTimeZone = shiftTimeZone;
+        this.windowTimerService = windowTimerService;
+        this.joinCondition = joinCondition;
+        this.collector = collector;
+
+        this.windowJoinProcessor = getWindowJoinProcessor(joinType);
+    }
+
+    public void registerMetric(OperatorMetricGroup metrics) {
+        // register metrics
+        Counter leftNumLateRecordsDropped = 
metrics.counter(LEFT_LATE_ELEMENTS_DROPPED_METRIC_NAME);
+        this.leftLateRecordsDroppedRate =
+                metrics.meter(
+                        LEFT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME,
+                        new MeterView(leftNumLateRecordsDropped));
+        Counter rightNumLateRecordsDropped =
+                metrics.counter(RIGHT_LATE_ELEMENTS_DROPPED_METRIC_NAME);
+        this.rightLateRecordsDroppedRate =
+                metrics.meter(
+                        RIGHT_LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME,
+                        new MeterView(rightNumLateRecordsDropped));
+        metrics.gauge(
+                WATERMARK_LATENCY_METRIC_NAME,
+                () -> {
+                    long watermark = windowTimerService.currentWatermark();
+                    if (watermark < 0) {
+                        return 0L;
+                    } else {
+                        return windowTimerService.currentProcessingTime() - 
watermark;
+                    }
+                });
+    }
+
+    public void processElement(
+            StreamRecord<RowData> element,
+            int windowEndIndex,
+            Meter lateRecordsDroppedRate,
+            BiConsumerWithException<Long, RowData, Exception> accStateConsumer)
+            throws Exception {
+        RowData inputRow = element.getValue();
+        long windowEnd = inputRow.getLong(windowEndIndex);
+        if (isWindowFired(windowEnd, windowTimerService.currentWatermark(), 
shiftTimeZone)) {
+            // element is late and should be dropped
+            lateRecordsDroppedRate.markEvent();
+            return;
+        }
+        if (RowDataUtil.isAccumulateMsg(inputRow)) {
+            accStateConsumer.accept(windowEnd, inputRow);
+        } else {
+            // Window join could not handle retraction input stream
+            throw new UnsupportedOperationException(
+                    "This is a bug and should not happen. Please file an 
issue.");
+        }
+        // always register time for every element
+        windowTimerService.registerEventTimeWindowTimer(windowEnd);
+    }
+
+    public void joinAndClear(
+            long windowEnd,
+            @Nullable Iterable<RowData> leftRecords,
+            @Nullable Iterable<RowData> rightRecords)
+            throws Exception {
+        windowJoinProcessor.doJoin(leftRecords, rightRecords);
+        // clear state
+        if (leftRecords != null) {
+            clearState(windowEnd, true);
+        }
+        if (rightRecords != null) {
+            clearState(windowEnd, false);
+        }
+    }
+
+    public Meter getLeftLateRecordsDroppedRate() {
+        return leftLateRecordsDroppedRate;
+    }
+
+    public Meter getRightLateRecordsDroppedRate() {
+        return rightLateRecordsDroppedRate;
+    }
+
+    public abstract void clearState(long windowEnd, boolean isLeft) throws 
Exception;
+
+    private WindowJoinProcessor getWindowJoinProcessor(FlinkJoinType joinType) 
{
+        switch (joinType) {
+            case INNER:
+                return new InnerWindowJoinProcessor();
+            case SEMI:
+                return new SemiAntiWindowJoinProcessor(false);
+            case ANTI:
+                return new SemiAntiWindowJoinProcessor(true);
+            case LEFT:
+                return new LeftOuterWindowJoinProcessor();
+            case RIGHT:
+                return new RightOuterWindowJoinProcessor();
+            case FULL:
+                return new FullOuterWindowJoinProcessor();
+            default:
+                throw new IllegalArgumentException("Invalid join type: " + 
joinType);
+        }
+    }
+
+    /**
+     * A processor to do the different logic for different {@link 
FlinkJoinType}.
+     *
+     * <p>TODO FLINK-37106 consider extracting common methods in different 
WindowJoinProcessor
+     */
+    private interface WindowJoinProcessor {
+
+        void doJoin(
+                @Nullable Iterable<RowData> leftRecords, @Nullable 
Iterable<RowData> rightRecords);
+    }
+
+    private class SemiAntiWindowJoinProcessor implements WindowJoinProcessor {
+
+        private final boolean isAntiJoin;
+
+        public SemiAntiWindowJoinProcessor(boolean isAntiJoin) {
+            this.isAntiJoin = isAntiJoin;
+        }
+
+        @Override
+        public void doJoin(
+                @Nullable Iterable<RowData> leftRecords, @Nullable 
Iterable<RowData> rightRecords) {
+            if (leftRecords == null) {
+                return;
+            }
+            if (rightRecords == null) {
+                if (isAntiJoin) {
+                    for (RowData leftRecord : leftRecords) {
+                        collector.collect(leftRecord);
+                    }
+                }
+                return;
+            }
+            for (RowData leftRecord : leftRecords) {
+                boolean matches = false;
+                for (RowData rightRecord : rightRecords) {
+                    if (joinCondition.apply(leftRecord, rightRecord)) {
+                        matches = true;
+                        break;
+                    }
+                }
+                if (matches) {
+                    if (!isAntiJoin) {
+                        // emit left record if there are matched rows on the 
other side
+                        collector.collect(leftRecord);
+                    }
+                } else {
+                    if (isAntiJoin) {
+                        // emit left record if there is no matched row on the 
other side
+                        collector.collect(leftRecord);
+                    }
+                }
+            }
+        }
+    }
+
+    private class InnerWindowJoinProcessor implements WindowJoinProcessor {
+
+        private final JoinedRowData outRow = new JoinedRowData();
+
+        @Override
+        public void doJoin(
+                @Nullable Iterable<RowData> leftRecords, @Nullable 
Iterable<RowData> rightRecords) {
+            if (leftRecords == null || rightRecords == null) {
+                return;
+            }
+            for (RowData leftRecord : leftRecords) {
+                for (RowData rightRecord : rightRecords) {
+                    if (joinCondition.apply(leftRecord, rightRecord)) {
+                        outRow.setRowKind(RowKind.INSERT);
+                        outRow.replace(leftRecord, rightRecord);
+                        collector.collect(outRow);
+                    }
+                }
+            }
+        }
+    }
+
+    private abstract class AbstractOuterWindowJoinProcessor implements 
WindowJoinProcessor {
+
+        private final RowData leftNullRow = new 
GenericRowData(leftSerializer.getArity());
+        private final RowData rightNullRow = new 
GenericRowData(rightSerializer.getArity());
+        private final JoinedRowData outRow = new JoinedRowData();
+
+        protected void outputNullPadding(RowData row, boolean isLeft) {
+            if (isLeft) {
+                outRow.replace(row, rightNullRow);
+            } else {
+                outRow.replace(leftNullRow, row);
+            }
+            outRow.setRowKind(RowKind.INSERT);
+            collector.collect(outRow);
+        }
+
+        protected void outputNullPadding(Iterable<RowData> rows, boolean 
isLeft) {
+            for (RowData row : rows) {
+                outputNullPadding(row, isLeft);
+            }
+        }
+
+        protected void output(RowData inputRow, RowData otherRow, boolean 
inputIsLeft) {
+            if (inputIsLeft) {
+                outRow.replace(inputRow, otherRow);
+            } else {
+                outRow.replace(otherRow, inputRow);
+            }
+            outRow.setRowKind(RowKind.INSERT);
+            collector.collect(outRow);
+        }
+    }
+
+    private class LeftOuterWindowJoinProcessor extends 
AbstractOuterWindowJoinProcessor {
+
+        @Override
+        public void doJoin(
+                @Nullable Iterable<RowData> leftRecords, @Nullable 
Iterable<RowData> rightRecords) {
+            if (leftRecords == null) {
+                return;
+            }
+            if (rightRecords == null) {
+                outputNullPadding(leftRecords, true);
+            } else {
+                for (RowData leftRecord : leftRecords) {
+                    boolean matches = false;
+                    for (RowData rightRecord : rightRecords) {
+                        if (joinCondition.apply(leftRecord, rightRecord)) {
+                            output(leftRecord, rightRecord, true);
+                            matches = true;
+                        }
+                    }
+                    if (!matches) {
+                        // padding null for left side
+                        outputNullPadding(leftRecord, true);
+                    }
+                }
+            }
+        }
+    }
+
+    private class RightOuterWindowJoinProcessor extends 
AbstractOuterWindowJoinProcessor {
+
+        @Override
+        public void doJoin(
+                @Nullable Iterable<RowData> leftRecords, @Nullable 
Iterable<RowData> rightRecords) {
+            if (rightRecords == null) {
+                return;
+            }
+            if (leftRecords == null) {
+                outputNullPadding(rightRecords, false);
+            } else {
+                for (RowData rightRecord : rightRecords) {
+                    boolean matches = false;
+                    for (RowData leftRecord : leftRecords) {
+                        if (joinCondition.apply(leftRecord, rightRecord)) {
+                            output(leftRecord, rightRecord, true);
+                            matches = true;
+                        }
+                    }
+                    if (!matches) {
+                        outputNullPadding(rightRecord, false);
+                    }
+                }
+            }
+        }
+    }
+
+    private class FullOuterWindowJoinProcessor extends 
AbstractOuterWindowJoinProcessor {
+
+        @Override
+        public void doJoin(
+                @Nullable Iterable<RowData> leftRecords, @Nullable 
Iterable<RowData> rightRecords) {
+            if (leftRecords == null && rightRecords == null) {
+                return;
+            }
+            if (rightRecords == null) {
+                outputNullPadding(leftRecords, true);
+            } else if (leftRecords == null) {
+                outputNullPadding(rightRecords, false);
+            } else {
+                IdentityHashMap<RowData, Boolean> emittedRightRecords = new 
IdentityHashMap<>();
+                for (RowData leftRecord : leftRecords) {
+                    boolean matches = false;
+                    for (RowData rightRecord : rightRecords) {
+                        if (joinCondition.apply(leftRecord, rightRecord)) {
+                            output(leftRecord, rightRecord, true);
+                            matches = true;
+                            emittedRightRecords.put(rightRecord, Boolean.TRUE);
+                        }
+                    }
+                    // padding null for left side
+                    if (!matches) {
+                        outputNullPadding(leftRecord, true);
+                    }
+                }
+                // padding null for never emitted right side
+                for (RowData rightRecord : rightRecords) {
+                    if (!emittedRightRecords.containsKey(rightRecord)) {
+                        outputNullPadding(rightRecord, false);
+                    }
+                }
+            }
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/asyncprocessing/state/WindowAsyncState.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/asyncprocessing/state/WindowAsyncState.java
new file mode 100644
index 00000000000..0ee9ded008a
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/asyncprocessing/state/WindowAsyncState.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.table.runtime.operators.window.tvf.asyncprocessing.state;
+
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.table.runtime.operators.window.tvf.state.WindowState;
+
+/**
+ * A base interface for manipulate state with window namespace.
+ *
+ * <p>Different with {@link WindowState}, this interface is based on async 
state api.
+ */
+public interface WindowAsyncState<W> {
+
+    /** Removes the value mapped under current key and the given window. */
+    StateFuture<Void> asyncClear(W window);
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/asyncprocessing/state/WindowListAsyncState.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/asyncprocessing/state/WindowListAsyncState.java
new file mode 100644
index 00000000000..233c68c6782
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/asyncprocessing/state/WindowListAsyncState.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.table.runtime.operators.window.tvf.asyncprocessing.state;
+
+import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.state.v2.StateIterator;
+import org.apache.flink.runtime.state.v2.internal.InternalListState;
+import org.apache.flink.table.data.RowData;
+
+/** A wrapper of {@link ListState} which is easier to update based on window 
namespace. */
+public final class WindowListAsyncState<W> implements WindowAsyncState<W> {
+
+    private final InternalListState<RowData, W, RowData> windowState;
+
+    public WindowListAsyncState(InternalListState<RowData, W, RowData> 
windowState) {
+        this.windowState = windowState;
+    }
+
+    @Override
+    public StateFuture<Void> asyncClear(W window) {
+        windowState.setCurrentNamespace(window);
+        return windowState.asyncClear();
+    }
+
+    public StateFuture<StateIterator<RowData>> asyncGet(W window) {
+        windowState.setCurrentNamespace(window);
+        return windowState.asyncGet();
+    }
+
+    /**
+     * Updates the operator state accessible by {@link #asyncGet(W)} by adding 
the given value to
+     * the list of values. The next time {@link #asyncGet(W)} is called (for 
the same state
+     * partition) the returned state will represent the updated list.
+     *
+     * <p>If null is passed in, the state value will remain unchanged.
+     *
+     * @param window The namespace for the state.
+     * @param value The new value for the state.
+     */
+    public StateFuture<Void> asyncAdd(W window, RowData value) {
+        windowState.setCurrentNamespace(window);
+        return windowState.asyncAdd(value);
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java
index 9071f80d3b2..b1fe275087c 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java
@@ -19,8 +19,10 @@
 package org.apache.flink.table.runtime.operators.join.window;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import 
org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedTwoInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
@@ -72,13 +74,20 @@ class WindowJoinOperatorTest {
 
     private final ZoneId shiftTimeZone;
 
-    WindowJoinOperatorTest(ZoneId shiftTimeZone) {
+    private final boolean enableAsyncState;
+
+    WindowJoinOperatorTest(ZoneId shiftTimeZone, boolean enableAsyncState) {
         this.shiftTimeZone = shiftTimeZone;
+        this.enableAsyncState = enableAsyncState;
     }
 
-    @Parameters(name = "TimeZone = {0}")
+    @Parameters(name = "TimeZone = {0}, EnableAsyncState = {1}")
     private static Collection<Object[]> runMode() {
-        return Arrays.asList(new Object[] {UTC_ZONE_ID}, new Object[] 
{SHANGHAI_ZONE_ID});
+        return Arrays.asList(
+                new Object[] {UTC_ZONE_ID, false},
+                new Object[] {UTC_ZONE_ID, true},
+                new Object[] {SHANGHAI_ZONE_ID, false},
+                new Object[] {SHANGHAI_ZONE_ID, true});
     }
 
     @TestTemplate
@@ -516,7 +525,8 @@ class WindowJoinOperatorTest {
                 HandwrittenSelectorUtil.getRowDataSelector(
                         new int[] {keyIdx}, INPUT_ROW_TYPE.toRowFieldTypes());
         TypeInformation<RowData> keyType = InternalTypeInfo.ofFields();
-        WindowJoinOperator operator =
+
+        WindowJoinOperatorBuilder operatorBuilder =
                 WindowJoinOperatorBuilder.builder()
                         .leftSerializer(INPUT_ROW_TYPE.toRowSerializer())
                         .rightSerializer(INPUT_ROW_TYPE.toRowSerializer())
@@ -525,11 +535,16 @@ class WindowJoinOperatorTest {
                         .rightWindowEndIndex(0)
                         .filterNullKeys(new boolean[] {true})
                         .joinType(joinType)
-                        .withShiftTimezone(shiftTimeZone)
-                        .build();
-        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, 
RowData> testHarness =
-                new KeyedTwoInputStreamOperatorTestHarness<>(
-                        operator, keySelector, keySelector, keyType);
-        return testHarness;
+                        .withShiftTimezone(shiftTimeZone);
+        if (enableAsyncState) {
+            operatorBuilder.enableAsyncState();
+            TwoInputStreamOperator<RowData, RowData, RowData> operator = 
operatorBuilder.build();
+            return AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+                    operator, keySelector, keySelector, keyType, 1, 1, 0);
+        } else {
+            TwoInputStreamOperator<RowData, RowData, RowData> operator = 
operatorBuilder.build();
+            return new KeyedTwoInputStreamOperatorTestHarness<>(
+                    operator, keySelector, keySelector, keyType);
+        }
     }
 }


Reply via email to