This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7d546b6321e08de7fb75f6104914c216b0522e95
Author: Seth Wiesman <sjwies...@gmail.com>
AuthorDate: Wed Jul 10 19:33:29 2019 -0400

    [FLINK-13094][state-processor-api] Support iterating over registered timers 
in the internal timer service
    
    Certain operations require querying the registered timers for a key but
    timers are stored TIMESTAMP -> KEY[] which would only support O(n)
    reads. Because the timer service sits in the per-record code path we opt
    to add forEach*RegisteredTimer methods so consumers can copy data into a
    more appropriate data structure to avoid any accidental performance
    degredations.
---
 .../api/operators/InternalTimerService.java        | 13 ++++
 .../api/operators/InternalTimerServiceImpl.java    | 21 ++++++
 .../operators/InternalTimerServiceImplTest.java    | 79 ++++++++++++++++++++++
 .../api/operators/TestInternalTimerService.java    | 17 +++++
 4 files changed, 130 insertions(+)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
index cb171fb..f736053 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.function.BiConsumerWithException;
 
 /**
  * Interface for working with time and timers.
@@ -58,4 +59,16 @@ public interface InternalTimerService<N> {
         * Deletes the timer for the given key and namespace.
         */
        void deleteEventTimeTimer(N namespace, long time);
+
+       /**
+        * Performs an action for each registered timer. The timer service will
+        * set the key context for the timers key before invoking the action.
+        */
+       void forEachEventTimeTimer(BiConsumerWithException<N, Long, Exception> 
consumer) throws Exception;
+
+       /**
+        * Performs an action for each registered timer. The timer service will
+        * set the key context for the timers key before invoking the action.
+        */
+       void forEachProcessingTimeTimer(BiConsumerWithException<N, Long, 
Exception> consumer) throws Exception;
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
index dd88bc6..08f3a1d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -226,6 +227,26 @@ public class InternalTimerServiceImpl<K, N> implements 
InternalTimerService<N>,
        }
 
        @Override
