[hotfix] Replace registerTimer/getTime by TimeServiceProvider in Context
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51a5048b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51a5048b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51a5048b Branch: refs/heads/master Commit: 51a5048b24ffe7655e2197c04aa844239bf1af83 Parents: ffff299 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Fri Sep 23 10:40:16 2016 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Fri Sep 23 15:01:07 2016 +0200 ---------------------------------------------------------------------- .../connectors/fs/bucketing/BucketingSink.java | 23 +++++++------ .../kafka/internals/AbstractFetcher.java | 19 ++++++----- .../kafka/testutils/MockRuntimeContext.java | 11 ++---- .../api/operators/StreamingRuntimeContext.java | 7 ++-- .../runtime/tasks/AsyncExceptionHandler.java | 8 ++--- .../tasks/DefaultTimeServiceProvider.java | 35 +++++++++----------- .../streaming/runtime/tasks/StreamTask.java | 6 ++-- .../runtime/operators/TimeProviderTest.java | 13 ++++++-- .../util/OneInputStreamOperatorTestHarness.java | 2 +- 9 files changed, 64 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index 1e05c0d..5a5cade 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -32,6 +32,7 @@ import org.apache.flink.streaming.connectors.fs.SequenceFileWriter; import org.apache.flink.streaming.connectors.fs.StringWriter; import org.apache.flink.streaming.connectors.fs.Writer; import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -284,6 +285,8 @@ public class BucketingSink<T> private transient Clock clock; + private transient TimeServiceProvider processingTimeService; + /** * Creates a new {@code BucketingSink} that writes files to the given base directory. * @@ -320,18 +323,19 @@ public class BucketingSink<T> FileSystem fs = baseDirectory.getFileSystem(hadoopConf); refTruncate = reflectTruncate(fs); - long currentProcessingTime = - ((StreamingRuntimeContext) getRuntimeContext()).getCurrentProcessingTime(); + processingTimeService = + ((StreamingRuntimeContext) getRuntimeContext()).getTimeServiceProvider(); + + long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); checkForInactiveBuckets(currentProcessingTime); - ((StreamingRuntimeContext) getRuntimeContext()).registerTimer( - currentProcessingTime + inactiveBucketCheckInterval, this); + processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this); this.clock = new Clock() { @Override public long currentTimeMillis() { - return ((StreamingRuntimeContext) getRuntimeContext()).getCurrentProcessingTime(); + return processingTimeService.getCurrentProcessingTime(); } }; @@ -376,8 +380,7 @@ public class BucketingSink<T> public void invoke(T value) throws Exception { Path bucketPath = bucketer.getBucketPath(clock, new Path(basePath), value); - long currentProcessingTime = - ((StreamingRuntimeContext) getRuntimeContext()).getCurrentProcessingTime(); + long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); if (!state.hasBucketState(bucketPath)) { state.addBucketState(bucketPath, new BucketState<T>(currentProcessingTime)); @@ -420,13 +423,11 @@ public class BucketingSink<T> @Override public void trigger(long timestamp) throws Exception { - long currentProcessingTime = - ((StreamingRuntimeContext) getRuntimeContext()).getCurrentProcessingTime(); + long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); checkForInactiveBuckets(currentProcessingTime); - ((StreamingRuntimeContext) getRuntimeContext()).registerTimer( - currentProcessingTime + inactiveBucketCheckInterval, this); + processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index 8ec26cc..9255445 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; import org.apache.flink.util.SerializedValue; import java.io.IOException; @@ -80,7 +81,8 @@ public abstract class AbstractFetcher<T, KPH> { List<KafkaTopicPartition> assignedPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, - StreamingRuntimeContext runtimeContext, boolean useMetrics) throws Exception + StreamingRuntimeContext runtimeContext, + boolean useMetrics) throws Exception { this.sourceContext = checkNotNull(sourceContext); this.checkpointLock = sourceContext.getCheckpointLock(); @@ -116,7 +118,7 @@ public abstract class AbstractFetcher<T, KPH> { (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions; PeriodicWatermarkEmitter periodicEmitter = - new PeriodicWatermarkEmitter(parts, sourceContext, runtimeContext); + new PeriodicWatermarkEmitter(parts, sourceContext, runtimeContext.getTimeServiceProvider(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval()); periodicEmitter.start(); } } @@ -458,7 +460,7 @@ public abstract class AbstractFetcher<T, KPH> { private final SourceContext<?> emitter; - private final StreamingRuntimeContext triggerContext; + private final TimeServiceProvider timerService; private final long interval; @@ -469,19 +471,20 @@ public abstract class AbstractFetcher<T, KPH> { PeriodicWatermarkEmitter( KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions, SourceContext<?> emitter, - StreamingRuntimeContext runtimeContext) + TimeServiceProvider timerService, + long autoWatermarkInterval) { this.allPartitions = checkNotNull(allPartitions); this.emitter = checkNotNull(emitter); - this.triggerContext = checkNotNull(runtimeContext); - this.interval = runtimeContext.getExecutionConfig().getAutoWatermarkInterval(); + this.timerService = checkNotNull(timerService); + this.interval = autoWatermarkInterval; this.lastWatermarkTimestamp = Long.MIN_VALUE; } //------------------------------------------------- public void start() { - triggerContext.registerTimer(triggerContext.getCurrentProcessingTime() + interval, this); + timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this); } @Override @@ -510,7 +513,7 @@ public abstract class AbstractFetcher<T, KPH> { } // schedule the next watermark - triggerContext.registerTimer(triggerContext.getCurrentProcessingTime() + interval, this); + timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java index 7a50569..da2c652 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java @@ -198,15 +198,8 @@ public class MockRuntimeContext extends StreamingRuntimeContext { } @Override - public long getCurrentProcessingTime() { - Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized."); - return timerService.getCurrentProcessingTime(); - } - - @Override - public ScheduledFuture<?> registerTimer(final long time, final Triggerable target) { - Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized."); - return timerService.registerTimer(time, target); + public TimeServiceProvider getTimeServiceProvider() { + return timerService; } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index 961bd9d..4f85e3a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -35,11 +35,10 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; import java.util.List; import java.util.Map; -import java.util.concurrent.ScheduledFuture; import static java.util.Objects.requireNonNull; @@ -83,6 +82,10 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { return taskEnvironment.getInputSplitProvider(); } + public TimeServiceProvider getTimeServiceProvider() { + return operator.getTimerService(); + } + // ------------------------------------------------------------------------ // broadcast variables // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java index 85a4115..c7ec2ed 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java @@ -18,14 +18,12 @@ package org.apache.flink.streaming.runtime.tasks; /** - * An interface marking a task as capable to register exceptions thrown by different - * threads, other than the one executing the taks itself. + * Interface for reporting exceptions that are thrown in (possibly) a different thread. */ public interface AsyncExceptionHandler { /** - * Registers to the main thread an exception that was thrown by another thread - * (e.g. a TriggerTask), other than the one executing the main task. + * Registers the given exception. */ - void registerAsyncException(String message, AsynchronousException exception); + void registerAsyncException(AsynchronousException exception); } http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java index c7339b3..ea2b07f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java @@ -39,10 +39,11 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider { /** The executor service that schedules and calls the triggers of this task*/ private final ScheduledExecutorService timerService; - public static DefaultTimeServiceProvider create(AsyncExceptionHandler task, - ScheduledExecutorService executor, - Object checkpointLock) { - return new DefaultTimeServiceProvider(task, executor, checkpointLock); + public static DefaultTimeServiceProvider create( + AsyncExceptionHandler exceptionHandler, + ScheduledExecutorService executor, + Object checkpointLock) { + return new DefaultTimeServiceProvider(exceptionHandler, executor, checkpointLock); } private DefaultTimeServiceProvider(AsyncExceptionHandler task, @@ -82,10 +83,10 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider { private final Object lock; private final Triggerable target; private final long timestamp; - private final AsyncExceptionHandler task; + private final AsyncExceptionHandler exceptionHandler; - TriggerTask(AsyncExceptionHandler task, final Object lock, Triggerable target, long timestamp) { - this.task = task; + TriggerTask(AsyncExceptionHandler exceptionHandler, final Object lock, Triggerable target, long timestamp) { + this.exceptionHandler = exceptionHandler; this.lock = lock; this.target = target; this.timestamp = timestamp; @@ -97,17 +98,8 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider { try { target.trigger(timestamp); } catch (Throwable t) { - - if (task != null) { - // registers the exception with the calling task - // so that it can be logged and (later) detected - TimerException asyncException = new TimerException(t); - task.registerAsyncException("Caught exception while processing timer.", asyncException); - } else { - // this is for when we are in testing mode and we - // want to have real processing time. - t.printStackTrace(); - } + TimerException asyncException = new TimerException(t); + exceptionHandler.registerAsyncException(asyncException); } } } @@ -115,6 +107,11 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider { @VisibleForTesting public static DefaultTimeServiceProvider createForTesting(ScheduledExecutorService executor, Object checkpointLock) { - return new DefaultTimeServiceProvider(null, executor, checkpointLock); + return new DefaultTimeServiceProvider(new AsyncExceptionHandler() { + @Override + public void registerAsyncException(AsynchronousException exception) { + exception.printStackTrace(); + } + }, executor, checkpointLock); } } http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index faa9672..ff074b7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -838,9 +838,9 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> } @Override - public void registerAsyncException(String message, AsynchronousException exception) { + public void registerAsyncException(AsynchronousException exception) { if (isRunning) { - LOG.error(message, exception); + LOG.error("Asynchronous exception registered.", exception); } if (this.asyncException == null) { @@ -940,7 +940,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> // registers the exception and tries to fail the whole task AsynchronousException asyncException = new AsynchronousException(e); - owner.registerAsyncException("Caught exception while materializing asynchronous checkpoints.", asyncException); + owner.registerAsyncException(asyncException); } finally { synchronized (cancelables) { http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java index 140e9e2..60850d8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java @@ -22,6 +22,8 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.StreamMap; +import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler; +import org.apache.flink.streaming.runtime.tasks.AsynchronousException; import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; @@ -50,8 +52,15 @@ public class TimeProviderTest { final OneShotLatch latch = new OneShotLatch(); final Object lock = new Object(); - TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider - .createForTesting(Executors.newSingleThreadScheduledExecutor(), lock); + TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider.create( + new AsyncExceptionHandler() { + @Override + public void registerAsyncException(AsynchronousException exception) { + exception.printStackTrace(); + } + }, + Executors.newSingleThreadScheduledExecutor(), + lock); final List<Long> timestamps = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 9cdc783..acf046a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -135,7 +135,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { // do nothing return null; } - }).when(mockTask).registerAsyncException(any(String.class), any(AsynchronousException.class)); + }).when(mockTask).registerAsyncException(any(AsynchronousException.class)); try { doAnswer(new Answer<CheckpointStreamFactory>() {