This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b01b521153 [FLINK-38364][streaming-java] Implement async state 
version of ProcessingTimeoutTrigger (#27027)
1b01b521153 is described below

commit 1b01b521153a7dd98daef9613bf120e6c71b7de4
Author: xia rui <[email protected]>
AuthorDate: Mon Oct 13 19:16:15 2025 +0800

    [FLINK-38364][streaming-java] Implement async state version of 
ProcessingTimeoutTrigger (#27027)
---
 .../windowing/triggers/AsyncCountTrigger.java      |   2 +-
 .../operators/windowing/AsyncTriggerConverter.java | 202 ++++++++++++++
 .../operators/windowing/WindowOperatorBuilder.java | 141 ----------
 .../windowing/AsyncTriggerTestHarness.java         | 308 +++++++++++++++++++++
 .../operators/windowing/TriggerTestHarness.java    |   8 +-
 .../triggers/AsyncProcessingTimeoutTrigger.java    | 237 ++++++++++++++++
 .../triggers/ProcessingTimeoutTrigger.java         |  16 +-
 .../windowing/AsyncTriggerConverterTest.java       | 115 ++++++++
 .../windowing/ProcessingTimeoutTriggerTest.java    | 100 +++++--
 9 files changed, 954 insertions(+), 175 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/windowing/triggers/AsyncCountTrigger.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/windowing/triggers/AsyncCountTrigger.java
index 20105062075..0139af98344 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/windowing/triggers/AsyncCountTrigger.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/windowing/triggers/AsyncCountTrigger.java
@@ -97,7 +97,7 @@ public class AsyncCountTrigger<W extends Window> extends 
AsyncTrigger<Object, W>
 
     @Override
     public String toString() {
-        return "CountTrigger(" + maxCount + ")";
+        return "AsyncCountTrigger(" + maxCount + ")";
     }
 
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverter.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverter.java
new file mode 100644
index 00000000000..7e0e8955405
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverter.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.core.state.StateFutureUtils;
+import org.apache.flink.metrics.MetricGroup;
+import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncCountTrigger;
+import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncEventTimeTrigger;
+import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncProcessingTimeTrigger;
+import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncPurgingTrigger;
+import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A converter from {@code Trigger} to {@code AsyncTrigger}.
+ *
+ * <p>Basic triggers (e.g., {@code CountTrigger}) are directly converted to 
their async version.
+ *
+ * <p>Async-support triggers which implement {@code AsyncTriggerConvertable} 
(e.g., {@code
+ * ProcessingTimeoutTrigger}) will use self-defined async version.
+ *
+ * <p>Other triggers are wrapped as an {@code AsyncTrigger}, whose internal 
functions are executed
+ * in sync mode.
+ */
+@Internal
+public interface AsyncTriggerConverter {
+
+    /**
+     * Convert to an {@code AsyncTrigger}. The default implementation is only 
a wrapper of the
+     * trigger, whose behaviours are all sync.
+     *
+     * <p>TODO: Return {@code AsyncTrigger} if {@code AsyncTrigger} becomes 
@PublicEvolving.
+     *
+     * @return The {@code AsyncTrigger} for async state processing.
+     */
+    @Nonnull
+    default Object convertToAsync() {
+        return UserDefinedAsyncTrigger.of((Trigger<?, ?>) 
AsyncTriggerConverter.this);
+    }
+
+    @SuppressWarnings("unchecked")
+    static <T, W extends Window> AsyncTrigger<T, W> convertToAsync(Trigger<T, 
W> trigger) {
+        if (trigger instanceof CountTrigger) {
+            return (AsyncTrigger<T, W>)
+                    AsyncCountTrigger.of(((CountTrigger<?>) 
trigger).getMaxCount());
+        } else if (trigger instanceof EventTimeTrigger) {
+            return (AsyncTrigger<T, W>) AsyncEventTimeTrigger.create();
+        } else if (trigger instanceof ProcessingTimeTrigger) {
+            return (AsyncTrigger<T, W>) AsyncProcessingTimeTrigger.create();
+        } else if (trigger instanceof PurgingTrigger) {
+            return (AsyncTrigger<T, W>)
+                    AsyncPurgingTrigger.of(
+                            convertToAsync(((PurgingTrigger<?, ?>) 
trigger).getNestedTrigger()));
+        } else if (trigger instanceof AsyncTriggerConverter) {
+            return (AsyncTrigger<T, W>) ((AsyncTriggerConverter) 
trigger).convertToAsync();
+        } else {
+            return UserDefinedAsyncTrigger.of(trigger);
+        }
+    }
+
+    /** Convert non-support user-defined trigger to {@code AsyncTrigger}. */
+    class UserDefinedAsyncTrigger<T, W extends Window> extends AsyncTrigger<T, 
W> {
+        private final Trigger<T, W> userDefinedTrigger;
+
+        private UserDefinedAsyncTrigger(Trigger<T, W> userDefinedTrigger) {
+            this.userDefinedTrigger = userDefinedTrigger;
+        }
+
+        @Override
+        public StateFuture<TriggerResult> onElement(
+                T element, long timestamp, W window, TriggerContext ctx) 
throws Exception {
+            return StateFutureUtils.completedFuture(
+                    userDefinedTrigger.onElement(
+                            element, timestamp, window, 
AsyncTriggerContextConvertor.of(ctx)));
+        }
+
+        @Override
+        public StateFuture<TriggerResult> onProcessingTime(long time, W 
window, TriggerContext ctx)
+                throws Exception {
+            return StateFutureUtils.completedFuture(
+                    userDefinedTrigger.onProcessingTime(
+                            time, window, 
AsyncTriggerContextConvertor.of(ctx)));
+        }
+
+        @Override
+        public StateFuture<TriggerResult> onEventTime(long time, W window, 
TriggerContext ctx)
+                throws Exception {
+            return StateFutureUtils.completedFuture(
+                    userDefinedTrigger.onEventTime(
+                            time, window, 
AsyncTriggerContextConvertor.of(ctx)));
+        }
+
+        @Override
+        public StateFuture<Void> clear(W window, TriggerContext ctx) throws 
Exception {
+            userDefinedTrigger.clear(window, 
AsyncTriggerContextConvertor.of(ctx));
+            return StateFutureUtils.completedVoidFuture();
+        }
+
+        @Override
+        public boolean isEndOfStreamTrigger() {
+            return userDefinedTrigger instanceof 
GlobalWindows.EndOfStreamTrigger;
+        }
+
+        public static <T, W extends Window> AsyncTrigger<T, W> of(
+                Trigger<T, W> userDefinedTrigger) {
+            return new UserDefinedAsyncTrigger<>(userDefinedTrigger);
+        }
+
+        /**
+         * A converter from {@link AsyncTrigger.TriggerContext} to {@link 
Trigger.TriggerContext}.
+         */
+        private static class AsyncTriggerContextConvertor implements 
Trigger.TriggerContext {
+
+            private final AsyncTrigger.TriggerContext asyncTriggerContext;
+
+            private AsyncTriggerContextConvertor(AsyncTrigger.TriggerContext 
asyncTriggerContext) {
+                this.asyncTriggerContext = asyncTriggerContext;
+            }
+
+            @Override
+            public long getCurrentProcessingTime() {
+                return asyncTriggerContext.getCurrentProcessingTime();
+            }
+
+            @Override
+            public MetricGroup getMetricGroup() {
+                return asyncTriggerContext.getMetricGroup();
+            }
+
+            @Override
+            public long getCurrentWatermark() {
+                return asyncTriggerContext.getCurrentWatermark();
+            }
+
+            @Override
+            public void registerProcessingTimeTimer(long time) {
+                asyncTriggerContext.registerProcessingTimeTimer(time);
+            }
+
+            @Override
+            public void registerEventTimeTimer(long time) {
+                asyncTriggerContext.registerEventTimeTimer(time);
+            }
+
+            @Override
+            public void deleteProcessingTimeTimer(long time) {
+                asyncTriggerContext.deleteProcessingTimeTimer(time);
+            }
+
+            @Override
+            public void deleteEventTimeTimer(long time) {
+                asyncTriggerContext.deleteEventTimeTimer(time);
+            }
+
+            @Override
+            public <S extends State> S getPartitionedState(StateDescriptor<S, 
?> stateDescriptor) {
+                throw new UnsupportedOperationException(
+                        "Trigger is for state V1 APIs, window operator with 
async state enabled only accept state V2 APIs.");
+            }
+
+            public static Trigger.TriggerContext of(
+                    AsyncTrigger.TriggerContext asyncTriggerContext) {
+                return new AsyncTriggerContextConvertor(asyncTriggerContext);
+            }
+        }
+
+        @VisibleForTesting
+        public Trigger<T, W> getUserDefinedTrigger() {
+            return userDefinedTrigger;
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java
index ee1b64d07bd..cdfa7ad2172 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java
@@ -28,15 +28,11 @@ import 
org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.AppendingState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.v2.StateFuture;
 import org.apache.flink.api.common.state.v2.StateIterator;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.core.state.StateFutureUtils;
-import org.apache.flink.metrics.MetricGroup;
 import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncEvictingWindowOperator;
 import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncWindowOperator;
 import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalAggregateProcessAsyncWindowFunction;
@@ -45,10 +41,6 @@ import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.In
 import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalIterableProcessAsyncWindowFunction;
 import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalSingleValueAsyncWindowFunction;
 import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalSingleValueProcessAsyncWindowFunction;
-import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncCountTrigger;
-import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncEventTimeTrigger;
-import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncProcessingTimeTrigger;
-import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncPurgingTrigger;
 import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
 import 
org.apache.flink.streaming.api.functions.windowing.AggregateApplyWindowFunction;
 import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
@@ -56,17 +48,10 @@ import 
org.apache.flink.streaming.api.functions.windowing.ReduceApplyProcessWind
 import 
org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import 
org.apache.flink.streaming.api.windowing.assigners.GlobalWindows.EndOfStreamTrigger;
 import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import 
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext;
-import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
@@ -570,130 +555,4 @@ public class WindowOperatorBuilder<T, K, W extends 
Window> {
     public long getAllowedLateness() {
         return allowedLateness;
     }
-
-    private static class UserDefinedAsyncTrigger<T, W extends Window> extends 
AsyncTrigger<T, W> {
-        private final Trigger<T, W> userDefinedTrigger;
-
-        private UserDefinedAsyncTrigger(Trigger<T, W> userDefinedTrigger) {
-            this.userDefinedTrigger = userDefinedTrigger;
-        }
-
-        @Override
-        public StateFuture<TriggerResult> onElement(
-                T element, long timestamp, W window, TriggerContext ctx) 
throws Exception {
-            return StateFutureUtils.completedFuture(
-                    userDefinedTrigger.onElement(
-                            element, timestamp, window, 
AsyncTriggerContextConvertor.of(ctx)));
-        }
-
-        @Override
-        public StateFuture<TriggerResult> onProcessingTime(long time, W 
window, TriggerContext ctx)
-                throws Exception {
-            return StateFutureUtils.completedFuture(
-                    userDefinedTrigger.onProcessingTime(
-                            time, window, 
AsyncTriggerContextConvertor.of(ctx)));
-        }
-
-        @Override
-        public StateFuture<TriggerResult> onEventTime(long time, W window, 
TriggerContext ctx)
-                throws Exception {
-            return StateFutureUtils.completedFuture(
-                    userDefinedTrigger.onEventTime(
-                            time, window, 
AsyncTriggerContextConvertor.of(ctx)));
-        }
-
-        @Override
-        public StateFuture<Void> clear(W window, TriggerContext ctx) throws 
Exception {
-            userDefinedTrigger.clear(window, 
AsyncTriggerContextConvertor.of(ctx));
-            return StateFutureUtils.completedVoidFuture();
-        }
-
-        @Override
-        public boolean isEndOfStreamTrigger() {
-            return userDefinedTrigger instanceof EndOfStreamTrigger;
-        }
-
-        public static <T, W extends Window> AsyncTrigger<T, W> of(
-                Trigger<T, W> userDefinedTrigger) {
-            return new UserDefinedAsyncTrigger<>(userDefinedTrigger);
-        }
-    }
-
-    private static class AsyncTriggerConverter {
-
-        @SuppressWarnings("unchecked")
-        public static <T, W extends Window> AsyncTrigger<T, W> convertToAsync(
-                Trigger<T, W> trigger) {
-            if (trigger instanceof CountTrigger) {
-                return (AsyncTrigger<T, W>)
-                        AsyncCountTrigger.of(((CountTrigger<?>) 
trigger).getMaxCount());
-            } else if (trigger instanceof EventTimeTrigger) {
-                return (AsyncTrigger<T, W>) AsyncEventTimeTrigger.create();
-            } else if (trigger instanceof ProcessingTimeTrigger) {
-                return (AsyncTrigger<T, W>) 
AsyncProcessingTimeTrigger.create();
-            } else if (trigger instanceof PurgingTrigger) {
-                return (AsyncTrigger<T, W>)
-                        AsyncPurgingTrigger.of(
-                                convertToAsync(
-                                        ((PurgingTrigger<?, ?>) 
trigger).getNestedTrigger()));
-            } else {
-                return UserDefinedAsyncTrigger.of(trigger);
-            }
-        }
-    }
-
-    /** A converter from {@link AsyncTrigger.TriggerContext} to {@link 
Trigger.TriggerContext}. */
-    private static class AsyncTriggerContextConvertor implements 
TriggerContext {
-
-        private final AsyncTrigger.TriggerContext asyncTriggerContext;
-
-        private AsyncTriggerContextConvertor(AsyncTrigger.TriggerContext 
asyncTriggerContext) {
-            this.asyncTriggerContext = asyncTriggerContext;
-        }
-
-        @Override
-        public long getCurrentProcessingTime() {
-            return asyncTriggerContext.getCurrentProcessingTime();
-        }
-
-        @Override
-        public MetricGroup getMetricGroup() {
-            return asyncTriggerContext.getMetricGroup();
-        }
-
-        @Override
-        public long getCurrentWatermark() {
-            return asyncTriggerContext.getCurrentWatermark();
-        }
-
-        @Override
-        public void registerProcessingTimeTimer(long time) {
-            asyncTriggerContext.registerProcessingTimeTimer(time);
-        }
-
-        @Override
-        public void registerEventTimeTimer(long time) {
-            asyncTriggerContext.registerEventTimeTimer(time);
-        }
-
-        @Override
-        public void deleteProcessingTimeTimer(long time) {
-            asyncTriggerContext.deleteProcessingTimeTimer(time);
-        }
-
-        @Override
-        public void deleteEventTimeTimer(long time) {
-            asyncTriggerContext.deleteEventTimeTimer(time);
-        }
-
-        @Override
-        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> 
stateDescriptor) {
-            throw new UnsupportedOperationException(
-                    "Trigger is for state V1 APIs, window operator with async 
state enabled only accept state V2 APIs.");
-        }
-
-        public static TriggerContext of(AsyncTrigger.TriggerContext 
asyncTriggerContext) {
-            return new AsyncTriggerContextConvertor(asyncTriggerContext);
-        }
-    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerTestHarness.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerTestHarness.java
new file mode 100644
index 00000000000..dbeada9aa04
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerTestHarness.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateDescriptor;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.asyncprocessing.InternalAsyncFuture;
+import org.apache.flink.metrics.MetricGroup;
+import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
+import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
+import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TestInternalTimerService;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.stream.Collectors;
+
+/** Utility for testing {@link AsyncTrigger} behaviour. */
+public class AsyncTriggerTestHarness<T, W extends Window> extends 
TriggerTestHarness<T, W> {
+
+    private final AsyncTrigger<T, W> trigger;
+
+    // Async adaptor to realStateBackend.
+    private final AsyncKeyedStateBackend<Integer> asyncStateBackend;
+
+    /**
+     * Initialize test harness for async trigger.
+     *
+     * <p>The state backend is heap, which does not support async state 
operation. The tests use
+     * async state API, but all state operations execute in sync mode.
+     */
+    public AsyncTriggerTestHarness(AsyncTrigger<T, W> trigger, 
TypeSerializer<W> windowSerializer)
+            throws Exception {
+        super(null, windowSerializer);
+        this.trigger = trigger;
+
+        this.asyncStateBackend = new 
AsyncKeyedStateBackendAdaptor<>(stateBackend);
+    }
+
+    // 
------------------------------------------------------------------------------
+    // Override TriggerTestHarness API
+    // 
------------------------------------------------------------------------------
+
+    @Override
+    public TriggerResult processElement(StreamRecord<T> element, W window) 
throws Exception {
+        return completeStateFuture(asyncProcessElement(element, window));
+    }
+
+    @Override
+    public TriggerResult advanceProcessingTime(long time, W window) throws 
Exception {
+        return completeStateFuture(asyncAdvanceProcessingTime(time, window));
+    }
+
+    @Override
+    public TriggerResult advanceWatermark(long time, W window) throws 
Exception {
+        return completeStateFuture(asyncAdvanceWatermark(time, window));
+    }
+
+    @Override
+    public Collection<Tuple2<W, TriggerResult>> advanceProcessingTime(long 
time) throws Exception {
+        return asyncAdvanceProcessingTime(time).stream()
+                .map(f -> Tuple2.of(f.f0, completeStateFuture(f.f1)))
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public Collection<Tuple2<W, TriggerResult>> advanceWatermark(long time) 
throws Exception {
+        return asyncAdvanceWatermark(time).stream()
+                .map(f -> Tuple2.of(f.f0, completeStateFuture(f.f1)))
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public TriggerResult invokeOnEventTime(long timestamp, W window) throws 
Exception {
+        return completeStateFuture(asyncInvokeOnEventTime(timestamp, window));
+    }
+
+    @Override
+    public void mergeWindows(W targetWindow, Collection<W> mergedWindows) 
throws Exception {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void clearTriggerState(W window) throws Exception {
+        completeStateFuture(asyncClearTriggerState(window));
+    }
+
+    // 
------------------------------------------------------------------------------
+    // Using Async State API
+    // 
------------------------------------------------------------------------------
+
+    StateFuture<TriggerResult> asyncProcessElement(StreamRecord<T> element, W 
window)
+            throws Exception {
+        TestTriggerContext<Integer, W> triggerContext =
+                new TestTriggerContext<>(
+                        KEY, window, internalTimerService, asyncStateBackend, 
windowSerializer);
+        return trigger.onElement(
+                element.getValue(), element.getTimestamp(), window, 
triggerContext);
+    }
+
+    StateFuture<TriggerResult> asyncAdvanceProcessingTime(long time, W window) 
throws Exception {
+        Collection<Tuple2<W, StateFuture<TriggerResult>>> firings =
+                asyncAdvanceProcessingTime(time);
+
+        if (firings.size() != 1) {
+            throw new IllegalStateException(
+                    "Must have exactly one timer firing. Fired timers: " + 
firings);
+        }
+
+        Tuple2<W, StateFuture<TriggerResult>> firing = 
firings.iterator().next();
+
+        if (!firing.f0.equals(window)) {
+            throw new IllegalStateException("Trigger fired for another 
window.");
+        }
+
+        return firing.f1;
+    }
+
+    StateFuture<TriggerResult> asyncAdvanceWatermark(long time, W window) 
throws Exception {
+        Collection<Tuple2<W, StateFuture<TriggerResult>>> firings = 
asyncAdvanceWatermark(time);
+
+        if (firings.size() != 1) {
+            throw new IllegalStateException(
+                    "Must have exactly one timer firing. Fired timers: " + 
firings);
+        }
+
+        Tuple2<W, StateFuture<TriggerResult>> firing = 
firings.iterator().next();
+
+        if (!firing.f0.equals(window)) {
+            throw new IllegalStateException("Trigger fired for another 
window.");
+        }
+
+        return firing.f1;
+    }
+
+    Collection<Tuple2<W, StateFuture<TriggerResult>>> 
asyncAdvanceProcessingTime(long time)
+            throws Exception {
+        Collection<TestInternalTimerService.Timer<Integer, W>> firedTimers =
+                internalTimerService.advanceProcessingTime(time);
+
+        Collection<Tuple2<W, StateFuture<TriggerResult>>> result = new 
ArrayList<>();
+
+        for (TestInternalTimerService.Timer<Integer, W> timer : firedTimers) {
+            TestTriggerContext<Integer, W> triggerContext =
+                    new TestTriggerContext<>(
+                            KEY,
+                            timer.getNamespace(),
+                            internalTimerService,
+                            asyncStateBackend,
+                            windowSerializer);
+
+            StateFuture<TriggerResult> triggerResult =
+                    trigger.onProcessingTime(
+                            timer.getTimestamp(), timer.getNamespace(), 
triggerContext);
+
+            result.add(new Tuple2<>(timer.getNamespace(), triggerResult));
+        }
+
+        return result;
+    }
+
+    Collection<Tuple2<W, StateFuture<TriggerResult>>> 
asyncAdvanceWatermark(long time)
+            throws Exception {
+        Collection<TestInternalTimerService.Timer<Integer, W>> firedTimers =
+                internalTimerService.advanceWatermark(time);
+
+        Collection<Tuple2<W, StateFuture<TriggerResult>>> result = new 
ArrayList<>();
+
+        for (TestInternalTimerService.Timer<Integer, W> timer : firedTimers) {
+            StateFuture<TriggerResult> triggerResult = 
asyncInvokeOnEventTime(timer);
+            result.add(new Tuple2<>(timer.getNamespace(), triggerResult));
+        }
+
+        return result;
+    }
+
+    private StateFuture<TriggerResult> asyncInvokeOnEventTime(
+            TestInternalTimerService.Timer<Integer, W> timer) throws Exception 
{
+        TestTriggerContext<Integer, W> triggerContext =
+                new TestTriggerContext<>(
+                        KEY,
+                        timer.getNamespace(),
+                        internalTimerService,
+                        asyncStateBackend,
+                        windowSerializer);
+
+        return trigger.onEventTime(timer.getTimestamp(), timer.getNamespace(), 
triggerContext);
+    }
+
+    StateFuture<TriggerResult> asyncInvokeOnEventTime(long timestamp, W 
window) throws Exception {
+        TestInternalTimerService.Timer<Integer, W> timer =
+                new TestInternalTimerService.Timer<>(timestamp, KEY, window);
+
+        return asyncInvokeOnEventTime(timer);
+    }
+
+    StateFuture<Void> asyncClearTriggerState(W window) throws Exception {
+        TestTriggerContext<Integer, W> triggerContext =
+                new TestTriggerContext<>(
+                        KEY, window, internalTimerService, asyncStateBackend, 
windowSerializer);
+        return trigger.clear(window, triggerContext);
+    }
+
+    // 
------------------------------------------------------------------------------
+    // Context
+    // 
------------------------------------------------------------------------------
+
+    private static class TestTriggerContext<K, W extends Window>
+            implements AsyncTrigger.TriggerContext {
+
+        protected final InternalTimerService<W> timerService;
+        protected final AsyncKeyedStateBackend<Integer> stateBackend;
+        protected final K key;
+        protected final W window;
+        protected final TypeSerializer<W> windowSerializer;
+
+        TestTriggerContext(
+                K key,
+                W window,
+                InternalTimerService<W> timerService,
+                AsyncKeyedStateBackend<Integer> stateBackend,
+                TypeSerializer<W> windowSerializer) {
+            this.key = key;
+            this.window = window;
+            this.timerService = timerService;
+            this.stateBackend = stateBackend;
+            this.windowSerializer = windowSerializer;
+        }
+
+        @Override
+        public long getCurrentProcessingTime() {
+            return timerService.currentProcessingTime();
+        }
+
+        @Override
+        public MetricGroup getMetricGroup() {
+            return null;
+        }
+
+        @Override
+        public long getCurrentWatermark() {
+            return timerService.currentWatermark();
+        }
+
+        @Override
+        public void registerProcessingTimeTimer(long time) {
+            timerService.registerProcessingTimeTimer(window, time);
+        }
+
+        @Override
+        public void registerEventTimeTimer(long time) {
+            timerService.registerEventTimeTimer(window, time);
+        }
+
+        @Override
+        public void deleteProcessingTimeTimer(long time) {
+            timerService.deleteProcessingTimeTimer(window, time);
+        }
+
+        @Override
+        public void deleteEventTimeTimer(long time) {
+            timerService.deleteEventTimeTimer(window, time);
+        }
+
+        @Override
+        public <T, S extends State> S getPartitionedState(StateDescriptor<T> 
stateDescriptor) {
+            try {
+                // single key (KEY), no need declaration.
+                return stateBackend.getOrCreateKeyedState(
+                        window, windowSerializer, stateDescriptor);
+            } catch (Exception e) {
+                throw new RuntimeException("Could not retrieve state", e);
+            }
+        }
+    }
+
+    static <T> T completeStateFuture(StateFuture<T> future) {
+        InternalAsyncFuture<T> internalAsyncFuture = (InternalAsyncFuture<T>) 
future;
+
+        // The harness executes state operations in sync mode, any StateFuture 
should be completed.
+        Preconditions.checkArgument(internalAsyncFuture.isDone());
+        return internalAsyncFuture.get();
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
index 02454dd4d68..9642500bc48 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
@@ -54,13 +54,13 @@ import java.util.Collections;
 /** Utility for testing {@link Trigger} behaviour. */
 public class TriggerTestHarness<T, W extends Window> {
 
-    private static final Integer KEY = 1;
+    protected static final Integer KEY = 1;
 
     private final Trigger<T, W> trigger;
-    private final TypeSerializer<W> windowSerializer;
+    protected final TypeSerializer<W> windowSerializer;
 
-    private final HeapKeyedStateBackend<Integer> stateBackend;
-    private final TestInternalTimerService<Integer, W> internalTimerService;
+    protected final HeapKeyedStateBackend<Integer> stateBackend;
+    protected final TestInternalTimerService<Integer, W> internalTimerService;
 
     public TriggerTestHarness(Trigger<T, W> trigger, TypeSerializer<W> 
windowSerializer)
             throws Exception {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/AsyncProcessingTimeoutTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/AsyncProcessingTimeoutTrigger.java
new file mode 100644
index 00000000000..ea602cb172f
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/AsyncProcessingTimeoutTrigger.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.state.StateFutureUtils;
+import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+import java.time.Duration;
+import java.util.Objects;
+
+/**
+ * A {@link AsyncTrigger} that can turn any {@link AsyncTrigger} into a 
timeout {@code
+ * AsyncTrigger}.
+ *
+ * <p>On the first arriving element a configurable processing-time timeout 
will be set. Using {@link
+ * #of(AsyncTrigger, Duration, boolean, boolean)}, you can also re-new the 
timer for each arriving
+ * element by specifying {@code resetTimerOnNewRecord} and you can specify 
whether {@link
+ * AsyncTrigger#clear(Window, AsyncTrigger.TriggerContext)} should be called 
on timout via {@code
+ * shouldClearOnTimeout}.
+ *
+ * @param <T> The type of elements on which this trigger can operate.
+ * @param <W> The type of {@link Window} on which this trigger can operate.
+ */
+@Experimental
+public class AsyncProcessingTimeoutTrigger<T, W extends Window> extends 
AsyncTrigger<T, W> {
+    private static final long serialVersionUID = 1L;
+
+    private final AsyncTrigger<T, W> nestedTrigger;
+    private final long interval;
+    private final boolean resetTimerOnNewRecord;
+    private final boolean shouldClearOnTimeout;
+
+    private final ValueStateDescriptor<Long> timeoutStateDesc;
+
+    public AsyncProcessingTimeoutTrigger(
+            AsyncTrigger<T, W> nestedTrigger,
+            long interval,
+            boolean resetTimerOnNewRecord,
+            boolean shouldClearOnTimeout) {
+        this.nestedTrigger = nestedTrigger;
+        this.interval = interval;
+        this.resetTimerOnNewRecord = resetTimerOnNewRecord;
+        this.shouldClearOnTimeout = shouldClearOnTimeout;
+
+        this.timeoutStateDesc = new ValueStateDescriptor<>("timeout", 
LongSerializer.INSTANCE);
+    }
+
+    @Override
+    public StateFuture<TriggerResult> onElement(
+            T element, long timestamp, W window, TriggerContext ctx) throws 
Exception {
+        return this.nestedTrigger
+                .onElement(element, timestamp, window, ctx)
+                .thenConditionallyCompose(
+                        TriggerResult::isFire,
+                        triggerResult -> this.clear(window, 
ctx).thenApply(ignore -> triggerResult),
+                        triggerResult -> {
+                            ValueState<Long> timeoutState =
+                                    
ctx.getPartitionedState(this.timeoutStateDesc);
+                            long nextFireTimestamp = 
ctx.getCurrentProcessingTime() + this.interval;
+
+                            return timeoutState
+                                    .asyncValue()
+                                    .thenConditionallyCompose(
+                                            Objects::nonNull,
+                                            timeoutTimestamp -> {
+                                                if (resetTimerOnNewRecord) {
+                                                    
ctx.deleteProcessingTimeTimer(timeoutTimestamp);
+                                                    return timeoutState
+                                                            .asyncClear()
+                                                            .thenApply(ignore 
-> null);
+                                                } else {
+                                                    return 
StateFutureUtils.completedFuture(
+                                                            timeoutTimestamp);
+                                                }
+                                            })
+                                    .thenConditionallyCompose(
+                                            tuple -> tuple.f1 
/*timeoutTimestamp*/ == null,
+                                            ignore ->
+                                                    timeoutState
+                                                            
.asyncUpdate(nextFireTimestamp)
+                                                            .thenAccept(
+                                                                    ignore2 ->
+                                                                            ctx
+                                                                               
     .registerProcessingTimeTimer(
+                                                                               
             nextFireTimestamp)))
+                                    .thenApply(ignore -> triggerResult);
+                        })
+                .thenApply(tuple -> (TriggerResult) tuple.f1);
+    }
+
+    @Override
+    public StateFuture<TriggerResult> onProcessingTime(long time, W window, 
TriggerContext ctx)
+            throws Exception {
+        return this.nestedTrigger
+                .onProcessingTime(time, window, ctx)
+                .thenCompose(
+                        triggerResult -> {
+                            TriggerResult finalResult =
+                                    triggerResult.isPurge()
+                                            ? TriggerResult.FIRE_AND_PURGE
+                                            : TriggerResult.FIRE;
+                            return shouldClearOnTimeout
+                                    ? this.clear(window, ctx).thenApply(ignore 
-> finalResult)
+                                    : 
StateFutureUtils.completedFuture(finalResult);
+                        });
+    }
+
+    @Override
+    public StateFuture<TriggerResult> onEventTime(long time, W window, 
TriggerContext ctx)
+            throws Exception {
+        return this.nestedTrigger
+                .onEventTime(time, window, ctx)
+                .thenCompose(
+                        triggerResult -> {
+                            TriggerResult finalResult =
+                                    triggerResult.isPurge()
+                                            ? TriggerResult.FIRE_AND_PURGE
+                                            : TriggerResult.FIRE;
+                            return shouldClearOnTimeout
+                                    ? this.clear(window, ctx).thenApply(ignore 
-> finalResult)
+                                    : 
StateFutureUtils.completedFuture(finalResult);
+                        });
+    }
+
+    @Override
+    public StateFuture<Void> clear(W window, TriggerContext ctx) throws 
Exception {
+        ValueState<Long> timeoutTimestampState = 
ctx.getPartitionedState(this.timeoutStateDesc);
+        return timeoutTimestampState
+                .asyncValue()
+                .thenConditionallyCompose(
+                        Objects::nonNull,
+                        timeoutTimestamp -> {
+                            ctx.deleteProcessingTimeTimer(timeoutTimestamp);
+                            return timeoutTimestampState.asyncClear();
+                        })
+                .thenCompose(ignore -> this.nestedTrigger.clear(window, ctx));
+    }
+
+    @Override
+    public String toString() {
+        return "AsyncTimeoutTrigger(" + this.nestedTrigger.toString() + ")";
+    }
+
+    /**
+     * Creates a new {@link AsyncProcessingTimeoutTrigger} that fires when the 
inner trigger is
+     * fired or when the timeout timer fires.
+     *
+     * <p>For example: {@code 
AsyncProcessingTimeoutTrigger.of(AsyncCountTrigger.of(3), 100)}, will
+     * create a AsyncCountTrigger with timeout of 100 millis. So, if the first 
record arrives at
+     * time {@code t}, and the second record arrives at time {@code t+50 }, 
the trigger will fire
+     * when the third record arrives or when the time is {code t+100} 
(timeout).
+     *
+     * @param nestedTrigger the nested {@link AsyncTrigger}
+     * @param timeout the timeout interval
+     * @return {@link AsyncProcessingTimeoutTrigger} with the above 
configuration.
+     */
+    public static <T, W extends Window> AsyncProcessingTimeoutTrigger<T, W> of(
+            AsyncTrigger<T, W> nestedTrigger, Duration timeout) {
+        return new AsyncProcessingTimeoutTrigger<>(nestedTrigger, 
timeout.toMillis(), false, true);
+    }
+
+    /**
+     * Creates a new {@link AsyncProcessingTimeoutTrigger} that fires when the 
inner trigger is
+     * fired or when the timeout timer fires.
+     *
+     * <p>For example: {@code 
AsyncProcessingTimeoutTrigger.of(CountTrigger.of(3), 100, false,
+     * true)}, will create a AsyncCountTrigger with timeout of 100 millis. So, 
if the first record
+     * arrives at time {@code t}, and the second record arrives at time {@code 
t+50 }, the trigger
+     * will fire when the third record arrives or when the time is {code 
t+100} (timeout).
+     *
+     * @param nestedTrigger the nested {@link AsyncTrigger}
+     * @param timeout the timeout interval
+     * @param resetTimerOnNewRecord each time a new element arrives, reset the 
timer and start a new
+     *     one
+     * @param shouldClearOnTimeout whether to call {@link 
AsyncTrigger#clear(Window,
+     *     AsyncTrigger.TriggerContext)} when the processing-time timer fires
+     * @param <T> The type of the element.
+     * @param <W> The type of {@link Window Windows} on which this trigger can 
operate.
+     * @return {@link AsyncProcessingTimeoutTrigger} with the above 
configuration.
+     */
+    public static <T, W extends Window> AsyncProcessingTimeoutTrigger<T, W> of(
+            AsyncTrigger<T, W> nestedTrigger,
+            Duration timeout,
+            boolean resetTimerOnNewRecord,
+            boolean shouldClearOnTimeout) {
+        return new AsyncProcessingTimeoutTrigger<>(
+                nestedTrigger, timeout.toMillis(), resetTimerOnNewRecord, 
shouldClearOnTimeout);
+    }
+
+    @VisibleForTesting
+    public AsyncTrigger<T, W> getNestedTrigger() {
+        return nestedTrigger;
+    }
+
+    @VisibleForTesting
+    public long getInterval() {
+        return interval;
+    }
+
+    @VisibleForTesting
+    public boolean isResetTimerOnNewRecord() {
+        return resetTimerOnNewRecord;
+    }
+
+    @VisibleForTesting
+    public boolean isShouldClearOnTimeout() {
+        return shouldClearOnTimeout;
+    }
+
+    @VisibleForTesting
+    public ValueStateDescriptor<Long> getTimeoutStateDesc() {
+        return timeoutStateDesc;
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
index 268edfbf14b..7d972e88643 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.streaming.api.windowing.windows.Window;
+import 
org.apache.flink.streaming.runtime.operators.windowing.AsyncTriggerConverter;
+
+import javax.annotation.Nonnull;
 
 import java.time.Duration;
 
@@ -39,7 +42,8 @@ import java.time.Duration;
  * @param <W> The type of {@link Window} on which this trigger can operate.
  */
 @PublicEvolving
-public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T, 
W> {
+public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T, 
W>
+        implements AsyncTriggerConverter {
 
     private static final long serialVersionUID = 1L;
 
@@ -169,4 +173,14 @@ public class ProcessingTimeoutTrigger<T, W extends Window> 
extends Trigger<T, W>
         return new ProcessingTimeoutTrigger<>(
                 nestedTrigger, timeout.toMillis(), resetTimerOnNewRecord, 
shouldClearOnTimeout);
     }
+
+    @Override
+    @Nonnull
+    public Object convertToAsync() {
+        return AsyncProcessingTimeoutTrigger.of(
+                AsyncTriggerConverter.convertToAsync(this.nestedTrigger),
+                Duration.ofMillis(interval),
+                resetTimerOnNewRecord,
+                shouldClearOnTimeout);
+    }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverterTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverterTest.java
new file mode 100644
index 00000000000..cf1fe75400b
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverterTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.state.v2.StateFuture;
+import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nonnull;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@code AsyncTriggerConverter}. */
+public class AsyncTriggerConverterTest {
+    private static class DummyTriggerWithoutAsyncConverter extends 
Trigger<Object, TimeWindow>
+            implements AsyncTriggerConverter {
+        @Override
+        public TriggerResult onElement(
+                Object element, long timestamp, TimeWindow window, 
TriggerContext ctx)
+                throws Exception {
+            return null;
+        }
+
+        @Override
+        public TriggerResult onProcessingTime(long time, TimeWindow window, 
TriggerContext ctx)
+                throws Exception {
+            return null;
+        }
+
+        @Override
+        public TriggerResult onEventTime(long time, TimeWindow window, 
TriggerContext ctx)
+                throws Exception {
+            return null;
+        }
+
+        @Override
+        public void clear(TimeWindow window, TriggerContext ctx) throws 
Exception {}
+    }
+
+    private static class DummyTriggerWithAsyncConverter extends 
DummyTriggerWithoutAsyncConverter {
+        @Override
+        @Nonnull
+        public Object convertToAsync() {
+            return new DummyAsyncTrigger();
+        }
+    }
+
+    private static class DummyAsyncTrigger extends AsyncTrigger<Object, 
TimeWindow> {
+        @Override
+        public StateFuture<TriggerResult> onElement(
+                Object element, long timestamp, TimeWindow window, 
TriggerContext ctx)
+                throws Exception {
+            return null;
+        }
+
+        @Override
+        public StateFuture<TriggerResult> onProcessingTime(
+                long time, TimeWindow window, TriggerContext ctx) throws 
Exception {
+            return null;
+        }
+
+        @Override
+        public StateFuture<TriggerResult> onEventTime(
+                long time, TimeWindow window, TriggerContext ctx) throws 
Exception {
+            return null;
+        }
+
+        @Override
+        public StateFuture<Void> clear(TimeWindow window, TriggerContext ctx) 
throws Exception {
+            return null;
+        }
+    }
+
+    @Test
+    void testTriggerUseDefaultConvert() {
+        Trigger<Object, TimeWindow> syncTrigger = new 
DummyTriggerWithoutAsyncConverter();
+        AsyncTrigger<Object, TimeWindow> asyncTrigger =
+                AsyncTriggerConverter.convertToAsync(syncTrigger);
+
+        
assertThat(asyncTrigger).isInstanceOf(AsyncTriggerConverter.UserDefinedAsyncTrigger.class);
+        AsyncTriggerConverter.UserDefinedAsyncTrigger<Object, TimeWindow> 
triggerWrapper =
+                (AsyncTriggerConverter.UserDefinedAsyncTrigger<Object, 
TimeWindow>) asyncTrigger;
+
+        
assertThat(triggerWrapper.getUserDefinedTrigger()).isSameAs(syncTrigger);
+    }
+
+    @Test
+    void testTriggerUseCustomizeConvert() {
+        Trigger<Object, TimeWindow> syncTrigger = new 
DummyTriggerWithAsyncConverter();
+        AsyncTrigger<Object, TimeWindow> asyncTrigger =
+                AsyncTriggerConverter.convertToAsync(syncTrigger);
+
+        assertThat(asyncTrigger).isInstanceOf(DummyAsyncTrigger.class);
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeoutTriggerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeoutTriggerTest.java
index 99a2593ab78..02c24018cc6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeoutTriggerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeoutTriggerTest.java
@@ -18,15 +18,21 @@
 
 package org.apache.flink.streaming.runtime.operators.windowing;
 
+import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncCountTrigger;
+import 
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.AsyncProcessingTimeoutTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import 
org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeoutTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
 
@@ -35,14 +41,17 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link ProcessingTimeoutTrigger}. */
 class ProcessingTimeoutTriggerTest {
-
-    @Test
-    void testWindowFireWithoutResetTimer() throws Exception {
+    @ParameterizedTest(name = "Enable async state = {0}")
+    @ValueSource(booleans = {false, true})
+    void testWindowFireWithoutResetTimer(boolean enableAsyncState) throws 
Exception {
+        Trigger<Object, TimeWindow> trigger =
+                ProcessingTimeoutTrigger.of(CountTrigger.of(3), 
Duration.ofMillis(50), false, true);
         TriggerTestHarness<Object, TimeWindow> testHarness =
-                new TriggerTestHarness<>(
-                        ProcessingTimeoutTrigger.of(
-                                CountTrigger.of(3), Duration.ofMillis(50), 
false, true),
-                        new TimeWindow.Serializer());
+                enableAsyncState
+                        ? new AsyncTriggerTestHarness<>(
+                                AsyncTriggerConverter.convertToAsync(trigger),
+                                new TimeWindow.Serializer())
+                        : new TriggerTestHarness<>(trigger, new 
TimeWindow.Serializer());
 
         assertThat(testHarness.processElement(new StreamRecord<>(1), new 
TimeWindow(0, 2)))
                 .isEqualTo(TriggerResult.CONTINUE);
@@ -76,13 +85,17 @@ class ProcessingTimeoutTriggerTest {
                 .isEqualTo(TriggerResult.FIRE);
     }
 
-    @Test
-    void testWindowFireWithResetTimer() throws Exception {
+    @ParameterizedTest(name = "Enable async state = {0}")
+    @ValueSource(booleans = {false, true})
+    void testWindowFireWithResetTimer(boolean enableAsyncState) throws 
Exception {
+        Trigger<Object, TimeWindow> trigger =
+                ProcessingTimeoutTrigger.of(CountTrigger.of(3), 
Duration.ofMillis(50), true, true);
         TriggerTestHarness<Object, TimeWindow> testHarness =
-                new TriggerTestHarness<>(
-                        ProcessingTimeoutTrigger.of(
-                                CountTrigger.of(3), Duration.ofMillis(50), 
true, true),
-                        new TimeWindow.Serializer());
+                enableAsyncState
+                        ? new AsyncTriggerTestHarness<>(
+                                AsyncTriggerConverter.convertToAsync(trigger),
+                                new TimeWindow.Serializer())
+                        : new TriggerTestHarness<>(trigger, new 
TimeWindow.Serializer());
 
         assertThrows(
                 "Must have exactly one timer firing. Fired timers: []",
@@ -125,13 +138,18 @@ class ProcessingTimeoutTriggerTest {
                 .isEqualTo(TriggerResult.FIRE);
     }
 
-    @Test
-    void testWindowFireWithoutClearOnTimeout() throws Exception {
+    @ParameterizedTest(name = "Enable async state = {0}")
+    @ValueSource(booleans = {false, true})
+    void testWindowFireWithoutClearOnTimeout(boolean enableAsyncState) throws 
Exception {
+        Trigger<Object, TimeWindow> trigger =
+                ProcessingTimeoutTrigger.of(
+                        CountTrigger.of(3), Duration.ofMillis(50), false, 
false);
         TriggerTestHarness<Object, TimeWindow> testHarness =
-                new TriggerTestHarness<>(
-                        ProcessingTimeoutTrigger.of(
-                                CountTrigger.of(3), Duration.ofMillis(50), 
false, false),
-                        new TimeWindow.Serializer());
+                enableAsyncState
+                        ? new AsyncTriggerTestHarness<>(
+                                AsyncTriggerConverter.convertToAsync(trigger),
+                                new TimeWindow.Serializer())
+                        : new TriggerTestHarness<>(trigger, new 
TimeWindow.Serializer());
 
         assertThat(testHarness.processElement(new StreamRecord<>(1), new 
TimeWindow(0, 2)))
                 .isEqualTo(TriggerResult.CONTINUE);
@@ -153,16 +171,21 @@ class ProcessingTimeoutTriggerTest {
         assertThat(testHarness.numEventTimeTimers()).isZero();
     }
 
-    @Test
-    void testWindowPurgingWhenInnerTriggerIsPurging() throws Exception {
+    @ParameterizedTest(name = "Enable async state = {0}")
+    @ValueSource(booleans = {false, true})
+    void testWindowPurgingWhenInnerTriggerIsPurging(boolean enableAsyncState) 
throws Exception {
+        Trigger<Object, TimeWindow> trigger =
+                ProcessingTimeoutTrigger.of(
+                        PurgingTrigger.of(ProcessingTimeTrigger.create()),
+                        Duration.ofMillis(50),
+                        false,
+                        false);
         TriggerTestHarness<Object, TimeWindow> testHarness =
-                new TriggerTestHarness<>(
-                        ProcessingTimeoutTrigger.of(
-                                
PurgingTrigger.of(ProcessingTimeTrigger.create()),
-                                Duration.ofMillis(50),
-                                false,
-                                false),
-                        new TimeWindow.Serializer());
+                enableAsyncState
+                        ? new AsyncTriggerTestHarness<>(
+                                AsyncTriggerConverter.convertToAsync(trigger),
+                                new TimeWindow.Serializer())
+                        : new TriggerTestHarness<>(trigger, new 
TimeWindow.Serializer());
 
         assertThat(testHarness.processElement(new StreamRecord<>(1), new 
TimeWindow(0, 2)))
                 .isEqualTo(TriggerResult.CONTINUE);
@@ -185,4 +208,25 @@ class ProcessingTimeoutTriggerTest {
         assertThat(testHarness.numProcessingTimeTimers()).isOne();
         assertThat(testHarness.numEventTimeTimers()).isZero();
     }
+
+    @Test
+    void testConvertToAsync() {
+        Trigger<Object, TimeWindow> syncTrigger =
+                ProcessingTimeoutTrigger.of(
+                        CountTrigger.of(2333), Duration.ofMillis(233), false, 
false);
+
+        AsyncTrigger<Object, TimeWindow> asyncTrigger =
+                AsyncTriggerConverter.convertToAsync(syncTrigger);
+        
assertThat(asyncTrigger).isInstanceOf(AsyncProcessingTimeoutTrigger.class);
+        AsyncProcessingTimeoutTrigger<Object, TimeWindow> 
asyncProcessingTimeoutTrigger =
+                (AsyncProcessingTimeoutTrigger<Object, TimeWindow>) 
asyncTrigger;
+        assertThat(asyncProcessingTimeoutTrigger.getInterval()).isEqualTo(233);
+        
assertThat(asyncProcessingTimeoutTrigger.isResetTimerOnNewRecord()).isFalse();
+        
assertThat(asyncProcessingTimeoutTrigger.isShouldClearOnTimeout()).isFalse();
+
+        AsyncTrigger<Object, TimeWindow> nestedTrigger =
+                asyncProcessingTimeoutTrigger.getNestedTrigger();
+        assertThat(nestedTrigger).isInstanceOf(AsyncCountTrigger.class);
+        
assertThat(nestedTrigger.toString()).isEqualTo("AsyncCountTrigger(2333)");
+    }
 }

Reply via email to