[FLINK-4877] Rename TimeServiceProvider to ProcessingTimeService The name is clashing with the soon-to-be-added TimerService/InternalTimerService which is meant as an interface for dealing with both processing time and event time.
TimeServiceProvider is renamed to ProcessingTimeService to reflect the fact that it is a low-level utility that only deals with "physical" processing-time trigger tasks. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e112a632 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e112a632 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e112a632 Branch: refs/heads/master Commit: e112a63208006b4e348d75f3df84d2fd4b091797 Parents: 71d2e3e Author: Aljoscha Krettek <[email protected]> Authored: Sun Sep 25 20:58:16 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Oct 21 19:03:04 2016 +0200 ---------------------------------------------------------------------- .../hdfstests/ContinuousFileMonitoringTest.java | 4 +- .../connectors/fs/bucketing/BucketingSink.java | 6 +- .../kafka/internals/AbstractFetcher.java | 8 +- .../AbstractFetcherTimestampsTest.java | 8 +- .../kafka/testutils/MockRuntimeContext.java | 8 +- .../source/ContinuousFileReaderOperator.java | 2 +- .../api/operators/AbstractStreamOperator.java | 17 +- .../streaming/api/operators/StreamSource.java | 2 +- .../api/operators/StreamSourceContexts.java | 14 +- .../api/operators/StreamingRuntimeContext.java | 6 +- .../operators/ExtractTimestampsOperator.java | 8 +- ...TimestampsAndPeriodicWatermarksOperator.java | 8 +- ...ractAlignedProcessingTimeWindowOperator.java | 6 +- .../windowing/EvictingWindowOperator.java | 2 +- .../operators/windowing/WindowOperator.java | 10 +- .../tasks/DefaultTimeServiceProvider.java | 262 ---------------- .../runtime/tasks/ProcessingTimeService.java | 83 +++++ .../streaming/runtime/tasks/StreamTask.java | 16 +- .../tasks/SystemProcessingTimeService.java | 262 ++++++++++++++++ .../tasks/TestProcessingTimeService.java | 172 ++++++++++ .../runtime/tasks/TestTimeServiceProvider.java | 172 ---------- .../runtime/tasks/TimeServiceProvider.java | 83 ----- .../operators/StreamSourceOperatorTest.java | 47 +-- .../runtime/operators/StreamTaskTimerTest.java | 6 +- .../TestProcessingTimeServiceTest.java | 113 +++++++ .../runtime/operators/TestTimeProviderTest.java | 113 ------- ...stampsAndPeriodicWatermarksOperatorTest.java | 13 +- ...AlignedProcessingTimeWindowOperatorTest.java | 49 ++- ...AlignedProcessingTimeWindowOperatorTest.java | 56 ++-- .../operators/windowing/NoOpTimerService.java | 4 +- .../operators/windowing/WindowOperatorTest.java | 14 +- .../tasks/DefaultTimeServiceProviderTest.java | 313 ------------------- .../runtime/tasks/StreamTaskTestHarness.java | 6 +- .../tasks/SystemProcessingTimeServiceTest.java | 313 +++++++++++++++++++ .../KeyedOneInputStreamOperatorTestHarness.java | 8 +- .../flink/streaming/util/MockContext.java | 23 -- .../util/OneInputStreamOperatorTestHarness.java | 43 ++- .../streaming/util/WindowingTestHarness.java | 6 +- .../runtime/StreamTaskTimerITCase.java | 10 +- 39 files changed, 1141 insertions(+), 1155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java index 36b5c5e..971d5f8 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java @@ -34,7 +34,7 @@ import org.apache.flink.streaming.api.functions.source.FileProcessingMode; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileUtil; @@ -127,7 +127,7 @@ public class ContinuousFileMonitoringTest { ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); reader.setOutputType(typeInfo, executionConfig); - final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final TestProcessingTimeService timeServiceProvider = new TestProcessingTimeService(); final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index 5a5cade..6f8a739 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -32,7 +32,7 @@ import org.apache.flink.streaming.connectors.fs.SequenceFileWriter; import org.apache.flink.streaming.connectors.fs.StringWriter; import org.apache.flink.streaming.connectors.fs.Writer; import org.apache.flink.streaming.runtime.operators.Triggerable; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -285,7 +285,7 @@ public class BucketingSink<T> private transient Clock clock; - private transient TimeServiceProvider processingTimeService; + private transient ProcessingTimeService processingTimeService; /** * Creates a new {@code BucketingSink} that writes files to the given base directory. @@ -324,7 +324,7 @@ public class BucketingSink<T> refTruncate = reflectTruncate(fs); processingTimeService = - ((StreamingRuntimeContext) getRuntimeContext()).getTimeServiceProvider(); + ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService(); long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index eb01b78..065b54f 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.operators.Triggerable; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.SerializedValue; import java.io.IOException; @@ -118,7 +118,7 @@ public abstract class AbstractFetcher<T, KPH> { (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions; PeriodicWatermarkEmitter periodicEmitter = - new PeriodicWatermarkEmitter(parts, sourceContext, runtimeContext.getTimeServiceProvider(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval()); + new PeriodicWatermarkEmitter(parts, sourceContext, runtimeContext.getProcessingTimeService(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval()); periodicEmitter.start(); } } @@ -466,7 +466,7 @@ public abstract class AbstractFetcher<T, KPH> { private final SourceContext<?> emitter; - private final TimeServiceProvider timerService; + private final ProcessingTimeService timerService; private final long interval; @@ -477,7 +477,7 @@ public abstract class AbstractFetcher<T, KPH> { PeriodicWatermarkEmitter( KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions, SourceContext<?> emitter, - TimeServiceProvider timerService, + ProcessingTimeService timerService, long autoWatermarkInterval) { this.allPartitions = checkNotNull(allPartitions); http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java index 7db6ba4..0782cb9 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java @@ -25,10 +25,10 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext; -import org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler; +import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.SerializedValue; import org.junit.Test; @@ -128,7 +128,7 @@ public class AbstractFetcherTimestampsTest { TestSourceContext<Long> sourceContext = new TestSourceContext<>(); final AtomicReference<Throwable> errorRef = new AtomicReference<>(); - final TimeServiceProvider timerService = new DefaultTimeServiceProvider( + final ProcessingTimeService timerService = new SystemProcessingTimeService( new ReferenceSettingExceptionHandler(errorRef), sourceContext.getCheckpointLock()); try { http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java index e1ec4cb..f16eacd 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java @@ -38,7 +38,7 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import java.io.Serializable; import java.util.Collections; @@ -53,7 +53,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext { private final ExecutionConfig execConfig; - private final TimeServiceProvider timeServiceProvider; + private final ProcessingTimeService timeServiceProvider; public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) { this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig()); @@ -70,7 +70,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext { int numberOfParallelSubtasks, int indexOfThisSubtask, ExecutionConfig execConfig, - TimeServiceProvider timeServiceProvider) { + ProcessingTimeService timeServiceProvider) { super(new MockStreamOperator(), new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16), @@ -188,7 +188,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext { } @Override - public TimeServiceProvider getTimeServiceProvider() { + public ProcessingTimeService getProcessingTimeService() { if (timeServiceProvider == null) { throw new UnsupportedOperationException(); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index 769cb6f..be22677 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -107,7 +107,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic(); final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(); this.readerContext = StreamSourceContexts.getSourceContext( - timeCharacteristic, getTimerService(), checkpointLock, output, watermarkInterval); + timeCharacteristic, getProcessingTimeService(), checkpointLock, output, watermarkInterval); // and initialize the split reading thread this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, readerState); http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 9184e93..b789c95 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -51,11 +51,12 @@ import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,7 +119,7 @@ public abstract class AbstractStreamOperator<OUT> /** Keyed state store view on the keyed backend */ private transient DefaultKeyedStateStore keyedStateStore; - + /** Operator state backend / store */ private transient OperatorStateBackend operatorStateBackend; @@ -246,7 +247,7 @@ public abstract class AbstractStreamOperator<OUT> keySerializer, container.getConfiguration().getNumberOfKeyGroups(getUserCodeClassloader()), subTaskKeyGroupRange); - + this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig()); } @@ -396,11 +397,11 @@ public abstract class AbstractStreamOperator<OUT> } /** - * Returns the {@link TimeServiceProvider} responsible for getting the current + * Returns the {@link ProcessingTimeService} responsible for getting the current * processing time and registering timers. */ - protected TimeServiceProvider getTimerService() { - return container.getTimerService(); + protected ProcessingTimeService getProcessingTimeService() { + return container.getProcessingTimeService(); } /** @@ -421,9 +422,9 @@ public abstract class AbstractStreamOperator<OUT> */ @SuppressWarnings("unchecked") protected <S extends State, N> S getPartitionedState( - N namespace, TypeSerializer<N> namespaceSerializer, + N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception { - + if (keyedStateStore != null) { return keyedStateBackend.getPartitionedState(namespace, namespaceSerializer, stateDescriptor); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index a07e6b7..5a16db0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -69,7 +69,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(); this.ctx = StreamSourceContexts.getSourceContext( - timeCharacteristic, getTimerService(), lockingObject, collector, watermarkInterval); + timeCharacteristic, getProcessingTimeService(), lockingObject, collector, watermarkInterval); try { userFunction.run(ctx); http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java index d0c4e15..01ae55c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java @@ -22,7 +22,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.Preconditions; import java.util.concurrent.ScheduledFuture; @@ -42,7 +42,7 @@ public class StreamSourceContexts { * </ul> * */ public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext( - TimeCharacteristic timeCharacteristic, TimeServiceProvider timeService, + TimeCharacteristic timeCharacteristic, ProcessingTimeService processingTimeService, Object checkpointLock, Output<StreamRecord<OUT>> output, long watermarkInterval) { final SourceFunction.SourceContext<OUT> ctx; @@ -51,7 +51,7 @@ public class StreamSourceContexts { ctx = new ManualWatermarkContext<>(checkpointLock, output); break; case IngestionTime: - ctx = new AutomaticWatermarkContext<>(timeService, checkpointLock, output, watermarkInterval); + ctx = new AutomaticWatermarkContext<>(processingTimeService, checkpointLock, output, watermarkInterval); break; case ProcessingTime: ctx = new NonTimestampContext<>(checkpointLock, output); @@ -111,7 +111,7 @@ public class StreamSourceContexts { */ private static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> { - private final TimeServiceProvider timeService; + private final ProcessingTimeService timeService; private final Object lock; private final Output<StreamRecord<T>> output; private final StreamRecord<T> reuse; @@ -122,7 +122,7 @@ public class StreamSourceContexts { private volatile long nextWatermarkTime; private AutomaticWatermarkContext( - final TimeServiceProvider timeService, + final ProcessingTimeService timeService, final Object checkpointLock, final Output<StreamRecord<T>> output, final long watermarkInterval) { @@ -201,12 +201,12 @@ public class StreamSourceContexts { private class WatermarkEmittingTask implements Triggerable { - private final TimeServiceProvider timeService; + private final ProcessingTimeService timeService; private final Object lock; private final Output<StreamRecord<T>> output; private WatermarkEmittingTask( - TimeServiceProvider timeService, + ProcessingTimeService timeService, Object checkpointLock, Output<StreamRecord<T>> output) { this.timeService = timeService; http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index cd0489f..fc9e39e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -32,7 +32,7 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import java.util.List; import java.util.Map; @@ -77,8 +77,8 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { return taskEnvironment.getInputSplitProvider(); } - public TimeServiceProvider getTimeServiceProvider() { - return operator.getTimerService(); + public ProcessingTimeService getProcessingTimeService() { + return operator.getProcessingTimeService(); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java index c92ff34..0798ed4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java @@ -54,8 +54,8 @@ public class ExtractTimestampsOperator<T> super.open(); watermarkInterval = getExecutionConfig().getAutoWatermarkInterval(); if (watermarkInterval > 0) { - long now = getTimerService().getCurrentProcessingTime(); - getTimerService().registerTimer(now + watermarkInterval, this); + long now = getProcessingTimeService().getCurrentProcessingTime(); + getProcessingTimeService().registerTimer(now + watermarkInterval, this); } currentWatermark = Long.MIN_VALUE; } @@ -81,8 +81,8 @@ public class ExtractTimestampsOperator<T> output.emitWatermark(new Watermark(currentWatermark)); } - long now = getTimerService().getCurrentProcessingTime(); - getTimerService().registerTimer(now + watermarkInterval, this); + long now = getProcessingTimeService().getCurrentProcessingTime(); + getProcessingTimeService().registerTimer(now + watermarkInterval, this); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java index f791723..b1402ed 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java @@ -54,8 +54,8 @@ public class TimestampsAndPeriodicWatermarksOperator<T> watermarkInterval = getExecutionConfig().getAutoWatermarkInterval(); if (watermarkInterval > 0) { - long now = getTimerService().getCurrentProcessingTime(); - getTimerService().registerTimer(now + watermarkInterval, this); + long now = getProcessingTimeService().getCurrentProcessingTime(); + getProcessingTimeService().registerTimer(now + watermarkInterval, this); } } @@ -77,8 +77,8 @@ public class TimestampsAndPeriodicWatermarksOperator<T> output.emitWatermark(newWatermark); } - long now = getTimerService().getCurrentProcessingTime(); - getTimerService().registerTimer(now + watermarkInterval, this); + long now = getProcessingTimeService().getCurrentProcessingTime(); + getProcessingTimeService().registerTimer(now + watermarkInterval, this); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java index b39b760..d331d4d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java @@ -125,7 +125,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, // decide when to first compute the window and when to slide it // the values should align with the start of time (that is, the UNIX epoch, not the big bang) - final long now = getTimerService().getCurrentProcessingTime(); + final long now = getProcessingTimeService().getCurrentProcessingTime(); nextEvaluationTime = now + windowSlide - (now % windowSlide); nextSlideTime = now + paneSize - (now % paneSize); @@ -166,7 +166,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, } // make sure the first window happens - getTimerService().registerTimer(firstTriggerTime, this); + getProcessingTimeService().registerTimer(firstTriggerTime, this); } @Override @@ -230,7 +230,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, } long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime); - getTimerService().registerTimer(nextTriggerTime, this); + getProcessingTimeService().registerTimer(nextTriggerTime, this); } private void computeWindow(long timestamp) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index 6609e4d..141b5b8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -307,7 +307,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window } if (timer != null) { - nextTimer = getTimerService().registerTimer(timer.timestamp, this); + nextTimer = getProcessingTimeService().registerTimer(timer.timestamp, this); } } http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 4d8f655..459c679 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -223,7 +223,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> windowAssignerContext = new WindowAssigner.WindowAssignerContext() { @Override public long getCurrentProcessingTime() { - return WindowOperator.this.getTimerService().getCurrentProcessingTime(); + return WindowOperator.this.getProcessingTimeService().getCurrentProcessingTime(); } }; @@ -233,7 +233,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> // re-register the restored timers (if any) if (processingTimeTimersQueue.size() > 0) { - nextTimer = getTimerService().registerTimer(processingTimeTimersQueue.peek().timestamp, this); + nextTimer = getProcessingTimeService().registerTimer(processingTimeTimersQueue.peek().timestamp, this); } } @@ -495,7 +495,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } if (timer != null) { - nextTimer = getTimerService().registerTimer(timer.timestamp, this); + nextTimer = getProcessingTimeService().registerTimer(timer.timestamp, this); } } @@ -697,7 +697,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> @Override public long getCurrentProcessingTime() { - return WindowOperator.this.getTimerService().getCurrentProcessingTime(); + return WindowOperator.this.getProcessingTimeService().getCurrentProcessingTime(); } @Override @@ -717,7 +717,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> if (nextTimer != null) { nextTimer.cancel(false); } - nextTimer = getTimerService().registerTimer(time, WindowOperator.this); + nextTimer = getProcessingTimeService().registerTimer(time, WindowOperator.this); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java deleted file mode 100644 index d2c743f..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.tasks; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.streaming.runtime.operators.Triggerable; - -import javax.annotation.Nonnull; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CancellationException; -import java.util.concurrent.Delayed; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A {@link TimeServiceProvider} which assigns as current processing time the result of calling - * {@link System#currentTimeMillis()} and registers timers using a {@link ScheduledThreadPoolExecutor}. - */ -public class DefaultTimeServiceProvider extends TimeServiceProvider { - - private static final int STATUS_ALIVE = 0; - private static final int STATUS_QUIESCED = 1; - private static final int STATUS_SHUTDOWN = 2; - - // ------------------------------------------------------------------------ - - /** The containing task that owns this time service provider. */ - private final AsyncExceptionHandler task; - - /** The lock that timers acquire upon triggering */ - private final Object checkpointLock; - - /** The executor service that schedules and calls the triggers of this task*/ - private final ScheduledThreadPoolExecutor timerService; - - private final AtomicInteger status; - - - public DefaultTimeServiceProvider(AsyncExceptionHandler failureHandler, Object checkpointLock) { - this(failureHandler, checkpointLock, null); - } - - public DefaultTimeServiceProvider( - AsyncExceptionHandler task, - Object checkpointLock, - ThreadFactory threadFactory) { - - this.task = checkNotNull(task); - this.checkpointLock = checkNotNull(checkpointLock); - - this.status = new AtomicInteger(STATUS_ALIVE); - - if (threadFactory == null) { - this.timerService = new ScheduledThreadPoolExecutor(1); - } else { - this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory); - } - - // tasks should be removed if the future is canceled - this.timerService.setRemoveOnCancelPolicy(true); - - // make sure shutdown removes all pending tasks - this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); - this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - } - - @Override - public long getCurrentProcessingTime() { - return System.currentTimeMillis(); - } - - @Override - public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) { - long delay = Math.max(timestamp - getCurrentProcessingTime(), 0); - - // we directly try to register the timer and only react to the status on exception - // that way we save unnecessary volatile accesses for each timer - try { - return timerService.schedule( - new TriggerTask(task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS); - } - catch (RejectedExecutionException e) { - final int status = this.status.get(); - if (status == STATUS_QUIESCED) { - return new NeverCompleteFuture(delay); - } - else if (status == STATUS_SHUTDOWN) { - throw new IllegalStateException("Timer service is shut down"); - } - else { - // something else happened, so propagate the exception - throw e; - } - } - } - - @Override - public boolean isTerminated() { - return status.get() == STATUS_SHUTDOWN; - } - - @Override - public void quiesceAndAwaitPending() throws InterruptedException { - if (status.compareAndSet(STATUS_ALIVE, STATUS_QUIESCED)) { - timerService.shutdown(); - - // await forever (almost) - timerService.awaitTermination(365, TimeUnit.DAYS); - } - } - - @Override - public void shutdownService() { - if (status.compareAndSet(STATUS_ALIVE, STATUS_SHUTDOWN) || - status.compareAndSet(STATUS_QUIESCED, STATUS_SHUTDOWN)) - { - timerService.shutdownNow(); - } - } - - // safety net to destroy the thread pool - @Override - protected void finalize() throws Throwable { - super.finalize(); - timerService.shutdownNow(); - } - - @VisibleForTesting - int getNumTasksScheduled() { - BlockingQueue<?> queue = timerService.getQueue(); - if (queue == null) { - return 0; - } else { - return queue.size(); - } - } - - // ------------------------------------------------------------------------ - - /** - * Internal task that is invoked by the timer service and triggers the target. - */ - private static final class TriggerTask implements Runnable { - - private final Object lock; - private final Triggerable target; - private final long timestamp; - private final AsyncExceptionHandler exceptionHandler; - - TriggerTask(AsyncExceptionHandler exceptionHandler, final Object lock, Triggerable target, long timestamp) { - this.exceptionHandler = exceptionHandler; - this.lock = lock; - this.target = target; - this.timestamp = timestamp; - } - - @Override - public void run() { - synchronized (lock) { - try { - target.trigger(timestamp); - } catch (Throwable t) { - TimerException asyncException = new TimerException(t); - exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException); - } - } - } - } - - // ------------------------------------------------------------------------ - - private static final class NeverCompleteFuture implements ScheduledFuture<Object> { - - private final Object lock = new Object(); - - private final long delayMillis; - - private volatile boolean canceled; - - - private NeverCompleteFuture(long delayMillis) { - this.delayMillis = delayMillis; - } - - @Override - public long getDelay(@Nonnull TimeUnit unit) { - return unit.convert(delayMillis, TimeUnit.MILLISECONDS); - } - - @Override - public int compareTo(@Nonnull Delayed o) { - long otherMillis = o.getDelay(TimeUnit.MILLISECONDS); - return Long.compare(this.delayMillis, otherMillis); - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - synchronized (lock) { - canceled = true; - lock.notifyAll(); - } - return true; - } - - @Override - public boolean isCancelled() { - return canceled; - } - - @Override - public boolean isDone() { - return false; - } - - @Override - public Object get() throws InterruptedException { - synchronized (lock) { - while (!canceled) { - lock.wait(); - } - } - throw new CancellationException(); - } - - @Override - public Object get(long timeout, @Nonnull TimeUnit unit) throws InterruptedException, TimeoutException { - synchronized (lock) { - while (!canceled) { - unit.timedWait(lock, timeout); - } - - if (canceled) { - throw new CancellationException(); - } else { - throw new TimeoutException(); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java new file mode 100644 index 0000000..15c3ebb --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.streaming.runtime.operators.Triggerable; + +import java.util.concurrent.ScheduledFuture; + +/** + * Defines the current processing time and handles all related actions, + * such as register timers for tasks to be executed in the future. + * + * <p>The access to the time via {@link #getCurrentProcessingTime()} is always available, regardless of + * whether the timer service has been shut down. + * + * <p>The registration of timers follows a life cycle of three phases: + * <ol> + * <li>In the initial state, it accepts timer registrations and triggers when the time is reached.</li> + * <li>After calling {@link #quiesceAndAwaitPending()}, further calls to + * {@link #registerTimer(long, Triggerable)} will not register any further timers, and will + * return a "dummy" future as a result. This is used for clean shutdown, where currently firing + * timers are waited for and no future timers can be scheduled, without causing hard exceptions.</li> + * <li>After a call to {@link #shutdownService()}, all calls to {@link #registerTimer(long, Triggerable)} + * will result in a hard exception.</li> + * </ol> + */ +public abstract class ProcessingTimeService { + + /** + * Returns the current processing time. + */ + public abstract long getCurrentProcessingTime(); + + /** + * Registers a task to be executed when (processing) time is {@code timestamp}. + * + * @param timestamp Time when the task is to be executed (in processing time) + * @param target The task to be executed + * + * @return The future that represents the scheduled task. This always returns some future, + * even if the timer was shut down + */ + public abstract ScheduledFuture<?> registerTimer(long timestamp, Triggerable target); + + /** + * Returns <tt>true</tt> if the service has been shut down, <tt>false</tt> otherwise. + */ + public abstract boolean isTerminated(); + + /** + * This method puts the service into a state where it does not register new timers, but + * returns for each call to {@link #registerTimer(long, Triggerable)} only a "mock" future. + * Furthermore, the method clears all not yet started timers, and awaits the completion + * of currently executing timers. + * + * <p>This method can be used to cleanly shut down the timer service. The using components + * will not notice that the service is shut down (as for example via exceptions when registering + * a new timer), but the service will simply not fire any timer any more. + */ + public abstract void quiesceAndAwaitPending() throws InterruptedException; + + /** + * Shuts down and clean up the timer service provider hard and immediately. This does not wait + * for any timer to complete. Any further call to {@link #registerTimer(long, Triggerable)} + * will result in a hard exception. + */ + public abstract void shutdownService(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 77efc7b..905782b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -146,11 +146,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> private AbstractKeyedStateBackend<?> keyedStateBackend; /** - * The internal {@link TimeServiceProvider} used to define the current + * The internal {@link ProcessingTimeService} used to define the current * processing time (default = {@code System.currentTimeMillis()}) and * register timers for tasks to be executed in the future. */ - private TimeServiceProvider timerService; + private ProcessingTimeService timerService; /** The map of user-defined accumulators of this task */ private Map<String, Accumulator<?, ?>> accumulatorMap; @@ -190,13 +190,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // ------------------------------------------------------------------------ /** - * Allows the user to specify his own {@link TimeServiceProvider TimerServiceProvider}. - * By default a {@link DefaultTimeServiceProvider DefaultTimerService} is going to be provided. + * Allows the user to specify his own {@link ProcessingTimeService TimerServiceProvider}. + * By default a {@link SystemProcessingTimeService DefaultTimerService} is going to be provided. * Changing it can be useful for testing processing time functionality, such as * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners} * and {@link org.apache.flink.streaming.api.windowing.triggers.Trigger Triggers}. * */ - public void setTimeService(TimeServiceProvider timeProvider) { + public void setProcessingTimeService(ProcessingTimeService timeProvider) { if (timeProvider == null) { throw new RuntimeException("The timeProvider cannot be set to null."); } @@ -224,7 +224,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()); - timerService = new DefaultTimeServiceProvider(this, getCheckpointLock(), timerThreadFactory); + timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory); } operatorChain = new OperatorChain<>(this, getEnvironment().getAccumulatorRegistry().getReadWriteReporter()); @@ -765,10 +765,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } /** - * Returns the {@link TimeServiceProvider} responsible for telling the current + * Returns the {@link ProcessingTimeService} responsible for telling the current * processing time and registering timers. */ - public TimeServiceProvider getTimerService() { + public ProcessingTimeService getProcessingTimeService() { if (timerService == null) { throw new IllegalStateException("The timer service has not been initialized."); } http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java new file mode 100644 index 0000000..3fd4202 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.runtime.operators.Triggerable; + +import javax.annotation.Nonnull; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Delayed; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link ProcessingTimeService} which assigns as current processing time the result of calling + * {@link System#currentTimeMillis()} and registers timers using a {@link ScheduledThreadPoolExecutor}. + */ +public class SystemProcessingTimeService extends ProcessingTimeService { + + private static final int STATUS_ALIVE = 0; + private static final int STATUS_QUIESCED = 1; + private static final int STATUS_SHUTDOWN = 2; + + // ------------------------------------------------------------------------ + + /** The containing task that owns this time service provider. */ + private final AsyncExceptionHandler task; + + /** The lock that timers acquire upon triggering */ + private final Object checkpointLock; + + /** The executor service that schedules and calls the triggers of this task*/ + private final ScheduledThreadPoolExecutor timerService; + + private final AtomicInteger status; + + + public SystemProcessingTimeService(AsyncExceptionHandler failureHandler, Object checkpointLock) { + this(failureHandler, checkpointLock, null); + } + + public SystemProcessingTimeService( + AsyncExceptionHandler task, + Object checkpointLock, + ThreadFactory threadFactory) { + + this.task = checkNotNull(task); + this.checkpointLock = checkNotNull(checkpointLock); + + this.status = new AtomicInteger(STATUS_ALIVE); + + if (threadFactory == null) { + this.timerService = new ScheduledThreadPoolExecutor(1); + } else { + this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory); + } + + // tasks should be removed if the future is canceled + this.timerService.setRemoveOnCancelPolicy(true); + + // make sure shutdown removes all pending tasks + this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + } + + @Override + public long getCurrentProcessingTime() { + return System.currentTimeMillis(); + } + + @Override + public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) { + long delay = Math.max(timestamp - getCurrentProcessingTime(), 0); + + // we directly try to register the timer and only react to the status on exception + // that way we save unnecessary volatile accesses for each timer + try { + return timerService.schedule( + new TriggerTask(task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS); + } + catch (RejectedExecutionException e) { + final int status = this.status.get(); + if (status == STATUS_QUIESCED) { + return new NeverCompleteFuture(delay); + } + else if (status == STATUS_SHUTDOWN) { + throw new IllegalStateException("Timer service is shut down"); + } + else { + // something else happened, so propagate the exception + throw e; + } + } + } + + @Override + public boolean isTerminated() { + return status.get() == STATUS_SHUTDOWN; + } + + @Override + public void quiesceAndAwaitPending() throws InterruptedException { + if (status.compareAndSet(STATUS_ALIVE, STATUS_QUIESCED)) { + timerService.shutdown(); + + // await forever (almost) + timerService.awaitTermination(365, TimeUnit.DAYS); + } + } + + @Override + public void shutdownService() { + if (status.compareAndSet(STATUS_ALIVE, STATUS_SHUTDOWN) || + status.compareAndSet(STATUS_QUIESCED, STATUS_SHUTDOWN)) + { + timerService.shutdownNow(); + } + } + + // safety net to destroy the thread pool + @Override + protected void finalize() throws Throwable { + super.finalize(); + timerService.shutdownNow(); + } + + @VisibleForTesting + int getNumTasksScheduled() { + BlockingQueue<?> queue = timerService.getQueue(); + if (queue == null) { + return 0; + } else { + return queue.size(); + } + } + + // ------------------------------------------------------------------------ + + /** + * Internal task that is invoked by the timer service and triggers the target. + */ + private static final class TriggerTask implements Runnable { + + private final Object lock; + private final Triggerable target; + private final long timestamp; + private final AsyncExceptionHandler exceptionHandler; + + TriggerTask(AsyncExceptionHandler exceptionHandler, final Object lock, Triggerable target, long timestamp) { + this.exceptionHandler = exceptionHandler; + this.lock = lock; + this.target = target; + this.timestamp = timestamp; + } + + @Override + public void run() { + synchronized (lock) { + try { + target.trigger(timestamp); + } catch (Throwable t) { + TimerException asyncException = new TimerException(t); + exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException); + } + } + } + } + + // ------------------------------------------------------------------------ + + private static final class NeverCompleteFuture implements ScheduledFuture<Object> { + + private final Object lock = new Object(); + + private final long delayMillis; + + private volatile boolean canceled; + + + private NeverCompleteFuture(long delayMillis) { + this.delayMillis = delayMillis; + } + + @Override + public long getDelay(@Nonnull TimeUnit unit) { + return unit.convert(delayMillis, TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(@Nonnull Delayed o) { + long otherMillis = o.getDelay(TimeUnit.MILLISECONDS); + return Long.compare(this.delayMillis, otherMillis); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + synchronized (lock) { + canceled = true; + lock.notifyAll(); + } + return true; + } + + @Override + public boolean isCancelled() { + return canceled; + } + + @Override + public boolean isDone() { + return false; + } + + @Override + public Object get() throws InterruptedException { + synchronized (lock) { + while (!canceled) { + lock.wait(); + } + } + throw new CancellationException(); + } + + @Override + public Object get(long timeout, @Nonnull TimeUnit unit) throws InterruptedException, TimeoutException { + synchronized (lock) { + while (!canceled) { + unit.timedWait(lock, timeout); + } + + if (canceled) { + throw new CancellationException(); + } else { + throw new TimeoutException(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java new file mode 100644 index 0000000..d2bf133 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.streaming.runtime.operators.Triggerable; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * This is a {@link ProcessingTimeService} used <b>strictly for testing</b> the + * processing time functionality. + * */ +public class TestProcessingTimeService extends ProcessingTimeService { + + private volatile long currentTime = 0; + + private volatile boolean isTerminated; + private volatile boolean isQuiesced; + + // sorts the timers by timestamp so that they are processed in the correct order. + private final Map<Long, List<Triggerable>> registeredTasks = new TreeMap<>(); + + + public void setCurrentTime(long timestamp) throws Exception { + this.currentTime = timestamp; + + if (!isQuiesced) { + // decide which timers to fire and put them in a list + // we do not fire them here to be able to accommodate timers + // that register other timers. + + Iterator<Map.Entry<Long, List<Triggerable>>> it = registeredTasks.entrySet().iterator(); + List<Map.Entry<Long, List<Triggerable>>> toRun = new ArrayList<>(); + while (it.hasNext()) { + Map.Entry<Long, List<Triggerable>> t = it.next(); + if (t.getKey() <= this.currentTime) { + toRun.add(t); + it.remove(); + } + } + + // now do the actual firing. + for (Map.Entry<Long, List<Triggerable>> tasks: toRun) { + long now = tasks.getKey(); + for (Triggerable task: tasks.getValue()) { + task.trigger(now); + } + } + } + } + + @Override + public long getCurrentProcessingTime() { + return currentTime; + } + + @Override + public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) { + if (isTerminated) { + throw new IllegalStateException("terminated"); + } + if (isQuiesced) { + return new DummyFuture(); + } + + if (timestamp <= currentTime) { + try { + target.trigger(timestamp); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + List<Triggerable> tasks = registeredTasks.get(timestamp); + if (tasks == null) { + tasks = new ArrayList<>(); + registeredTasks.put(timestamp, tasks); + } + tasks.add(target); + + return new DummyFuture(); + } + + @Override + public boolean isTerminated() { + return isTerminated; + } + + @Override + public void quiesceAndAwaitPending() { + if (!isTerminated) { + isQuiesced = true; + registeredTasks.clear(); + } + } + + @Override + public void shutdownService() { + this.isTerminated = true; + } + + public int getNumRegisteredTimers() { + int count = 0; + for (List<Triggerable> tasks: registeredTasks.values()) { + count += tasks.size(); + } + return count; + } + + // ------------------------------------------------------------------------ + + private static class DummyFuture implements ScheduledFuture<Object> { + + @Override + public long getDelay(TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public int compareTo(Delayed o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return true; + } + + @Override + public boolean isCancelled() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDone() { + throw new UnsupportedOperationException(); + } + + @Override + public Object get() throws InterruptedException, ExecutionException { + throw new UnsupportedOperationException(); + } + + @Override + public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + throw new UnsupportedOperationException(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java deleted file mode 100644 index 9eb6cd1..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.tasks; - -import org.apache.flink.streaming.runtime.operators.Triggerable; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.Delayed; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * This is a {@link TimeServiceProvider} used <b>strictly for testing</b> the - * processing time functionality. - * */ -public class TestTimeServiceProvider extends TimeServiceProvider { - - private volatile long currentTime = 0; - - private volatile boolean isTerminated; - private volatile boolean isQuiesced; - - // sorts the timers by timestamp so that they are processed in the correct order. - private final Map<Long, List<Triggerable>> registeredTasks = new TreeMap<>(); - - - public void setCurrentTime(long timestamp) throws Exception { - this.currentTime = timestamp; - - if (!isQuiesced) { - // decide which timers to fire and put them in a list - // we do not fire them here to be able to accommodate timers - // that register other timers. - - Iterator<Map.Entry<Long, List<Triggerable>>> it = registeredTasks.entrySet().iterator(); - List<Map.Entry<Long, List<Triggerable>>> toRun = new ArrayList<>(); - while (it.hasNext()) { - Map.Entry<Long, List<Triggerable>> t = it.next(); - if (t.getKey() <= this.currentTime) { - toRun.add(t); - it.remove(); - } - } - - // now do the actual firing. - for (Map.Entry<Long, List<Triggerable>> tasks: toRun) { - long now = tasks.getKey(); - for (Triggerable task: tasks.getValue()) { - task.trigger(now); - } - } - } - } - - @Override - public long getCurrentProcessingTime() { - return currentTime; - } - - @Override - public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) { - if (isTerminated) { - throw new IllegalStateException("terminated"); - } - if (isQuiesced) { - return new DummyFuture(); - } - - if (timestamp <= currentTime) { - try { - target.trigger(timestamp); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - List<Triggerable> tasks = registeredTasks.get(timestamp); - if (tasks == null) { - tasks = new ArrayList<>(); - registeredTasks.put(timestamp, tasks); - } - tasks.add(target); - - return new DummyFuture(); - } - - @Override - public boolean isTerminated() { - return isTerminated; - } - - @Override - public void quiesceAndAwaitPending() { - if (!isTerminated) { - isQuiesced = true; - registeredTasks.clear(); - } - } - - @Override - public void shutdownService() { - this.isTerminated = true; - } - - public int getNumRegisteredTimers() { - int count = 0; - for (List<Triggerable> tasks: registeredTasks.values()) { - count += tasks.size(); - } - return count; - } - - // ------------------------------------------------------------------------ - - private static class DummyFuture implements ScheduledFuture<Object> { - - @Override - public long getDelay(TimeUnit unit) { - throw new UnsupportedOperationException(); - } - - @Override - public int compareTo(Delayed o) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return true; - } - - @Override - public boolean isCancelled() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isDone() { - throw new UnsupportedOperationException(); - } - - @Override - public Object get() throws InterruptedException, ExecutionException { - throw new UnsupportedOperationException(); - } - - @Override - public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - throw new UnsupportedOperationException(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java deleted file mode 100644 index afa6f35..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.tasks; - -import org.apache.flink.streaming.runtime.operators.Triggerable; - -import java.util.concurrent.ScheduledFuture; - -/** - * Defines the current processing time and handles all related actions, - * such as register timers for tasks to be executed in the future. - * - * <p>The access to the time via {@link #getCurrentProcessingTime()} is always available, regardless of - * whether the timer service has been shut down. - * - * <p>The registration of timers follows a life cycle of three phases: - * <ol> - * <li>In the initial state, it accepts timer registrations and triggers when the time is reached.</li> - * <li>After calling {@link #quiesceAndAwaitPending()}, further calls to - * {@link #registerTimer(long, Triggerable)} will not register any further timers, and will - * return a "dummy" future as a result. This is used for clean shutdown, where currently firing - * timers are waited for and no future timers can be scheduled, without causing hard exceptions.</li> - * <li>After a call to {@link #shutdownService()}, all calls to {@link #registerTimer(long, Triggerable)} - * will result in a hard exception.</li> - * </ol> - */ -public abstract class TimeServiceProvider { - - /** - * Returns the current processing time. - */ - public abstract long getCurrentProcessingTime(); - - /** - * Registers a task to be executed when (processing) time is {@code timestamp}. - * - * @param timestamp Time when the task is to be executed (in processing time) - * @param target The task to be executed - * - * @return The future that represents the scheduled task. This always returns some future, - * even if the timer was shut down - */ - public abstract ScheduledFuture<?> registerTimer(long timestamp, Triggerable target); - - /** - * Returns <tt>true</tt> if the service has been shut down, <tt>false</tt> otherwise. - */ - public abstract boolean isTerminated(); - - /** - * This method puts the service into a state where it does not register new timers, but - * returns for each call to {@link #registerTimer(long, Triggerable)} only a "mock" future. - * Furthermore, the method clears all not yet started timers, and awaits the completion - * of currently executing timers. - * - * <p>This method can be used to cleanly shut down the timer service. The using components - * will not notice that the service is shut down (as for example via exceptions when registering - * a new timer), but the service will simply not fire any timer any more. - */ - public abstract void quiesceAndAwaitPending() throws InterruptedException; - - /** - * Shuts down and clean up the timer service provider hard and immediately. This does not wait - * for any timer to complete. Any further call to {@link #registerTimer(long, Triggerable)} - * will result in a hard exception. - */ - public abstract void shutdownService(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java index 42087b4..f87b5ef 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java @@ -38,8 +38,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.junit.Assert; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -67,7 +67,7 @@ public class StreamSourceOperatorTest { final List<StreamElement> output = new ArrayList<>(); - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null); + setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0); operator.run(new Object(), new CollectorOutput<String>(output)); assertEquals(1, output.size()); @@ -84,7 +84,7 @@ public class StreamSourceOperatorTest { new StreamSource<>(new InfiniteSource<String>()); - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null); + setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0); operator.cancel(); // run and exit @@ -104,7 +104,7 @@ public class StreamSourceOperatorTest { new StreamSource<>(new InfiniteSource<String>()); - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null); + setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0); // trigger an async cancel in a bit new Thread("canceler") { @@ -137,7 +137,7 @@ public class StreamSourceOperatorTest { new StoppableStreamSource<>(new InfiniteSource<String>()); - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null); + setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0); operator.stop(); // run and stop @@ -156,7 +156,7 @@ public class StreamSourceOperatorTest { new StoppableStreamSource<>(new InfiniteSource<String>()); - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null); + setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0); // trigger an async cancel in a bit new Thread("canceler") { @@ -189,7 +189,7 @@ public class StreamSourceOperatorTest { new StoppableStreamSource<>(new InfiniteSource<String>()); // emit latency marks every 10 milliseconds. - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 10, null); + setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 10); // trigger an async cancel in a bit new Thread("canceler") { @@ -225,15 +225,15 @@ public class StreamSourceOperatorTest { new StoppableStreamSource<>(new InfiniteSource<String>()); long watermarkInterval = 10; - TestTimeServiceProvider timeProvider = new TestTimeServiceProvider(); - timeProvider.setCurrentTime(0); + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + processingTimeService.setCurrentTime(0); - setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, 0, timeProvider); + setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, 0, processingTimeService); final List<StreamElement> output = new ArrayList<>(); StreamSourceContexts.getSourceContext(TimeCharacteristic.IngestionTime, - operator.getContainingTask().getTimerService(), + operator.getContainingTask().getProcessingTimeService(), operator.getContainingTask().getCheckpointLock(), new CollectorOutput<String>(output), operator.getExecutionConfig().getAutoWatermarkInterval()); @@ -243,7 +243,7 @@ public class StreamSourceOperatorTest { // going to be aligned with the watermark interval. for (long i = 1; i < 100; i += watermarkInterval) { - timeProvider.setCurrentTime(i); + processingTimeService.setCurrentTime(i); } assertTrue(output.size() == 9); @@ -257,13 +257,21 @@ public class StreamSourceOperatorTest { } // ------------------------------------------------------------------------ - + + @SuppressWarnings("unchecked") + private static <T> void setupSourceOperator(StreamSource<T, ?> operator, + TimeCharacteristic timeChar, + long watermarkInterval, + long latencyMarkInterval) { + setupSourceOperator(operator, timeChar, watermarkInterval, latencyMarkInterval, new TestProcessingTimeService()); + } + @SuppressWarnings("unchecked") private static <T> void setupSourceOperator(StreamSource<T, ?> operator, TimeCharacteristic timeChar, long watermarkInterval, long latencyMarkInterval, - final TimeServiceProvider timeProvider) { + final ProcessingTimeService timeProvider) { ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.setAutoWatermarkInterval(watermarkInterval); @@ -284,12 +292,15 @@ public class StreamSourceOperatorTest { when(mockTask.getExecutionConfig()).thenReturn(executionConfig); when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap()); - doAnswer(new Answer<TimeServiceProvider>() { + doAnswer(new Answer<ProcessingTimeService>() { @Override - public TimeServiceProvider answer(InvocationOnMock invocation) throws Throwable { + public ProcessingTimeService answer(InvocationOnMock invocation) throws Throwable { + if (timeProvider == null) { + throw new RuntimeException("The time provider is null."); + } return timeProvider; } - }).when(mockTask).getTimerService(); + }).when(mockTask).getProcessingTimeService(); operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class)); } http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java index 98058e8..fb1fab5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java @@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.junit.Test; import org.junit.runner.RunWith; @@ -64,7 +64,7 @@ public class StreamTaskTimerTest { testHarness.waitForTaskRunning(); // first one spawns thread - mapTask.getTimerService().registerTimer(System.currentTimeMillis(), new Triggerable() { + mapTask.getProcessingTimeService().registerTimer(System.currentTimeMillis(), new Triggerable() { @Override public void trigger(long timestamp) { } @@ -106,7 +106,7 @@ public class StreamTaskTimerTest { final long t3 = System.currentTimeMillis() + 100; final long t4 = System.currentTimeMillis() + 200; - TimeServiceProvider timeService = mapTask.getTimerService(); + ProcessingTimeService timeService = mapTask.getProcessingTimeService(); timeService.registerTimer(t1, new ValidatingTriggerable(errorRef, t1, 0)); timeService.registerTimer(t2, new ValidatingTriggerable(errorRef, t2, 1)); timeService.registerTimer(t3, new ValidatingTriggerable(errorRef, t3, 2)); http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java new file mode 100644 index 0000000..9c2cee3 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.StreamMap; +import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; + +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ResultPartitionWriter.class}) +@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) +public class TestProcessingTimeServiceTest { + + @Test + public void testCustomTimeServiceProvider() throws Throwable { + TestProcessingTimeService tp = new TestProcessingTimeService(); + + final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>(); + mapTask.setProcessingTimeService(tp); + + final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>( + mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + + StreamConfig streamConfig = testHarness.getStreamConfig(); + + StreamMap<String, String> mapOperator = new StreamMap<>(new StreamTaskTimerTest.DummyMapFunction<String>()); + streamConfig.setStreamOperator(mapOperator); + + testHarness.invoke(); + + assertEquals(testHarness.getProcessingTimeService().getCurrentProcessingTime(), 0); + + tp.setCurrentTime(11); + assertEquals(testHarness.getProcessingTimeService().getCurrentProcessingTime(), 11); + + tp.setCurrentTime(15); + tp.setCurrentTime(16); + assertEquals(testHarness.getProcessingTimeService().getCurrentProcessingTime(), 16); + + // register 2 tasks + mapTask.getProcessingTimeService().registerTimer(30, new Triggerable() { + @Override + public void trigger(long timestamp) { + + } + }); + + mapTask.getProcessingTimeService().registerTimer(40, new Triggerable() { + @Override + public void trigger(long timestamp) { + + } + }); + + assertEquals(2, tp.getNumRegisteredTimers()); + + tp.setCurrentTime(35); + assertEquals(1, tp.getNumRegisteredTimers()); + + tp.setCurrentTime(40); + assertEquals(0, tp.getNumRegisteredTimers()); + + tp.shutdownService(); + } + + // ------------------------------------------------------------------------ + + public static class ReferenceSettingExceptionHandler implements AsyncExceptionHandler { + + private final AtomicReference<Throwable> errorReference; + + public ReferenceSettingExceptionHandler(AtomicReference<Throwable> errorReference) { + this.errorReference = errorReference; + } + + @Override + public void handleAsyncException(String message, Throwable exception) { + errorReference.compareAndSet(null, exception); + } + } +}
