[FLINK-4748] [streaming api] Make timers in Ingestion Time source context 
properly cancelable.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd3416fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd3416fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd3416fd

Branch: refs/heads/master
Commit: dd3416fde7b934584d4e18bc54d79ed7858556c2
Parents: beb31fc
Author: Stephan Ewen <[email protected]>
Authored: Tue Oct 4 22:29:28 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Oct 5 19:36:14 2016 +0200

----------------------------------------------------------------------
 .../api/operators/StreamSourceContexts.java     | 31 ++++++++++++--------
 1 file changed, 19 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dd3416fd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index a290deb..d0c4e15 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -115,9 +116,9 @@ public class StreamSourceContexts {
                private final Output<StreamRecord<T>> output;
                private final StreamRecord<T> reuse;
 
-               private final ScheduledFuture<?> watermarkTimer;
                private final long watermarkInterval;
 
+               private volatile ScheduledFuture<?> nextWatermarkTimer;
                private volatile long nextWatermarkTime;
 
                private AutomaticWatermarkContext(
@@ -130,13 +131,13 @@ public class StreamSourceContexts {
                        this.lock = Preconditions.checkNotNull(checkpointLock, 
"The checkpoint lock cannot be null.");
                        this.output = Preconditions.checkNotNull(output, "The 
output cannot be null.");
 
-                       Preconditions.checkArgument(watermarkInterval > 1L, 
"The watermark interval cannot be smaller than 1 ms.");
+                       Preconditions.checkArgument(watermarkInterval >= 1L, 
"The watermark interval cannot be smaller than 1 ms.");
                        this.watermarkInterval = watermarkInterval;
 
                        this.reuse = new StreamRecord<>(null);
 
                        long now = this.timeService.getCurrentProcessingTime();
-                       this.watermarkTimer = 
this.timeService.registerTimer(now + watermarkInterval,
+                       this.nextWatermarkTimer = 
this.timeService.registerTimer(now + watermarkInterval,
                                new WatermarkEmittingTask(this.timeService, 
lock, output));
                }
 
@@ -178,8 +179,9 @@ public class StreamSourceContexts {
                                }
 
                                // we can shutdown the timer now, no watermarks 
will be needed any more
-                               if (watermarkTimer != null) {
-                                       watermarkTimer.cancel(true);
+                               final ScheduledFuture<?> nextWatermarkTimer = 
this.nextWatermarkTimer;
+                               if (nextWatermarkTimer != null) {
+                                       nextWatermarkTimer.cancel(true);
                                }
                        }
                }
@@ -191,8 +193,9 @@ public class StreamSourceContexts {
 
                @Override
                public void close() {
-                       if (watermarkTimer != null) {
-                               watermarkTimer.cancel(true);
+                       final ScheduledFuture<?> nextWatermarkTimer = 
this.nextWatermarkTimer;
+                       if (nextWatermarkTimer != null) {
+                               nextWatermarkTimer.cancel(true);
                        }
                }
 
@@ -202,10 +205,13 @@ public class StreamSourceContexts {
                        private final Object lock;
                        private final Output<StreamRecord<T>> output;
 
-                       private WatermarkEmittingTask(TimeServiceProvider 
timeService, Object checkpointLock, Output<StreamRecord<T>> output) {
-                               this.timeService = 
Preconditions.checkNotNull(timeService, "Time Service cannot be null.");
-                               this.lock = 
Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be 
null.");
-                               this.output = 
Preconditions.checkNotNull(output, "The output cannot be null.");
+                       private WatermarkEmittingTask(
+                                       TimeServiceProvider timeService,
+                                       Object checkpointLock,
+                                       Output<StreamRecord<T>> output) {
+                               this.timeService = timeService;
+                               this.lock = checkpointLock;
+                               this.output = output;
                        }
 
                        @Override
@@ -227,7 +233,8 @@ public class StreamSourceContexts {
                                }
 
                                long nextWatermark = currentTime + 
watermarkInterval;
-                               this.timeService.registerTimer(nextWatermark, 
new WatermarkEmittingTask(this.timeService, lock, output));
+                               nextWatermarkTimer = 
this.timeService.registerTimer(
+                                               nextWatermark, new 
WatermarkEmittingTask(this.timeService, lock, output));
                        }
                }
        }

Reply via email to