This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a0c2e40506 [FLINK-37521][runtime] Implement async state version of  
KeyedCoProcessOperator (#26328)
4a0c2e40506 is described below

commit 4a0c2e40506558e877b6b4188ab2ea5218c908a8
Author: Yanfei Lei <fredia...@gmail.com>
AuthorDate: Thu Apr 3 10:27:45 2025 +0800

    [FLINK-37521][runtime] Implement async state version of  
KeyedCoProcessOperator (#26328)
---
 .../DeclaringAsyncKeyedCoProcessFunction.java      | 130 ++++
 .../AbstractAsyncStateStreamOperator.java          |  18 +-
 .../{ => co}/AsyncIntervalJoinOperator.java        |   8 +-
 .../operators/co/AsyncKeyedCoProcessOperator.java  | 238 +++++++
 ...ncKeyedCoProcessOperatorWithWatermarkDelay.java |  48 ++
 .../streaming/api/datastream/ConnectedStreams.java |  38 +-
 .../streaming/api/datastream/KeyedStream.java      |   2 +-
 .../transformations/TwoInputTransformation.java    |  11 +
 .../AbstractAsyncStateStreamOperatorTest.java      |   7 +-
 .../operators/AsyncIntervalJoinOperatorTest.java   |   1 +
 .../operators/AsyncKeyedCoProcessOperatorTest.java | 746 +++++++++++++++++++++
 .../nodes/exec/stream/StreamExecIntervalJoin.java  |   4 +-
 12 files changed, 1236 insertions(+), 15 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/functions/DeclaringAsyncKeyedCoProcessFunction.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/functions/DeclaringAsyncKeyedCoProcessFunction.java
new file mode 100644
index 00000000000..ab27c0095d3
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/functions/DeclaringAsyncKeyedCoProcessFunction.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclarationException;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+/**
+ * A function that processes elements of two keyed streams and produces a 
single output stream.
+ *
+ * <p>The function will be called for every element in the input streams and 
can produce zero or
+ * more output elements. Contrary to the {@link CoFlatMapFunction}, this 
function can also query the
+ * time (both event and processing) and set timers, through the provided 
{@link Context}. When
+ * reacting to the firing of timers the function can emit yet more elements.
+ *
+ * <p>An example use case for connected streams is the application of a set of 
rules that change
+ * over time ({@code stream A}) to the elements contained in another stream 
(stream {@code B}). The
+ * rules contained in {@code stream A} can be stored in the state and wait for 
new elements to
+ * arrive on {@code stream B}. Upon reception of a new element on {@code 
stream B}, the function can
+ * apply the previously stored rules to the element and emit a result, and/or 
register a timer that
+ * will trigger an action in the future.
+ *
+ * @param <K> Type of the key.
+ * @param <IN1> Type of the first input.
+ * @param <IN2> Type of the second input.
+ * @param <OUT> Output type.
+ */
+@Internal
+public abstract class DeclaringAsyncKeyedCoProcessFunction<K, IN1, IN2, OUT>
+        extends KeyedCoProcessFunction<K, IN1, IN2, OUT> {
+
+    private static final long serialVersionUID = 1L;
+
+    /** Override and finalize this method. Please use {@link #declareProcess1} 
instead. */
+    @Override
+    public final void processElement1(IN1 value, Context ctx, Collector<OUT> 
out) throws Exception {
+        throw new IllegalAccessException("This method is replaced by 
declareProcess1.");
+    }
+
+    /** Override and finalize this method. Please use {@link #declareProcess2} 
instead. */
+    @Override
+    public final void processElement2(IN2 value, Context ctx, Collector<OUT> 
out) throws Exception {
+        throw new IllegalAccessException("This method is replaced by 
declareProcess2.");
+    }
+
+    /** Override and finalize this method. Please use {@link #declareOnTimer} 
instead. */
+    public final void onTimer(long timestamp, OnTimerContext ctx, 
Collector<OUT> out)
+            throws Exception {
+        throw new IllegalAccessException("This method is replaced by 
declareOnTimer.");
+    }
+
+    /**
+     * Declaring variables before {@link #declareProcess1} and {@link 
#declareProcess2} and {@link
+     * #declareOnTimer}.
+     */
+    public void declareVariables(DeclarationContext context) {}
+
+    /**
+     * Declare a process for one element from the first of the connected 
streams.
+     *
+     * <p>This function can output zero or more elements using the {@link 
Collector} parameter and
+     * also update internal state or set timers using the {@link Context} 
parameter.
+     *
+     * @param context the context that provides useful methods to define named 
callbacks.
+     * @param ctx A {@link Context} that allows querying the timestamp of the 
element and getting a
+     *     {@link TimerService} for registering timers and querying the time. 
The context is only
+     *     valid during the invocation of this method, do not store it.
+     * @param out The collector for returning result values.
+     * @return the whole processing logic just like {@code processElement}.
+     */
+    public abstract ThrowingConsumer<IN1, Exception> declareProcess1(
+            DeclarationContext context, Context ctx, Collector<OUT> out)
+            throws DeclarationException;
+
+    /**
+     * Declare a process for one element from the second of the connected 
streams.
+     *
+     * <p>This function can output zero or more elements using the {@link 
Collector} parameter and
+     * also update internal state or set timers using the {@link Context} 
parameter.
+     *
+     * @param context the context that provides useful methods to define named 
callbacks.
+     * @param ctx A {@link Context} that allows querying the timestamp of the 
element and getting a
+     *     {@link TimerService} for registering timers and querying the time. 
The context is only
+     *     valid during the invocation of this method, do not store it.
+     * @param out The collector for returning result values.
+     * @return the whole processing logic just like {@code processElement}.
+     */
+    public abstract ThrowingConsumer<IN2, Exception> declareProcess2(
+            DeclarationContext context, Context ctx, Collector<OUT> out)
+            throws DeclarationException;
+
+    /**
+     * Declare a procedure which is called when a timer set using {@link 
TimerService} fires.
+     *
+     * @param context the context that provides useful methods to define named 
callbacks.
+     * @param ctx An {@link OnTimerContext} that allows querying the 
timestamp, the {@link
+     *     TimeDomain}, and the key of the firing timer and getting a {@link 
TimerService} for
+     *     registering timers and querying the time. The context is only valid 
during the invocation
+     *     of this method, do not store it.
+     * @param out The processor for processing timestamps.
+     */
+    public ThrowingConsumer<Long, Exception> declareOnTimer(
+            DeclarationContext context, OnTimerContext ctx, Collector<OUT> out)
+            throws DeclarationException {
+        return (t) -> {};
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
index df79c512a1d..0c702ed483e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
@@ -383,12 +383,18 @@ public abstract class 
AbstractAsyncStateStreamOperator<OUT> extends AbstractStre
     }
 
     /**
-     * A hook that will be invoked after finishing advancing the watermark. It 
is not recommended to
-     * perform async state here. Only some synchronous logic is suggested.
+     * A hook that will be invoked after finishing advancing the watermark and 
right before the
+     * watermark being emitting downstream. Here is a chance for customization 
of the emitting
+     * watermark. It is not recommended to perform async state here. Only some 
synchronous logic is
+     * suggested.
      *
      * @param watermark the advanced watermark.
+     * @return the watermark that should be emitted to downstream. Null if 
there is no need for
+     *     following emitting.
      */
-    public void postProcessWatermark(Watermark watermark) throws Exception {}
+    public Watermark postProcessWatermark(Watermark watermark) throws 
Exception {
+        return watermark;
+    }
 
     /**
      * Process a watermark when receiving it. Do not override this method 
since the async processing
@@ -425,8 +431,10 @@ public abstract class 
AbstractAsyncStateStreamOperator<OUT> extends AbstractStre
                 },
                 () -> {
                     if (watermarkRef.get() != null) {
-                        output.emitWatermark(watermarkRef.get());
-                        postProcessWatermark(watermarkRef.get());
+                        Watermark postProcessWatermark = 
postProcessWatermark(watermarkRef.get());
+                        if (postProcessWatermark != null) {
+                            output.emitWatermark(postProcessWatermark);
+                        }
                     }
                 });
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncIntervalJoinOperator.java
similarity index 97%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperator.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncIntervalJoinOperator.java
index 1480fc0e4cd..9eb54a23a5a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncIntervalJoinOperator.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.asyncprocessing.operators;
+package org.apache.flink.runtime.asyncprocessing.operators.co;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
@@ -28,6 +28,8 @@ import 
org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable;
+import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
+import 
org.apache.flink.runtime.asyncprocessing.operators.TimestampedCollectorWithDeclaredVariable;
 import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
@@ -417,12 +419,12 @@ public class AsyncIntervalJoinOperator<K, T1, T2, OUT>
     }
 
     @VisibleForTesting
-    MapState<Long, List<IntervalJoinOperator.BufferEntry<T1>>> getLeftBuffer() 
{
+    public MapState<Long, List<IntervalJoinOperator.BufferEntry<T1>>> 
getLeftBuffer() {
         return leftBuffer;
     }
 
     @VisibleForTesting
-    MapState<Long, List<IntervalJoinOperator.BufferEntry<T2>>> 
getRightBuffer() {
+    public MapState<Long, List<IntervalJoinOperator.BufferEntry<T2>>> 
getRightBuffer() {
         return rightBuffer;
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperator.java
new file mode 100644
index 00000000000..03454e22709
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperator.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable;
+import 
org.apache.flink.runtime.asyncprocessing.functions.DeclaringAsyncKeyedCoProcessFunction;
+import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
+import 
org.apache.flink.runtime.asyncprocessing.operators.TimestampedCollectorWithDeclaredVariable;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link org.apache.flink.streaming.api.operators.StreamOperator} for 
executing keyed {@link
+ * KeyedCoProcessFunction KeyedCoProcessFunction}.
+ */
+@Internal
+public class AsyncKeyedCoProcessOperator<K, IN1, IN2, OUT>
+        extends AbstractAsyncStateUdfStreamOperator<OUT, 
KeyedCoProcessFunction<K, IN1, IN2, OUT>>
+        implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<K, 
VoidNamespace> {
+
+    private static final long serialVersionUID = 1L;
+
+    // Shared timestamp variable for collector, context and onTimerContext.
+    private transient DeclaredVariable<Long> sharedTimestamp;
+
+    private transient TimestampedCollectorWithDeclaredVariable<OUT> collector;
+
+    private transient ContextImpl<K, IN1, IN2, OUT> context;
+
+    private transient OnTimerContextImpl<K, IN1, IN2, OUT> onTimerContext;
+
+    private transient ThrowingConsumer<IN1, Exception> processor1;
+    private transient ThrowingConsumer<IN2, Exception> processor2;
+    private transient ThrowingConsumer<Long, Exception> timerProcessor;
+
+    public AsyncKeyedCoProcessOperator(
+            KeyedCoProcessFunction<K, IN1, IN2, OUT> keyedCoProcessFunction) {
+        super(keyedCoProcessFunction);
+    }
+
+    @Override
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public void open() throws Exception {
+        super.open();
+        sharedTimestamp =
+                declarationContext.declareVariable(
+                        LongSerializer.INSTANCE,
+                        "_AsyncCoKeyedProcessOperator$sharedTimestamp",
+                        null);
+
+        collector = new TimestampedCollectorWithDeclaredVariable<>(output, 
sharedTimestamp);
+
+        InternalTimerService<VoidNamespace> internalTimerService =
+                getInternalTimerService("user-timers", 
VoidNamespaceSerializer.INSTANCE, this);
+
+        TimerService timerService = new 
SimpleTimerService(internalTimerService);
+
+        context = new ContextImpl<>(userFunction, timerService, 
sharedTimestamp);
+        onTimerContext = new OnTimerContextImpl<>(userFunction, timerService, 
declarationContext);
+        if (userFunction instanceof DeclaringAsyncKeyedCoProcessFunction) {
+            DeclaringAsyncKeyedCoProcessFunction declaringFunction =
+                    (DeclaringAsyncKeyedCoProcessFunction) userFunction;
+            declaringFunction.declareVariables(declarationContext);
+            processor1 = declaringFunction.declareProcess1(declarationContext, 
context, collector);
+            processor2 = declaringFunction.declareProcess2(declarationContext, 
context, collector);
+            timerProcessor =
+                    declaringFunction.declareOnTimer(declarationContext, 
onTimerContext, collector);
+        } else {
+            processor1 = (in) -> userFunction.processElement1(in, context, 
collector);
+            processor2 = (in) -> userFunction.processElement2(in, context, 
collector);
+            timerProcessor = (in) -> userFunction.onTimer(in, onTimerContext, 
collector);
+        }
+    }
+
+    @Override
+    public void processElement1(StreamRecord<IN1> element) throws Exception {
+        collector.setTimestamp(element);
+        processor1.accept(element.getValue());
+    }
+
+    @Override
+    public void processElement2(StreamRecord<IN2> element) throws Exception {
+        collector.setTimestamp(element);
+        processor2.accept(element.getValue());
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {
+        collector.setAbsoluteTimestamp(timer.getTimestamp());
+        invokeUserFunction(TimeDomain.EVENT_TIME, timer);
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {
+        collector.eraseTimestamp();
+        invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
+    }
+
+    private void invokeUserFunction(TimeDomain timeDomain, InternalTimer<K, 
VoidNamespace> timer)
+            throws Exception {
+        onTimerContext.setTime(timer.getTimestamp(), timeDomain);
+        timerProcessor.accept(timer.getTimestamp());
+    }
+
+    public class ContextImpl<K, IN1, IN2, OUT>
+            extends KeyedCoProcessFunction<K, IN1, IN2, OUT>.Context {
+
+        private final TimerService timerService;
+
+        private final DeclaredVariable<Long> timestamp;
+
+        ContextImpl(
+                KeyedCoProcessFunction<K, IN1, IN2, OUT> function,
+                TimerService timerService,
+                DeclaredVariable<Long> timestamp) {
+            function.super();
+            this.timerService = checkNotNull(timerService);
+            this.timestamp = timestamp;
+        }
+
+        @Override
+        public Long timestamp() {
+            return timestamp.get();
+        }
+
+        @Override
+        public TimerService timerService() {
+            return timerService;
+        }
+
+        @Override
+        public <X> void output(OutputTag<X> outputTag, X value) {
+            if (outputTag == null) {
+                throw new IllegalArgumentException("OutputTag must not be 
null.");
+            }
+
+            output.collect(outputTag, new StreamRecord<>(value, 
timestamp.get()));
+        }
+
+        @Override
+        public K getCurrentKey() {
+            return (K) AsyncKeyedCoProcessOperator.this.getCurrentKey();
+        }
+    }
+
+    private class OnTimerContextImpl<K, IN1, IN2, OUT>
+            extends KeyedCoProcessFunction<K, IN1, IN2, OUT>.OnTimerContext {
+
+        private final TimerService timerService;
+
+        private final DeclaredVariable<String> timeDomain;
+
+        private final DeclaredVariable<Long> timestamp;
+
+        OnTimerContextImpl(
+                KeyedCoProcessFunction<K, IN1, IN2, OUT> function,
+                TimerService timerService,
+                DeclarationContext declarationContext) {
+            function.super();
+            this.timerService = checkNotNull(timerService);
+            this.timeDomain =
+                    declarationContext.declareVariable(
+                            StringSerializer.INSTANCE, 
"_OnTimerContextImpl$timeDomain", null);
+            this.timestamp =
+                    declarationContext.declareVariable(
+                            LongSerializer.INSTANCE, 
"_OnTimerContextImpl$timestamp", null);
+        }
+
+        public void setTime(long time, TimeDomain one) {
+            timestamp.set(time);
+            timeDomain.set(one.name());
+        }
+
+        @Override
+        public Long timestamp() {
+            checkState(timestamp.get() != null);
+            return timestamp.get();
+        }
+
+        @Override
+        public TimerService timerService() {
+            return timerService;
+        }
+
+        @Override
+        public <X> void output(OutputTag<X> outputTag, X value) {
+            if (outputTag == null) {
+                throw new IllegalArgumentException("OutputTag must not be 
null.");
+            }
+
+            output.collect(outputTag, new StreamRecord<>(value, timestamp()));
+        }
+
+        @Override
+        public TimeDomain timeDomain() {
+            checkState(timeDomain.get() != null);
+            return TimeDomain.valueOf(timeDomain.get());
+        }
+
+        @Override
+        public K getCurrentKey() {
+            return (K) AsyncKeyedCoProcessOperator.this.getCurrentKey();
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperatorWithWatermarkDelay.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperatorWithWatermarkDelay.java
new file mode 100644
index 00000000000..ace1297a0a9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncKeyedCoProcessOperatorWithWatermarkDelay.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing.operators.co;
+
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
+import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.Preconditions;
+
+/** A {@link KeyedCoProcessOperator} that supports holding back watermarks 
with a static delay. */
+public class AsyncKeyedCoProcessOperatorWithWatermarkDelay<K, IN1, IN2, OUT>
+        extends AsyncKeyedCoProcessOperator<K, IN1, IN2, OUT> {
+    private static final long serialVersionUID = 1L;
+
+    private final long watermarkDelay;
+
+    public AsyncKeyedCoProcessOperatorWithWatermarkDelay(
+            KeyedCoProcessFunction<K, IN1, IN2, OUT> keyedCoProcessFunction, 
long watermarkDelay) {
+        super(keyedCoProcessFunction);
+        Preconditions.checkArgument(
+                watermarkDelay >= 0, "The watermark delay should be 
non-negative.");
+        this.watermarkDelay = watermarkDelay;
+    }
+
+    @Override
+    public Watermark postProcessWatermark(Watermark watermark) throws 
Exception {
+        if (watermarkDelay == 0) {
+            return watermark;
+        } else {
+            return new Watermark(watermark.getTimestamp() - watermarkDelay);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index 3c9347c9a25..45296ddfad8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
@@ -26,6 +27,7 @@ import 
org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import 
org.apache.flink.runtime.asyncprocessing.operators.co.AsyncKeyedCoProcessOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
@@ -69,12 +71,20 @@ public class ConnectedStreams<IN1, IN2> {
     protected final StreamExecutionEnvironment environment;
     protected final DataStream<IN1> inputStream1;
     protected final DataStream<IN2> inputStream2;
+    protected boolean isEnableAsyncState;
 
     protected ConnectedStreams(
             StreamExecutionEnvironment env, DataStream<IN1> input1, 
DataStream<IN2> input2) {
         this.environment = requireNonNull(env);
         this.inputStream1 = requireNonNull(input1);
         this.inputStream2 = requireNonNull(input2);
+        if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof 
KeyedStream)) {
+            this.isEnableAsyncState =
+                    ((KeyedStream) inputStream1).isEnableAsyncState()
+                            && ((KeyedStream) 
inputStream2).isEnableAsyncState();
+        } else {
+            this.isEnableAsyncState = false;
+        }
     }
 
     public StreamExecutionEnvironment getExecutionEnvironment() {
@@ -439,7 +449,12 @@ public class ConnectedStreams<IN1, IN2> {
         TwoInputStreamOperator<IN1, IN2, R> operator;
 
         if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof 
KeyedStream)) {
-            operator = new 
KeyedCoProcessOperator<>(inputStream1.clean(keyedCoProcessFunction));
+            operator =
+                    isEnableAsyncState
+                            ? new AsyncKeyedCoProcessOperator<>(
+                                    inputStream1.clean(keyedCoProcessFunction))
+                            : new KeyedCoProcessOperator<>(
+                                    
inputStream1.clean(keyedCoProcessFunction));
         } else {
             throw new UnsupportedOperationException(
                     "KeyedCoProcessFunction can only be used "
@@ -523,4 +538,25 @@ public class ConnectedStreams<IN1, IN2> {
 
         return returnStream;
     }
+
+    /**
+     * Enable the async state processing for following keyed processing 
function on connected
+     * streams. This also requires only State V2 APIs are used in the function.
+     *
+     * @return the configured ConnectedStreams itself.
+     */
+    @Experimental
+    public ConnectedStreams<IN1, IN2> enableAsyncState() {
+        if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof 
KeyedStream)) {
+            ((KeyedStream<?, ?>) inputStream1).enableAsyncState();
+            ((KeyedStream<?, ?>) inputStream2).enableAsyncState();
+            this.isEnableAsyncState = true;
+        } else {
+            throw new UnsupportedOperationException(
+                    "The connected streams do not support async state, "
+                            + "please ensure that two input streams of your 
connected streams are "
+                            + "keyed stream(not behind a keyBy()).");
+        }
+        return this;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index dd4818fe30f..f082f11b8e6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -35,8 +35,8 @@ import 
org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import 
org.apache.flink.runtime.asyncprocessing.operators.AsyncIntervalJoinOperator;
 import org.apache.flink.runtime.asyncprocessing.operators.AsyncStreamFlatMap;
+import 
org.apache.flink.runtime.asyncprocessing.operators.co.AsyncIntervalJoinOperator;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
index 007f908c809..43566988b1e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -239,4 +240,14 @@ public class TwoInputTransformation<IN1, IN2, OUT> extends 
PhysicalTransformatio
     public boolean isInternalSorterSupported() {
         return 
operatorFactory.getOperatorAttributes().isInternalSorterSupported();
     }
+
+    @Override
+    public void enableAsyncState() {
+        TwoInputStreamOperator<IN1, IN2, OUT> operator =
+                (TwoInputStreamOperator<IN1, IN2, OUT>)
+                        ((SimpleOperatorFactory<OUT>) 
operatorFactory).getOperator();
+        if (!(operator instanceof AsyncStateProcessingOperator)) {
+            super.enableAsyncState();
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
index bb234af0eef..e5452f51656 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
@@ -371,8 +371,8 @@ public class AbstractAsyncStateStreamOperatorTest {
             expectedOutput.add(new StreamRecord<>(1002L));
             expectedOutput.add(new StreamRecord<>(1L));
             expectedOutput.add(new StreamRecord<>(3L));
-            expectedOutput.add(new Watermark(3L));
             expectedOutput.add(new StreamRecord<>(103L));
+            expectedOutput.add(new Watermark(3L));
             testHarness.processWatermark1(new Watermark(4L));
             testHarness.processWatermark2(new Watermark(4L));
             expectedOutput.add(new StreamRecord<>(1004L));
@@ -380,8 +380,8 @@ public class AbstractAsyncStateStreamOperatorTest {
             testHarness.processWatermark2(new Watermark(5L));
             expectedOutput.add(new StreamRecord<>(1005L));
             expectedOutput.add(new StreamRecord<>(4L));
-            expectedOutput.add(new Watermark(6L));
             expectedOutput.add(new StreamRecord<>(106L));
+            expectedOutput.add(new Watermark(6L));
 
             TestHarnessUtil.assertOutputEquals(
                     "Output was not correct", expectedOutput, 
testHarness.getOutput());
@@ -690,10 +690,11 @@ public class AbstractAsyncStateStreamOperatorTest {
         }
 
         @Override
-        public void postProcessWatermark(Watermark watermark) throws Exception 
{
+        public Watermark postProcessWatermark(Watermark watermark) throws 
Exception {
             if (postProcessFunction != null) {
                 postProcessFunction.accept(watermark);
             }
+            return watermark;
         }
 
         @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperatorTest.java
index dc08ed7f76c..2c250380969 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AsyncIntervalJoinOperatorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.runtime.asyncprocessing.operators.co.AsyncIntervalJoinOperator;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/asyncprocessing/operators/AsyncKeyedCoProcessOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/asyncprocessing/operators/AsyncKeyedCoProcessOperatorTest.java
new file mode 100644
index 00000000000..d7113471c35
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/asyncprocessing/operators/AsyncKeyedCoProcessOperatorTest.java
@@ -0,0 +1,746 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.asyncprocessing.operators;
+
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.state.StateFutureUtils;
+import org.apache.flink.runtime.asyncprocessing.declare.ContextVariable;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclarationException;
+import 
org.apache.flink.runtime.asyncprocessing.functions.DeclaringAsyncKeyedCoProcessFunction;
+import 
org.apache.flink.runtime.asyncprocessing.operators.co.AsyncKeyedCoProcessOperator;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import 
org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AsyncKeyedCoProcessOperator}. */
+class AsyncKeyedCoProcessOperatorTest {
+
+    @Test
+    void testDeclareProcessor() throws Exception {
+        TestChainDeclarationFunction function = new 
TestChainDeclarationFunction();
+        AsyncKeyedCoProcessOperator<String, Integer, String, String> operator =
+                new AsyncKeyedCoProcessOperator<>(function);
+
+        AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String, 
String> testHarness =
+                AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+                        operator,
+                        new IntToStringKeySelector<>(),
+                        new IdentityKeySelector<>(),
+                        BasicTypeInfo.STRING_TYPE_INFO);
+
+        ArrayList<StreamRecord<String>> expectedOutput = new ArrayList<>();
+
+        testHarness.open();
+        testHarness.processElement1(new StreamRecord<>(5));
+        expectedOutput.add(new StreamRecord<>("11"));
+        assertThat(function.value.get()).isEqualTo(11);
+        testHarness.processElement2(new StreamRecord<>("6"));
+        expectedOutput.add(new StreamRecord<>("6"));
+        assertThat(function.value.get()).isEqualTo(17);
+        
assertThat(testHarness.getOutput()).containsExactly(expectedOutput.toArray());
+    }
+
+    @Test
+    void testTimestampAndWatermarkQuerying() throws Exception {
+
+        AsyncKeyedCoProcessOperator<String, Integer, String, String> operator =
+                new AsyncKeyedCoProcessOperator<>(new 
WatermarkQueryingProcessFunction());
+
+        AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String, 
String> testHarness =
+                AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+                        operator,
+                        new IntToStringKeySelector<>(),
+                        new IdentityKeySelector<>(),
+                        BasicTypeInfo.STRING_TYPE_INFO);
+
+        testHarness.setup();
+        testHarness.open();
+
+        testHarness.processWatermark1(new Watermark(17));
+        testHarness.processWatermark2(new Watermark(17));
+        testHarness.processElement1(new StreamRecord<>(5, 12L));
+
+        testHarness.processWatermark1(new Watermark(42));
+        testHarness.processWatermark2(new Watermark(42));
+        testHarness.processElement2(new StreamRecord<>("6", 13L));
+
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+        expectedOutput.add(new Watermark(17L));
+        expectedOutput.add(new StreamRecord<>("5WM:17 TS:12", 12L));
+        expectedOutput.add(new Watermark(42L));
+        expectedOutput.add(new StreamRecord<>("6WM:42 TS:13", 13L));
+
+        TestHarnessUtil.assertOutputEquals(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.close();
+    }
+
+    @Test
+    void testTimestampAndProcessingTimeQuerying() throws Exception {
+
+        AsyncKeyedCoProcessOperator<String, Integer, String, String> operator =
+                new AsyncKeyedCoProcessOperator<>(new 
ProcessingTimeQueryingProcessFunction());
+
+        AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String, 
String> testHarness =
+                AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+                        operator,
+                        new IntToStringKeySelector<>(),
+                        new IdentityKeySelector<>(),
+                        BasicTypeInfo.STRING_TYPE_INFO);
+
+        testHarness.setup();
+        testHarness.open();
+
+        testHarness.setProcessingTime(17);
+        testHarness.processElement1(new StreamRecord<>(5));
+
+        testHarness.setProcessingTime(42);
+        testHarness.processElement2(new StreamRecord<>("6"));
+
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+        expectedOutput.add(new StreamRecord<>("5PT:17 TS:null"));
+        expectedOutput.add(new StreamRecord<>("6PT:42 TS:null"));
+
+        TestHarnessUtil.assertOutputEquals(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.close();
+    }
+
+    @Test
+    void testEventTimeTimers() throws Exception {
+
+        AsyncKeyedCoProcessOperator<String, Integer, String, String> operator =
+                new AsyncKeyedCoProcessOperator<>(new 
EventTimeTriggeringProcessFunction());
+
+        AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String, 
String> testHarness =
+                AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+                        operator,
+                        new IntToStringKeySelector<>(),
+                        new IdentityKeySelector<>(),
+                        BasicTypeInfo.STRING_TYPE_INFO);
+
+        testHarness.setup();
+        testHarness.open();
+
+        testHarness.processElement1(new StreamRecord<>(17, 42L));
+        testHarness.processElement2(new StreamRecord<>("18", 42L));
+
+        testHarness.processWatermark1(new Watermark(5));
+        testHarness.processWatermark2(new Watermark(5));
+
+        testHarness.processWatermark1(new Watermark(6));
+        testHarness.processWatermark2(new Watermark(6));
+
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+        expectedOutput.add(new StreamRecord<>("INPUT1:17", 42L));
+        expectedOutput.add(new StreamRecord<>("INPUT2:18", 42L));
+        expectedOutput.add(new StreamRecord<>("17:1777", 5L));
+        expectedOutput.add(new Watermark(5L));
+        expectedOutput.add(new StreamRecord<>("18:1777", 6L));
+        expectedOutput.add(new Watermark(6L));
+
+        TestHarnessUtil.assertOutputEquals(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.close();
+    }
+
+    @Test
+    void testProcessingTimeTimers() throws Exception {
+
+        AsyncKeyedCoProcessOperator<String, Integer, String, String> operator =
+                new AsyncKeyedCoProcessOperator<>(new 
ProcessingTimeTriggeringProcessFunction());
+
+        AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String, 
String> testHarness =
+                AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+                        operator,
+                        new IntToStringKeySelector<>(),
+                        new IdentityKeySelector<>(),
+                        BasicTypeInfo.STRING_TYPE_INFO);
+
+        testHarness.setup();
+        testHarness.open();
+
+        testHarness.processElement1(new StreamRecord<>(17));
+        testHarness.processElement2(new StreamRecord<>("18"));
+
+        testHarness.setProcessingTime(5);
+        testHarness.setProcessingTime(6);
+
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+        expectedOutput.add(new StreamRecord<>("INPUT1:17"));
+        expectedOutput.add(new StreamRecord<>("INPUT2:18"));
+        expectedOutput.add(new StreamRecord<>("1777"));
+        expectedOutput.add(new StreamRecord<>("1777"));
+
+        TestHarnessUtil.assertOutputEquals(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.close();
+    }
+
+    /** Verifies that we don't have leakage between different keys. */
+    @Test
+    void testEventTimeTimerWithState() throws Exception {
+
+        AsyncKeyedCoProcessOperator<String, Integer, String, String> operator =
+                new AsyncKeyedCoProcessOperator<>(new 
EventTimeTriggeringStatefulProcessFunction());
+
+        AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String, 
String> testHarness =
+                AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+                        operator,
+                        new IntToStringKeySelector<>(),
+                        new IdentityKeySelector<>(),
+                        BasicTypeInfo.STRING_TYPE_INFO);
+
+        testHarness.setup();
+        testHarness.open();
+
+        testHarness.processWatermark1(new Watermark(1));
+        testHarness.processWatermark2(new Watermark(1));
+        testHarness.processElement1(new StreamRecord<>(17, 0L)); // should set 
timer for 6
+        testHarness.processElement1(new StreamRecord<>(13, 0L)); // should set 
timer for 6
+
+        testHarness.processWatermark1(new Watermark(2));
+        testHarness.processWatermark2(new Watermark(2));
+        testHarness.processElement1(new StreamRecord<>(13, 1L)); // should 
delete timer
+        testHarness.processElement2(new StreamRecord<>("42", 1L)); // should 
set timer for 7
+
+        testHarness.processWatermark1(new Watermark(6));
+        testHarness.processWatermark2(new Watermark(6));
+
+        testHarness.processWatermark1(new Watermark(7));
+        testHarness.processWatermark2(new Watermark(7));
+
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+        expectedOutput.add(new Watermark(1L));
+        expectedOutput.add(new StreamRecord<>("INPUT1:17", 0L));
+        expectedOutput.add(new StreamRecord<>("INPUT1:13", 0L));
+        expectedOutput.add(new Watermark(2L));
+        expectedOutput.add(new StreamRecord<>("INPUT2:42", 1L));
+        expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+        expectedOutput.add(new Watermark(6L));
+        expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+        expectedOutput.add(new Watermark(7L));
+
+        TestHarnessUtil.assertOutputEquals(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.close();
+    }
+
+    /** Verifies that we don't have leakage between different keys. */
+    @Test
+    void testProcessingTimeTimerWithState() throws Exception {
+
+        AsyncKeyedCoProcessOperator<String, Integer, String, String> operator =
+                new AsyncKeyedCoProcessOperator<>(
+                        new ProcessingTimeTriggeringStatefulProcessFunction());
+
+        AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String, 
String> testHarness =
+                AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+                        operator,
+                        new IntToStringKeySelector<>(),
+                        new IdentityKeySelector<>(),
+                        BasicTypeInfo.STRING_TYPE_INFO);
+
+        testHarness.setup();
+        testHarness.open();
+
+        testHarness.setProcessingTime(1);
+        testHarness.processElement1(new StreamRecord<>(17)); // should set 
timer for 6
+        testHarness.processElement1(new StreamRecord<>(13)); // should set 
timer for 6
+
+        testHarness.setProcessingTime(2);
+        testHarness.processElement1(new StreamRecord<>(13)); // should delete 
timer again
+        testHarness.processElement2(new StreamRecord<>("42")); // should set 
timer for 7
+
+        testHarness.setProcessingTime(6);
+        testHarness.setProcessingTime(7);
+
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+        expectedOutput.add(new StreamRecord<>("INPUT1:17"));
+        expectedOutput.add(new StreamRecord<>("INPUT1:13"));
+        expectedOutput.add(new StreamRecord<>("INPUT2:42"));
+        expectedOutput.add(new StreamRecord<>("STATE:17"));
+        expectedOutput.add(new StreamRecord<>("STATE:42"));
+
+        TestHarnessUtil.assertOutputEquals(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.close();
+    }
+
+    @Test
+    void testSnapshotAndRestore() throws Exception {
+
+        AsyncKeyedCoProcessOperator<String, Integer, String, String> operator =
+                new AsyncKeyedCoProcessOperator<>(new 
BothTriggeringProcessFunction());
+
+        AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String, 
String> testHarness =
+                AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+                        operator,
+                        new IntToStringKeySelector<>(),
+                        new IdentityKeySelector<>(),
+                        BasicTypeInfo.STRING_TYPE_INFO);
+
+        testHarness.setup();
+        testHarness.open();
+
+        testHarness.processElement1(new StreamRecord<>(5, 12L));
+        testHarness.processElement2(new StreamRecord<>("5", 12L));
+
+        // snapshot and restore from scratch
+        OperatorSubtaskState snapshot = testHarness.snapshot(0, 0);
+
+        testHarness.close();
+
+        operator = new AsyncKeyedCoProcessOperator<>(new 
BothTriggeringProcessFunction());
+
+        testHarness =
+                AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+                        operator,
+                        new IntToStringKeySelector<>(),
+                        new IdentityKeySelector<>(),
+                        BasicTypeInfo.STRING_TYPE_INFO);
+
+        testHarness.setup();
+        testHarness.initializeState(snapshot);
+        testHarness.open();
+
+        testHarness.setProcessingTime(5);
+        testHarness.processWatermark1(new Watermark(6));
+        testHarness.processWatermark2(new Watermark(6));
+
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+        expectedOutput.add(new StreamRecord<>("PROC:1777"));
+        expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
+        expectedOutput.add(new Watermark(6));
+
+        TestHarnessUtil.assertOutputEquals(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.close();
+    }
+
+    @Test
+    void testGetCurrentKeyFromContext() throws Exception {
+        AsyncKeyedCoProcessOperator<String, Integer, String, String> operator =
+                new AsyncKeyedCoProcessOperator<>(new 
AppendCurrentKeyProcessFunction());
+
+        AsyncKeyedTwoInputStreamOperatorTestHarness<String, Integer, String, 
String> testHarness =
+                AsyncKeyedTwoInputStreamOperatorTestHarness.create(
+                        operator,
+                        new IntToStringKeySelector<>(),
+                        new IdentityKeySelector<>(),
+                        BasicTypeInfo.STRING_TYPE_INFO);
+
+        testHarness.setup();
+        testHarness.open();
+
+        testHarness.processElement1(new StreamRecord<>(5));
+        testHarness.processElement1(new StreamRecord<>(6));
+        testHarness.processElement2(new StreamRecord<>("hello"));
+        testHarness.processElement2(new StreamRecord<>("world"));
+
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+        expectedOutput.add(new StreamRecord<>("5,5"));
+        expectedOutput.add(new StreamRecord<>("6,6"));
+        expectedOutput.add(new StreamRecord<>("hello,hello"));
+        expectedOutput.add(new StreamRecord<>("world,world"));
+
+        TestHarnessUtil.assertOutputEquals(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.close();
+    }
+
+    private static class IntToStringKeySelector<T> implements 
KeySelector<Integer, String> {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public String getKey(Integer value) throws Exception {
+            return "" + value;
+        }
+    }
+
+    private static class IdentityKeySelector<T> implements KeySelector<T, T> {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public T getKey(T value) throws Exception {
+            return value;
+        }
+    }
+
+    private static class TestChainDeclarationFunction
+            extends DeclaringAsyncKeyedCoProcessFunction<String, Integer, 
String, String> {
+
+        final AtomicInteger value = new AtomicInteger(0);
+
+        @Override
+        public ThrowingConsumer<Integer, Exception> declareProcess1(
+                DeclarationContext context,
+                KeyedCoProcessFunction<String, Integer, String, 
String>.Context ctx,
+                Collector<String> out)
+                throws DeclarationException {
+            ContextVariable<Integer> inputValue = 
context.declareVariable(null);
+            return context.<Integer>declareChain()
+                    .thenCompose(
+                            e -> {
+                                if (inputValue.get() == null) {
+                                    inputValue.set(e);
+                                }
+                                value.addAndGet(e);
+                                return StateFutureUtils.completedVoidFuture();
+                            })
+                    .thenCompose(v -> 
StateFutureUtils.completedFuture(value.incrementAndGet()))
+                    .withName("adder")
+                    .thenAccept(
+                            (v) -> {
+                                value.addAndGet(inputValue.get());
+                                out.collect(String.valueOf(value.get()));
+                            })
+                    .withName("doubler")
+                    .finish();
+        }
+
+        @Override
+        public ThrowingConsumer<String, Exception> declareProcess2(
+                DeclarationContext context,
+                KeyedCoProcessFunction<String, Integer, String, 
String>.Context ctx,
+                Collector<String> out)
+                throws DeclarationException {
+            return context.<String>declareChain()
+                    .thenAccept(
+                            v -> {
+                                out.collect(v);
+                                value.addAndGet(Integer.valueOf(v));
+                            })
+                    .withName("pass")
+                    .finish();
+        }
+    }
+
+    private static class WatermarkQueryingProcessFunction
+            extends KeyedCoProcessFunction<String, Integer, String, String> {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public void processElement1(Integer value, Context ctx, 
Collector<String> out)
+                throws Exception {
+            out.collect(
+                    value
+                            + "WM:"
+                            + ctx.timerService().currentWatermark()
+                            + " TS:"
+                            + ctx.timestamp());
+        }
+
+        @Override
+        public void processElement2(String value, Context ctx, 
Collector<String> out)
+                throws Exception {
+            out.collect(
+                    value
+                            + "WM:"
+                            + ctx.timerService().currentWatermark()
+                            + " TS:"
+                            + ctx.timestamp());
+        }
+
+        @Override
+        public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<String> out)
+                throws Exception {}
+    }
+
+    private static class EventTimeTriggeringProcessFunction
+            extends KeyedCoProcessFunction<String, Integer, String, String> {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public void processElement1(Integer value, Context ctx, 
Collector<String> out)
+                throws Exception {
+            out.collect("INPUT1:" + value);
+            ctx.timerService().registerEventTimeTimer(5);
+        }
+
+        @Override
+        public void processElement2(String value, Context ctx, 
Collector<String> out)
+                throws Exception {
+            out.collect("INPUT2:" + value);
+            ctx.timerService().registerEventTimeTimer(6);
+        }
+
+        @Override
+        public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<String> out)
+                throws Exception {
+            assertThat(ctx.timeDomain()).isEqualTo(TimeDomain.EVENT_TIME);
+            out.collect(ctx.getCurrentKey() + ":" + 1777);
+        }
+    }
+
+    private static class EventTimeTriggeringStatefulProcessFunction
+            extends KeyedCoProcessFunction<String, Integer, String, String> {
+
+        private static final long serialVersionUID = 1L;
+
+        private final ValueStateDescriptor<String> state =
+                new ValueStateDescriptor<>("seen-element", 
StringSerializer.INSTANCE);
+
+        @Override
+        public void processElement1(Integer value, Context ctx, 
Collector<String> out)
+                throws Exception {
+            handleValue(value, out, ctx.timerService(), 1);
+        }
+
+        @Override
+        public void processElement2(String value, Context ctx, 
Collector<String> out)
+                throws Exception {
+            handleValue(value, out, ctx.timerService(), 2);
+        }
+
+        private void handleValue(
+                Object value, Collector<String> out, TimerService 
timerService, int channel)
+                throws IOException {
+            final ValueState<String> state = 
getRuntimeContext().getState(this.state);
+            state.asyncValue()
+                    .thenAccept(
+                            v -> {
+                                if (v == null) {
+                                    state.asyncUpdate(String.valueOf(value))
+                                            .thenAccept(
+                                                    VO ->
+                                                            out.collect(
+                                                                    "INPUT" + 
channel + ":"
+                                                                            + 
value));
+                                    timerService.registerEventTimeTimer(
+                                            timerService.currentWatermark() + 
5);
+                                } else {
+                                    state.asyncClear();
+                                    timerService.deleteEventTimeTimer(
+                                            timerService.currentWatermark() + 
4);
+                                }
+                            });
+        }
+
+        @Override
+        public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<String> out)
+                throws Exception {
+            assertThat(ctx.timeDomain()).isEqualTo(TimeDomain.EVENT_TIME);
+            getRuntimeContext()
+                    .getState(state)
+                    .asyncValue()
+                    .thenAccept(
+                            v -> {
+                                out.collect("STATE:" + v);
+                            });
+        }
+    }
+
+    private static class ProcessingTimeTriggeringProcessFunction
+            extends KeyedCoProcessFunction<String, Integer, String, String> {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public void processElement1(Integer value, Context ctx, 
Collector<String> out)
+                throws Exception {
+            out.collect("INPUT1:" + value);
+            ctx.timerService().registerProcessingTimeTimer(5);
+        }
+
+        @Override
+        public void processElement2(String value, Context ctx, 
Collector<String> out)
+                throws Exception {
+            out.collect("INPUT2:" + value);
+            ctx.timerService().registerProcessingTimeTimer(6);
+        }
+
+        @Override
+        public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<String> out)
+                throws Exception {
+            assertThat(ctx.timeDomain()).isEqualTo(TimeDomain.PROCESSING_TIME);
+            out.collect("" + 1777);
+        }
+    }
+
+    private static class ProcessingTimeQueryingProcessFunction
+            extends KeyedCoProcessFunction<String, Integer, String, String> {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public void processElement1(Integer value, Context ctx, 
Collector<String> out)
+                throws Exception {
+            out.collect(
+                    value
+                            + "PT:"
+                            + ctx.timerService().currentProcessingTime()
+                            + " TS:"
+                            + ctx.timestamp());
+        }
+
+        @Override
+        public void processElement2(String value, Context ctx, 
Collector<String> out)
+                throws Exception {
+            out.collect(
+                    value
+                            + "PT:"
+                            + ctx.timerService().currentProcessingTime()
+                            + " TS:"
+                            + ctx.timestamp());
+        }
+
+        @Override
+        public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<String> out)
+                throws Exception {}
+    }
+
+    private static class ProcessingTimeTriggeringStatefulProcessFunction
+            extends KeyedCoProcessFunction<String, Integer, String, String> {
+
+        private static final long serialVersionUID = 1L;
+
+        private final ValueStateDescriptor<String> state =
+                new ValueStateDescriptor<>("seen-element", 
StringSerializer.INSTANCE);
+
+        @Override
+        public void processElement1(Integer value, Context ctx, 
Collector<String> out)
+                throws Exception {
+            handleValue(value, out, ctx.timerService(), 1);
+        }
+
+        @Override
+        public void processElement2(String value, Context ctx, 
Collector<String> out)
+                throws Exception {
+            handleValue(value, out, ctx.timerService(), 2);
+        }
+
+        private void handleValue(
+                Object value, Collector<String> out, TimerService 
timerService, int channel)
+                throws IOException {
+            final ValueState<String> state = 
getRuntimeContext().getState(this.state);
+            state.asyncValue()
+                    .thenAccept(
+                            v -> {
+                                if (v == null) {
+                                    state.asyncUpdate(String.valueOf(value))
+                                            .thenAccept(
+                                                    VO ->
+                                                            out.collect(
+                                                                    "INPUT" + 
channel + ":"
+                                                                            + 
value));
+                                    timerService.registerProcessingTimeTimer(
+                                            
timerService.currentProcessingTime() + 5);
+                                } else {
+                                    state.asyncClear();
+                                    timerService.deleteProcessingTimeTimer(
+                                            
timerService.currentProcessingTime() + 4);
+                                }
+                            });
+        }
+
+        @Override
+        public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<String> out)
+                throws Exception {
+            assertThat(ctx.timeDomain()).isEqualTo(TimeDomain.PROCESSING_TIME);
+            out.collect("STATE:" + 
getRuntimeContext().getState(state).value());
+        }
+    }
+
+    private static class BothTriggeringProcessFunction
+            extends KeyedCoProcessFunction<String, Integer, String, String> {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public void processElement1(Integer value, Context ctx, 
Collector<String> out)
+                throws Exception {
+            ctx.timerService().registerProcessingTimeTimer(3);
+            ctx.timerService().registerEventTimeTimer(6);
+            ctx.timerService().deleteProcessingTimeTimer(3);
+        }
+
+        @Override
+        public void processElement2(String value, Context ctx, 
Collector<String> out)
+                throws Exception {
+            ctx.timerService().registerEventTimeTimer(4);
+            ctx.timerService().registerProcessingTimeTimer(5);
+            ctx.timerService().deleteEventTimeTimer(4);
+        }
+
+        @Override
+        public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<String> out)
+                throws Exception {
+            if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
+                out.collect("EVENT:1777");
+            } else {
+                out.collect("PROC:1777");
+            }
+        }
+    }
+
+    private static class AppendCurrentKeyProcessFunction
+            extends KeyedCoProcessFunction<String, Integer, String, String> {
+
+        @Override
+        public void processElement1(Integer value, Context ctx, 
Collector<String> out)
+                throws Exception {
+            out.collect(value + "," + ctx.getCurrentKey());
+        }
+
+        @Override
+        public void processElement2(String value, Context ctx, 
Collector<String> out)
+                throws Exception {
+            out.collect(value + "," + ctx.getCurrentKey());
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
index 3d89e414ae7..2ac0af781e0 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
@@ -367,7 +367,7 @@ public class StreamExecIntervalJoin extends 
ExecNodeBase<RowData>
                         leftTypeInfo,
                         rightTypeInfo,
                         joinFunction);
-
+        // TODO: add async version procJoinFunc to use 
AsyncKeyedCoProcessOperator
         return ExecNodeUtil.createTwoInputTransformation(
                 leftInputTransform,
                 rightInputTransform,
@@ -404,7 +404,7 @@ public class StreamExecIntervalJoin extends 
ExecNodeBase<RowData>
                         joinFunction,
                         windowBounds.getLeftTimeIdx(),
                         windowBounds.getRightTimeIdx());
-
+        // TODO: add async version rowJoinFunc to use 
AsyncKeyedCoProcessOperator
         return ExecNodeUtil.createTwoInputTransformation(
                 leftInputTransform,
                 rightInputTransform,

Reply via email to