This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 33de4ea7739 [FLINK-37140][runtime] Create async timer service when 
restore async operator (#25993)
33de4ea7739 is described below

commit 33de4ea7739dcee4dd40f2217754bf2b17e3b589
Author: Yanfei Lei <fredia...@gmail.com>
AuthorDate: Fri Jan 17 19:51:31 2025 +0800

    [FLINK-37140][runtime] Create async timer service when restore async 
operator (#25993)
---
 .../AbstractAsyncStateStreamOperator.java          |  17 +--
 .../AbstractAsyncStateStreamOperatorV2.java        |  17 +--
 .../api/operators/AbstractStreamOperator.java      |   7 +-
 .../api/operators/AbstractStreamOperatorV2.java    |   7 +-
 .../api/operators/InternalTimeServiceManager.java  |  16 ---
 .../operators/InternalTimeServiceManagerImpl.java  |  89 ++++--------
 .../operators/InternalTimerServiceAsyncImpl.java   |  13 +-
 .../BatchExecutionInternalTimeServiceManager.java  |  12 --
 .../InternalTimerServiceAsyncImplTest.java         | 155 +++++++++++++++++++--
 .../operators/MailboxWatermarkProcessorTest.java   |  11 --
 10 files changed, 211 insertions(+), 133 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
index 797cfbad483..80493086d35 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
@@ -40,7 +40,7 @@ import 
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.Input;
 import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
+import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -90,9 +90,7 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> 
extends AbstractStre
 
     /** Initialize necessary state components for {@link 
AbstractStreamOperator}. */
     @Override
-    public void initializeState(StreamTaskStateInitializer 
streamTaskStateManager)
-            throws Exception {
-        super.initializeState(streamTaskStateManager);
+    public final void beforeInitializeStateHandler() {
         KeyedStateStore stateStore = 
stateHandler.getKeyedStateStore().orElse(null);
         if (stateStore instanceof DefaultKeyedStateStore) {
             ((DefaultKeyedStateStore) 
stateStore).setSupportKeyedStateApiSetV2();
@@ -293,12 +291,11 @@ public abstract class 
AbstractAsyncStateStreamOperator<OUT> extends AbstractStre
         checkState(keySerializer != null, "Timers can only be used on keyed 
operators.");
         // A {@link RecordContext} will be set as the current processing 
context to preserve record
         // order when the given {@link Triggerable} is invoked.
-        return keyedTimeServiceHandler.getAsyncInternalTimerService(
-                name,
-                keySerializer,
-                namespaceSerializer,
-                triggerable,
-                (AsyncExecutionController<K>) asyncExecutionController);
+        InternalTimerService<N> service =
+                keyedTimeServiceHandler.getInternalTimerService(
+                        name, keySerializer, namespaceSerializer, triggerable);
+        ((InternalTimerServiceAsyncImpl<K, N>) 
service).setup(asyncExecutionController);
+        return service;
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
index 983d9f7794a..b1a80cebaa0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
@@ -36,8 +36,8 @@ import 
org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
 import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
-import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing;
@@ -92,9 +92,7 @@ public abstract class AbstractAsyncStateStreamOperatorV2<OUT> 
extends AbstractSt
 
     /** Initialize necessary state components for {@link 
AbstractStreamOperatorV2}. */
     @Override
-    public final void initializeState(StreamTaskStateInitializer 
streamTaskStateManager)
-            throws Exception {
-        super.initializeState(streamTaskStateManager);
+    public final void beforeInitializeStateHandler() {
         KeyedStateStore stateStore = 
stateHandler.getKeyedStateStore().orElse(null);
         if (stateStore instanceof DefaultKeyedStateStore) {
             ((DefaultKeyedStateStore) 
stateStore).setSupportKeyedStateApiSetV2();
@@ -258,12 +256,11 @@ public abstract class 
AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractSt
         checkState(keySerializer != null, "Timers can only be used on keyed 
operators.");
         // A {@link RecordContext} will be set as the current processing 
context to preserve record
         // order when the given {@link Triggerable} is invoked.
-        return keyedTimeServiceHandler.getAsyncInternalTimerService(
-                name,
-                keySerializer,
-                namespaceSerializer,
-                triggerable,
-                (AsyncExecutionController<K>) asyncExecutionController);
+        InternalTimerService<N> service =
+                keyedTimeServiceHandler.getInternalTimerService(
+                        name, keySerializer, namespaceSerializer, triggerable);
+        ((InternalTimerServiceAsyncImpl<K, N>) 
service).setup(asyncExecutionController);
+        return service;
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 72081a4ae32..7c9cbe908f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -262,8 +262,11 @@ public abstract class AbstractStreamOperator<OUT>
         return metrics;
     }
 
+    /** Initialize necessary state components before initializing state 
components. */
+    protected void beforeInitializeStateHandler() {}
+
     @Override
-    public void initializeState(StreamTaskStateInitializer 
streamTaskStateManager)
+    public final void initializeState(StreamTaskStateInitializer 
streamTaskStateManager)
             throws Exception {
 
         final TypeSerializer<?> keySerializer =
@@ -296,6 +299,8 @@ public abstract class AbstractStreamOperator<OUT>
                 isAsyncStateProcessingEnabled()
                         ? context.asyncInternalTimerServiceManager()
                         : context.internalTimerServiceManager();
+
+        beforeInitializeStateHandler();
         stateHandler.initializeOperatorState(this);
         
runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
index 76b13cecc99..b892b89c4e3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
@@ -197,8 +197,11 @@ public abstract class AbstractStreamOperatorV2<OUT>
         return metrics;
     }
 
+    /** Initialize necessary state components before initializing state 
components. */
+    protected void beforeInitializeStateHandler() {}
+
     @Override
-    public void initializeState(StreamTaskStateInitializer 
streamTaskStateManager)
+    public final void initializeState(StreamTaskStateInitializer 
streamTaskStateManager)
             throws Exception {
         final TypeSerializer<?> keySerializer =
                 config.getStateKeySerializer(getUserCodeClassloader());
@@ -225,6 +228,8 @@ public abstract class AbstractStreamOperatorV2<OUT>
                 isAsyncStateProcessingEnabled()
                         ? context.asyncInternalTimerServiceManager()
                         : context.internalTimerServiceManager();
+
+        beforeInitializeStateHandler();
         stateHandler.initializeOperatorState(this);
 
         if (useSplittableTimers()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
index 782bbe7856b..e24e879afae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
@@ -65,21 +64,6 @@ public interface InternalTimeServiceManager<K> {
             TypeSerializer<N> namespaceSerializer,
             Triggerable<K, N> triggerable);
 
-    /**
-     * Creates an {@link InternalTimerServiceAsyncImpl} for handling a group 
of timers identified by
-     * the given {@code name}. The timers are scoped to a key and namespace. 
Mainly used by async
-     * operators.
-     *
-     * <p>Some essential order preservation will be added when the given 
{@link Triggerable} is
-     * invoked.
-     */
-    <N> InternalTimerService<N> getAsyncInternalTimerService(
-            String name,
-            TypeSerializer<K> keySerializer,
-            TypeSerializer<N> namespaceSerializer,
-            Triggerable<K, N> triggerable,
-            AsyncExecutionController<K> asyncExecutionController);
-
     /**
      * Advances the Watermark of all managed {@link InternalTimerService timer 
services},
      * potentially firing event time timers.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java
index 3b5f81697f8..243ae007f1e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
@@ -39,6 +40,8 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collections;
@@ -76,6 +79,8 @@ public class InternalTimeServiceManagerImpl<K> implements 
InternalTimeServiceMan
 
     private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;
 
+    @Nullable AsyncExecutionController<K> asyncExecutionController;
+
     private InternalTimeServiceManagerImpl(
             TaskIOMetricGroup taskIOMetricGroup,
             KeyGroupRange localKeyGroupRange,
@@ -162,65 +167,31 @@ public class InternalTimeServiceManagerImpl<K> implements 
InternalTimeServiceMan
         InternalTimerServiceImpl<K, N> timerService =
                 (InternalTimerServiceImpl<K, N>) timerServices.get(name);
         if (timerService == null) {
-
-            timerService =
-                    new InternalTimerServiceImpl<>(
-                            taskIOMetricGroup,
-                            localKeyGroupRange,
-                            keyContext,
-                            processingTimeService,
-                            createTimerPriorityQueue(
-                                    PROCESSING_TIMER_PREFIX + name, 
timerSerializer),
-                            createTimerPriorityQueue(EVENT_TIMER_PREFIX + 
name, timerSerializer),
-                            cancellationContext);
-
-            timerServices.put(name, timerService);
-        }
-        return timerService;
-    }
-
-    @Override
-    public <N> InternalTimerService<N> getAsyncInternalTimerService(
-            String name,
-            TypeSerializer<K> keySerializer,
-            TypeSerializer<N> namespaceSerializer,
-            Triggerable<K, N> triggerable,
-            AsyncExecutionController<K> asyncExecutionController) {
-        checkNotNull(keySerializer, "Timers can only be used on keyed 
operators.");
-
-        // the following casting is to overcome type restrictions.
-        TimerSerializer<K, N> timerSerializer =
-                new TimerSerializer<>(keySerializer, namespaceSerializer);
-
-        InternalTimerServiceAsyncImpl<K, N> timerService =
-                registerOrGetAsyncTimerService(name, timerSerializer, 
asyncExecutionController);
-
-        timerService.startTimerService(
-                timerSerializer.getKeySerializer(),
-                timerSerializer.getNamespaceSerializer(),
-                triggerable);
-
-        return timerService;
-    }
-
-    <N> InternalTimerServiceAsyncImpl<K, N> registerOrGetAsyncTimerService(
-            String name,
-            TimerSerializer<K, N> timerSerializer,
-            AsyncExecutionController<K> asyncExecutionController) {
-        InternalTimerServiceAsyncImpl<K, N> timerService =
-                (InternalTimerServiceAsyncImpl<K, N>) timerServices.get(name);
-        if (timerService == null) {
-            timerService =
-                    new InternalTimerServiceAsyncImpl<>(
-                            taskIOMetricGroup,
-                            localKeyGroupRange,
-                            keyContext,
-                            processingTimeService,
-                            createTimerPriorityQueue(
-                                    PROCESSING_TIMER_PREFIX + name, 
timerSerializer),
-                            createTimerPriorityQueue(EVENT_TIMER_PREFIX + 
name, timerSerializer),
-                            cancellationContext,
-                            asyncExecutionController);
+            if (priorityQueueSetFactory instanceof AsyncKeyedStateBackend) {
+                timerService =
+                        new InternalTimerServiceAsyncImpl<>(
+                                taskIOMetricGroup,
+                                localKeyGroupRange,
+                                keyContext,
+                                processingTimeService,
+                                createTimerPriorityQueue(
+                                        PROCESSING_TIMER_PREFIX + name, 
timerSerializer),
+                                createTimerPriorityQueue(
+                                        EVENT_TIMER_PREFIX + name, 
timerSerializer),
+                                cancellationContext);
+            } else {
+                timerService =
+                        new InternalTimerServiceImpl<>(
+                                taskIOMetricGroup,
+                                localKeyGroupRange,
+                                keyContext,
+                                processingTimeService,
+                                createTimerPriorityQueue(
+                                        PROCESSING_TIMER_PREFIX + name, 
timerSerializer),
+                                createTimerPriorityQueue(
+                                        EVENT_TIMER_PREFIX + name, 
timerSerializer),
+                                cancellationContext);
+            }
 
             timerServices.put(name, timerService);
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java
index f756d2d40eb..682a3d8aeb2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.BiConsumerWithException;
 import org.apache.flink.util.function.ThrowingRunnable;
 
@@ -54,8 +55,7 @@ public class InternalTimerServiceAsyncImpl<K, N> extends 
InternalTimerServiceImp
             ProcessingTimeService processingTimeService,
             KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> 
processingTimeTimersQueue,
             KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> 
eventTimeTimersQueue,
-            StreamTaskCancellationContext cancellationContext,
-            AsyncExecutionController<K> asyncExecutionController) {
+            StreamTaskCancellationContext cancellationContext) {
         super(
                 taskIOMetricGroup,
                 localKeyGroupRange,
@@ -64,11 +64,17 @@ public class InternalTimerServiceAsyncImpl<K, N> extends 
InternalTimerServiceImp
                 processingTimeTimersQueue,
                 eventTimeTimersQueue,
                 cancellationContext);
-        this.asyncExecutionController = asyncExecutionController;
+    }
+
+    public void setup(AsyncExecutionController<K> asyncExecutionController) {
+        if (asyncExecutionController != null) {
+            this.asyncExecutionController = asyncExecutionController;
+        }
     }
 
     @Override
     void onProcessingTime(long time) throws Exception {
+        Preconditions.checkNotNull(asyncExecutionController);
         // null out the timer in case the Triggerable calls 
registerProcessingTimeTimer()
         // inside the callback.
         nextTimer = null;
@@ -99,6 +105,7 @@ public class InternalTimerServiceAsyncImpl<K, N> extends 
InternalTimerServiceImp
      */
     @Override
     public void advanceWatermark(long time) throws Exception {
+        Preconditions.checkNotNull(asyncExecutionController);
         currentWatermark = time;
         InternalTimer<K, N> timer;
         while ((timer = eventTimeTimersQueue.peek()) != null
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java
index 57f0b3f6f05..5d027671b43 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.operators.sorted.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
@@ -74,17 +73,6 @@ public class BatchExecutionInternalTimeServiceManager<K>
         return timerService;
     }
 
-    @Override
-    public <N> InternalTimerService<N> getAsyncInternalTimerService(
-            String name,
-            TypeSerializer<K> keySerializer,
-            TypeSerializer<N> namespaceSerializer,
-            Triggerable<K, N> triggerable,
-            AsyncExecutionController<K> asyncExecutionController) {
-        throw new UnsupportedOperationException(
-                "Async timer service is not supported in BATCH execution 
mode.");
-    }
-
     @Override
     public void advanceWatermark(Watermark watermark) {
         if (watermark.getTimestamp() == Long.MAX_VALUE) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java
index 14cfb3b1621..d53197b5547 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import 
org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler;
 import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
 import org.apache.flink.runtime.asyncprocessing.MockStateExecutor;
@@ -31,6 +33,7 @@ import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.PriorityQueueSetFactory;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -40,6 +43,12 @@ import 
org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link InternalTimerServiceAsyncImpl}. */
@@ -48,6 +57,7 @@ class InternalTimerServiceAsyncImplTest {
     private TestKeyContext keyContext;
     private TestProcessingTimeService processingTimeService;
     private InternalTimerServiceAsyncImpl<Integer, String> service;
+    private KeyGroupRange testKeyGroupList;
 
     private AsyncFrameworkExceptionHandler exceptionHandler =
             new AsyncFrameworkExceptionHandler() {
@@ -72,7 +82,7 @@ class InternalTimerServiceAsyncImplTest {
                         null);
         // ensure arbitrary key is in the key group
         int totalKeyGroups = 128;
-        KeyGroupRange testKeyGroupList = new KeyGroupRange(0, totalKeyGroups - 
1);
+        testKeyGroupList = new KeyGroupRange(0, totalKeyGroups - 1);
 
         keyContext = new TestKeyContext();
 
@@ -191,6 +201,119 @@ class InternalTimerServiceAsyncImplTest {
         assertThat(testTriggerable.eventTriggerCount).isEqualTo(3);
     }
 
+    @Test
+    void testSnapshotAndRestore() throws Exception {
+        service.startTimerService(
+                IntSerializer.INSTANCE, StringSerializer.INSTANCE, new 
TestTriggerable());
+        keyContext.setCurrentKey("key-1");
+        // get two different keys
+        int key1 = getKeyInKeyGroupRange(testKeyGroupList, 
testKeyGroupList.getNumberOfKeyGroups());
+        int key2 = getKeyInKeyGroupRange(testKeyGroupList, 
testKeyGroupList.getNumberOfKeyGroups());
+        while (key2 == key1) {
+            key2 = getKeyInKeyGroupRange(testKeyGroupList, 
testKeyGroupList.getNumberOfKeyGroups());
+        }
+
+        keyContext.setCurrentKey(key1);
+
+        service.registerProcessingTimeTimer("ciao", 10);
+        service.registerEventTimeTimer("hello", 10);
+
+        keyContext.setCurrentKey(key2);
+
+        service.registerEventTimeTimer("ciao", 10);
+        service.registerProcessingTimeTimer("hello", 10);
+
+        assertThat(service.numProcessingTimeTimers()).isEqualTo(2);
+        assertThat(service.numProcessingTimeTimers("hello")).isOne();
+        assertThat(service.numProcessingTimeTimers("ciao")).isOne();
+        assertThat(service.numEventTimeTimers()).isEqualTo(2);
+        assertThat(service.numEventTimeTimers("hello")).isOne();
+        assertThat(service.numEventTimeTimers("ciao")).isOne();
+
+        Map<Integer, byte[]> snapshot = new HashMap<>();
+        for (Integer keyGroupIndex : testKeyGroupList) {
+            try (ByteArrayOutputStream outStream = new 
ByteArrayOutputStream()) {
+                InternalTimersSnapshot<Integer, String> timersSnapshot =
+                        service.snapshotTimersForKeyGroup(keyGroupIndex);
+
+                InternalTimersSnapshotReaderWriters.getWriterForVersion(
+                                InternalTimerServiceSerializationProxy.VERSION,
+                                timersSnapshot,
+                                service.getKeySerializer(),
+                                service.getNamespaceSerializer())
+                        .writeTimersSnapshot(new 
DataOutputViewStreamWrapper(outStream));
+
+                snapshot.put(keyGroupIndex, outStream.toByteArray());
+            }
+        }
+
+        TestTriggerable testTriggerable = new TestTriggerable();
+        testTriggerable.eventTriggerCount = 0;
+        testTriggerable.processingTriggerCount = 0;
+
+        processingTimeService = new TestProcessingTimeService();
+
+        service =
+                restoreTimerService(
+                        snapshot,
+                        InternalTimerServiceSerializationProxy.VERSION,
+                        testTriggerable,
+                        keyContext,
+                        processingTimeService);
+
+        processingTimeService.setCurrentTime(10);
+        service.advanceWatermark(10);
+
+        assertThat(testTriggerable.eventTriggerCount).isEqualTo(2);
+        assertThat(testTriggerable.processingTriggerCount).isEqualTo(2);
+
+        assertThat(service.numEventTimeTimers()).isZero();
+    }
+
+    private InternalTimerServiceAsyncImpl<Integer, String> restoreTimerService(
+            Map<Integer, byte[]> state,
+            int snapshotVersion,
+            Triggerable<Integer, String> triggerable,
+            KeyContext keyContext,
+            ProcessingTimeService processingTimeService)
+            throws Exception {
+
+        // create an empty service
+        InternalTimerServiceAsyncImpl<Integer, String> service =
+                createInternalTimerService(
+                        
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup()
+                                .getIOMetricGroup(),
+                        testKeyGroupList,
+                        keyContext,
+                        processingTimeService,
+                        IntSerializer.INSTANCE,
+                        StringSerializer.INSTANCE,
+                        new HeapPriorityQueueSetFactory(
+                                testKeyGroupList, 
testKeyGroupList.getNumberOfKeyGroups(), 128),
+                        asyncExecutionController);
+
+        // restore the timers
+        for (Integer keyGroupIndex : testKeyGroupList) {
+            if (state.containsKey(keyGroupIndex)) {
+                try (ByteArrayInputStream inputStream =
+                        new ByteArrayInputStream(state.get(keyGroupIndex))) {
+                    InternalTimersSnapshot<?, ?> restoredTimersSnapshot =
+                            
InternalTimersSnapshotReaderWriters.getReaderForVersion(
+                                            snapshotVersion,
+                                            
InternalTimerServiceImplTest.class.getClassLoader())
+                                    .readTimersSnapshot(
+                                            new 
DataInputViewStreamWrapper(inputStream));
+
+                    service.restoreTimersForKeyGroup(restoredTimersSnapshot, 
keyGroupIndex);
+                }
+            }
+        }
+
+        // initialize the service
+        service.startTimerService(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, triggerable);
+        return service;
+    }
+
     private static <K, N> InternalTimerServiceAsyncImpl<K, N> 
createInternalTimerService(
             TaskIOMetricGroup taskIOMetricGroup,
             KeyGroupRange keyGroupsList,
@@ -204,15 +327,27 @@ class InternalTimerServiceAsyncImplTest {
         TimerSerializer<K, N> timerSerializer =
                 new TimerSerializer<>(keySerializer, namespaceSerializer);
 
-        return new InternalTimerServiceAsyncImpl<>(
-                taskIOMetricGroup,
-                keyGroupsList,
-                keyContext,
-                processingTimeService,
-                priorityQueueSetFactory.create("__async_processing_timers", 
timerSerializer),
-                priorityQueueSetFactory.create("__async_event_timers", 
timerSerializer),
-                StreamTaskCancellationContext.alwaysRunning(),
-                asyncExecutionController);
+        InternalTimerServiceAsyncImpl serviceAsync =
+                new InternalTimerServiceAsyncImpl<>(
+                        taskIOMetricGroup,
+                        keyGroupsList,
+                        keyContext,
+                        processingTimeService,
+                        priorityQueueSetFactory.create(
+                                "__async_processing_timers", timerSerializer),
+                        priorityQueueSetFactory.create("__async_event_timers", 
timerSerializer),
+                        StreamTaskCancellationContext.alwaysRunning());
+        serviceAsync.setup(asyncExecutionController);
+        return serviceAsync;
+    }
+
+    private static int getKeyInKeyGroupRange(KeyGroupRange range, int 
maxParallelism) {
+        Random rand = new Random(System.currentTimeMillis());
+        int result = rand.nextInt();
+        while 
(!range.contains(KeyGroupRangeAssignment.assignToKeyGroup(result, 
maxParallelism))) {
+            result = rand.nextInt();
+        }
+        return result;
     }
 
     private static class SameTimerTriggerable implements Triggerable<Integer, 
String> {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java
index 87f19f4032d..e0b76c4123f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
 import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -96,16 +95,6 @@ class MailboxWatermarkProcessorTest {
             throw new UnsupportedOperationException();
         }
 
-        @Override
-        public <N> InternalTimerService<N> getAsyncInternalTimerService(
-                String name,
-                TypeSerializer<Object> keySerializer,
-                TypeSerializer<N> namespaceSerializer,
-                Triggerable<Object, N> triggerable,
-                AsyncExecutionController<Object> asyncExecutionController) {
-            throw new UnsupportedOperationException();
-        }
-
         @Override
         public void advanceWatermark(Watermark watermark) throws Exception {
             throw new UnsupportedOperationException();

Reply via email to