[FLINK-4877] Rename Triggerable to ProcessingTimeCallback

This more accurately describes what the interface is for.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94a3f251
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94a3f251
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94a3f251

Branch: refs/heads/master
Commit: 94a3f251cd3eed54c7d8220db119eecbfb11c3b9
Parents: 81b19e5
Author: Aljoscha Krettek <[email protected]>
Authored: Tue Oct 18 11:08:58 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Oct 21 19:03:05 2016 +0200

----------------------------------------------------------------------
 .../connectors/fs/bucketing/BucketingSink.java  |  4 +-
 .../kafka/internals/AbstractFetcher.java        |  4 +-
 .../api/operators/HeapInternalTimerService.java |  4 +-
 .../api/operators/StreamSourceContexts.java     |  4 +-
 .../operators/ExtractTimestampsOperator.java    |  3 +-
 ...TimestampsAndPeriodicWatermarksOperator.java |  3 +-
 .../runtime/operators/Triggerable.java          | 40 --------------------
 ...ractAlignedProcessingTimeWindowOperator.java |  4 +-
 .../runtime/tasks/ProcessingTimeCallback.java   | 40 ++++++++++++++++++++
 .../runtime/tasks/ProcessingTimeService.java    | 12 +++---
 .../tasks/SystemProcessingTimeService.java      |  7 ++--
 .../tasks/TestProcessingTimeService.java        | 16 ++++----
 .../runtime/operators/StreamTaskTimerTest.java  | 19 +++++-----
 .../TestProcessingTimeServiceTest.java          |  5 ++-
 .../tasks/SystemProcessingTimeServiceTest.java  | 19 +++++-----
 .../runtime/StreamTaskTimerITCase.java          |  6 +--
 16 files changed, 94 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 6f8a739..66e704c 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -31,7 +31,7 @@ import org.apache.flink.streaming.connectors.fs.Clock;
 import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
 import org.apache.flink.streaming.connectors.fs.Writer;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -138,7 +138,7 @@ import java.util.Iterator;
  */
 public class BucketingSink<T>
                extends RichSinkFunction<T>
