This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7d546b6321e08de7fb75f6104914c216b0522e95 Author: Seth Wiesman <sjwies...@gmail.com> AuthorDate: Wed Jul 10 19:33:29 2019 -0400 [FLINK-13094][state-processor-api] Support iterating over registered timers in the internal timer service Certain operations require querying the registered timers for a key but timers are stored TIMESTAMP -> KEY[] which would only support O(n) reads. Because the timer service sits in the per-record code path we opt to add forEach*RegisteredTimer methods so consumers can copy data into a more appropriate data structure to avoid any accidental performance degredations. --- .../api/operators/InternalTimerService.java | 13 ++++ .../api/operators/InternalTimerServiceImpl.java | 21 ++++++ .../operators/InternalTimerServiceImplTest.java | 79 ++++++++++++++++++++++ .../api/operators/TestInternalTimerService.java | 17 +++++ 4 files changed, 130 insertions(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java index cb171fb..f736053 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; +import org.apache.flink.util.function.BiConsumerWithException; /** * Interface for working with time and timers. @@ -58,4 +59,16 @@ public interface InternalTimerService<N> { * Deletes the timer for the given key and namespace. */ void deleteEventTimeTimer(N namespace, long time); + + /** + * Performs an action for each registered timer. The timer service will + * set the key context for the timers key before invoking the action. + */ + void forEachEventTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception; + + /** + * Performs an action for each registered timer. The timer service will + * set the key context for the timers key before invoking the action. + */ + void forEachProcessingTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java index dd88bc6..08f3a1d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java @@ -29,6 +29,7 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.BiConsumerWithException; import java.util.ArrayList; import java.util.Collections; @@ -226,6 +227,26 @@ public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N>, } @Override + public void forEachEventTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception { + foreachTimer(consumer, eventTimeTimersQueue); + } + + @Override + public void forEachProcessingTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception { + foreachTimer(consumer, processingTimeTimersQueue); + } + + private void foreachTimer(BiConsumerWithException<N, Long, Exception> consumer, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> queue) throws Exception { + try (final CloseableIterator<TimerHeapInternalTimer<K, N>> iterator = queue.iterator()) { + while (iterator.hasNext()) { + final TimerHeapInternalTimer<K, N> timer = iterator.next(); + keyContext.setCurrentKey(timer.getKey()); + consumer.accept(timer.getNamespace(), timer.getTimestamp()); + } + } + } + + @Override public void onProcessingTime(long time) throws Exception { // null out the timer in case the Triggerable calls registerProcessingTimeTimer() // inside the callback. diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java index 99d2700..777ef2b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.state.KeyGroupRange; @@ -560,6 +561,84 @@ public class InternalTimerServiceImplTest { assertEquals(0, timerService.numEventTimeTimers()); } + /** + * This also verifies that we iterate over all timers and set the key context on each element. + */ + @Test + public void testForEachEventTimeTimers() throws Exception { + @SuppressWarnings("unchecked") + Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class); + + TestKeyContext keyContext = new TestKeyContext(); + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + InternalTimerServiceImpl<Integer, String> timerService = + createAndStartInternalTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, createQueueFactory()); + + // get two different keys + int key1 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + int key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + while (key2 == key1) { + key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + } + + Set<Tuple3<Integer, String, Long>> timers = new HashSet<>(); + timers.add(Tuple3.of(key1, "ciao", 10L)); + timers.add(Tuple3.of(key1, "hello", 10L)); + timers.add(Tuple3.of(key2, "ciao", 10L)); + timers.add(Tuple3.of(key2, "hello", 10L)); + + for (Tuple3<Integer, String, Long> timer : timers) { + keyContext.setCurrentKey(timer.f0); + timerService.registerEventTimeTimer(timer.f1, timer.f2); + } + + Set<Tuple3<Integer, String, Long>> results = new HashSet<>(); + timerService.forEachEventTimeTimer((namespace, timer) -> { + results.add(Tuple3.of((Integer) keyContext.getCurrentKey(), namespace, timer)); + }); + + Assert.assertEquals(timers, results); + } + + /** + * This also verifies that we iterate over all timers and set the key context on each element. + */ + @Test + public void testForEachProcessingTimeTimers() throws Exception { + @SuppressWarnings("unchecked") + Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class); + + TestKeyContext keyContext = new TestKeyContext(); + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + InternalTimerServiceImpl<Integer, String> timerService = + createAndStartInternalTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, createQueueFactory()); + + // get two different keys + int key1 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + int key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + while (key2 == key1) { + key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism); + } + + Set<Tuple3<Integer, String, Long>> timers = new HashSet<>(); + timers.add(Tuple3.of(key1, "ciao", 10L)); + timers.add(Tuple3.of(key1, "hello", 10L)); + timers.add(Tuple3.of(key2, "ciao", 10L)); + timers.add(Tuple3.of(key2, "hello", 10L)); + + for (Tuple3<Integer, String, Long> timer : timers) { + keyContext.setCurrentKey(timer.f0); + timerService.registerProcessingTimeTimer(timer.f1, timer.f2); + } + + Set<Tuple3<Integer, String, Long>> results = new HashSet<>(); + timerService.forEachProcessingTimeTimer((namespace, timer) -> { + results.add(Tuple3.of((Integer) keyContext.getCurrentKey(), namespace, timer)); + }); + + Assert.assertEquals(timers, results); + } + @Test public void testSnapshotAndRestore() throws Exception { testSnapshotAndRestore(InternalTimerServiceSerializationProxy.VERSION); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java index f8b095c..777dcbb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; +import org.apache.flink.util.function.BiConsumerWithException; import java.util.ArrayList; import java.util.Collection; @@ -108,6 +109,22 @@ public class TestInternalTimerService<K, N> implements InternalTimerService<N> { } } + @Override + public void forEachEventTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception { + for (Timer<K, N> timer : watermarkTimers) { + keyContext.setCurrentKey(timer.getKey()); + consumer.accept(timer.getNamespace(), timer.getTimestamp()); + } + } + + @Override + public void forEachProcessingTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception { + for (Timer<K, N> timer : processingTimeTimers) { + keyContext.setCurrentKey(timer.getKey()); + consumer.accept(timer.getNamespace(), timer.getTimestamp()); + } + } + public Collection<Timer<K, N>> advanceProcessingTime(long time) throws Exception { List<Timer<K, N>> result = new ArrayList<>();