[FLINK-4877] Rename Triggerable to ProcessingTimeCallback This more accurately describes what the interface is for.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94a3f251 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94a3f251 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94a3f251 Branch: refs/heads/master Commit: 94a3f251cd3eed54c7d8220db119eecbfb11c3b9 Parents: 81b19e5 Author: Aljoscha Krettek <[email protected]> Authored: Tue Oct 18 11:08:58 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Oct 21 19:03:05 2016 +0200 ---------------------------------------------------------------------- .../connectors/fs/bucketing/BucketingSink.java | 4 +- .../kafka/internals/AbstractFetcher.java | 4 +- .../api/operators/HeapInternalTimerService.java | 4 +- .../api/operators/StreamSourceContexts.java | 4 +- .../operators/ExtractTimestampsOperator.java | 3 +- ...TimestampsAndPeriodicWatermarksOperator.java | 3 +- .../runtime/operators/Triggerable.java | 40 -------------------- ...ractAlignedProcessingTimeWindowOperator.java | 4 +- .../runtime/tasks/ProcessingTimeCallback.java | 40 ++++++++++++++++++++ .../runtime/tasks/ProcessingTimeService.java | 12 +++--- .../tasks/SystemProcessingTimeService.java | 7 ++-- .../tasks/TestProcessingTimeService.java | 16 ++++---- .../runtime/operators/StreamTaskTimerTest.java | 19 +++++----- .../TestProcessingTimeServiceTest.java | 5 ++- .../tasks/SystemProcessingTimeServiceTest.java | 19 +++++----- .../runtime/StreamTaskTimerITCase.java | 6 +-- 16 files changed, 94 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java index 6f8a739..66e704c 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java @@ -31,7 +31,7 @@ import org.apache.flink.streaming.connectors.fs.Clock; import org.apache.flink.streaming.connectors.fs.SequenceFileWriter; import org.apache.flink.streaming.connectors.fs.StringWriter; import org.apache.flink.streaming.connectors.fs.Writer; -import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -138,7 +138,7 @@ import java.util.Iterator; */ public class BucketingSink<T> extends RichSinkFunction<T> - implements InputTypeConfigurable, Checkpointed<BucketingSink.State<T>>, CheckpointListener, Triggerable { + implements InputTypeConfigurable, Checkpointed<BucketingSink.State<T>>, CheckpointListener, ProcessingTimeCallback { private static final long serialVersionUID = 1L; private static Logger LOG = LoggerFactory.getLogger(BucketingSink.class); http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index 321991a..58bca52 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.SerializedValue; @@ -461,7 +461,7 @@ public abstract class AbstractFetcher<T, KPH> { * The periodic watermark emitter. In its given interval, it checks all partitions for * the current event time watermark, and possibly emits the next watermark. */ - private static class PeriodicWatermarkEmitter implements Triggerable { + private static class PeriodicWatermarkEmitter implements ProcessingTimeCallback { private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions; http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java index c77b634..15258cf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java @@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.InstantiationUtil; @@ -37,7 +37,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * {@link InternalTimerService} that stores timers on the Java heap. */ -public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, Triggerable { +public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, ProcessingTimeCallback { private final TypeSerializer<K> keySerializer; http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java index 01ae55c..66d2ac2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java @@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.Preconditions; @@ -199,7 +199,7 @@ public class StreamSourceContexts { } } - private class WatermarkEmittingTask implements Triggerable { + private class WatermarkEmittingTask implements ProcessingTimeCallback { private final ProcessingTimeService timeService; private final Object lock; http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java index 0798ed4..5f5028a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java @@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; /** * A {@link org.apache.flink.streaming.api.operators.StreamOperator} for extracting timestamps @@ -36,7 +37,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @Deprecated public class ExtractTimestampsOperator<T> extends AbstractUdfStreamOperator<T, TimestampExtractor<T>> - implements OneInputStreamOperator<T, T>, Triggerable { + implements OneInputStreamOperator<T, T>, ProcessingTimeCallback { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java index b1402ed..ba72659 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java @@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; /** * A stream operator that extracts timestamps from stream elements and @@ -32,7 +33,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; */ public class TimestampsAndPeriodicWatermarksOperator<T> extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>> - implements OneInputStreamOperator<T, T>, Triggerable { + implements OneInputStreamOperator<T, T>, ProcessingTimeCallback { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java deleted file mode 100644 index 9ca3f33..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators; - -import org.apache.flink.annotation.Internal; - -/** - * This interface must be implemented by objects that are triggered by the timer service available - * to stream operators in {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment}. - */ -@Internal -public interface Triggerable { - - /** - * This method is invoked with the timestamp for which the trigger was scheduled. - * <p> - * If the triggering is delayed for whatever reason (trigger timer was blocked, JVM stalled due - * to a garbage collection), the timestamp supplied to this function will still be the original - * timestamp for which the trigger was scheduled. - * - * @param timestamp The timestamp for which the trigger event was scheduled. - */ - void trigger(long timestamp) throws Exception ; -} http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java index 2a77c0a..80a317e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java @@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import static java.util.Objects.requireNonNull; @@ -41,7 +41,7 @@ import static java.util.Objects.requireNonNull; @Internal public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, STATE, F extends Function> extends AbstractUdfStreamOperator<OUT, F> - implements OneInputStreamOperator<IN, OUT>, Triggerable { + implements OneInputStreamOperator<IN, OUT>, ProcessingTimeCallback { private static final long serialVersionUID = 3245500864882459867L; http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java new file mode 100644 index 0000000..aca1718 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.annotation.Internal; + +/** + * Interface for processing-time callbacks that can be registered at a + * {@link ProcessingTimeService}. + */ +@Internal +public interface ProcessingTimeCallback { + + /** + * This method is invoked with the timestamp for which the trigger was scheduled. + * <p> + * If the triggering is delayed for whatever reason (trigger timer was blocked, JVM stalled due + * to a garbage collection), the timestamp supplied to this function will still be the original + * timestamp for which the trigger was scheduled. + * + * @param timestamp The timestamp for which the trigger event was scheduled. + */ + void trigger(long timestamp) throws Exception ; +} http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java index 15c3ebb..f64bead 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java @@ -17,8 +17,6 @@ package org.apache.flink.streaming.runtime.tasks; -import org.apache.flink.streaming.runtime.operators.Triggerable; - import java.util.concurrent.ScheduledFuture; /** @@ -32,10 +30,10 @@ import java.util.concurrent.ScheduledFuture; * <ol> * <li>In the initial state, it accepts timer registrations and triggers when the time is reached.</li> * <li>After calling {@link #quiesceAndAwaitPending()}, further calls to - * {@link #registerTimer(long, Triggerable)} will not register any further timers, and will + * {@link #registerTimer(long, ProcessingTimeCallback)} will not register any further timers, and will * return a "dummy" future as a result. This is used for clean shutdown, where currently firing * timers are waited for and no future timers can be scheduled, without causing hard exceptions.</li> - * <li>After a call to {@link #shutdownService()}, all calls to {@link #registerTimer(long, Triggerable)} + * <li>After a call to {@link #shutdownService()}, all calls to {@link #registerTimer(long, ProcessingTimeCallback)} * will result in a hard exception.</li> * </ol> */ @@ -55,7 +53,7 @@ public abstract class ProcessingTimeService { * @return The future that represents the scheduled task. This always returns some future, * even if the timer was shut down */ - public abstract ScheduledFuture<?> registerTimer(long timestamp, Triggerable target); + public abstract ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target); /** * Returns <tt>true</tt> if the service has been shut down, <tt>false</tt> otherwise. @@ -64,7 +62,7 @@ public abstract class ProcessingTimeService { /** * This method puts the service into a state where it does not register new timers, but - * returns for each call to {@link #registerTimer(long, Triggerable)} only a "mock" future. + * returns for each call to {@link #registerTimer(long, ProcessingTimeCallback)} only a "mock" future. * Furthermore, the method clears all not yet started timers, and awaits the completion * of currently executing timers. * @@ -76,7 +74,7 @@ public abstract class ProcessingTimeService { /** * Shuts down and clean up the timer service provider hard and immediately. This does not wait - * for any timer to complete. Any further call to {@link #registerTimer(long, Triggerable)} + * for any timer to complete. Any further call to {@link #registerTimer(long, ProcessingTimeCallback)} * will result in a hard exception. */ public abstract void shutdownService(); http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java index 3fd4202..b433f8d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.streaming.runtime.operators.Triggerable; import javax.annotation.Nonnull; import java.util.concurrent.BlockingQueue; @@ -92,7 +91,7 @@ public class SystemProcessingTimeService extends ProcessingTimeService { } @Override - public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) { + public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) { long delay = Math.max(timestamp - getCurrentProcessingTime(), 0); // we directly try to register the timer and only react to the status on exception @@ -165,11 +164,11 @@ public class SystemProcessingTimeService extends ProcessingTimeService { private static final class TriggerTask implements Runnable { private final Object lock; - private final Triggerable target; + private final ProcessingTimeCallback target; private final long timestamp; private final AsyncExceptionHandler exceptionHandler; - TriggerTask(AsyncExceptionHandler exceptionHandler, final Object lock, Triggerable target, long timestamp) { + TriggerTask(AsyncExceptionHandler exceptionHandler, final Object lock, ProcessingTimeCallback target, long timestamp) { this.exceptionHandler = exceptionHandler; this.lock = lock; this.target = target; http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java index d0a2ea9..3e6c273 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java @@ -17,8 +17,6 @@ package org.apache.flink.streaming.runtime.tasks; -import org.apache.flink.streaming.runtime.operators.Triggerable; - import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; @@ -69,7 +67,7 @@ public class TestProcessingTimeService extends ProcessingTimeService { for (Map.Entry<Long, List<ScheduledTimerFuture>> tasks: toRun) { long now = tasks.getKey(); for (ScheduledTimerFuture task: tasks.getValue()) { - task.getTriggerable().trigger(now); + task.getProcessingTimeCallback().trigger(now); } } } @@ -81,7 +79,7 @@ public class TestProcessingTimeService extends ProcessingTimeService { } @Override - public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) { + public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) { if (isTerminated) { throw new IllegalStateException("terminated"); } @@ -149,12 +147,12 @@ public class TestProcessingTimeService extends ProcessingTimeService { private class ScheduledTimerFuture implements ScheduledFuture<Object> { - private final Triggerable triggerable; + private final ProcessingTimeCallback processingTimeCallback; private final long timestamp; - public ScheduledTimerFuture(Triggerable triggerable, long timestamp) { - this.triggerable = triggerable; + public ScheduledTimerFuture(ProcessingTimeCallback processingTimeCallback, long timestamp) { + this.processingTimeCallback = processingTimeCallback; this.timestamp = timestamp; } @@ -197,8 +195,8 @@ public class TestProcessingTimeService extends ProcessingTimeService { throw new UnsupportedOperationException(); } - public Triggerable getTriggerable() { - return triggerable; + public ProcessingTimeCallback getProcessingTimeCallback() { + return processingTimeCallback; } public long getTimestamp() { http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java index fb1fab5..87241dd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.junit.Test; import org.junit.runner.RunWith; @@ -64,7 +65,7 @@ public class StreamTaskTimerTest { testHarness.waitForTaskRunning(); // first one spawns thread - mapTask.getProcessingTimeService().registerTimer(System.currentTimeMillis(), new Triggerable() { + mapTask.getProcessingTimeService().registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() { @Override public void trigger(long timestamp) { } @@ -107,14 +108,14 @@ public class StreamTaskTimerTest { final long t4 = System.currentTimeMillis() + 200; ProcessingTimeService timeService = mapTask.getProcessingTimeService(); - timeService.registerTimer(t1, new ValidatingTriggerable(errorRef, t1, 0)); - timeService.registerTimer(t2, new ValidatingTriggerable(errorRef, t2, 1)); - timeService.registerTimer(t3, new ValidatingTriggerable(errorRef, t3, 2)); - timeService.registerTimer(t4, new ValidatingTriggerable(errorRef, t4, 3)); + timeService.registerTimer(t1, new ValidatingProcessingTimeCallback(errorRef, t1, 0)); + timeService.registerTimer(t2, new ValidatingProcessingTimeCallback(errorRef, t2, 1)); + timeService.registerTimer(t3, new ValidatingProcessingTimeCallback(errorRef, t3, 2)); + timeService.registerTimer(t4, new ValidatingProcessingTimeCallback(errorRef, t4, 3)); long deadline = System.currentTimeMillis() + 20000; while (errorRef.get() == null && - ValidatingTriggerable.numInSequence < 4 && + ValidatingProcessingTimeCallback.numInSequence < 4 && System.currentTimeMillis() < deadline) { Thread.sleep(100); @@ -126,7 +127,7 @@ public class StreamTaskTimerTest { fail(errorRef.get().getMessage()); } - assertEquals(4, ValidatingTriggerable.numInSequence); + assertEquals(4, ValidatingProcessingTimeCallback.numInSequence); testHarness.endInput(); testHarness.waitForTaskCompletion(); @@ -146,7 +147,7 @@ public class StreamTaskTimerTest { } } - private static class ValidatingTriggerable implements Triggerable { + private static class ValidatingProcessingTimeCallback implements ProcessingTimeCallback { static int numInSequence; @@ -155,7 +156,7 @@ public class StreamTaskTimerTest { private final long expectedTimestamp; private final int expectedInSequence; - private ValidatingTriggerable(AtomicReference<Throwable> errorRef, long expectedTimestamp, int expectedInSequence) { + private ValidatingProcessingTimeCallback(AtomicReference<Throwable> errorRef, long expectedTimestamp, int expectedInSequence) { this.errorRef = errorRef; this.expectedTimestamp = expectedTimestamp; this.expectedInSequence = expectedInSequence; http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java index 9c2cee3..db56717 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.junit.Test; import org.junit.runner.RunWith; @@ -70,14 +71,14 @@ public class TestProcessingTimeServiceTest { assertEquals(testHarness.getProcessingTimeService().getCurrentProcessingTime(), 16); // register 2 tasks - mapTask.getProcessingTimeService().registerTimer(30, new Triggerable() { + mapTask.getProcessingTimeService().registerTimer(30, new ProcessingTimeCallback() { @Override public void trigger(long timestamp) { } }); - mapTask.getProcessingTimeService().registerTimer(40, new Triggerable() { + mapTask.getProcessingTimeService().registerTimer(40, new ProcessingTimeCallback() { @Override public void trigger(long timestamp) { http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java index e7944df..dc679ab 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java @@ -20,7 +20,6 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler; -import org.apache.flink.streaming.runtime.operators.Triggerable; import org.junit.Test; @@ -51,7 +50,7 @@ public class SystemProcessingTimeServiceTest { assertEquals(0, timer.getNumTasksScheduled()); // schedule something - ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis(), new Triggerable() { + ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() { @Override public void trigger(long timestamp) { assertTrue(Thread.holdsLock(lock)); @@ -87,7 +86,7 @@ public class SystemProcessingTimeServiceTest { final OneShotLatch latch = new OneShotLatch(); // the task should trigger immediately and should block until terminated with interruption - timer.registerTimer(System.currentTimeMillis(), new Triggerable() { + timer.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() { @Override public void trigger(long timestamp) throws Exception { latch.trigger(); @@ -105,7 +104,7 @@ public class SystemProcessingTimeServiceTest { } try { - timer.registerTimer(System.currentTimeMillis() + 1000, new Triggerable() { + timer.registerTimer(System.currentTimeMillis() + 1000, new ProcessingTimeCallback() { @Override public void trigger(long timestamp) {} }); @@ -141,7 +140,7 @@ public class SystemProcessingTimeServiceTest { final ReentrantLock scopeLock = new ReentrantLock(); - timer.registerTimer(System.currentTimeMillis() + 20, new Triggerable() { + timer.registerTimer(System.currentTimeMillis() + 20, new ProcessingTimeCallback() { @Override public void trigger(long timestamp) throws Exception { scopeLock.lock(); @@ -163,7 +162,7 @@ public class SystemProcessingTimeServiceTest { assertTrue(scopeLock.tryLock()); // should be able to schedule more tasks (that never get executed) - ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() - 5, new Triggerable() { + ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() - 5, new ProcessingTimeCallback() { @Override public void trigger(long timestamp) throws Exception { throw new Exception("test"); @@ -198,7 +197,7 @@ public class SystemProcessingTimeServiceTest { assertEquals(0, timer.getNumTasksScheduled()); // schedule something - ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() + 100000000, new Triggerable() { + ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() + 100000000, new ProcessingTimeCallback() { @Override public void trigger(long timestamp) {} }); @@ -233,7 +232,7 @@ public class SystemProcessingTimeServiceTest { } }, lock); - timeServiceProvider.registerTimer(System.currentTimeMillis(), new Triggerable() { + timeServiceProvider.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() { @Override public void trigger(long timestamp) throws Exception { throw new Exception("Exception in Timer"); @@ -257,7 +256,7 @@ public class SystemProcessingTimeServiceTest { // we block the timer execution to make sure we have all the time // to register some additional timers out of order - timer.registerTimer(System.currentTimeMillis(), new Triggerable() { + timer.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() { @Override public void trigger(long timestamp) throws Exception { sync.await(); @@ -272,7 +271,7 @@ public class SystemProcessingTimeServiceTest { final long time4 = now - 2; final ArrayBlockingQueue<Long> timestamps = new ArrayBlockingQueue<>(4); - Triggerable trigger = new Triggerable() { + ProcessingTimeCallback trigger = new ProcessingTimeCallback() { @Override public void trigger(long timestamp) { timestamps.add(timestamp); http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java index e7f62fd..c0cd0be 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java @@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.operators.Triggerable; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.TimerException; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; @@ -171,7 +171,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase { Assert.assertTrue(testSuccess); } - public static class TimerOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String>, Triggerable { + public static class TimerOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String>, ProcessingTimeCallback { private static final long serialVersionUID = 1L; int numTimers = 0; @@ -230,7 +230,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase { } } - public static class TwoInputTimerOperator extends AbstractStreamOperator<String> implements TwoInputStreamOperator<String, String, String>, Triggerable { + public static class TwoInputTimerOperator extends AbstractStreamOperator<String> implements TwoInputStreamOperator<String, String, String>, ProcessingTimeCallback { private static final long serialVersionUID = 1L; int numTimers = 0;