-               implements InputTypeConfigurable, 
Checkpointed<BucketingSink.State<T>>, CheckpointListener, Triggerable {
+               implements InputTypeConfigurable, 
Checkpointed<BucketingSink.State<T>>, CheckpointListener, 
ProcessingTimeCallback {
        private static final long serialVersionUID = 1L;
 
        private static Logger LOG = 
LoggerFactory.getLogger(BucketingSink.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 321991a..58bca52 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.SerializedValue;
 
@@ -461,7 +461,7 @@ public abstract class AbstractFetcher<T, KPH> {
         * The periodic watermark emitter. In its given interval, it checks all 
partitions for
         * the current event time watermark, and possibly emits the next 
watermark.
         */
-       private static class PeriodicWatermarkEmitter implements Triggerable {
+       private static class PeriodicWatermarkEmitter implements 
ProcessingTimeCallback {
 
                private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, 
?>[] allPartitions;
                

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
index c77b634..15258cf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.InstantiationUtil;
 
@@ -37,7 +37,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * {@link InternalTimerService} that stores timers on the Java heap.
  */
-public class HeapInternalTimerService<K, N> implements 
InternalTimerService<N>, Triggerable {
+public class HeapInternalTimerService<K, N> implements 
InternalTimerService<N>, ProcessingTimeCallback {
 
        private final TypeSerializer<K> keySerializer;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index 01ae55c..66d2ac2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.Preconditions;
@@ -199,7 +199,7 @@ public class StreamSourceContexts {
                        }
                }
 
-               private class WatermarkEmittingTask implements Triggerable {
+               private class WatermarkEmittingTask implements 
ProcessingTimeCallback {
 
                        private final ProcessingTimeService timeService;
                        private final Object lock;

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
index 0798ed4..5f5028a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 
 /**
  * A {@link org.apache.flink.streaming.api.operators.StreamOperator} for 
extracting timestamps
@@ -36,7 +37,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 @Deprecated
 public class ExtractTimestampsOperator<T>
                extends AbstractUdfStreamOperator<T, TimestampExtractor<T>>
-               implements OneInputStreamOperator<T, T>, Triggerable {
+               implements OneInputStreamOperator<T, T>, ProcessingTimeCallback 
{
 
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
index b1402ed..ba72659 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 
 /**
  * A stream operator that extracts timestamps from stream elements and
@@ -32,7 +33,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
  */
 public class TimestampsAndPeriodicWatermarksOperator<T>
                extends AbstractUdfStreamOperator<T, 
AssignerWithPeriodicWatermarks<T>>
-               implements OneInputStreamOperator<T, T>, Triggerable {
+               implements OneInputStreamOperator<T, T>, ProcessingTimeCallback 
{
 
        private static final long serialVersionUID = 1L;
        

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
deleted file mode 100644
index 9ca3f33..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * This interface must be implemented by objects that are triggered by the 
timer service available
- * to stream operators in {@link 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment}.
- */
-@Internal
-public interface Triggerable {
-
-       /**
-        * This method is invoked with the timestamp for which the trigger was 
scheduled.
-        * <p>
-        * If the triggering is delayed for whatever reason (trigger timer was 
blocked, JVM stalled due
-        * to a garbage collection), the timestamp supplied to this function 
will still be the original
-        * timestamp for which the trigger was scheduled.
-        * 
-        * @param timestamp The timestamp for which the trigger event was 
scheduled.
-        */
-       void trigger(long timestamp) throws Exception ;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 2a77c0a..80a317e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -33,7 +33,7 @@ import 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import static java.util.Objects.requireNonNull;
@@ -41,7 +41,7 @@ import static java.util.Objects.requireNonNull;
 @Internal
 public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, 
OUT, STATE, F extends Function> 
                extends AbstractUdfStreamOperator<OUT, F> 
-               implements OneInputStreamOperator<IN, OUT>, Triggerable {
+               implements OneInputStreamOperator<IN, OUT>, 
ProcessingTimeCallback {
        
        private static final long serialVersionUID = 3245500864882459867L;
        

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
new file mode 100644
index 0000000..aca1718
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for processing-time callbacks that can be registered at a
+ * {@link ProcessingTimeService}.
+ */
+@Internal
+public interface ProcessingTimeCallback {
+
+       /**
+        * This method is invoked with the timestamp for which the trigger was 
scheduled.
+        * <p>
+        * If the triggering is delayed for whatever reason (trigger timer was 
blocked, JVM stalled due
+        * to a garbage collection), the timestamp supplied to this function 
will still be the original
+        * timestamp for which the trigger was scheduled.
+        * 
+        * @param timestamp The timestamp for which the trigger event was 
scheduled.
+        */
+       void trigger(long timestamp) throws Exception ;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
index 15c3ebb..f64bead 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-
 import java.util.concurrent.ScheduledFuture;
 
 /**
@@ -32,10 +30,10 @@ import java.util.concurrent.ScheduledFuture;
  * <ol>
  *     <li>In the initial state, it accepts timer registrations and triggers 
when the time is reached.</li>
  *     <li>After calling {@link #quiesceAndAwaitPending()}, further calls to
- *         {@link #registerTimer(long, Triggerable)} will not register any 
further timers, and will
+ *         {@link #registerTimer(long, ProcessingTimeCallback)} will not 
register any further timers, and will
  *         return a "dummy" future as a result. This is used for clean 
shutdown, where currently firing
  *         timers are waited for and no future timers can be scheduled, 
without causing hard exceptions.</li>
- *     <li>After a call to {@link #shutdownService()}, all calls to {@link 
#registerTimer(long, Triggerable)}
+ *     <li>After a call to {@link #shutdownService()}, all calls to {@link 
#registerTimer(long, ProcessingTimeCallback)}
  *         will result in a hard exception.</li>
  * </ol>
  */
@@ -55,7 +53,7 @@ public abstract class ProcessingTimeService {
         * @return The future that represents the scheduled task. This always 
returns some future,
         *         even if the timer was shut down
         */
-       public abstract ScheduledFuture<?> registerTimer(long timestamp, 
Triggerable target);
+       public abstract ScheduledFuture<?> registerTimer(long timestamp, 
ProcessingTimeCallback target);
 
        /**
         * Returns <tt>true</tt> if the service has been shut down, 
<tt>false</tt> otherwise.
@@ -64,7 +62,7 @@ public abstract class ProcessingTimeService {
 
        /**
         * This method puts the service into a state where it does not register 
new timers, but
-        * returns for each call to {@link #registerTimer(long, Triggerable)} 
only a "mock" future.
+        * returns for each call to {@link #registerTimer(long, 
ProcessingTimeCallback)} only a "mock" future.
         * Furthermore, the method clears all not yet started timers, and 
awaits the completion
         * of currently executing timers.
         * 
@@ -76,7 +74,7 @@ public abstract class ProcessingTimeService {
 
        /**
         * Shuts down and clean up the timer service provider hard and 
immediately. This does not wait
-        * for any timer to complete. Any further call to {@link 
#registerTimer(long, Triggerable)}
+        * for any timer to complete. Any further call to {@link 
#registerTimer(long, ProcessingTimeCallback)}
         * will result in a hard exception.
         */
        public abstract void shutdownService();

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index 3fd4202..b433f8d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
 
 import javax.annotation.Nonnull;
 import java.util.concurrent.BlockingQueue;
@@ -92,7 +91,7 @@ public class SystemProcessingTimeService extends 
ProcessingTimeService {
        }
 
        @Override
-       public ScheduledFuture<?> registerTimer(long timestamp, Triggerable 
target) {
+       public ScheduledFuture<?> registerTimer(long timestamp, 
ProcessingTimeCallback target) {
                long delay = Math.max(timestamp - getCurrentProcessingTime(), 
0);
 
                // we directly try to register the timer and only react to the 
status on exception
@@ -165,11 +164,11 @@ public class SystemProcessingTimeService extends 
ProcessingTimeService {
        private static final class TriggerTask implements Runnable {
 
                private final Object lock;
-               private final Triggerable target;
+               private final ProcessingTimeCallback target;
                private final long timestamp;
                private final AsyncExceptionHandler exceptionHandler;
 
-               TriggerTask(AsyncExceptionHandler exceptionHandler, final 
Object lock, Triggerable target, long timestamp) {
+               TriggerTask(AsyncExceptionHandler exceptionHandler, final 
Object lock, ProcessingTimeCallback target, long timestamp) {
                        this.exceptionHandler = exceptionHandler;
                        this.lock = lock;
                        this.target = target;

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index d0a2ea9..3e6c273 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -69,7 +67,7 @@ public class TestProcessingTimeService extends 
ProcessingTimeService {
                        for (Map.Entry<Long, List<ScheduledTimerFuture>> tasks: 
toRun) {
                                long now = tasks.getKey();
                                for (ScheduledTimerFuture task: 
tasks.getValue()) {
-                                       task.getTriggerable().trigger(now);
+                                       
task.getProcessingTimeCallback().trigger(now);
                                }
                        }
                }
@@ -81,7 +79,7 @@ public class TestProcessingTimeService extends 
ProcessingTimeService {
        }
 
        @Override
-       public ScheduledFuture<?> registerTimer(long timestamp, Triggerable 
target) {
+       public ScheduledFuture<?> registerTimer(long timestamp, 
ProcessingTimeCallback target) {
                if (isTerminated) {
                        throw new IllegalStateException("terminated");
                }
@@ -149,12 +147,12 @@ public class TestProcessingTimeService extends 
ProcessingTimeService {
 
        private class ScheduledTimerFuture implements ScheduledFuture<Object> {
 
-               private final Triggerable triggerable;
+               private final ProcessingTimeCallback processingTimeCallback;
 
                private final long timestamp;
 
-               public ScheduledTimerFuture(Triggerable triggerable, long 
timestamp) {
-                       this.triggerable = triggerable;
+               public ScheduledTimerFuture(ProcessingTimeCallback 
processingTimeCallback, long timestamp) {
+                       this.processingTimeCallback = processingTimeCallback;
                        this.timestamp = timestamp;
                }
 
@@ -197,8 +195,8 @@ public class TestProcessingTimeService extends 
ProcessingTimeService {
                        throw new UnsupportedOperationException();
                }
 
-               public Triggerable getTriggerable() {
-                       return triggerable;
+               public ProcessingTimeCallback getProcessingTimeCallback() {
+                       return processingTimeCallback;
                }
 
                public long getTimestamp() {

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index fb1fab5..87241dd 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
@@ -64,7 +65,7 @@ public class StreamTaskTimerTest {
                testHarness.waitForTaskRunning();
 
                // first one spawns thread
-               
mapTask.getProcessingTimeService().registerTimer(System.currentTimeMillis(), 
new Triggerable() {
+               
mapTask.getProcessingTimeService().registerTimer(System.currentTimeMillis(), 
new ProcessingTimeCallback() {
                        @Override
                        public void trigger(long timestamp) {
                        }
@@ -107,14 +108,14 @@ public class StreamTaskTimerTest {
                        final long t4 = System.currentTimeMillis() + 200;
 
                        ProcessingTimeService timeService = 
mapTask.getProcessingTimeService();
-                       timeService.registerTimer(t1, new 
ValidatingTriggerable(errorRef, t1, 0));
-                       timeService.registerTimer(t2, new 
ValidatingTriggerable(errorRef, t2, 1));
-                       timeService.registerTimer(t3, new 
ValidatingTriggerable(errorRef, t3, 2));
-                       timeService.registerTimer(t4, new 
ValidatingTriggerable(errorRef, t4, 3));
+                       timeService.registerTimer(t1, new 
ValidatingProcessingTimeCallback(errorRef, t1, 0));
+                       timeService.registerTimer(t2, new 
ValidatingProcessingTimeCallback(errorRef, t2, 1));
+                       timeService.registerTimer(t3, new 
ValidatingProcessingTimeCallback(errorRef, t3, 2));
+                       timeService.registerTimer(t4, new 
ValidatingProcessingTimeCallback(errorRef, t4, 3));
 
                        long deadline = System.currentTimeMillis() + 20000;
                        while (errorRef.get() == null &&
-                                       ValidatingTriggerable.numInSequence < 4 
&&
+                                       
ValidatingProcessingTimeCallback.numInSequence < 4 &&
                                        System.currentTimeMillis() < deadline)
                        {
                                Thread.sleep(100);
@@ -126,7 +127,7 @@ public class StreamTaskTimerTest {
                                fail(errorRef.get().getMessage());
                        }
 
-                       assertEquals(4, ValidatingTriggerable.numInSequence);
+                       assertEquals(4, 
ValidatingProcessingTimeCallback.numInSequence);
 
                        testHarness.endInput();
                        testHarness.waitForTaskCompletion();
@@ -146,7 +147,7 @@ public class StreamTaskTimerTest {
                }
        }
 
-       private static class ValidatingTriggerable implements Triggerable {
+       private static class ValidatingProcessingTimeCallback implements 
ProcessingTimeCallback {
                
                static int numInSequence;
                
@@ -155,7 +156,7 @@ public class StreamTaskTimerTest {
                private final long expectedTimestamp;
                private final int expectedInSequence;
 
-               private ValidatingTriggerable(AtomicReference<Throwable> 
errorRef, long expectedTimestamp, int expectedInSequence) {
+               private 
ValidatingProcessingTimeCallback(AtomicReference<Throwable> errorRef, long 
expectedTimestamp, int expectedInSequence) {
                        this.errorRef = errorRef;
                        this.expectedTimestamp = expectedTimestamp;
                        this.expectedInSequence = expectedInSequence;

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
index 9c2cee3..db56717 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
@@ -70,14 +71,14 @@ public class TestProcessingTimeServiceTest {
                
assertEquals(testHarness.getProcessingTimeService().getCurrentProcessingTime(), 
16);
 
                // register 2 tasks
-               mapTask.getProcessingTimeService().registerTimer(30, new 
Triggerable() {
+               mapTask.getProcessingTimeService().registerTimer(30, new 
ProcessingTimeCallback() {
                        @Override
                        public void trigger(long timestamp) {
 
                        }
                });
 
-               mapTask.getProcessingTimeService().registerTimer(40, new 
Triggerable() {
+               mapTask.getProcessingTimeService().registerTimer(40, new 
ProcessingTimeCallback() {
                        @Override
                        public void trigger(long timestamp) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index e7944df..dc679ab 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.core.testutils.OneShotLatch;
 import 
org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
 
 import org.junit.Test;
 
@@ -51,7 +50,7 @@ public class SystemProcessingTimeServiceTest {
                        assertEquals(0, timer.getNumTasksScheduled());
 
                        // schedule something
-                       ScheduledFuture<?> future = 
timer.registerTimer(System.currentTimeMillis(), new Triggerable() {
+                       ScheduledFuture<?> future = 
timer.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() {
                                @Override
                                public void trigger(long timestamp) {
                                        assertTrue(Thread.holdsLock(lock));
@@ -87,7 +86,7 @@ public class SystemProcessingTimeServiceTest {
                        final OneShotLatch latch = new OneShotLatch();
 
                        // the task should trigger immediately and should block 
until terminated with interruption
-                       timer.registerTimer(System.currentTimeMillis(), new 
Triggerable() {
+                       timer.registerTimer(System.currentTimeMillis(), new 
ProcessingTimeCallback() {
                                @Override
                                public void trigger(long timestamp) throws 
Exception {
                                        latch.trigger();
@@ -105,7 +104,7 @@ public class SystemProcessingTimeServiceTest {
                        }
 
                        try {
-                               timer.registerTimer(System.currentTimeMillis() 
+ 1000, new Triggerable() {
+                               timer.registerTimer(System.currentTimeMillis() 
+ 1000, new ProcessingTimeCallback() {
                                        @Override
                                        public void trigger(long timestamp) {}
                                });
@@ -141,7 +140,7 @@ public class SystemProcessingTimeServiceTest {
 
                        final ReentrantLock scopeLock = new ReentrantLock();
 
-                       timer.registerTimer(System.currentTimeMillis() + 20, 
new Triggerable() {
+                       timer.registerTimer(System.currentTimeMillis() + 20, 
new ProcessingTimeCallback() {
                                @Override
                                public void trigger(long timestamp) throws 
Exception {
                                        scopeLock.lock();
@@ -163,7 +162,7 @@ public class SystemProcessingTimeServiceTest {
                        assertTrue(scopeLock.tryLock());
 
                        // should be able to schedule more tasks (that never 
get executed)
-                       ScheduledFuture<?> future = 
timer.registerTimer(System.currentTimeMillis() - 5, new Triggerable() {
+                       ScheduledFuture<?> future = 
timer.registerTimer(System.currentTimeMillis() - 5, new 
ProcessingTimeCallback() {
                                @Override
                                public void trigger(long timestamp) throws 
Exception {
                                        throw new Exception("test");
@@ -198,7 +197,7 @@ public class SystemProcessingTimeServiceTest {
                        assertEquals(0, timer.getNumTasksScheduled());
 
                        // schedule something
-                       ScheduledFuture<?> future = 
timer.registerTimer(System.currentTimeMillis() + 100000000, new Triggerable() {
+                       ScheduledFuture<?> future = 
timer.registerTimer(System.currentTimeMillis() + 100000000, new 
ProcessingTimeCallback() {
                                @Override
                                public void trigger(long timestamp) {}
                        });
@@ -233,7 +232,7 @@ public class SystemProcessingTimeServiceTest {
                                        }
                                }, lock);
                
-               timeServiceProvider.registerTimer(System.currentTimeMillis(), 
new Triggerable() {
+               timeServiceProvider.registerTimer(System.currentTimeMillis(), 
new ProcessingTimeCallback() {
                        @Override
                        public void trigger(long timestamp) throws Exception {
                                throw new Exception("Exception in Timer");
@@ -257,7 +256,7 @@ public class SystemProcessingTimeServiceTest {
 
                        // we block the timer execution to make sure we have 
all the time
                        // to register some additional timers out of order
-                       timer.registerTimer(System.currentTimeMillis(), new 
Triggerable() {
+                       timer.registerTimer(System.currentTimeMillis(), new 
ProcessingTimeCallback() {
                                @Override
                                public void trigger(long timestamp) throws 
Exception {
                                        sync.await();
@@ -272,7 +271,7 @@ public class SystemProcessingTimeServiceTest {
                        final long time4 = now - 2;
 
                        final ArrayBlockingQueue<Long> timestamps = new 
ArrayBlockingQueue<>(4);
-                       Triggerable trigger = new Triggerable() {
+                       ProcessingTimeCallback trigger = new 
ProcessingTimeCallback() {
                                @Override
                                public void trigger(long timestamp) {
                                        timestamps.add(timestamp);

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
index e7f62fd..c0cd0be 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.TimerException;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
@@ -171,7 +171,7 @@ public class StreamTaskTimerITCase extends 
StreamingMultipleProgramsTestBase {
                Assert.assertTrue(testSuccess);
        }
 
-       public static class TimerOperator extends 
AbstractStreamOperator<String> implements OneInputStreamOperator<String, 
String>, Triggerable {
+       public static class TimerOperator extends 
AbstractStreamOperator<String> implements OneInputStreamOperator<String, 
String>, ProcessingTimeCallback {
                private static final long serialVersionUID = 1L;
 
                int numTimers = 0;
@@ -230,7 +230,7 @@ public class StreamTaskTimerITCase extends 
StreamingMultipleProgramsTestBase {
                }
        }
 
-       public static class TwoInputTimerOperator extends 
AbstractStreamOperator<String> implements TwoInputStreamOperator<String, 
String, String>, Triggerable {
+       public static class TwoInputTimerOperator extends 
AbstractStreamOperator<String> implements TwoInputStreamOperator<String, 
String, String>, ProcessingTimeCallback {
                private static final long serialVersionUID = 1L;
 
                int numTimers = 0;

Reply via email to