[FLINK-4877] Rename ProcessingTimeCallback.trigger() to onProcessingTime()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/770f2f83 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/770f2f83 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/770f2f83 Branch: refs/heads/master Commit: 770f2f83a81b2810aff171b2f56390ef686f725a Parents: 94a3f25 Author: Aljoscha Krettek <[email protected]> Authored: Tue Oct 18 11:11:10 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Oct 21 19:03:05 2016 +0200 ---------------------------------------------------------------------- .../connectors/fs/bucketing/BucketingSink.java | 2 +- .../kafka/internals/AbstractFetcher.java | 2 +- .../api/operators/HeapInternalTimerService.java | 2 +- .../api/operators/StreamSourceContexts.java | 2 +- .../operators/ExtractTimestampsOperator.java | 2 +- .../TimestampsAndPeriodicWatermarksOperator.java | 2 +- ...stractAlignedProcessingTimeWindowOperator.java | 2 +- .../runtime/tasks/ProcessingTimeCallback.java | 2 +- .../tasks/SystemProcessingTimeService.java | 2 +- .../runtime/tasks/TestProcessingTimeService.java | 4 ++-- .../runtime/operators/StreamTaskTimerTest.java | 4 ++-- .../operators/TestProcessingTimeServiceTest.java | 4 ++-- .../tasks/SystemProcessingTimeServiceTest.java | 18 +++++++++--------- .../streaming/runtime/StreamTaskTimerITCase.java | 4 ++-- 14 files changed, 26 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index 66e704c..52de00d 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -422,7 +422,7 @@ public class BucketingSink<T> } @Override - public void trigger(long timestamp) throws Exception { + public void onProcessingTime(long timestamp) throws Exception { long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); checkForInactiveBuckets(currentProcessingTime); http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index 58bca52..3350b06 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -495,7 +495,7 @@ public abstract class AbstractFetcher<T, KPH> { } @Override - public void trigger(long timestamp) throws Exception { + public void onProcessingTime(long timestamp) throws Exception { long minAcrossAll = Long.MAX_VALUE; for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) { http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java index 15258cf..8884c3d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java @@ -170,7 +170,7 @@ public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, } @Override - public void trigger(long time) throws Exception { + public void onProcessingTime(long time) throws Exception { // null out the timer in case the Triggerable calls registerProcessingTimeTimer() // inside the callback. nextTimer = null; http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java index 66d2ac2..a6a273f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java @@ -215,7 +215,7 @@ public class StreamSourceContexts { } @Override - public void trigger(long timestamp) { + public void onProcessingTime(long timestamp) { final long currentTime = timeService.getCurrentProcessingTime(); if (currentTime > nextWatermarkTime) { http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java index 5f5028a..a10e457 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java @@ -73,7 +73,7 @@ public class ExtractTimestampsOperator<T> } @Override - public void trigger(long timestamp) throws Exception { + public void onProcessingTime(long timestamp) throws Exception { // register next timer long newWatermark = userFunction.getCurrentWatermark(); if (newWatermark > currentWatermark) { http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java index ba72659..4defb96 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java @@ -69,7 +69,7 @@ public class TimestampsAndPeriodicWatermarksOperator<T> } @Override - public void trigger(long timestamp) throws Exception { + public void onProcessingTime(long timestamp) throws Exception { // register next timer Watermark newWatermark = userFunction.getCurrentWatermark(); if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) { http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java index 80a317e..24fd0de 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java @@ -207,7 +207,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, } @Override - public void trigger(long timestamp) throws Exception { + public void onProcessingTime(long timestamp) throws Exception { // first we check if we actually trigger the window function if (timestamp == nextEvaluationTime) { // compute and output the results http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java index aca1718..035939f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java @@ -36,5 +36,5 @@ public interface ProcessingTimeCallback { * * @param timestamp The timestamp for which the trigger event was scheduled. */ - void trigger(long timestamp) throws Exception ; + void onProcessingTime(long timestamp) throws Exception ; } http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java index b433f8d..153aedf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java @@ -179,7 +179,7 @@ public class SystemProcessingTimeService extends ProcessingTimeService { public void run() { synchronized (lock) { try { - target.trigger(timestamp); + target.onProcessingTime(timestamp); } catch (Throwable t) { TimerException asyncException = new TimerException(t); exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException); http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java index 3e6c273..2ca287a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java @@ -67,7 +67,7 @@ public class TestProcessingTimeService extends ProcessingTimeService { for (Map.Entry<Long, List<ScheduledTimerFuture>> tasks: toRun) { long now = tasks.getKey(); for (ScheduledTimerFuture task: tasks.getValue()) { - task.getProcessingTimeCallback().trigger(now); + task.getProcessingTimeCallback().onProcessingTime(now); } } } @@ -89,7 +89,7 @@ public class TestProcessingTimeService extends ProcessingTimeService { if (timestamp <= currentTime) { try { - target.trigger(timestamp); + target.onProcessingTime(timestamp); } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java index 87241dd..f23c6d2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java @@ -67,7 +67,7 @@ public class StreamTaskTimerTest { // first one spawns thread mapTask.getProcessingTimeService().registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() { @Override - public void trigger(long timestamp) { + public void onProcessingTime(long timestamp) { } }); @@ -163,7 +163,7 @@ public class StreamTaskTimerTest { } @Override - public void trigger(long timestamp) { + public void onProcessingTime(long timestamp) { try { assertEquals(expectedTimestamp, timestamp); assertEquals(expectedInSequence, numInSequence); http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java index db56717..a3b231b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java @@ -73,14 +73,14 @@ public class TestProcessingTimeServiceTest { // register 2 tasks mapTask.getProcessingTimeService().registerTimer(30, new ProcessingTimeCallback() { @Override - public void trigger(long timestamp) { + public void onProcessingTime(long timestamp) { } }); mapTask.getProcessingTimeService().registerTimer(40, new ProcessingTimeCallback() { @Override - public void trigger(long timestamp) { + public void onProcessingTime(long timestamp) { } }); http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java index dc679ab..797e18a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java @@ -52,7 +52,7 @@ public class SystemProcessingTimeServiceTest { // schedule something ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() { @Override - public void trigger(long timestamp) { + public void onProcessingTime(long timestamp) { assertTrue(Thread.holdsLock(lock)); } }); @@ -88,7 +88,7 @@ public class SystemProcessingTimeServiceTest { // the task should trigger immediately and should block until terminated with interruption timer.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() { @Override - public void trigger(long timestamp) throws Exception { + public void onProcessingTime(long timestamp) throws Exception { latch.trigger(); Thread.sleep(100000000); } @@ -106,7 +106,7 @@ public class SystemProcessingTimeServiceTest { try { timer.registerTimer(System.currentTimeMillis() + 1000, new ProcessingTimeCallback() { @Override - public void trigger(long timestamp) {} + public void onProcessingTime(long timestamp) {} }); fail("should result in an exception"); @@ -142,7 +142,7 @@ public class SystemProcessingTimeServiceTest { timer.registerTimer(System.currentTimeMillis() + 20, new ProcessingTimeCallback() { @Override - public void trigger(long timestamp) throws Exception { + public void onProcessingTime(long timestamp) throws Exception { scopeLock.lock(); try { latch.trigger(); @@ -164,7 +164,7 @@ public class SystemProcessingTimeServiceTest { // should be able to schedule more tasks (that never get executed) ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() - 5, new ProcessingTimeCallback() { @Override - public void trigger(long timestamp) throws Exception { + public void onProcessingTime(long timestamp) throws Exception { throw new Exception("test"); } }); @@ -199,7 +199,7 @@ public class SystemProcessingTimeServiceTest { // schedule something ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() + 100000000, new ProcessingTimeCallback() { @Override - public void trigger(long timestamp) {} + public void onProcessingTime(long timestamp) {} }); assertEquals(1, timer.getNumTasksScheduled()); @@ -234,7 +234,7 @@ public class SystemProcessingTimeServiceTest { timeServiceProvider.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() { @Override - public void trigger(long timestamp) throws Exception { + public void onProcessingTime(long timestamp) throws Exception { throw new Exception("Exception in Timer"); } }); @@ -258,7 +258,7 @@ public class SystemProcessingTimeServiceTest { // to register some additional timers out of order timer.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() { @Override - public void trigger(long timestamp) throws Exception { + public void onProcessingTime(long timestamp) throws Exception { sync.await(); } }); @@ -273,7 +273,7 @@ public class SystemProcessingTimeServiceTest { final ArrayBlockingQueue<Long> timestamps = new ArrayBlockingQueue<>(4); ProcessingTimeCallback trigger = new ProcessingTimeCallback() { @Override - public void trigger(long timestamp) { + public void onProcessingTime(long timestamp) { timestamps.add(timestamp); } }; http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java index c0cd0be..48e6fae 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java @@ -201,7 +201,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase { } @Override - public void trigger(long time) throws Exception { + public void onProcessingTime(long time) throws Exception { if (!semaphore.tryAcquire()) { Assert.fail("Concurrent invocation of operator functions."); } @@ -276,7 +276,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase { @Override - public void trigger(long time) throws Exception { + public void onProcessingTime(long time) throws Exception { if (!semaphore.tryAcquire()) { Assert.fail("Concurrent invocation of operator functions."); }