+       public void forEachEventTimeTimer(BiConsumerWithException<N, Long, 
Exception> consumer) throws Exception {
+               foreachTimer(consumer, eventTimeTimersQueue);
+       }
+
+       @Override
+       public void forEachProcessingTimeTimer(BiConsumerWithException<N, Long, 
Exception> consumer) throws Exception {
+               foreachTimer(consumer, processingTimeTimersQueue);
+       }
+
+       private void foreachTimer(BiConsumerWithException<N, Long, Exception> 
consumer, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> queue) 
throws Exception {
+               try (final CloseableIterator<TimerHeapInternalTimer<K, N>> 
iterator = queue.iterator()) {
+                       while (iterator.hasNext()) {
+                               final TimerHeapInternalTimer<K, N> timer = 
iterator.next();
+                               keyContext.setCurrentKey(timer.getKey());
+                               consumer.accept(timer.getNamespace(), 
timer.getTimestamp());
+                       }
+               }
+       }
+
+       @Override
        public void onProcessingTime(long time) throws Exception {
                // null out the timer in case the Triggerable calls 
registerProcessingTimeTimer()
                // inside the callback.
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java
index 99d2700..777ef2b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -560,6 +561,84 @@ public class InternalTimerServiceImplTest {
                assertEquals(0, timerService.numEventTimeTimers());
        }
 
+       /**
+        * This also verifies that we iterate over all timers and set the key 
context on each element.
+        */
+       @Test
+       public void testForEachEventTimeTimers() throws Exception {
+               @SuppressWarnings("unchecked")
+               Triggerable<Integer, String> mockTriggerable = 
mock(Triggerable.class);
+
+               TestKeyContext keyContext = new TestKeyContext();
+               TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+               InternalTimerServiceImpl<Integer, String> timerService =
+                       createAndStartInternalTimerService(mockTriggerable, 
keyContext, processingTimeService, testKeyGroupRange, createQueueFactory());
+
+               // get two different keys
+               int key1 = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               int key2 = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               while (key2 == key1) {
+                       key2 = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               }
+
+               Set<Tuple3<Integer, String, Long>> timers = new HashSet<>();
+               timers.add(Tuple3.of(key1, "ciao", 10L));
+               timers.add(Tuple3.of(key1, "hello", 10L));
+               timers.add(Tuple3.of(key2, "ciao", 10L));
+               timers.add(Tuple3.of(key2, "hello", 10L));
+
+               for (Tuple3<Integer, String, Long> timer : timers) {
+                       keyContext.setCurrentKey(timer.f0);
+                       timerService.registerEventTimeTimer(timer.f1, timer.f2);
+               }
+
+               Set<Tuple3<Integer, String, Long>> results = new HashSet<>();
+               timerService.forEachEventTimeTimer((namespace, timer) -> {
+                       results.add(Tuple3.of((Integer) 
keyContext.getCurrentKey(), namespace, timer));
+               });
+
+               Assert.assertEquals(timers, results);
+       }
+
+       /**
+        * This also verifies that we iterate over all timers and set the key 
context on each element.
+        */
+       @Test
+       public void testForEachProcessingTimeTimers() throws Exception {
+               @SuppressWarnings("unchecked")
+               Triggerable<Integer, String> mockTriggerable = 
mock(Triggerable.class);
+
+               TestKeyContext keyContext = new TestKeyContext();
+               TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+               InternalTimerServiceImpl<Integer, String> timerService =
+                       createAndStartInternalTimerService(mockTriggerable, 
keyContext, processingTimeService, testKeyGroupRange, createQueueFactory());
+
+               // get two different keys
+               int key1 = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               int key2 = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               while (key2 == key1) {
+                       key2 = getKeyInKeyGroupRange(testKeyGroupRange, 
maxParallelism);
+               }
+
+               Set<Tuple3<Integer, String, Long>> timers = new HashSet<>();
+               timers.add(Tuple3.of(key1, "ciao", 10L));
+               timers.add(Tuple3.of(key1, "hello", 10L));
+               timers.add(Tuple3.of(key2, "ciao", 10L));
+               timers.add(Tuple3.of(key2, "hello", 10L));
+
+               for (Tuple3<Integer, String, Long> timer : timers) {
+                       keyContext.setCurrentKey(timer.f0);
+                       timerService.registerProcessingTimeTimer(timer.f1, 
timer.f2);
+               }
+
+               Set<Tuple3<Integer, String, Long>> results = new HashSet<>();
+               timerService.forEachProcessingTimeTimer((namespace, timer) -> {
+                       results.add(Tuple3.of((Integer) 
keyContext.getCurrentKey(), namespace, timer));
+               });
+
+               Assert.assertEquals(timers, results);
+       }
+
        @Test
        public void testSnapshotAndRestore() throws Exception {
                
testSnapshotAndRestore(InternalTimerServiceSerializationProxy.VERSION);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
index f8b095c..777dcbb 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.function.BiConsumerWithException;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -108,6 +109,22 @@ public class TestInternalTimerService<K, N> implements 
InternalTimerService<N> {
                }
        }
 
+       @Override
+       public void forEachEventTimeTimer(BiConsumerWithException<N, Long, 
Exception> consumer) throws Exception {
+               for (Timer<K, N> timer : watermarkTimers) {
+                       keyContext.setCurrentKey(timer.getKey());
+                       consumer.accept(timer.getNamespace(), 
timer.getTimestamp());
+               }
+       }
+
+       @Override
+       public void forEachProcessingTimeTimer(BiConsumerWithException<N, Long, 
Exception> consumer) throws Exception {
+               for (Timer<K, N> timer : processingTimeTimers) {
+                       keyContext.setCurrentKey(timer.getKey());
+                       consumer.accept(timer.getNamespace(), 
timer.getTimestamp());
+               }
+       }
+
        public Collection<Timer<K, N>> advanceProcessingTime(long time) throws 
Exception {
                List<Timer<K, N>> result = new ArrayList<>();
 

Reply via email to