[FLINK-4748] [streaming api] Make timers in Ingestion Time source context properly cancelable.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd3416fd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd3416fd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd3416fd Branch: refs/heads/master Commit: dd3416fde7b934584d4e18bc54d79ed7858556c2 Parents: beb31fc Author: Stephan Ewen <[email protected]> Authored: Tue Oct 4 22:29:28 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Oct 5 19:36:14 2016 +0200 ---------------------------------------------------------------------- .../api/operators/StreamSourceContexts.java | 31 ++++++++++++-------- 1 file changed, 19 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/dd3416fd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java index a290deb..d0c4e15 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.operators; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -115,9 +116,9 @@ public class StreamSourceContexts { private final Output<StreamRecord<T>> output; private final StreamRecord<T> reuse; - private final ScheduledFuture<?> watermarkTimer; private final long watermarkInterval; + private volatile ScheduledFuture<?> nextWatermarkTimer; private volatile long nextWatermarkTime; private AutomaticWatermarkContext( @@ -130,13 +131,13 @@ public class StreamSourceContexts { this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null."); this.output = Preconditions.checkNotNull(output, "The output cannot be null."); - Preconditions.checkArgument(watermarkInterval > 1L, "The watermark interval cannot be smaller than 1 ms."); + Preconditions.checkArgument(watermarkInterval >= 1L, "The watermark interval cannot be smaller than 1 ms."); this.watermarkInterval = watermarkInterval; this.reuse = new StreamRecord<>(null); long now = this.timeService.getCurrentProcessingTime(); - this.watermarkTimer = this.timeService.registerTimer(now + watermarkInterval, + this.nextWatermarkTimer = this.timeService.registerTimer(now + watermarkInterval, new WatermarkEmittingTask(this.timeService, lock, output)); } @@ -178,8 +179,9 @@ public class StreamSourceContexts { } // we can shutdown the timer now, no watermarks will be needed any more - if (watermarkTimer != null) { - watermarkTimer.cancel(true); + final ScheduledFuture<?> nextWatermarkTimer = this.nextWatermarkTimer; + if (nextWatermarkTimer != null) { + nextWatermarkTimer.cancel(true); } } } @@ -191,8 +193,9 @@ public class StreamSourceContexts { @Override public void close() { - if (watermarkTimer != null) { - watermarkTimer.cancel(true); + final ScheduledFuture<?> nextWatermarkTimer = this.nextWatermarkTimer; + if (nextWatermarkTimer != null) { + nextWatermarkTimer.cancel(true); } } @@ -202,10 +205,13 @@ public class StreamSourceContexts { private final Object lock; private final Output<StreamRecord<T>> output; - private WatermarkEmittingTask(TimeServiceProvider timeService, Object checkpointLock, Output<StreamRecord<T>> output) { - this.timeService = Preconditions.checkNotNull(timeService, "Time Service cannot be null."); - this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null."); - this.output = Preconditions.checkNotNull(output, "The output cannot be null."); + private WatermarkEmittingTask( + TimeServiceProvider timeService, + Object checkpointLock, + Output<StreamRecord<T>> output) { + this.timeService = timeService; + this.lock = checkpointLock; + this.output = output; } @Override @@ -227,7 +233,8 @@ public class StreamSourceContexts { } long nextWatermark = currentTime + watermarkInterval; - this.timeService.registerTimer(nextWatermark, new WatermarkEmittingTask(this.timeService, lock, output)); + nextWatermarkTimer = this.timeService.registerTimer( + nextWatermark, new WatermarkEmittingTask(this.timeService, lock, output)); } } }
