This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 33de4ea7739 [FLINK-37140][runtime] Create async timer service when restore async operator (#25993) 33de4ea7739 is described below commit 33de4ea7739dcee4dd40f2217754bf2b17e3b589 Author: Yanfei Lei <fredia...@gmail.com> AuthorDate: Fri Jan 17 19:51:31 2025 +0800 [FLINK-37140][runtime] Create async timer service when restore async operator (#25993) --- .../AbstractAsyncStateStreamOperator.java | 17 +-- .../AbstractAsyncStateStreamOperatorV2.java | 17 +-- .../api/operators/AbstractStreamOperator.java | 7 +- .../api/operators/AbstractStreamOperatorV2.java | 7 +- .../api/operators/InternalTimeServiceManager.java | 16 --- .../operators/InternalTimeServiceManagerImpl.java | 89 ++++-------- .../operators/InternalTimerServiceAsyncImpl.java | 13 +- .../BatchExecutionInternalTimeServiceManager.java | 12 -- .../InternalTimerServiceAsyncImplTest.java | 155 +++++++++++++++++++-- .../operators/MailboxWatermarkProcessorTest.java | 11 -- 10 files changed, 211 insertions(+), 133 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index 797cfbad483..80493086d35 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -40,7 +40,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.Input; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimerService; -import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; +import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl; import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; @@ -90,9 +90,7 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStre /** Initialize necessary state components for {@link AbstractStreamOperator}. */ @Override - public void initializeState(StreamTaskStateInitializer streamTaskStateManager) - throws Exception { - super.initializeState(streamTaskStateManager); + public final void beforeInitializeStateHandler() { KeyedStateStore stateStore = stateHandler.getKeyedStateStore().orElse(null); if (stateStore instanceof DefaultKeyedStateStore) { ((DefaultKeyedStateStore) stateStore).setSupportKeyedStateApiSetV2(); @@ -293,12 +291,11 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStre checkState(keySerializer != null, "Timers can only be used on keyed operators."); // A {@link RecordContext} will be set as the current processing context to preserve record // order when the given {@link Triggerable} is invoked. - return keyedTimeServiceHandler.getAsyncInternalTimerService( - name, - keySerializer, - namespaceSerializer, - triggerable, - (AsyncExecutionController<K>) asyncExecutionController); + InternalTimerService<N> service = + keyedTimeServiceHandler.getInternalTimerService( + name, keySerializer, namespaceSerializer, triggerable); + ((InternalTimerServiceAsyncImpl<K, N>) service).setup(asyncExecutionController); + return service; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java index 983d9f7794a..b1a80cebaa0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java @@ -36,8 +36,8 @@ import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; -import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing; @@ -92,9 +92,7 @@ public abstract class AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractSt /** Initialize necessary state components for {@link AbstractStreamOperatorV2}. */ @Override - public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) - throws Exception { - super.initializeState(streamTaskStateManager); + public final void beforeInitializeStateHandler() { KeyedStateStore stateStore = stateHandler.getKeyedStateStore().orElse(null); if (stateStore instanceof DefaultKeyedStateStore) { ((DefaultKeyedStateStore) stateStore).setSupportKeyedStateApiSetV2(); @@ -258,12 +256,11 @@ public abstract class AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractSt checkState(keySerializer != null, "Timers can only be used on keyed operators."); // A {@link RecordContext} will be set as the current processing context to preserve record // order when the given {@link Triggerable} is invoked. - return keyedTimeServiceHandler.getAsyncInternalTimerService( - name, - keySerializer, - namespaceSerializer, - triggerable, - (AsyncExecutionController<K>) asyncExecutionController); + InternalTimerService<N> service = + keyedTimeServiceHandler.getInternalTimerService( + name, keySerializer, namespaceSerializer, triggerable); + ((InternalTimerServiceAsyncImpl<K, N>) service).setup(asyncExecutionController); + return service; } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 72081a4ae32..7c9cbe908f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -262,8 +262,11 @@ public abstract class AbstractStreamOperator<OUT> return metrics; } + /** Initialize necessary state components before initializing state components. */ + protected void beforeInitializeStateHandler() {} + @Override - public void initializeState(StreamTaskStateInitializer streamTaskStateManager) + public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception { final TypeSerializer<?> keySerializer = @@ -296,6 +299,8 @@ public abstract class AbstractStreamOperator<OUT> isAsyncStateProcessingEnabled() ? context.asyncInternalTimerServiceManager() : context.internalTimerServiceManager(); + + beforeInitializeStateHandler(); stateHandler.initializeOperatorState(this); runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java index 76b13cecc99..b892b89c4e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java @@ -197,8 +197,11 @@ public abstract class AbstractStreamOperatorV2<OUT> return metrics; } + /** Initialize necessary state components before initializing state components. */ + protected void beforeInitializeStateHandler() {} + @Override - public void initializeState(StreamTaskStateInitializer streamTaskStateManager) + public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception { final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader()); @@ -225,6 +228,8 @@ public abstract class AbstractStreamOperatorV2<OUT> isAsyncStateProcessingEnabled() ? context.asyncInternalTimerServiceManager() : context.internalTimerServiceManager(); + + beforeInitializeStateHandler(); stateHandler.initializeOperatorState(this); if (useSplittableTimers() diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java index 782bbe7856b..e24e879afae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java @@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; @@ -65,21 +64,6 @@ public interface InternalTimeServiceManager<K> { TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable); - /** - * Creates an {@link InternalTimerServiceAsyncImpl} for handling a group of timers identified by - * the given {@code name}. The timers are scoped to a key and namespace. Mainly used by async - * operators. - * - * <p>Some essential order preservation will be added when the given {@link Triggerable} is - * invoked. - */ - <N> InternalTimerService<N> getAsyncInternalTimerService( - String name, - TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - Triggerable<K, N> triggerable, - AsyncExecutionController<K> asyncExecutionController); - /** * Advances the Watermark of all managed {@link InternalTimerService timer services}, * potentially firing event time timers. diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java index 3b5f81697f8..243ae007f1e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java @@ -25,6 +25,7 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; +import org.apache.flink.runtime.state.AsyncKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; @@ -39,6 +40,8 @@ import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.io.InputStream; import java.util.Collections; @@ -76,6 +79,8 @@ public class InternalTimeServiceManagerImpl<K> implements InternalTimeServiceMan private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices; + @Nullable AsyncExecutionController<K> asyncExecutionController; + private InternalTimeServiceManagerImpl( TaskIOMetricGroup taskIOMetricGroup, KeyGroupRange localKeyGroupRange, @@ -162,65 +167,31 @@ public class InternalTimeServiceManagerImpl<K> implements InternalTimeServiceMan InternalTimerServiceImpl<K, N> timerService = (InternalTimerServiceImpl<K, N>) timerServices.get(name); if (timerService == null) { - - timerService = - new InternalTimerServiceImpl<>( - taskIOMetricGroup, - localKeyGroupRange, - keyContext, - processingTimeService, - createTimerPriorityQueue( - PROCESSING_TIMER_PREFIX + name, timerSerializer), - createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer), - cancellationContext); - - timerServices.put(name, timerService); - } - return timerService; - } - - @Override - public <N> InternalTimerService<N> getAsyncInternalTimerService( - String name, - TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - Triggerable<K, N> triggerable, - AsyncExecutionController<K> asyncExecutionController) { - checkNotNull(keySerializer, "Timers can only be used on keyed operators."); - - // the following casting is to overcome type restrictions. - TimerSerializer<K, N> timerSerializer = - new TimerSerializer<>(keySerializer, namespaceSerializer); - - InternalTimerServiceAsyncImpl<K, N> timerService = - registerOrGetAsyncTimerService(name, timerSerializer, asyncExecutionController); - - timerService.startTimerService( - timerSerializer.getKeySerializer(), - timerSerializer.getNamespaceSerializer(), - triggerable); - - return timerService; - } - - <N> InternalTimerServiceAsyncImpl<K, N> registerOrGetAsyncTimerService( - String name, - TimerSerializer<K, N> timerSerializer, - AsyncExecutionController<K> asyncExecutionController) { - InternalTimerServiceAsyncImpl<K, N> timerService = - (InternalTimerServiceAsyncImpl<K, N>) timerServices.get(name); - if (timerService == null) { - timerService = - new InternalTimerServiceAsyncImpl<>( - taskIOMetricGroup, - localKeyGroupRange, - keyContext, - processingTimeService, - createTimerPriorityQueue( - PROCESSING_TIMER_PREFIX + name, timerSerializer), - createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer), - cancellationContext, - asyncExecutionController); + if (priorityQueueSetFactory instanceof AsyncKeyedStateBackend) { + timerService = + new InternalTimerServiceAsyncImpl<>( + taskIOMetricGroup, + localKeyGroupRange, + keyContext, + processingTimeService, + createTimerPriorityQueue( + PROCESSING_TIMER_PREFIX + name, timerSerializer), + createTimerPriorityQueue( + EVENT_TIMER_PREFIX + name, timerSerializer), + cancellationContext); + } else { + timerService = + new InternalTimerServiceImpl<>( + taskIOMetricGroup, + localKeyGroupRange, + keyContext, + processingTimeService, + createTimerPriorityQueue( + PROCESSING_TIMER_PREFIX + name, timerSerializer), + createTimerPriorityQueue( + EVENT_TIMER_PREFIX + name, timerSerializer), + cancellationContext); + } timerServices.put(name, timerService); } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java index f756d2d40eb..682a3d8aeb2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.BiConsumerWithException; import org.apache.flink.util.function.ThrowingRunnable; @@ -54,8 +55,7 @@ public class InternalTimerServiceAsyncImpl<K, N> extends InternalTimerServiceImp ProcessingTimeService processingTimeService, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue, - StreamTaskCancellationContext cancellationContext, - AsyncExecutionController<K> asyncExecutionController) { + StreamTaskCancellationContext cancellationContext) { super( taskIOMetricGroup, localKeyGroupRange, @@ -64,11 +64,17 @@ public class InternalTimerServiceAsyncImpl<K, N> extends InternalTimerServiceImp processingTimeTimersQueue, eventTimeTimersQueue, cancellationContext); - this.asyncExecutionController = asyncExecutionController; + } + + public void setup(AsyncExecutionController<K> asyncExecutionController) { + if (asyncExecutionController != null) { + this.asyncExecutionController = asyncExecutionController; + } } @Override void onProcessingTime(long time) throws Exception { + Preconditions.checkNotNull(asyncExecutionController); // null out the timer in case the Triggerable calls registerProcessingTimeTimer() // inside the callback. nextTimer = null; @@ -99,6 +105,7 @@ public class InternalTimerServiceAsyncImpl<K, N> extends InternalTimerServiceImp */ @Override public void advanceWatermark(long time) throws Exception { + Preconditions.checkNotNull(asyncExecutionController); currentWatermark = time; InternalTimer<K, N> timer; while ((timer = eventTimeTimersQueue.peek()) != null diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java index 57f0b3f6f05..5d027671b43 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.operators.sorted.state; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; @@ -74,17 +73,6 @@ public class BatchExecutionInternalTimeServiceManager<K> return timerService; } - @Override - public <N> InternalTimerService<N> getAsyncInternalTimerService( - String name, - TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - Triggerable<K, N> triggerable, - AsyncExecutionController<K> asyncExecutionController) { - throw new UnsupportedOperationException( - "Async timer service is not supported in BATCH execution mode."); - } - @Override public void advanceWatermark(Watermark watermark) { if (watermark.getTimestamp() == Long.MAX_VALUE) { diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java index 14cfb3b1621..d53197b5547 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java @@ -21,6 +21,8 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler; import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.asyncprocessing.MockStateExecutor; @@ -31,6 +33,7 @@ import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.PriorityQueueSetFactory; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -40,6 +43,12 @@ import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link InternalTimerServiceAsyncImpl}. */ @@ -48,6 +57,7 @@ class InternalTimerServiceAsyncImplTest { private TestKeyContext keyContext; private TestProcessingTimeService processingTimeService; private InternalTimerServiceAsyncImpl<Integer, String> service; + private KeyGroupRange testKeyGroupList; private AsyncFrameworkExceptionHandler exceptionHandler = new AsyncFrameworkExceptionHandler() { @@ -72,7 +82,7 @@ class InternalTimerServiceAsyncImplTest { null); // ensure arbitrary key is in the key group int totalKeyGroups = 128; - KeyGroupRange testKeyGroupList = new KeyGroupRange(0, totalKeyGroups - 1); + testKeyGroupList = new KeyGroupRange(0, totalKeyGroups - 1); keyContext = new TestKeyContext(); @@ -191,6 +201,119 @@ class InternalTimerServiceAsyncImplTest { assertThat(testTriggerable.eventTriggerCount).isEqualTo(3); } + @Test + void testSnapshotAndRestore() throws Exception { + service.startTimerService( + IntSerializer.INSTANCE, StringSerializer.INSTANCE, new TestTriggerable()); + keyContext.setCurrentKey("key-1"); + // get two different keys + int key1 = getKeyInKeyGroupRange(testKeyGroupList, testKeyGroupList.getNumberOfKeyGroups()); + int key2 = getKeyInKeyGroupRange(testKeyGroupList, testKeyGroupList.getNumberOfKeyGroups()); + while (key2 == key1) { + key2 = getKeyInKeyGroupRange(testKeyGroupList, testKeyGroupList.getNumberOfKeyGroups()); + } + + keyContext.setCurrentKey(key1); + + service.registerProcessingTimeTimer("ciao", 10); + service.registerEventTimeTimer("hello", 10); + + keyContext.setCurrentKey(key2); + + service.registerEventTimeTimer("ciao", 10); + service.registerProcessingTimeTimer("hello", 10); + + assertThat(service.numProcessingTimeTimers()).isEqualTo(2); + assertThat(service.numProcessingTimeTimers("hello")).isOne(); + assertThat(service.numProcessingTimeTimers("ciao")).isOne(); + assertThat(service.numEventTimeTimers()).isEqualTo(2); + assertThat(service.numEventTimeTimers("hello")).isOne(); + assertThat(service.numEventTimeTimers("ciao")).isOne(); + + Map<Integer, byte[]> snapshot = new HashMap<>(); + for (Integer keyGroupIndex : testKeyGroupList) { + try (ByteArrayOutputStream outStream = new ByteArrayOutputStream()) { + InternalTimersSnapshot<Integer, String> timersSnapshot = + service.snapshotTimersForKeyGroup(keyGroupIndex); + + InternalTimersSnapshotReaderWriters.getWriterForVersion( + InternalTimerServiceSerializationProxy.VERSION, + timersSnapshot, + service.getKeySerializer(), + service.getNamespaceSerializer()) + .writeTimersSnapshot(new DataOutputViewStreamWrapper(outStream)); + + snapshot.put(keyGroupIndex, outStream.toByteArray()); + } + } + + TestTriggerable testTriggerable = new TestTriggerable(); + testTriggerable.eventTriggerCount = 0; + testTriggerable.processingTriggerCount = 0; + + processingTimeService = new TestProcessingTimeService(); + + service = + restoreTimerService( + snapshot, + InternalTimerServiceSerializationProxy.VERSION, + testTriggerable, + keyContext, + processingTimeService); + + processingTimeService.setCurrentTime(10); + service.advanceWatermark(10); + + assertThat(testTriggerable.eventTriggerCount).isEqualTo(2); + assertThat(testTriggerable.processingTriggerCount).isEqualTo(2); + + assertThat(service.numEventTimeTimers()).isZero(); + } + + private InternalTimerServiceAsyncImpl<Integer, String> restoreTimerService( + Map<Integer, byte[]> state, + int snapshotVersion, + Triggerable<Integer, String> triggerable, + KeyContext keyContext, + ProcessingTimeService processingTimeService) + throws Exception { + + // create an empty service + InternalTimerServiceAsyncImpl<Integer, String> service = + createInternalTimerService( + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup() + .getIOMetricGroup(), + testKeyGroupList, + keyContext, + processingTimeService, + IntSerializer.INSTANCE, + StringSerializer.INSTANCE, + new HeapPriorityQueueSetFactory( + testKeyGroupList, testKeyGroupList.getNumberOfKeyGroups(), 128), + asyncExecutionController); + + // restore the timers + for (Integer keyGroupIndex : testKeyGroupList) { + if (state.containsKey(keyGroupIndex)) { + try (ByteArrayInputStream inputStream = + new ByteArrayInputStream(state.get(keyGroupIndex))) { + InternalTimersSnapshot<?, ?> restoredTimersSnapshot = + InternalTimersSnapshotReaderWriters.getReaderForVersion( + snapshotVersion, + InternalTimerServiceImplTest.class.getClassLoader()) + .readTimersSnapshot( + new DataInputViewStreamWrapper(inputStream)); + + service.restoreTimersForKeyGroup(restoredTimersSnapshot, keyGroupIndex); + } + } + } + + // initialize the service + service.startTimerService(IntSerializer.INSTANCE, StringSerializer.INSTANCE, triggerable); + return service; + } + private static <K, N> InternalTimerServiceAsyncImpl<K, N> createInternalTimerService( TaskIOMetricGroup taskIOMetricGroup, KeyGroupRange keyGroupsList, @@ -204,15 +327,27 @@ class InternalTimerServiceAsyncImplTest { TimerSerializer<K, N> timerSerializer = new TimerSerializer<>(keySerializer, namespaceSerializer); - return new InternalTimerServiceAsyncImpl<>( - taskIOMetricGroup, - keyGroupsList, - keyContext, - processingTimeService, - priorityQueueSetFactory.create("__async_processing_timers", timerSerializer), - priorityQueueSetFactory.create("__async_event_timers", timerSerializer), - StreamTaskCancellationContext.alwaysRunning(), - asyncExecutionController); + InternalTimerServiceAsyncImpl serviceAsync = + new InternalTimerServiceAsyncImpl<>( + taskIOMetricGroup, + keyGroupsList, + keyContext, + processingTimeService, + priorityQueueSetFactory.create( + "__async_processing_timers", timerSerializer), + priorityQueueSetFactory.create("__async_event_timers", timerSerializer), + StreamTaskCancellationContext.alwaysRunning()); + serviceAsync.setup(asyncExecutionController); + return serviceAsync; + } + + private static int getKeyInKeyGroupRange(KeyGroupRange range, int maxParallelism) { + Random rand = new Random(System.currentTimeMillis()); + int result = rand.nextInt(); + while (!range.contains(KeyGroupRangeAssignment.assignToKeyGroup(result, maxParallelism))) { + result = rand.nextInt(); + } + return result; } private static class SameTimerTriggerable implements Triggerable<Integer, String> { diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java index 87f19f4032d..e0b76c4123f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; @@ -96,16 +95,6 @@ class MailboxWatermarkProcessorTest { throw new UnsupportedOperationException(); } - @Override - public <N> InternalTimerService<N> getAsyncInternalTimerService( - String name, - TypeSerializer<Object> keySerializer, - TypeSerializer<N> namespaceSerializer, - Triggerable<Object, N> triggerable, - AsyncExecutionController<Object> asyncExecutionController) { - throw new UnsupportedOperationException(); - } - @Override public void advanceWatermark(Watermark watermark) throws Exception { throw new UnsupportedOperationException();