[FLINK-4877] Rename ProcessingTimeCallback.trigger() to onProcessingTime()

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/770f2f83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/770f2f83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/770f2f83

Branch: refs/heads/master
Commit: 770f2f83a81b2810aff171b2f56390ef686f725a
Parents: 94a3f25
Author: Aljoscha Krettek <[email protected]>
Authored: Tue Oct 18 11:11:10 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Oct 21 19:03:05 2016 +0200

----------------------------------------------------------------------
 .../connectors/fs/bucketing/BucketingSink.java    |  2 +-
 .../kafka/internals/AbstractFetcher.java          |  2 +-
 .../api/operators/HeapInternalTimerService.java   |  2 +-
 .../api/operators/StreamSourceContexts.java       |  2 +-
 .../operators/ExtractTimestampsOperator.java      |  2 +-
 .../TimestampsAndPeriodicWatermarksOperator.java  |  2 +-
 ...stractAlignedProcessingTimeWindowOperator.java |  2 +-
 .../runtime/tasks/ProcessingTimeCallback.java     |  2 +-
 .../tasks/SystemProcessingTimeService.java        |  2 +-
 .../runtime/tasks/TestProcessingTimeService.java  |  4 ++--
 .../runtime/operators/StreamTaskTimerTest.java    |  4 ++--
 .../operators/TestProcessingTimeServiceTest.java  |  4 ++--
 .../tasks/SystemProcessingTimeServiceTest.java    | 18 +++++++++---------
 .../streaming/runtime/StreamTaskTimerITCase.java  |  4 ++--
 14 files changed, 26 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 66e704c..52de00d 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -422,7 +422,7 @@ public class BucketingSink<T>
        }
 
        @Override
