This is an automated email from the ASF dual-hosted git repository.
zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1b01b521153 [FLINK-38364][streaming-java] Implement async state
version of ProcessingTimeoutTrigger (#27027)
1b01b521153 is described below
commit 1b01b521153a7dd98daef9613bf120e6c71b7de4
Author: xia rui <[email protected]>
AuthorDate: Mon Oct 13 19:16:15 2025 +0800
[FLINK-38364][streaming-java] Implement async state version of
ProcessingTimeoutTrigger (#27027)
---
.../windowing/triggers/AsyncCountTrigger.java | 2 +-
.../operators/windowing/AsyncTriggerConverter.java | 202 ++++++++++++++
.../operators/windowing/WindowOperatorBuilder.java | 141 ----------
.../windowing/AsyncTriggerTestHarness.java | 308 +++++++++++++++++++++
.../operators/windowing/TriggerTestHarness.java | 8 +-
.../triggers/AsyncProcessingTimeoutTrigger.java | 237 ++++++++++++++++
.../triggers/ProcessingTimeoutTrigger.java | 16 +-
.../windowing/AsyncTriggerConverterTest.java | 115 ++++++++
.../windowing/ProcessingTimeoutTriggerTest.java | 100 +++++--
9 files changed, 954 insertions(+), 175 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/windowing/triggers/AsyncCountTrigger.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/windowing/triggers/AsyncCountTrigger.java
index 20105062075..0139af98344 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/windowing/triggers/AsyncCountTrigger.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/windowing/triggers/AsyncCountTrigger.java
@@ -97,7 +97,7 @@ public class AsyncCountTrigger<W extends Window> extends
AsyncTrigger<Object, W>
@Override
public String toString() {
- return "CountTrigger(" + maxCount + ")";
+ return "AsyncCountTrigger(" + maxCount + ")";
}
/**
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverter.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverter.java
new file mode 100644
index 00000000000..7e0e8955405
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverter.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.core.state.StateFutureUtils;
+import org.apache.flink.metrics.MetricGroup;
+import
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncCountTrigger;
+import
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncEventTimeTrigger;
+import
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncProcessingTimeTrigger;
+import
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncPurgingTrigger;
+import
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A converter from {@code Trigger} to {@code AsyncTrigger}.
+ *
+ * <p>Basic triggers (e.g., {@code CountTrigger}) are directly converted to
their async version.
+ *
+ * <p>Async-support triggers which implement {@code AsyncTriggerConvertable}
(e.g., {@code
+ * ProcessingTimeoutTrigger}) will use self-defined async version.
+ *
+ * <p>Other triggers are wrapped as an {@code AsyncTrigger}, whose internal
functions are executed
+ * in sync mode.
+ */
+@Internal
+public interface AsyncTriggerConverter {
+
+ /**
+ * Convert to an {@code AsyncTrigger}. The default implementation is only
a wrapper of the
+ * trigger, whose behaviours are all sync.
+ *
+ * <p>TODO: Return {@code AsyncTrigger} if {@code AsyncTrigger} becomes
@PublicEvolving.
+ *
+ * @return The {@code AsyncTrigger} for async state processing.
+ */
+ @Nonnull
+ default Object convertToAsync() {
+ return UserDefinedAsyncTrigger.of((Trigger<?, ?>)
AsyncTriggerConverter.this);
+ }
+
+ @SuppressWarnings("unchecked")
+ static <T, W extends Window> AsyncTrigger<T, W> convertToAsync(Trigger<T,
W> trigger) {
+ if (trigger instanceof CountTrigger) {
+ return (AsyncTrigger<T, W>)
+ AsyncCountTrigger.of(((CountTrigger<?>)
trigger).getMaxCount());
+ } else if (trigger instanceof EventTimeTrigger) {
+ return (AsyncTrigger<T, W>) AsyncEventTimeTrigger.create();
+ } else if (trigger instanceof ProcessingTimeTrigger) {
+ return (AsyncTrigger<T, W>) AsyncProcessingTimeTrigger.create();
+ } else if (trigger instanceof PurgingTrigger) {
+ return (AsyncTrigger<T, W>)
+ AsyncPurgingTrigger.of(
+ convertToAsync(((PurgingTrigger<?, ?>)
trigger).getNestedTrigger()));
+ } else if (trigger instanceof AsyncTriggerConverter) {
+ return (AsyncTrigger<T, W>) ((AsyncTriggerConverter)
trigger).convertToAsync();
+ } else {
+ return UserDefinedAsyncTrigger.of(trigger);
+ }
+ }
+
+ /** Convert non-support user-defined trigger to {@code AsyncTrigger}. */
+ class UserDefinedAsyncTrigger<T, W extends Window> extends AsyncTrigger<T,
W> {
+ private final Trigger<T, W> userDefinedTrigger;
+
+ private UserDefinedAsyncTrigger(Trigger<T, W> userDefinedTrigger) {
+ this.userDefinedTrigger = userDefinedTrigger;
+ }
+
+ @Override
+ public StateFuture<TriggerResult> onElement(
+ T element, long timestamp, W window, TriggerContext ctx)
throws Exception {
+ return StateFutureUtils.completedFuture(
+ userDefinedTrigger.onElement(
+ element, timestamp, window,
AsyncTriggerContextConvertor.of(ctx)));
+ }
+
+ @Override
+ public StateFuture<TriggerResult> onProcessingTime(long time, W
window, TriggerContext ctx)
+ throws Exception {
+ return StateFutureUtils.completedFuture(
+ userDefinedTrigger.onProcessingTime(
+ time, window,
AsyncTriggerContextConvertor.of(ctx)));
+ }
+
+ @Override
+ public StateFuture<TriggerResult> onEventTime(long time, W window,
TriggerContext ctx)
+ throws Exception {
+ return StateFutureUtils.completedFuture(
+ userDefinedTrigger.onEventTime(
+ time, window,
AsyncTriggerContextConvertor.of(ctx)));
+ }
+
+ @Override
+ public StateFuture<Void> clear(W window, TriggerContext ctx) throws
Exception {
+ userDefinedTrigger.clear(window,
AsyncTriggerContextConvertor.of(ctx));
+ return StateFutureUtils.completedVoidFuture();
+ }
+
+ @Override
+ public boolean isEndOfStreamTrigger() {
+ return userDefinedTrigger instanceof
GlobalWindows.EndOfStreamTrigger;
+ }
+
+ public static <T, W extends Window> AsyncTrigger<T, W> of(
+ Trigger<T, W> userDefinedTrigger) {
+ return new UserDefinedAsyncTrigger<>(userDefinedTrigger);
+ }
+
+ /**
+ * A converter from {@link AsyncTrigger.TriggerContext} to {@link
Trigger.TriggerContext}.
+ */
+ private static class AsyncTriggerContextConvertor implements
Trigger.TriggerContext {
+
+ private final AsyncTrigger.TriggerContext asyncTriggerContext;
+
+ private AsyncTriggerContextConvertor(AsyncTrigger.TriggerContext
asyncTriggerContext) {
+ this.asyncTriggerContext = asyncTriggerContext;
+ }
+
+ @Override
+ public long getCurrentProcessingTime() {
+ return asyncTriggerContext.getCurrentProcessingTime();
+ }
+
+ @Override
+ public MetricGroup getMetricGroup() {
+ return asyncTriggerContext.getMetricGroup();
+ }
+
+ @Override
+ public long getCurrentWatermark() {
+ return asyncTriggerContext.getCurrentWatermark();
+ }
+
+ @Override
+ public void registerProcessingTimeTimer(long time) {
+ asyncTriggerContext.registerProcessingTimeTimer(time);
+ }
+
+ @Override
+ public void registerEventTimeTimer(long time) {
+ asyncTriggerContext.registerEventTimeTimer(time);
+ }
+
+ @Override
+ public void deleteProcessingTimeTimer(long time) {
+ asyncTriggerContext.deleteProcessingTimeTimer(time);
+ }
+
+ @Override
+ public void deleteEventTimeTimer(long time) {
+ asyncTriggerContext.deleteEventTimeTimer(time);
+ }
+
+ @Override
+ public <S extends State> S getPartitionedState(StateDescriptor<S,
?> stateDescriptor) {
+ throw new UnsupportedOperationException(
+ "Trigger is for state V1 APIs, window operator with
async state enabled only accept state V2 APIs.");
+ }
+
+ public static Trigger.TriggerContext of(
+ AsyncTrigger.TriggerContext asyncTriggerContext) {
+ return new AsyncTriggerContextConvertor(asyncTriggerContext);
+ }
+ }
+
+ @VisibleForTesting
+ public Trigger<T, W> getUserDefinedTrigger() {
+ return userDefinedTrigger;
+ }
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java
index ee1b64d07bd..cdfa7ad2172 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java
@@ -28,15 +28,11 @@ import
org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.state.v2.StateIterator;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.core.state.StateFutureUtils;
-import org.apache.flink.metrics.MetricGroup;
import
org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncEvictingWindowOperator;
import
org.apache.flink.runtime.asyncprocessing.operators.windowing.AsyncWindowOperator;
import
org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalAggregateProcessAsyncWindowFunction;
@@ -45,10 +41,6 @@ import
org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.In
import
org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalIterableProcessAsyncWindowFunction;
import
org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalSingleValueAsyncWindowFunction;
import
org.apache.flink.runtime.asyncprocessing.operators.windowing.functions.InternalSingleValueProcessAsyncWindowFunction;
-import
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncCountTrigger;
-import
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncEventTimeTrigger;
-import
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncProcessingTimeTrigger;
-import
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncPurgingTrigger;
import
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
import
org.apache.flink.streaming.api.functions.windowing.AggregateApplyWindowFunction;
import
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
@@ -56,17 +48,10 @@ import
org.apache.flink.streaming.api.functions.windowing.ReduceApplyProcessWind
import
org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import
org.apache.flink.streaming.api.windowing.assigners.GlobalWindows.EndOfStreamTrigger;
import
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext;
-import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
import
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
@@ -570,130 +555,4 @@ public class WindowOperatorBuilder<T, K, W extends
Window> {
public long getAllowedLateness() {
return allowedLateness;
}
-
- private static class UserDefinedAsyncTrigger<T, W extends Window> extends
AsyncTrigger<T, W> {
- private final Trigger<T, W> userDefinedTrigger;
-
- private UserDefinedAsyncTrigger(Trigger<T, W> userDefinedTrigger) {
- this.userDefinedTrigger = userDefinedTrigger;
- }
-
- @Override
- public StateFuture<TriggerResult> onElement(
- T element, long timestamp, W window, TriggerContext ctx)
throws Exception {
- return StateFutureUtils.completedFuture(
- userDefinedTrigger.onElement(
- element, timestamp, window,
AsyncTriggerContextConvertor.of(ctx)));
- }
-
- @Override
- public StateFuture<TriggerResult> onProcessingTime(long time, W
window, TriggerContext ctx)
- throws Exception {
- return StateFutureUtils.completedFuture(
- userDefinedTrigger.onProcessingTime(
- time, window,
AsyncTriggerContextConvertor.of(ctx)));
- }
-
- @Override
- public StateFuture<TriggerResult> onEventTime(long time, W window,
TriggerContext ctx)
- throws Exception {
- return StateFutureUtils.completedFuture(
- userDefinedTrigger.onEventTime(
- time, window,
AsyncTriggerContextConvertor.of(ctx)));
- }
-
- @Override
- public StateFuture<Void> clear(W window, TriggerContext ctx) throws
Exception {
- userDefinedTrigger.clear(window,
AsyncTriggerContextConvertor.of(ctx));
- return StateFutureUtils.completedVoidFuture();
- }
-
- @Override
- public boolean isEndOfStreamTrigger() {
- return userDefinedTrigger instanceof EndOfStreamTrigger;
- }
-
- public static <T, W extends Window> AsyncTrigger<T, W> of(
- Trigger<T, W> userDefinedTrigger) {
- return new UserDefinedAsyncTrigger<>(userDefinedTrigger);
- }
- }
-
- private static class AsyncTriggerConverter {
-
- @SuppressWarnings("unchecked")
- public static <T, W extends Window> AsyncTrigger<T, W> convertToAsync(
- Trigger<T, W> trigger) {
- if (trigger instanceof CountTrigger) {
- return (AsyncTrigger<T, W>)
- AsyncCountTrigger.of(((CountTrigger<?>)
trigger).getMaxCount());
- } else if (trigger instanceof EventTimeTrigger) {
- return (AsyncTrigger<T, W>) AsyncEventTimeTrigger.create();
- } else if (trigger instanceof ProcessingTimeTrigger) {
- return (AsyncTrigger<T, W>)
AsyncProcessingTimeTrigger.create();
- } else if (trigger instanceof PurgingTrigger) {
- return (AsyncTrigger<T, W>)
- AsyncPurgingTrigger.of(
- convertToAsync(
- ((PurgingTrigger<?, ?>)
trigger).getNestedTrigger()));
- } else {
- return UserDefinedAsyncTrigger.of(trigger);
- }
- }
- }
-
- /** A converter from {@link AsyncTrigger.TriggerContext} to {@link
Trigger.TriggerContext}. */
- private static class AsyncTriggerContextConvertor implements
TriggerContext {
-
- private final AsyncTrigger.TriggerContext asyncTriggerContext;
-
- private AsyncTriggerContextConvertor(AsyncTrigger.TriggerContext
asyncTriggerContext) {
- this.asyncTriggerContext = asyncTriggerContext;
- }
-
- @Override
- public long getCurrentProcessingTime() {
- return asyncTriggerContext.getCurrentProcessingTime();
- }
-
- @Override
- public MetricGroup getMetricGroup() {
- return asyncTriggerContext.getMetricGroup();
- }
-
- @Override
- public long getCurrentWatermark() {
- return asyncTriggerContext.getCurrentWatermark();
- }
-
- @Override
- public void registerProcessingTimeTimer(long time) {
- asyncTriggerContext.registerProcessingTimeTimer(time);
- }
-
- @Override
- public void registerEventTimeTimer(long time) {
- asyncTriggerContext.registerEventTimeTimer(time);
- }
-
- @Override
- public void deleteProcessingTimeTimer(long time) {
- asyncTriggerContext.deleteProcessingTimeTimer(time);
- }
-
- @Override
- public void deleteEventTimeTimer(long time) {
- asyncTriggerContext.deleteEventTimeTimer(time);
- }
-
- @Override
- public <S extends State> S getPartitionedState(StateDescriptor<S, ?>
stateDescriptor) {
- throw new UnsupportedOperationException(
- "Trigger is for state V1 APIs, window operator with async
state enabled only accept state V2 APIs.");
- }
-
- public static TriggerContext of(AsyncTrigger.TriggerContext
asyncTriggerContext) {
- return new AsyncTriggerContextConvertor(asyncTriggerContext);
- }
- }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerTestHarness.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerTestHarness.java
new file mode 100644
index 00000000000..dbeada9aa04
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerTestHarness.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateDescriptor;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.asyncprocessing.InternalAsyncFuture;
+import org.apache.flink.metrics.MetricGroup;
+import
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
+import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
+import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TestInternalTimerService;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.stream.Collectors;
+
+/** Utility for testing {@link AsyncTrigger} behaviour. */
+public class AsyncTriggerTestHarness<T, W extends Window> extends
TriggerTestHarness<T, W> {
+
+ private final AsyncTrigger<T, W> trigger;
+
+ // Async adaptor to realStateBackend.
+ private final AsyncKeyedStateBackend<Integer> asyncStateBackend;
+
+ /**
+ * Initialize test harness for async trigger.
+ *
+ * <p>The state backend is heap, which does not support async state
operation. The tests use
+ * async state API, but all state operations execute in sync mode.
+ */
+ public AsyncTriggerTestHarness(AsyncTrigger<T, W> trigger,
TypeSerializer<W> windowSerializer)
+ throws Exception {
+ super(null, windowSerializer);
+ this.trigger = trigger;
+
+ this.asyncStateBackend = new
AsyncKeyedStateBackendAdaptor<>(stateBackend);
+ }
+
+ //
------------------------------------------------------------------------------
+ // Override TriggerTestHarness API
+ //
------------------------------------------------------------------------------
+
+ @Override
+ public TriggerResult processElement(StreamRecord<T> element, W window)
throws Exception {
+ return completeStateFuture(asyncProcessElement(element, window));
+ }
+
+ @Override
+ public TriggerResult advanceProcessingTime(long time, W window) throws
Exception {
+ return completeStateFuture(asyncAdvanceProcessingTime(time, window));
+ }
+
+ @Override
+ public TriggerResult advanceWatermark(long time, W window) throws
Exception {
+ return completeStateFuture(asyncAdvanceWatermark(time, window));
+ }
+
+ @Override
+ public Collection<Tuple2<W, TriggerResult>> advanceProcessingTime(long
time) throws Exception {
+ return asyncAdvanceProcessingTime(time).stream()
+ .map(f -> Tuple2.of(f.f0, completeStateFuture(f.f1)))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public Collection<Tuple2<W, TriggerResult>> advanceWatermark(long time)
throws Exception {
+ return asyncAdvanceWatermark(time).stream()
+ .map(f -> Tuple2.of(f.f0, completeStateFuture(f.f1)))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public TriggerResult invokeOnEventTime(long timestamp, W window) throws
Exception {
+ return completeStateFuture(asyncInvokeOnEventTime(timestamp, window));
+ }
+
+ @Override
+ public void mergeWindows(W targetWindow, Collection<W> mergedWindows)
throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clearTriggerState(W window) throws Exception {
+ completeStateFuture(asyncClearTriggerState(window));
+ }
+
+ //
------------------------------------------------------------------------------
+ // Using Async State API
+ //
------------------------------------------------------------------------------
+
+ StateFuture<TriggerResult> asyncProcessElement(StreamRecord<T> element, W
window)
+ throws Exception {
+ TestTriggerContext<Integer, W> triggerContext =
+ new TestTriggerContext<>(
+ KEY, window, internalTimerService, asyncStateBackend,
windowSerializer);
+ return trigger.onElement(
+ element.getValue(), element.getTimestamp(), window,
triggerContext);
+ }
+
+ StateFuture<TriggerResult> asyncAdvanceProcessingTime(long time, W window)
throws Exception {
+ Collection<Tuple2<W, StateFuture<TriggerResult>>> firings =
+ asyncAdvanceProcessingTime(time);
+
+ if (firings.size() != 1) {
+ throw new IllegalStateException(
+ "Must have exactly one timer firing. Fired timers: " +
firings);
+ }
+
+ Tuple2<W, StateFuture<TriggerResult>> firing =
firings.iterator().next();
+
+ if (!firing.f0.equals(window)) {
+ throw new IllegalStateException("Trigger fired for another
window.");
+ }
+
+ return firing.f1;
+ }
+
+ StateFuture<TriggerResult> asyncAdvanceWatermark(long time, W window)
throws Exception {
+ Collection<Tuple2<W, StateFuture<TriggerResult>>> firings =
asyncAdvanceWatermark(time);
+
+ if (firings.size() != 1) {
+ throw new IllegalStateException(
+ "Must have exactly one timer firing. Fired timers: " +
firings);
+ }
+
+ Tuple2<W, StateFuture<TriggerResult>> firing =
firings.iterator().next();
+
+ if (!firing.f0.equals(window)) {
+ throw new IllegalStateException("Trigger fired for another
window.");
+ }
+
+ return firing.f1;
+ }
+
+ Collection<Tuple2<W, StateFuture<TriggerResult>>>
asyncAdvanceProcessingTime(long time)
+ throws Exception {
+ Collection<TestInternalTimerService.Timer<Integer, W>> firedTimers =
+ internalTimerService.advanceProcessingTime(time);
+
+ Collection<Tuple2<W, StateFuture<TriggerResult>>> result = new
ArrayList<>();
+
+ for (TestInternalTimerService.Timer<Integer, W> timer : firedTimers) {
+ TestTriggerContext<Integer, W> triggerContext =
+ new TestTriggerContext<>(
+ KEY,
+ timer.getNamespace(),
+ internalTimerService,
+ asyncStateBackend,
+ windowSerializer);
+
+ StateFuture<TriggerResult> triggerResult =
+ trigger.onProcessingTime(
+ timer.getTimestamp(), timer.getNamespace(),
triggerContext);
+
+ result.add(new Tuple2<>(timer.getNamespace(), triggerResult));
+ }
+
+ return result;
+ }
+
+ Collection<Tuple2<W, StateFuture<TriggerResult>>>
asyncAdvanceWatermark(long time)
+ throws Exception {
+ Collection<TestInternalTimerService.Timer<Integer, W>> firedTimers =
+ internalTimerService.advanceWatermark(time);
+
+ Collection<Tuple2<W, StateFuture<TriggerResult>>> result = new
ArrayList<>();
+
+ for (TestInternalTimerService.Timer<Integer, W> timer : firedTimers) {
+ StateFuture<TriggerResult> triggerResult =
asyncInvokeOnEventTime(timer);
+ result.add(new Tuple2<>(timer.getNamespace(), triggerResult));
+ }
+
+ return result;
+ }
+
+ private StateFuture<TriggerResult> asyncInvokeOnEventTime(
+ TestInternalTimerService.Timer<Integer, W> timer) throws Exception
{
+ TestTriggerContext<Integer, W> triggerContext =
+ new TestTriggerContext<>(
+ KEY,
+ timer.getNamespace(),
+ internalTimerService,
+ asyncStateBackend,
+ windowSerializer);
+
+ return trigger.onEventTime(timer.getTimestamp(), timer.getNamespace(),
triggerContext);
+ }
+
+ StateFuture<TriggerResult> asyncInvokeOnEventTime(long timestamp, W
window) throws Exception {
+ TestInternalTimerService.Timer<Integer, W> timer =
+ new TestInternalTimerService.Timer<>(timestamp, KEY, window);
+
+ return asyncInvokeOnEventTime(timer);
+ }
+
+ StateFuture<Void> asyncClearTriggerState(W window) throws Exception {
+ TestTriggerContext<Integer, W> triggerContext =
+ new TestTriggerContext<>(
+ KEY, window, internalTimerService, asyncStateBackend,
windowSerializer);
+ return trigger.clear(window, triggerContext);
+ }
+
+ //
------------------------------------------------------------------------------
+ // Context
+ //
------------------------------------------------------------------------------
+
+ private static class TestTriggerContext<K, W extends Window>
+ implements AsyncTrigger.TriggerContext {
+
+ protected final InternalTimerService<W> timerService;
+ protected final AsyncKeyedStateBackend<Integer> stateBackend;
+ protected final K key;
+ protected final W window;
+ protected final TypeSerializer<W> windowSerializer;
+
+ TestTriggerContext(
+ K key,
+ W window,
+ InternalTimerService<W> timerService,
+ AsyncKeyedStateBackend<Integer> stateBackend,
+ TypeSerializer<W> windowSerializer) {
+ this.key = key;
+ this.window = window;
+ this.timerService = timerService;
+ this.stateBackend = stateBackend;
+ this.windowSerializer = windowSerializer;
+ }
+
+ @Override
+ public long getCurrentProcessingTime() {
+ return timerService.currentProcessingTime();
+ }
+
+ @Override
+ public MetricGroup getMetricGroup() {
+ return null;
+ }
+
+ @Override
+ public long getCurrentWatermark() {
+ return timerService.currentWatermark();
+ }
+
+ @Override
+ public void registerProcessingTimeTimer(long time) {
+ timerService.registerProcessingTimeTimer(window, time);
+ }
+
+ @Override
+ public void registerEventTimeTimer(long time) {
+ timerService.registerEventTimeTimer(window, time);
+ }
+
+ @Override
+ public void deleteProcessingTimeTimer(long time) {
+ timerService.deleteProcessingTimeTimer(window, time);
+ }
+
+ @Override
+ public void deleteEventTimeTimer(long time) {
+ timerService.deleteEventTimeTimer(window, time);
+ }
+
+ @Override
+ public <T, S extends State> S getPartitionedState(StateDescriptor<T>
stateDescriptor) {
+ try {
+ // single key (KEY), no need declaration.
+ return stateBackend.getOrCreateKeyedState(
+ window, windowSerializer, stateDescriptor);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not retrieve state", e);
+ }
+ }
+ }
+
+ static <T> T completeStateFuture(StateFuture<T> future) {
+ InternalAsyncFuture<T> internalAsyncFuture = (InternalAsyncFuture<T>)
future;
+
+ // The harness executes state operations in sync mode, any StateFuture
should be completed.
+ Preconditions.checkArgument(internalAsyncFuture.isDone());
+ return internalAsyncFuture.get();
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
index 02454dd4d68..9642500bc48 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
@@ -54,13 +54,13 @@ import java.util.Collections;
/** Utility for testing {@link Trigger} behaviour. */
public class TriggerTestHarness<T, W extends Window> {
- private static final Integer KEY = 1;
+ protected static final Integer KEY = 1;
private final Trigger<T, W> trigger;
- private final TypeSerializer<W> windowSerializer;
+ protected final TypeSerializer<W> windowSerializer;
- private final HeapKeyedStateBackend<Integer> stateBackend;
- private final TestInternalTimerService<Integer, W> internalTimerService;
+ protected final HeapKeyedStateBackend<Integer> stateBackend;
+ protected final TestInternalTimerService<Integer, W> internalTimerService;
public TriggerTestHarness(Trigger<T, W> trigger, TypeSerializer<W>
windowSerializer)
throws Exception {
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/AsyncProcessingTimeoutTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/AsyncProcessingTimeoutTrigger.java
new file mode 100644
index 00000000000..ea602cb172f
--- /dev/null
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/AsyncProcessingTimeoutTrigger.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.state.StateFutureUtils;
+import
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+import java.time.Duration;
+import java.util.Objects;
+
+/**
+ * A {@link AsyncTrigger} that can turn any {@link AsyncTrigger} into a
timeout {@code
+ * AsyncTrigger}.
+ *
+ * <p>On the first arriving element a configurable processing-time timeout
will be set. Using {@link
+ * #of(AsyncTrigger, Duration, boolean, boolean)}, you can also re-new the
timer for each arriving
+ * element by specifying {@code resetTimerOnNewRecord} and you can specify
whether {@link
+ * AsyncTrigger#clear(Window, AsyncTrigger.TriggerContext)} should be called
on timout via {@code
+ * shouldClearOnTimeout}.
+ *
+ * @param <T> The type of elements on which this trigger can operate.
+ * @param <W> The type of {@link Window} on which this trigger can operate.
+ */
+@Experimental
+public class AsyncProcessingTimeoutTrigger<T, W extends Window> extends
AsyncTrigger<T, W> {
+ private static final long serialVersionUID = 1L;
+
+ private final AsyncTrigger<T, W> nestedTrigger;
+ private final long interval;
+ private final boolean resetTimerOnNewRecord;
+ private final boolean shouldClearOnTimeout;
+
+ private final ValueStateDescriptor<Long> timeoutStateDesc;
+
+ public AsyncProcessingTimeoutTrigger(
+ AsyncTrigger<T, W> nestedTrigger,
+ long interval,
+ boolean resetTimerOnNewRecord,
+ boolean shouldClearOnTimeout) {
+ this.nestedTrigger = nestedTrigger;
+ this.interval = interval;
+ this.resetTimerOnNewRecord = resetTimerOnNewRecord;
+ this.shouldClearOnTimeout = shouldClearOnTimeout;
+
+ this.timeoutStateDesc = new ValueStateDescriptor<>("timeout",
LongSerializer.INSTANCE);
+ }
+
+ @Override
+ public StateFuture<TriggerResult> onElement(
+ T element, long timestamp, W window, TriggerContext ctx) throws
Exception {
+ return this.nestedTrigger
+ .onElement(element, timestamp, window, ctx)
+ .thenConditionallyCompose(
+ TriggerResult::isFire,
+ triggerResult -> this.clear(window,
ctx).thenApply(ignore -> triggerResult),
+ triggerResult -> {
+ ValueState<Long> timeoutState =
+
ctx.getPartitionedState(this.timeoutStateDesc);
+ long nextFireTimestamp =
ctx.getCurrentProcessingTime() + this.interval;
+
+ return timeoutState
+ .asyncValue()
+ .thenConditionallyCompose(
+ Objects::nonNull,
+ timeoutTimestamp -> {
+ if (resetTimerOnNewRecord) {
+
ctx.deleteProcessingTimeTimer(timeoutTimestamp);
+ return timeoutState
+ .asyncClear()
+ .thenApply(ignore
-> null);
+ } else {
+ return
StateFutureUtils.completedFuture(
+ timeoutTimestamp);
+ }
+ })
+ .thenConditionallyCompose(
+ tuple -> tuple.f1
/*timeoutTimestamp*/ == null,
+ ignore ->
+ timeoutState
+
.asyncUpdate(nextFireTimestamp)
+ .thenAccept(
+ ignore2 ->
+ ctx
+
.registerProcessingTimeTimer(
+
nextFireTimestamp)))
+ .thenApply(ignore -> triggerResult);
+ })
+ .thenApply(tuple -> (TriggerResult) tuple.f1);
+ }
+
+ @Override
+ public StateFuture<TriggerResult> onProcessingTime(long time, W window,
TriggerContext ctx)
+ throws Exception {
+ return this.nestedTrigger
+ .onProcessingTime(time, window, ctx)
+ .thenCompose(
+ triggerResult -> {
+ TriggerResult finalResult =
+ triggerResult.isPurge()
+ ? TriggerResult.FIRE_AND_PURGE
+ : TriggerResult.FIRE;
+ return shouldClearOnTimeout
+ ? this.clear(window, ctx).thenApply(ignore
-> finalResult)
+ :
StateFutureUtils.completedFuture(finalResult);
+ });
+ }
+
+ @Override
+ public StateFuture<TriggerResult> onEventTime(long time, W window,
TriggerContext ctx)
+ throws Exception {
+ return this.nestedTrigger
+ .onEventTime(time, window, ctx)
+ .thenCompose(
+ triggerResult -> {
+ TriggerResult finalResult =
+ triggerResult.isPurge()
+ ? TriggerResult.FIRE_AND_PURGE
+ : TriggerResult.FIRE;
+ return shouldClearOnTimeout
+ ? this.clear(window, ctx).thenApply(ignore
-> finalResult)
+ :
StateFutureUtils.completedFuture(finalResult);
+ });
+ }
+
+ @Override
+ public StateFuture<Void> clear(W window, TriggerContext ctx) throws
Exception {
+ ValueState<Long> timeoutTimestampState =
ctx.getPartitionedState(this.timeoutStateDesc);
+ return timeoutTimestampState
+ .asyncValue()
+ .thenConditionallyCompose(
+ Objects::nonNull,
+ timeoutTimestamp -> {
+ ctx.deleteProcessingTimeTimer(timeoutTimestamp);
+ return timeoutTimestampState.asyncClear();
+ })
+ .thenCompose(ignore -> this.nestedTrigger.clear(window, ctx));
+ }
+
+ @Override
+ public String toString() {
+ return "AsyncTimeoutTrigger(" + this.nestedTrigger.toString() + ")";
+ }
+
+ /**
+ * Creates a new {@link AsyncProcessingTimeoutTrigger} that fires when the
inner trigger is
+ * fired or when the timeout timer fires.
+ *
+ * <p>For example: {@code
AsyncProcessingTimeoutTrigger.of(AsyncCountTrigger.of(3), 100)}, will
+ * create a AsyncCountTrigger with timeout of 100 millis. So, if the first
record arrives at
+ * time {@code t}, and the second record arrives at time {@code t+50 },
the trigger will fire
+ * when the third record arrives or when the time is {code t+100}
(timeout).
+ *
+ * @param nestedTrigger the nested {@link AsyncTrigger}
+ * @param timeout the timeout interval
+ * @return {@link AsyncProcessingTimeoutTrigger} with the above
configuration.
+ */
+ public static <T, W extends Window> AsyncProcessingTimeoutTrigger<T, W> of(
+ AsyncTrigger<T, W> nestedTrigger, Duration timeout) {
+ return new AsyncProcessingTimeoutTrigger<>(nestedTrigger,
timeout.toMillis(), false, true);
+ }
+
+ /**
+ * Creates a new {@link AsyncProcessingTimeoutTrigger} that fires when the
inner trigger is
+ * fired or when the timeout timer fires.
+ *
+ * <p>For example: {@code
AsyncProcessingTimeoutTrigger.of(CountTrigger.of(3), 100, false,
+ * true)}, will create a AsyncCountTrigger with timeout of 100 millis. So,
if the first record
+ * arrives at time {@code t}, and the second record arrives at time {@code
t+50 }, the trigger
+ * will fire when the third record arrives or when the time is {code
t+100} (timeout).
+ *
+ * @param nestedTrigger the nested {@link AsyncTrigger}
+ * @param timeout the timeout interval
+ * @param resetTimerOnNewRecord each time a new element arrives, reset the
timer and start a new
+ * one
+ * @param shouldClearOnTimeout whether to call {@link
AsyncTrigger#clear(Window,
+ * AsyncTrigger.TriggerContext)} when the processing-time timer fires
+ * @param <T> The type of the element.
+ * @param <W> The type of {@link Window Windows} on which this trigger can
operate.
+ * @return {@link AsyncProcessingTimeoutTrigger} with the above
configuration.
+ */
+ public static <T, W extends Window> AsyncProcessingTimeoutTrigger<T, W> of(
+ AsyncTrigger<T, W> nestedTrigger,
+ Duration timeout,
+ boolean resetTimerOnNewRecord,
+ boolean shouldClearOnTimeout) {
+ return new AsyncProcessingTimeoutTrigger<>(
+ nestedTrigger, timeout.toMillis(), resetTimerOnNewRecord,
shouldClearOnTimeout);
+ }
+
+ @VisibleForTesting
+ public AsyncTrigger<T, W> getNestedTrigger() {
+ return nestedTrigger;
+ }
+
+ @VisibleForTesting
+ public long getInterval() {
+ return interval;
+ }
+
+ @VisibleForTesting
+ public boolean isResetTimerOnNewRecord() {
+ return resetTimerOnNewRecord;
+ }
+
+ @VisibleForTesting
+ public boolean isShouldClearOnTimeout() {
+ return shouldClearOnTimeout;
+ }
+
+ @VisibleForTesting
+ public ValueStateDescriptor<Long> getTimeoutStateDesc() {
+ return timeoutStateDesc;
+ }
+}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
index 268edfbf14b..7d972e88643 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
@@ -23,6 +23,9 @@ import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.windows.Window;
+import
org.apache.flink.streaming.runtime.operators.windowing.AsyncTriggerConverter;
+
+import javax.annotation.Nonnull;
import java.time.Duration;
@@ -39,7 +42,8 @@ import java.time.Duration;
* @param <W> The type of {@link Window} on which this trigger can operate.
*/
@PublicEvolving
-public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T,
W> {
+public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T,
W>
+ implements AsyncTriggerConverter {
private static final long serialVersionUID = 1L;
@@ -169,4 +173,14 @@ public class ProcessingTimeoutTrigger<T, W extends Window>
extends Trigger<T, W>
return new ProcessingTimeoutTrigger<>(
nestedTrigger, timeout.toMillis(), resetTimerOnNewRecord,
shouldClearOnTimeout);
}
+
+ @Override
+ @Nonnull
+ public Object convertToAsync() {
+ return AsyncProcessingTimeoutTrigger.of(
+ AsyncTriggerConverter.convertToAsync(this.nestedTrigger),
+ Duration.ofMillis(interval),
+ resetTimerOnNewRecord,
+ shouldClearOnTimeout);
+ }
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverterTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverterTest.java
new file mode 100644
index 00000000000..cf1fe75400b
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AsyncTriggerConverterTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.state.v2.StateFuture;
+import
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nonnull;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@code AsyncTriggerConverter}. */
+public class AsyncTriggerConverterTest {
+ private static class DummyTriggerWithoutAsyncConverter extends
Trigger<Object, TimeWindow>
+ implements AsyncTriggerConverter {
+ @Override
+ public TriggerResult onElement(
+ Object element, long timestamp, TimeWindow window,
TriggerContext ctx)
+ throws Exception {
+ return null;
+ }
+
+ @Override
+ public TriggerResult onProcessingTime(long time, TimeWindow window,
TriggerContext ctx)
+ throws Exception {
+ return null;
+ }
+
+ @Override
+ public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx)
+ throws Exception {
+ return null;
+ }
+
+ @Override
+ public void clear(TimeWindow window, TriggerContext ctx) throws
Exception {}
+ }
+
+ private static class DummyTriggerWithAsyncConverter extends
DummyTriggerWithoutAsyncConverter {
+ @Override
+ @Nonnull
+ public Object convertToAsync() {
+ return new DummyAsyncTrigger();
+ }
+ }
+
+ private static class DummyAsyncTrigger extends AsyncTrigger<Object,
TimeWindow> {
+ @Override
+ public StateFuture<TriggerResult> onElement(
+ Object element, long timestamp, TimeWindow window,
TriggerContext ctx)
+ throws Exception {
+ return null;
+ }
+
+ @Override
+ public StateFuture<TriggerResult> onProcessingTime(
+ long time, TimeWindow window, TriggerContext ctx) throws
Exception {
+ return null;
+ }
+
+ @Override
+ public StateFuture<TriggerResult> onEventTime(
+ long time, TimeWindow window, TriggerContext ctx) throws
Exception {
+ return null;
+ }
+
+ @Override
+ public StateFuture<Void> clear(TimeWindow window, TriggerContext ctx)
throws Exception {
+ return null;
+ }
+ }
+
+ @Test
+ void testTriggerUseDefaultConvert() {
+ Trigger<Object, TimeWindow> syncTrigger = new
DummyTriggerWithoutAsyncConverter();
+ AsyncTrigger<Object, TimeWindow> asyncTrigger =
+ AsyncTriggerConverter.convertToAsync(syncTrigger);
+
+
assertThat(asyncTrigger).isInstanceOf(AsyncTriggerConverter.UserDefinedAsyncTrigger.class);
+ AsyncTriggerConverter.UserDefinedAsyncTrigger<Object, TimeWindow>
triggerWrapper =
+ (AsyncTriggerConverter.UserDefinedAsyncTrigger<Object,
TimeWindow>) asyncTrigger;
+
+
assertThat(triggerWrapper.getUserDefinedTrigger()).isSameAs(syncTrigger);
+ }
+
+ @Test
+ void testTriggerUseCustomizeConvert() {
+ Trigger<Object, TimeWindow> syncTrigger = new
DummyTriggerWithAsyncConverter();
+ AsyncTrigger<Object, TimeWindow> asyncTrigger =
+ AsyncTriggerConverter.convertToAsync(syncTrigger);
+
+ assertThat(asyncTrigger).isInstanceOf(DummyAsyncTrigger.class);
+ }
+}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeoutTriggerTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeoutTriggerTest.java
index 99a2593ab78..02c24018cc6 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeoutTriggerTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeoutTriggerTest.java
@@ -18,15 +18,21 @@
package org.apache.flink.streaming.runtime.operators.windowing;
+import
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncCountTrigger;
+import
org.apache.flink.runtime.asyncprocessing.operators.windowing.triggers.AsyncTrigger;
+import
org.apache.flink.streaming.api.windowing.triggers.AsyncProcessingTimeoutTrigger;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import
org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeoutTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.time.Duration;
@@ -35,14 +41,17 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link ProcessingTimeoutTrigger}. */
class ProcessingTimeoutTriggerTest {
-
- @Test
- void testWindowFireWithoutResetTimer() throws Exception {
+ @ParameterizedTest(name = "Enable async state = {0}")
+ @ValueSource(booleans = {false, true})
+ void testWindowFireWithoutResetTimer(boolean enableAsyncState) throws
Exception {
+ Trigger<Object, TimeWindow> trigger =
+ ProcessingTimeoutTrigger.of(CountTrigger.of(3),
Duration.ofMillis(50), false, true);
TriggerTestHarness<Object, TimeWindow> testHarness =
- new TriggerTestHarness<>(
- ProcessingTimeoutTrigger.of(
- CountTrigger.of(3), Duration.ofMillis(50),
false, true),
- new TimeWindow.Serializer());
+ enableAsyncState
+ ? new AsyncTriggerTestHarness<>(
+ AsyncTriggerConverter.convertToAsync(trigger),
+ new TimeWindow.Serializer())
+ : new TriggerTestHarness<>(trigger, new
TimeWindow.Serializer());
assertThat(testHarness.processElement(new StreamRecord<>(1), new
TimeWindow(0, 2)))
.isEqualTo(TriggerResult.CONTINUE);
@@ -76,13 +85,17 @@ class ProcessingTimeoutTriggerTest {
.isEqualTo(TriggerResult.FIRE);
}
- @Test
- void testWindowFireWithResetTimer() throws Exception {
+ @ParameterizedTest(name = "Enable async state = {0}")
+ @ValueSource(booleans = {false, true})
+ void testWindowFireWithResetTimer(boolean enableAsyncState) throws
Exception {
+ Trigger<Object, TimeWindow> trigger =
+ ProcessingTimeoutTrigger.of(CountTrigger.of(3),
Duration.ofMillis(50), true, true);
TriggerTestHarness<Object, TimeWindow> testHarness =
- new TriggerTestHarness<>(
- ProcessingTimeoutTrigger.of(
- CountTrigger.of(3), Duration.ofMillis(50),
true, true),
- new TimeWindow.Serializer());
+ enableAsyncState
+ ? new AsyncTriggerTestHarness<>(
+ AsyncTriggerConverter.convertToAsync(trigger),
+ new TimeWindow.Serializer())
+ : new TriggerTestHarness<>(trigger, new
TimeWindow.Serializer());
assertThrows(
"Must have exactly one timer firing. Fired timers: []",
@@ -125,13 +138,18 @@ class ProcessingTimeoutTriggerTest {
.isEqualTo(TriggerResult.FIRE);
}
- @Test
- void testWindowFireWithoutClearOnTimeout() throws Exception {
+ @ParameterizedTest(name = "Enable async state = {0}")
+ @ValueSource(booleans = {false, true})
+ void testWindowFireWithoutClearOnTimeout(boolean enableAsyncState) throws
Exception {
+ Trigger<Object, TimeWindow> trigger =
+ ProcessingTimeoutTrigger.of(
+ CountTrigger.of(3), Duration.ofMillis(50), false,
false);
TriggerTestHarness<Object, TimeWindow> testHarness =
- new TriggerTestHarness<>(
- ProcessingTimeoutTrigger.of(
- CountTrigger.of(3), Duration.ofMillis(50),
false, false),
- new TimeWindow.Serializer());
+ enableAsyncState
+ ? new AsyncTriggerTestHarness<>(
+ AsyncTriggerConverter.convertToAsync(trigger),
+ new TimeWindow.Serializer())
+ : new TriggerTestHarness<>(trigger, new
TimeWindow.Serializer());
assertThat(testHarness.processElement(new StreamRecord<>(1), new
TimeWindow(0, 2)))
.isEqualTo(TriggerResult.CONTINUE);
@@ -153,16 +171,21 @@ class ProcessingTimeoutTriggerTest {
assertThat(testHarness.numEventTimeTimers()).isZero();
}
- @Test
- void testWindowPurgingWhenInnerTriggerIsPurging() throws Exception {
+ @ParameterizedTest(name = "Enable async state = {0}")
+ @ValueSource(booleans = {false, true})
+ void testWindowPurgingWhenInnerTriggerIsPurging(boolean enableAsyncState)
throws Exception {
+ Trigger<Object, TimeWindow> trigger =
+ ProcessingTimeoutTrigger.of(
+ PurgingTrigger.of(ProcessingTimeTrigger.create()),
+ Duration.ofMillis(50),
+ false,
+ false);
TriggerTestHarness<Object, TimeWindow> testHarness =
- new TriggerTestHarness<>(
- ProcessingTimeoutTrigger.of(
-
PurgingTrigger.of(ProcessingTimeTrigger.create()),
- Duration.ofMillis(50),
- false,
- false),
- new TimeWindow.Serializer());
+ enableAsyncState
+ ? new AsyncTriggerTestHarness<>(
+ AsyncTriggerConverter.convertToAsync(trigger),
+ new TimeWindow.Serializer())
+ : new TriggerTestHarness<>(trigger, new
TimeWindow.Serializer());
assertThat(testHarness.processElement(new StreamRecord<>(1), new
TimeWindow(0, 2)))
.isEqualTo(TriggerResult.CONTINUE);
@@ -185,4 +208,25 @@ class ProcessingTimeoutTriggerTest {
assertThat(testHarness.numProcessingTimeTimers()).isOne();
assertThat(testHarness.numEventTimeTimers()).isZero();
}
+
+ @Test
+ void testConvertToAsync() {
+ Trigger<Object, TimeWindow> syncTrigger =
+ ProcessingTimeoutTrigger.of(
+ CountTrigger.of(2333), Duration.ofMillis(233), false,
false);
+
+ AsyncTrigger<Object, TimeWindow> asyncTrigger =
+ AsyncTriggerConverter.convertToAsync(syncTrigger);
+
assertThat(asyncTrigger).isInstanceOf(AsyncProcessingTimeoutTrigger.class);
+ AsyncProcessingTimeoutTrigger<Object, TimeWindow>
asyncProcessingTimeoutTrigger =
+ (AsyncProcessingTimeoutTrigger<Object, TimeWindow>)
asyncTrigger;
+ assertThat(asyncProcessingTimeoutTrigger.getInterval()).isEqualTo(233);
+
assertThat(asyncProcessingTimeoutTrigger.isResetTimerOnNewRecord()).isFalse();
+
assertThat(asyncProcessingTimeoutTrigger.isShouldClearOnTimeout()).isFalse();
+
+ AsyncTrigger<Object, TimeWindow> nestedTrigger =
+ asyncProcessingTimeoutTrigger.getNestedTrigger();
+ assertThat(nestedTrigger).isInstanceOf(AsyncCountTrigger.class);
+
assertThat(nestedTrigger.toString()).isEqualTo("AsyncCountTrigger(2333)");
+ }
}