-       public void trigger(long timestamp) throws Exception {
+       public void onProcessingTime(long timestamp) throws Exception {
                long currentProcessingTime = 
processingTimeService.getCurrentProcessingTime();
 
                checkForInactiveBuckets(currentProcessingTime);

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 58bca52..3350b06 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -495,7 +495,7 @@ public abstract class AbstractFetcher<T, KPH> {
                }
                
                @Override
-               public void trigger(long timestamp) throws Exception {
+               public void onProcessingTime(long timestamp) throws Exception {
 
                        long minAcrossAll = Long.MAX_VALUE;
                        for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, 
?> state : allPartitions) {

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
index 15258cf..8884c3d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
@@ -170,7 +170,7 @@ public class HeapInternalTimerService<K, N> implements 
InternalTimerService<N>,
        }
 
        @Override
-       public void trigger(long time) throws Exception {
+       public void onProcessingTime(long time) throws Exception {
                // null out the timer in case the Triggerable calls 
registerProcessingTimeTimer()
                // inside the callback.
                nextTimer = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index 66d2ac2..a6a273f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -215,7 +215,7 @@ public class StreamSourceContexts {
                        }
 
                        @Override
-                       public void trigger(long timestamp) {
+                       public void onProcessingTime(long timestamp) {
                                final long currentTime = 
timeService.getCurrentProcessingTime();
 
                                if (currentTime > nextWatermarkTime) {

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
index 5f5028a..a10e457 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
@@ -73,7 +73,7 @@ public class ExtractTimestampsOperator<T>
        }
 
        @Override
-       public void trigger(long timestamp) throws Exception {
+       public void onProcessingTime(long timestamp) throws Exception {
                // register next timer
                long newWatermark = userFunction.getCurrentWatermark();
                if (newWatermark > currentWatermark) {

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
index ba72659..4defb96 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
@@ -69,7 +69,7 @@ public class TimestampsAndPeriodicWatermarksOperator<T>
        }
 
        @Override
-       public void trigger(long timestamp) throws Exception {
+       public void onProcessingTime(long timestamp) throws Exception {
                // register next timer
                Watermark newWatermark = userFunction.getCurrentWatermark();
                if (newWatermark != null && newWatermark.getTimestamp() > 
currentWatermark) {

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 80a317e..24fd0de 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -207,7 +207,7 @@ public abstract class 
AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
        }
 
        @Override
-       public void trigger(long timestamp) throws Exception {
+       public void onProcessingTime(long timestamp) throws Exception {
                // first we check if we actually trigger the window function
                if (timestamp == nextEvaluationTime) {
                        // compute and output the results

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
index aca1718..035939f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
@@ -36,5 +36,5 @@ public interface ProcessingTimeCallback {
         * 
         * @param timestamp The timestamp for which the trigger event was 
scheduled.
         */
-       void trigger(long timestamp) throws Exception ;
+       void onProcessingTime(long timestamp) throws Exception ;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index b433f8d..153aedf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -179,7 +179,7 @@ public class SystemProcessingTimeService extends 
ProcessingTimeService {
                public void run() {
                        synchronized (lock) {
                                try {
-                                       target.trigger(timestamp);
+                                       target.onProcessingTime(timestamp);
                                } catch (Throwable t) {
                                        TimerException asyncException = new 
TimerException(t);
                                        
exceptionHandler.handleAsyncException("Caught exception while processing 
timer.", asyncException);

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index 3e6c273..2ca287a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -67,7 +67,7 @@ public class TestProcessingTimeService extends 
ProcessingTimeService {
                        for (Map.Entry<Long, List<ScheduledTimerFuture>> tasks: 
toRun) {
                                long now = tasks.getKey();
                                for (ScheduledTimerFuture task: 
tasks.getValue()) {
-                                       
task.getProcessingTimeCallback().trigger(now);
+                                       
task.getProcessingTimeCallback().onProcessingTime(now);
                                }
                        }
                }
@@ -89,7 +89,7 @@ public class TestProcessingTimeService extends 
ProcessingTimeService {
 
                if (timestamp <= currentTime) {
                        try {
-                               target.trigger(timestamp);
+                               target.onProcessingTime(timestamp);
                        } catch (Exception e) {
                                throw new RuntimeException(e);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index 87241dd..f23c6d2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -67,7 +67,7 @@ public class StreamTaskTimerTest {
                // first one spawns thread
                
mapTask.getProcessingTimeService().registerTimer(System.currentTimeMillis(), 
new ProcessingTimeCallback() {
                        @Override
-                       public void trigger(long timestamp) {
+                       public void onProcessingTime(long timestamp) {
                        }
                });
 
@@ -163,7 +163,7 @@ public class StreamTaskTimerTest {
                }
 
                @Override
-               public void trigger(long timestamp) {
+               public void onProcessingTime(long timestamp) {
                        try {
                                assertEquals(expectedTimestamp, timestamp);
                                assertEquals(expectedInSequence, numInSequence);

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
index db56717..a3b231b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
@@ -73,14 +73,14 @@ public class TestProcessingTimeServiceTest {
                // register 2 tasks
                mapTask.getProcessingTimeService().registerTimer(30, new 
ProcessingTimeCallback() {
                        @Override
-                       public void trigger(long timestamp) {
+                       public void onProcessingTime(long timestamp) {
 
                        }
                });
 
                mapTask.getProcessingTimeService().registerTimer(40, new 
ProcessingTimeCallback() {
                        @Override
-                       public void trigger(long timestamp) {
+                       public void onProcessingTime(long timestamp) {
 
                        }
                });

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index dc679ab..797e18a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -52,7 +52,7 @@ public class SystemProcessingTimeServiceTest {
                        // schedule something
                        ScheduledFuture<?> future = 
timer.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() {
                                @Override
-                               public void trigger(long timestamp) {
+                               public void onProcessingTime(long timestamp) {
                                        assertTrue(Thread.holdsLock(lock));
                                }
                        });
@@ -88,7 +88,7 @@ public class SystemProcessingTimeServiceTest {
                        // the task should trigger immediately and should block 
until terminated with interruption
                        timer.registerTimer(System.currentTimeMillis(), new 
ProcessingTimeCallback() {
                                @Override
-                               public void trigger(long timestamp) throws 
Exception {
+                               public void onProcessingTime(long timestamp) 
throws Exception {
                                        latch.trigger();
                                        Thread.sleep(100000000);
                                }
@@ -106,7 +106,7 @@ public class SystemProcessingTimeServiceTest {
                        try {
                                timer.registerTimer(System.currentTimeMillis() 
+ 1000, new ProcessingTimeCallback() {
                                        @Override
-                                       public void trigger(long timestamp) {}
+                                       public void onProcessingTime(long 
timestamp) {}
                                });
 
                                fail("should result in an exception");
@@ -142,7 +142,7 @@ public class SystemProcessingTimeServiceTest {
 
                        timer.registerTimer(System.currentTimeMillis() + 20, 
new ProcessingTimeCallback() {
                                @Override
-                               public void trigger(long timestamp) throws 
Exception {
+                               public void onProcessingTime(long timestamp) 
throws Exception {
                                        scopeLock.lock();
                                        try {
                                                latch.trigger();
@@ -164,7 +164,7 @@ public class SystemProcessingTimeServiceTest {
                        // should be able to schedule more tasks (that never 
get executed)
                        ScheduledFuture<?> future = 
timer.registerTimer(System.currentTimeMillis() - 5, new 
ProcessingTimeCallback() {
                                @Override
-                               public void trigger(long timestamp) throws 
Exception {
+                               public void onProcessingTime(long timestamp) 
throws Exception {
                                        throw new Exception("test");
                                }
                        });
@@ -199,7 +199,7 @@ public class SystemProcessingTimeServiceTest {
                        // schedule something
                        ScheduledFuture<?> future = 
timer.registerTimer(System.currentTimeMillis() + 100000000, new 
ProcessingTimeCallback() {
                                @Override
-                               public void trigger(long timestamp) {}
+                               public void onProcessingTime(long timestamp) {}
                        });
                        assertEquals(1, timer.getNumTasksScheduled());
 
@@ -234,7 +234,7 @@ public class SystemProcessingTimeServiceTest {
                
                timeServiceProvider.registerTimer(System.currentTimeMillis(), 
new ProcessingTimeCallback() {
                        @Override
-                       public void trigger(long timestamp) throws Exception {
+                       public void onProcessingTime(long timestamp) throws 
Exception {
                                throw new Exception("Exception in Timer");
                        }
                });
@@ -258,7 +258,7 @@ public class SystemProcessingTimeServiceTest {
                        // to register some additional timers out of order
                        timer.registerTimer(System.currentTimeMillis(), new 
ProcessingTimeCallback() {
                                @Override
-                               public void trigger(long timestamp) throws 
Exception {
+                               public void onProcessingTime(long timestamp) 
throws Exception {
                                        sync.await();
                                }
                        });
@@ -273,7 +273,7 @@ public class SystemProcessingTimeServiceTest {
                        final ArrayBlockingQueue<Long> timestamps = new 
ArrayBlockingQueue<>(4);
                        ProcessingTimeCallback trigger = new 
ProcessingTimeCallback() {
                                @Override
-                               public void trigger(long timestamp) {
+                               public void onProcessingTime(long timestamp) {
                                        timestamps.add(timestamp);
                                }
                        };

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
index c0cd0be..48e6fae 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
@@ -201,7 +201,7 @@ public class StreamTaskTimerITCase extends 
StreamingMultipleProgramsTestBase {
                }
 
                @Override
-               public void trigger(long time) throws Exception {
+               public void onProcessingTime(long time) throws Exception {
                        if (!semaphore.tryAcquire()) {
                                Assert.fail("Concurrent invocation of operator 
functions.");
                        }
@@ -276,7 +276,7 @@ public class StreamTaskTimerITCase extends 
StreamingMultipleProgramsTestBase {
 
 
                @Override
-               public void trigger(long time) throws Exception {
+               public void onProcessingTime(long time) throws Exception {
                        if (!semaphore.tryAcquire()) {
                                Assert.fail("Concurrent invocation of operator 
functions.");
                        }

Reply via email to