http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java deleted file mode 100644 index a8f2dc4..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators; - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.api.operators.StreamMap; -import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler; -import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; -import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; -import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; - -import org.junit.Test; -import org.junit.runner.RunWith; - -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.Assert.assertEquals; - -@RunWith(PowerMockRunner.class) -@PrepareForTest({ResultPartitionWriter.class}) -@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) -public class TestTimeProviderTest { - - @Test - public void testCustomTimeServiceProvider() throws Throwable { - TestTimeServiceProvider tp = new TestTimeServiceProvider(); - - final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>(); - mapTask.setTimeService(tp); - - final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>( - mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); - - StreamConfig streamConfig = testHarness.getStreamConfig(); - - StreamMap<String, String> mapOperator = new StreamMap<>(new StreamTaskTimerTest.DummyMapFunction<String>()); - streamConfig.setStreamOperator(mapOperator); - - testHarness.invoke(); - - assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 0); - - tp.setCurrentTime(11); - assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 11); - - tp.setCurrentTime(15); - tp.setCurrentTime(16); - assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 16); - - // register 2 tasks - mapTask.getTimerService().registerTimer(30, new Triggerable() { - @Override - public void trigger(long timestamp) { - - } - }); - - mapTask.getTimerService().registerTimer(40, new Triggerable() { - @Override - public void trigger(long timestamp) { - - } - }); - - assertEquals(2, tp.getNumRegisteredTimers()); - - tp.setCurrentTime(35); - assertEquals(1, tp.getNumRegisteredTimers()); - - tp.setCurrentTime(40); - assertEquals(0, tp.getNumRegisteredTimers()); - - tp.shutdownService(); - } - - // ------------------------------------------------------------------------ - - public static class ReferenceSettingExceptionHandler implements AsyncExceptionHandler { - - private final AtomicReference<Throwable> errorReference; - - public ReferenceSettingExceptionHandler(AtomicReference<Throwable> errorReference) { - this.errorReference = errorReference; - } - - @Override - public void handleAsyncException(String message, Throwable exception) { - errorReference.compareAndSet(null, exception); - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java index 4d5c881..af99d0d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.junit.Test; @@ -41,9 +42,13 @@ public class TimestampsAndPeriodicWatermarksOperatorTest { final ExecutionConfig config = new ExecutionConfig(); config.setAutoWatermarkInterval(50); + + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); OneInputStreamOperatorTestHarness<Long, Long> testHarness = - new OneInputStreamOperatorTestHarness<Long, Long>(operator, config); + new OneInputStreamOperatorTestHarness<Long, Long>(operator, config, processingTimeService); + + long currentTime = 0; testHarness.open(); @@ -71,7 +76,8 @@ public class TimestampsAndPeriodicWatermarksOperatorTest { // check the invariant assertTrue(lastWatermark < nextElementValue); } else { - Thread.sleep(10); + currentTime = currentTime + 10; + processingTimeService.setCurrentTime(currentTime); } } @@ -102,7 +108,8 @@ public class TimestampsAndPeriodicWatermarksOperatorTest { // check the invariant assertTrue(lastWatermark < nextElementValue); } else { - Thread.sleep(10); + currentTime = currentTime + 10; + processingTimeService.setCurrentTime(currentTime); } } http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index e96109e..128c88b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -39,12 +39,12 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler; +import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; @@ -186,8 +186,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { try { @SuppressWarnings("unchecked") final Output<StreamRecord<String>> mockOut = mock(Output.class); - final TimeServiceProvider timerService = new NoOpTimerService(); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, new Object()); + final ProcessingTimeService timerService = new NoOpTimerService(); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); AccumulatingProcessingTimeWindowOperator<String, String, String> op; @@ -234,13 +234,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { final Object lock = new Object(); final AtomicReference<Throwable> error = new AtomicReference<>(); - final TimeServiceProvider timerService = new DefaultTimeServiceProvider( + final ProcessingTimeService timerService = new SystemProcessingTimeService( new ReferenceSettingExceptionHandler(error), lock); try { final int windowSize = 50; final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); // tumbling window that triggers every 20 milliseconds AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = @@ -299,12 +299,12 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { final Object lock = new Object(); final AtomicReference<Throwable> error = new AtomicReference<>(); - final TimeServiceProvider timerService = new DefaultTimeServiceProvider( + final ProcessingTimeService timerService = new SystemProcessingTimeService( new ReferenceSettingExceptionHandler(error), lock); try { final CollectingOutput<Integer> out = new CollectingOutput<>(50); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); // tumbling window that triggers every 20 milliseconds AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = @@ -371,12 +371,12 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { final Object lock = new Object(); final AtomicReference<Throwable> error = new AtomicReference<>(); - final TimeServiceProvider timerService = new DefaultTimeServiceProvider( + final ProcessingTimeService timerService = new SystemProcessingTimeService( new ReferenceSettingExceptionHandler(error), lock); try { final CollectingOutput<Integer> out = new CollectingOutput<>(50); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); // tumbling window that triggers every 20 milliseconds AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = @@ -438,12 +438,12 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { final Object lock = new Object(); final AtomicReference<Throwable> error = new AtomicReference<>(); - final TimeServiceProvider timerService = new DefaultTimeServiceProvider( + final ProcessingTimeService timerService = new SystemProcessingTimeService( new ReferenceSettingExceptionHandler(error), lock); try { final CollectingOutput<Integer> out = new CollectingOutput<>(50); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); // tumbling window that triggers every 20 milliseconds AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = @@ -503,7 +503,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, IntSerializer.INSTANCE, windowSize, windowSize); - TestTimeServiceProvider timerService = new TestTimeServiceProvider(); + TestProcessingTimeService timerService = new TestProcessingTimeService(); OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService); @@ -542,7 +542,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, IntSerializer.INSTANCE, windowSize, windowSize); - timerService = new TestTimeServiceProvider(); + timerService = new TestProcessingTimeService(); testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService); testHarness.setup(); @@ -583,7 +583,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { final int windowSlide = 50; final int windowSize = factor * windowSlide; - TestTimeServiceProvider timerService = new TestTimeServiceProvider(); + TestProcessingTimeService timerService = new TestProcessingTimeService(); // sliding window (200 msecs) every 50 msecs AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = @@ -631,7 +631,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, IntSerializer.INSTANCE, windowSize, windowSlide); - timerService = new TestTimeServiceProvider(); + timerService = new TestProcessingTimeService(); testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService); testHarness.setup(); @@ -684,7 +684,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { new StatefulFunction(), identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50); - TestTimeServiceProvider timerService = new TestTimeServiceProvider(); + TestProcessingTimeService timerService = new TestProcessingTimeService(); OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService, identitySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -777,7 +777,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { // ------------------------------------------------------------------------ - private static StreamTask<?, ?> createMockTask(Object lock) { + private static StreamTask<?, ?> createMockTask() { Configuration configuration = new Configuration(); configuration.setString(ConfigConstants.STATE_BACKEND, "jobmanager"); @@ -785,7 +785,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>()); when(task.getName()).thenReturn("Test task name"); when(task.getExecutionConfig()).thenReturn(new ExecutionConfig()); - when(task.getCheckpointLock()).thenReturn(lock); final TaskManagerRuntimeInfo mockTaskManagerRuntimeInfo = mock(TaskManagerRuntimeInfo.class); when(mockTaskManagerRuntimeInfo.getConfiguration()).thenReturn(configuration); @@ -801,10 +800,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { } private static StreamTask<?, ?> createMockTaskWithTimer( - final TimeServiceProvider timerService, final Object lock) + final ProcessingTimeService timerService) { - StreamTask<?, ?> mockTask = createMockTask(lock); - when(mockTask.getTimerService()).thenReturn(timerService); + StreamTask<?, ?> mockTask = createMockTask(); + when(mockTask.getProcessingTimeService()).thenReturn(timerService); return mockTask; } @@ -819,7 +818,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { return result; } - private static void shutdownTimerServiceAndWait(TimeServiceProvider timers) throws Exception { + private static void shutdownTimerServiceAndWait(ProcessingTimeService timers) throws Exception { timers.shutdownService(); while (!timers.isTerminated()) { http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java index 802329b..bb64a08 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java @@ -40,12 +40,13 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler; +import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; + +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -194,8 +195,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { try { @SuppressWarnings("unchecked") final Output<StreamRecord<String>> mockOut = mock(Output.class); - final TimeServiceProvider timerService = new NoOpTimerService(); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, new Object()); + final ProcessingTimeService timerService = new NoOpTimerService(); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); AggregatingProcessingTimeWindowOperator<String, String> op; @@ -242,7 +243,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { final Object lock = new Object(); final AtomicReference<Throwable> error = new AtomicReference<>(); - final TimeServiceProvider timerService = new DefaultTimeServiceProvider( + final ProcessingTimeService timerService = new SystemProcessingTimeService( new ReferenceSettingExceptionHandler(error), lock); try { @@ -255,7 +256,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSize); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out); op.open(); @@ -311,13 +312,13 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { final Object lock = new Object(); final AtomicReference<Throwable> error = new AtomicReference<>(); - final TimeServiceProvider timerService = new DefaultTimeServiceProvider( + final ProcessingTimeService timerService = new SystemProcessingTimeService( new ReferenceSettingExceptionHandler(error), lock); try { final int windowSize = 50; final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op = new AggregatingProcessingTimeWindowOperator<>( @@ -389,13 +390,13 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { final Object lock = new Object(); final AtomicReference<Throwable> error = new AtomicReference<>(); - final TimeServiceProvider timerService = new DefaultTimeServiceProvider( + final ProcessingTimeService timerService = new SystemProcessingTimeService( new ReferenceSettingExceptionHandler(error), lock); try { final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); // tumbling window that triggers every 20 milliseconds AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op = @@ -471,12 +472,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { final Object lock = new Object(); final AtomicReference<Throwable> error = new AtomicReference<>(); - final TimeServiceProvider timerService = new DefaultTimeServiceProvider( + final ProcessingTimeService timerService = new SystemProcessingTimeService( new ReferenceSettingExceptionHandler(error), lock); try { final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); // tumbling window that triggers every 20 milliseconds AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op = @@ -541,12 +542,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { final Object lock = new Object(); final AtomicReference<Throwable> error = new AtomicReference<>(); - final TimeServiceProvider timerService = new DefaultTimeServiceProvider( + final ProcessingTimeService timerService = new SystemProcessingTimeService( new ReferenceSettingExceptionHandler(error), lock); try { final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock); + final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); ReduceFunction<Tuple2<Integer, Integer>> failingFunction = new FailingFunction(100); @@ -605,7 +606,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { try { final int windowSize = 200; - TestTimeServiceProvider timerService = new TestTimeServiceProvider(); + TestProcessingTimeService timerService = new TestProcessingTimeService(); // tumbling window that triggers every 50 milliseconds AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op = @@ -655,7 +656,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSize); - timerService = new TestTimeServiceProvider(); + timerService = new TestProcessingTimeService(); testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService); testHarness.setup(); @@ -698,7 +699,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { final int windowSlide = 50; final int windowSize = factor * windowSlide; - TestTimeServiceProvider timerService = new TestTimeServiceProvider(); + TestProcessingTimeService timerService = new TestProcessingTimeService(); // sliding window (200 msecs) every 50 msecs AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op = @@ -748,7 +749,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSlide); - timerService = new TestTimeServiceProvider(); + timerService = new TestProcessingTimeService(); testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService); testHarness.setup(); @@ -796,7 +797,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { try { final long twoSeconds = 2000; - TestTimeServiceProvider timerService = new TestTimeServiceProvider(); + TestProcessingTimeService timerService = new TestProcessingTimeService(); StatefulFunction.globalCounts.clear(); @@ -850,7 +851,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { final int windowSlide = 50; final int windowSize = factor * windowSlide; - TestTimeServiceProvider timerService = new TestTimeServiceProvider(); + TestProcessingTimeService timerService = new TestProcessingTimeService(); StatefulFunction.globalCounts.clear(); @@ -977,7 +978,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { // ------------------------------------------------------------------------ - private static StreamTask<?, ?> createMockTask(Object lock) { + private static StreamTask<?, ?> createMockTask() { Configuration configuration = new Configuration(); configuration.setString(ConfigConstants.STATE_BACKEND, "jobmanager"); @@ -985,7 +986,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>()); when(task.getName()).thenReturn("Test task name"); when(task.getExecutionConfig()).thenReturn(new ExecutionConfig()); - when(task.getCheckpointLock()).thenReturn(lock); final TaskManagerRuntimeInfo mockTaskManagerRuntimeInfo = mock(TaskManagerRuntimeInfo.class); when(mockTaskManagerRuntimeInfo.getConfiguration()).thenReturn(configuration); @@ -996,10 +996,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { return task; } - private static StreamTask<?, ?> createMockTaskWithTimer(final TimeServiceProvider timerService, final Object lock) + private static StreamTask<?, ?> createMockTaskWithTimer(final ProcessingTimeService timerService) { - StreamTask<?, ?> mockTask = createMockTask(lock); - when(mockTask.getTimerService()).thenReturn(timerService); + StreamTask<?, ?> mockTask = createMockTask(); + when(mockTask.getProcessingTimeService()).thenReturn(timerService); return mockTask; } @@ -1018,7 +1018,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { return result; } - private static void shutdownTimerServiceAndWait(TimeServiceProvider timers) throws Exception { + private static void shutdownTimerServiceAndWait(ProcessingTimeService timers) throws Exception { timers.shutdownService(); while (!timers.isTerminated()) { http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java index d0c5050..a7a71cf 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java @@ -19,11 +19,11 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.streaming.runtime.operators.Triggerable; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import java.util.concurrent.ScheduledFuture; -class NoOpTimerService extends TimeServiceProvider { +class NoOpTimerService extends ProcessingTimeService { private volatile boolean terminated; http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index 2b0b915..38f0778 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -63,7 +63,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; @@ -850,8 +850,6 @@ public class WindowOperatorTest extends TestLogger { TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); - TestTimeServiceProvider timer = new TestTimeServiceProvider(); - ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents", new SumReducer(), inputType.createSerializer(new ExecutionConfig())); @@ -869,7 +867,7 @@ public class WindowOperatorTest extends TestLogger { OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>( - operator, new ExecutionConfig(), timer, + operator, new ExecutionConfig(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); testHarness.open(); @@ -898,7 +896,7 @@ public class WindowOperatorTest extends TestLogger { OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> otherTestHarness = new KeyedOneInputStreamOperatorTestHarness<>( - otherOperator, new ExecutionConfig(), timer, + otherOperator, new ExecutionConfig(), new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); otherTestHarness.setup(); @@ -928,7 +926,7 @@ public class WindowOperatorTest extends TestLogger { new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), ProcessingTimeTrigger.create(), 0); - TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider(); + TestProcessingTimeService testTimeProvider = new TestProcessingTimeService(); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -987,7 +985,7 @@ public class WindowOperatorTest extends TestLogger { new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), ProcessingTimeTrigger.create(), 0); - TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider(); + TestProcessingTimeService testTimeProvider = new TestProcessingTimeService(); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); @@ -1059,7 +1057,7 @@ public class WindowOperatorTest extends TestLogger { new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()), ProcessingTimeTrigger.create(), 0); - TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider(); + TestProcessingTimeService testTimeProvider = new TestProcessingTimeService(); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java deleted file mode 100644 index 29e13ed..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java +++ /dev/null @@ -1,313 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.tasks; - -import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler; -import org.apache.flink.streaming.runtime.operators.Triggerable; - -import org.junit.Test; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public class DefaultTimeServiceProviderTest { - - @Test - public void testTriggerHoldsLock() throws Exception { - - final Object lock = new Object(); - final AtomicReference<Throwable> errorRef = new AtomicReference<>(); - - final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider( - new ReferenceSettingExceptionHandler(errorRef), lock); - - try { - assertEquals(0, timer.getNumTasksScheduled()); - - // schedule something - ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis(), new Triggerable() { - @Override - public void trigger(long timestamp) { - assertTrue(Thread.holdsLock(lock)); - } - }); - - // wait until the execution is over - future.get(); - assertEquals(0, timer.getNumTasksScheduled()); - - // check that no asynchronous error was reported - if (errorRef.get() != null) { - throw new Exception(errorRef.get()); - } - } - finally { - timer.shutdownService(); - } - } - - @Test - public void testImmediateShutdown() throws Exception { - - final Object lock = new Object(); - final AtomicReference<Throwable> errorRef = new AtomicReference<>(); - - final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider( - new ReferenceSettingExceptionHandler(errorRef), lock); - - try { - assertFalse(timer.isTerminated()); - - final OneShotLatch latch = new OneShotLatch(); - - // the task should trigger immediately and should block until terminated with interruption - timer.registerTimer(System.currentTimeMillis(), new Triggerable() { - @Override - public void trigger(long timestamp) throws Exception { - latch.trigger(); - Thread.sleep(100000000); - } - }); - - latch.await(); - timer.shutdownService(); - - // can only enter this scope after the triggerable is interrupted - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (lock) { - assertTrue(timer.isTerminated()); - } - - try { - timer.registerTimer(System.currentTimeMillis() + 1000, new Triggerable() { - @Override - public void trigger(long timestamp) {} - }); - - fail("should result in an exception"); - } - catch (IllegalStateException e) { - // expected - } - - // obviously, we have an asynchronous interrupted exception - assertNotNull(errorRef.get()); - assertTrue(errorRef.get().getCause() instanceof InterruptedException); - - assertEquals(0, timer.getNumTasksScheduled()); - } - finally { - timer.shutdownService(); - } - } - - @Test - public void testQuiescing() throws Exception { - - final Object lock = new Object(); - final AtomicReference<Throwable> errorRef = new AtomicReference<>(); - - final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider( - new ReferenceSettingExceptionHandler(errorRef), lock); - - try { - final OneShotLatch latch = new OneShotLatch(); - - final ReentrantLock scopeLock = new ReentrantLock(); - - timer.registerTimer(System.currentTimeMillis() + 20, new Triggerable() { - @Override - public void trigger(long timestamp) throws Exception { - scopeLock.lock(); - try { - latch.trigger(); - // delay a bit before leaving the method - Thread.sleep(5); - } finally { - scopeLock.unlock(); - } - } - }); - - // after the task triggered, shut the timer down cleanly, waiting for the task to finish - latch.await(); - timer.quiesceAndAwaitPending(); - - // should be able to immediately acquire the lock, since the task must have exited by now - assertTrue(scopeLock.tryLock()); - - // should be able to schedule more tasks (that never get executed) - ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() - 5, new Triggerable() { - @Override - public void trigger(long timestamp) throws Exception { - throw new Exception("test"); - } - }); - assertNotNull(future); - - // nothing should be scheduled right now - assertEquals(0, timer.getNumTasksScheduled()); - - // check that no asynchronous error was reported - that ensures that the newly scheduled - // triggerable did, in fact, not trigger - if (errorRef.get() != null) { - throw new Exception(errorRef.get()); - } - } - finally { - timer.shutdownService(); - } - } - - @Test - public void testFutureCancellation() throws Exception { - - final Object lock = new Object(); - final AtomicReference<Throwable> errorRef = new AtomicReference<>(); - - final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider( - new ReferenceSettingExceptionHandler(errorRef), lock); - - try { - assertEquals(0, timer.getNumTasksScheduled()); - - // schedule something - ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() + 100000000, new Triggerable() { - @Override - public void trigger(long timestamp) {} - }); - assertEquals(1, timer.getNumTasksScheduled()); - - future.cancel(false); - - assertEquals(0, timer.getNumTasksScheduled()); - - // check that no asynchronous error was reported - if (errorRef.get() != null) { - throw new Exception(errorRef.get()); - } - } - finally { - timer.shutdownService(); - } - } - - @Test - public void testExceptionReporting() throws InterruptedException { - final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false); - final OneShotLatch latch = new OneShotLatch(); - final Object lock = new Object(); - - TimeServiceProvider timeServiceProvider = new DefaultTimeServiceProvider( - new AsyncExceptionHandler() { - @Override - public void handleAsyncException(String message, Throwable exception) { - exceptionWasThrown.set(true); - latch.trigger(); - } - }, lock); - - timeServiceProvider.registerTimer(System.currentTimeMillis(), new Triggerable() { - @Override - public void trigger(long timestamp) throws Exception { - throw new Exception("Exception in Timer"); - } - }); - - latch.await(); - assertTrue(exceptionWasThrown.get()); - } - - @Test - public void testTimerSorting() throws Exception { - final Object lock = new Object(); - final AtomicReference<Throwable> errorRef = new AtomicReference<>(); - - final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider( - new ReferenceSettingExceptionHandler(errorRef), lock); - - try { - final OneShotLatch sync = new OneShotLatch(); - - // we block the timer execution to make sure we have all the time - // to register some additional timers out of order - timer.registerTimer(System.currentTimeMillis(), new Triggerable() { - @Override - public void trigger(long timestamp) throws Exception { - sync.await(); - } - }); - - // schedule two timers out of order something - final long now = System.currentTimeMillis(); - final long time1 = now + 6; - final long time2 = now + 5; - final long time3 = now + 8; - final long time4 = now - 2; - - final ArrayBlockingQueue<Long> timestamps = new ArrayBlockingQueue<>(4); - Triggerable trigger = new Triggerable() { - @Override - public void trigger(long timestamp) { - timestamps.add(timestamp); - } - }; - - // schedule - ScheduledFuture<?> future1 = timer.registerTimer(time1, trigger); - ScheduledFuture<?> future2 = timer.registerTimer(time2, trigger); - ScheduledFuture<?> future3 = timer.registerTimer(time3, trigger); - ScheduledFuture<?> future4 = timer.registerTimer(time4, trigger); - - // now that everything is scheduled, unblock the timer service - sync.trigger(); - - // wait until both are complete - future1.get(); - future2.get(); - future3.get(); - future4.get(); - - // verify that the order is 4 - 2 - 1 - 3 - assertEquals(4, timestamps.size()); - assertEquals(time4, timestamps.take().longValue()); - assertEquals(time2, timestamps.take().longValue()); - assertEquals(time1, timestamps.take().longValue()); - assertEquals(time3, timestamps.take().longValue()); - - // check that no asynchronous error was reported - if (errorRef.get() != null) { - throw new Exception(errorRef.get()); - } - } - finally { - timer.shutdownService(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index ce62624..ab7bf69 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -113,11 +113,11 @@ public class StreamTaskTestHarness<OUT> { outputStreamRecordSerializer = new StreamElementSerializer<OUT>(outputSerializer); } - public TimeServiceProvider getTimerService() { + public ProcessingTimeService getProcessingTimeService() { if (!(task instanceof StreamTask)) { - throw new UnsupportedOperationException("getTimerService() only supported on StreamTasks."); + throw new UnsupportedOperationException("getProcessingTimeService() only supported on StreamTasks."); } - return ((StreamTask) task).getTimerService(); + return ((StreamTask) task).getProcessingTimeService(); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java new file mode 100644 index 0000000..e7944df --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler; +import org.apache.flink.streaming.runtime.operators.Triggerable; + +import org.junit.Test; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class SystemProcessingTimeServiceTest { + + @Test + public void testTriggerHoldsLock() throws Exception { + + final Object lock = new Object(); + final AtomicReference<Throwable> errorRef = new AtomicReference<>(); + + final SystemProcessingTimeService timer = new SystemProcessingTimeService( + new ReferenceSettingExceptionHandler(errorRef), lock); + + try { + assertEquals(0, timer.getNumTasksScheduled()); + + // schedule something + ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis(), new Triggerable() { + @Override + public void trigger(long timestamp) { + assertTrue(Thread.holdsLock(lock)); + } + }); + + // wait until the execution is over + future.get(); + assertEquals(0, timer.getNumTasksScheduled()); + + // check that no asynchronous error was reported + if (errorRef.get() != null) { + throw new Exception(errorRef.get()); + } + } + finally { + timer.shutdownService(); + } + } + + @Test + public void testImmediateShutdown() throws Exception { + + final Object lock = new Object(); + final AtomicReference<Throwable> errorRef = new AtomicReference<>(); + + final SystemProcessingTimeService timer = new SystemProcessingTimeService( + new ReferenceSettingExceptionHandler(errorRef), lock); + + try { + assertFalse(timer.isTerminated()); + + final OneShotLatch latch = new OneShotLatch(); + + // the task should trigger immediately and should block until terminated with interruption + timer.registerTimer(System.currentTimeMillis(), new Triggerable() { + @Override + public void trigger(long timestamp) throws Exception { + latch.trigger(); + Thread.sleep(100000000); + } + }); + + latch.await(); + timer.shutdownService(); + + // can only enter this scope after the triggerable is interrupted + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (lock) { + assertTrue(timer.isTerminated()); + } + + try { + timer.registerTimer(System.currentTimeMillis() + 1000, new Triggerable() { + @Override + public void trigger(long timestamp) {} + }); + + fail("should result in an exception"); + } + catch (IllegalStateException e) { + // expected + } + + // obviously, we have an asynchronous interrupted exception + assertNotNull(errorRef.get()); + assertTrue(errorRef.get().getCause() instanceof InterruptedException); + + assertEquals(0, timer.getNumTasksScheduled()); + } + finally { + timer.shutdownService(); + } + } + + @Test + public void testQuiescing() throws Exception { + + final Object lock = new Object(); + final AtomicReference<Throwable> errorRef = new AtomicReference<>(); + + final SystemProcessingTimeService timer = new SystemProcessingTimeService( + new ReferenceSettingExceptionHandler(errorRef), lock); + + try { + final OneShotLatch latch = new OneShotLatch(); + + final ReentrantLock scopeLock = new ReentrantLock(); + + timer.registerTimer(System.currentTimeMillis() + 20, new Triggerable() { + @Override + public void trigger(long timestamp) throws Exception { + scopeLock.lock(); + try { + latch.trigger(); + // delay a bit before leaving the method + Thread.sleep(5); + } finally { + scopeLock.unlock(); + } + } + }); + + // after the task triggered, shut the timer down cleanly, waiting for the task to finish + latch.await(); + timer.quiesceAndAwaitPending(); + + // should be able to immediately acquire the lock, since the task must have exited by now + assertTrue(scopeLock.tryLock()); + + // should be able to schedule more tasks (that never get executed) + ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() - 5, new Triggerable() { + @Override + public void trigger(long timestamp) throws Exception { + throw new Exception("test"); + } + }); + assertNotNull(future); + + // nothing should be scheduled right now + assertEquals(0, timer.getNumTasksScheduled()); + + // check that no asynchronous error was reported - that ensures that the newly scheduled + // triggerable did, in fact, not trigger + if (errorRef.get() != null) { + throw new Exception(errorRef.get()); + } + } + finally { + timer.shutdownService(); + } + } + + @Test + public void testFutureCancellation() throws Exception { + + final Object lock = new Object(); + final AtomicReference<Throwable> errorRef = new AtomicReference<>(); + + final SystemProcessingTimeService timer = new SystemProcessingTimeService( + new ReferenceSettingExceptionHandler(errorRef), lock); + + try { + assertEquals(0, timer.getNumTasksScheduled()); + + // schedule something + ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() + 100000000, new Triggerable() { + @Override + public void trigger(long timestamp) {} + }); + assertEquals(1, timer.getNumTasksScheduled()); + + future.cancel(false); + + assertEquals(0, timer.getNumTasksScheduled()); + + // check that no asynchronous error was reported + if (errorRef.get() != null) { + throw new Exception(errorRef.get()); + } + } + finally { + timer.shutdownService(); + } + } + + @Test + public void testExceptionReporting() throws InterruptedException { + final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false); + final OneShotLatch latch = new OneShotLatch(); + final Object lock = new Object(); + + ProcessingTimeService timeServiceProvider = new SystemProcessingTimeService( + new AsyncExceptionHandler() { + @Override + public void handleAsyncException(String message, Throwable exception) { + exceptionWasThrown.set(true); + latch.trigger(); + } + }, lock); + + timeServiceProvider.registerTimer(System.currentTimeMillis(), new Triggerable() { + @Override + public void trigger(long timestamp) throws Exception { + throw new Exception("Exception in Timer"); + } + }); + + latch.await(); + assertTrue(exceptionWasThrown.get()); + } + + @Test + public void testTimerSorting() throws Exception { + final Object lock = new Object(); + final AtomicReference<Throwable> errorRef = new AtomicReference<>(); + + final SystemProcessingTimeService timer = new SystemProcessingTimeService( + new ReferenceSettingExceptionHandler(errorRef), lock); + + try { + final OneShotLatch sync = new OneShotLatch(); + + // we block the timer execution to make sure we have all the time + // to register some additional timers out of order + timer.registerTimer(System.currentTimeMillis(), new Triggerable() { + @Override + public void trigger(long timestamp) throws Exception { + sync.await(); + } + }); + + // schedule two timers out of order something + final long now = System.currentTimeMillis(); + final long time1 = now + 6; + final long time2 = now + 5; + final long time3 = now + 8; + final long time4 = now - 2; + + final ArrayBlockingQueue<Long> timestamps = new ArrayBlockingQueue<>(4); + Triggerable trigger = new Triggerable() { + @Override + public void trigger(long timestamp) { + timestamps.add(timestamp); + } + }; + + // schedule + ScheduledFuture<?> future1 = timer.registerTimer(time1, trigger); + ScheduledFuture<?> future2 = timer.registerTimer(time2, trigger); + ScheduledFuture<?> future3 = timer.registerTimer(time3, trigger); + ScheduledFuture<?> future4 = timer.registerTimer(time4, trigger); + + // now that everything is scheduled, unblock the timer service + sync.trigger(); + + // wait until both are complete + future1.get(); + future2.get(); + future3.get(); + future4.get(); + + // verify that the order is 4 - 2 - 1 - 3 + assertEquals(4, timestamps.size()); + assertEquals(time4, timestamps.take().longValue()); + assertEquals(time2, timestamps.take().longValue()); + assertEquals(time1, timestamps.take().longValue()); + assertEquals(time3, timestamps.take().longValue()); + + // check that no asynchronous error was reported + if (errorRef.get() != null) { + throw new Exception(errorRef.get()); + } + } + finally { + timer.shutdownService(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java index 41968e6..6ad684b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java @@ -32,9 +32,9 @@ import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; -import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -50,7 +50,7 @@ import static org.mockito.Mockito.doAnswer; /** * Extension of {@link OneInputStreamOperatorTestHarness} that allows the operator to get - * a {@link AbstractKeyedStateBackend}. + * a {@link KeyedStateBackend}. * */ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> @@ -94,7 +94,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> public KeyedOneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator, ExecutionConfig executionConfig, - TestTimeServiceProvider testTimeProvider, + ProcessingTimeService testTimeProvider, KeySelector<IN, K> keySelector, TypeInformation<K> keyType) { super(operator, executionConfig, testTimeProvider); @@ -187,7 +187,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> } /** - * + * */ @Override public void restore(StreamStateHandle snapshot) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java index 2dd2163..c2763d8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java @@ -19,24 +19,14 @@ package org.apache.flink.streaming.util; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.runtime.operators.testutils.MockEnvironment; -import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class MockContext<IN, OUT> { @@ -95,17 +85,4 @@ public class MockContext<IN, OUT> { return result; } - - private static StreamTask<?, ?> createMockTaskWithTimer( - final TimeServiceProvider timerService, final Object lock) - { - StreamTask<?, ?> task = mock(StreamTask.class); - when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>()); - when(task.getName()).thenReturn("Test task name"); - when(task.getExecutionConfig()).thenReturn(new ExecutionConfig()); - when(task.getEnvironment()).thenReturn(new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024)); - when(task.getCheckpointLock()).thenReturn(lock); - when(task.getTimerService()).thenReturn(timerService); - return task; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 9f8d223..4104049 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -43,11 +43,11 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.util.Preconditions; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -79,9 +79,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { final ExecutionConfig executionConfig; - final Object checkpointLock; - - final TimeServiceProvider timeServiceProvider; + final ProcessingTimeService processingTimeService; StreamTask<?, ?> mockTask; @@ -105,36 +103,36 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { public OneInputStreamOperatorTestHarness( OneInputStreamOperator<IN, OUT> operator, ExecutionConfig executionConfig) { - this(operator, executionConfig, null); + this(operator, executionConfig, new TestProcessingTimeService()); } public OneInputStreamOperatorTestHarness( OneInputStreamOperator<IN, OUT> operator, ExecutionConfig executionConfig, - TestTimeServiceProvider testTimeProvider) { - this(operator, executionConfig, new Object(), testTimeProvider); + ProcessingTimeService processingTimeService) { + this(operator, executionConfig, new Object(), processingTimeService); } public OneInputStreamOperatorTestHarness( OneInputStreamOperator<IN, OUT> operator, ExecutionConfig executionConfig, Object checkpointLock, - TimeServiceProvider testTimeProvider) { + ProcessingTimeService processingTimeService) { + this.processingTimeService = Preconditions.checkNotNull(processingTimeService); this.operator = operator; - this.outputList = new ConcurrentLinkedQueue<Object>(); + this.outputList = new ConcurrentLinkedQueue<>(); Configuration underlyingConfig = new Configuration(); this.config = new StreamConfig(underlyingConfig); this.config.setCheckpointingEnabled(true); this.executionConfig = executionConfig; - this.checkpointLock = checkpointLock; this.closableRegistry = new ClosableRegistry(); final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024, underlyingConfig, executionConfig, MAX_PARALLELISM, 1, 0); mockTask = mock(StreamTask.class); when(mockTask.getName()).thenReturn("Mock Task"); - when(mockTask.getCheckpointLock()).thenReturn(this.checkpointLock); + when(mockTask.getCheckpointLock()).thenReturn(checkpointLock); when(mockTask.getConfiguration()).thenReturn(config); when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig); when(mockTask.getEnvironment()).thenReturn(env); @@ -183,15 +181,12 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { throw new RuntimeException(e.getMessage(), e); } - timeServiceProvider = testTimeProvider != null ? testTimeProvider : - new DefaultTimeServiceProvider(mockTask, this.checkpointLock); - - doAnswer(new Answer<TimeServiceProvider>() { + doAnswer(new Answer<ProcessingTimeService>() { @Override - public TimeServiceProvider answer(InvocationOnMock invocation) throws Throwable { - return timeServiceProvider; + public ProcessingTimeService answer(InvocationOnMock invocation) throws Throwable { + return OneInputStreamOperatorTestHarness.this.processingTimeService; } - }).when(mockTask).getTimerService(); + }).when(mockTask).getProcessingTimeService(); } public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) { @@ -219,9 +214,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { } /** - * Get all the output from the task. This contains StreamRecords and Events interleaved. Use - * {@link org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)} - * to extract only the StreamRecords. + * Get all the output from the task. This contains StreamRecords and Events interleaved. */ public ConcurrentLinkedQueue<Object> getOutput() { return outputList; @@ -316,8 +309,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { public void close() throws Exception { operator.close(); operator.dispose(); - if (timeServiceProvider != null) { - timeServiceProvider.shutdownService(); + if (processingTimeService != null) { + processingTimeService.shutdownService(); } setupCalled = false; } http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java index a4e26f0..ed9a7cd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java @@ -31,7 +31,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; @@ -50,7 +50,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; */ public class WindowingTestHarness<K, IN, W extends Window> { - private final TestTimeServiceProvider timeServiceProvider; + private final TestProcessingTimeService timeServiceProvider; private final OneInputStreamOperatorTestHarness<IN, IN> testHarness; @@ -80,7 +80,7 @@ public class WindowingTestHarness<K, IN, W extends Window> { trigger, allowedLateness); - timeServiceProvider = new TestTimeServiceProvider(); + timeServiceProvider = new TestProcessingTimeService(); testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, executionConfig, timeServiceProvider, keySelector, keyType); } http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java index 707ce0f..e7f62fd 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java @@ -192,7 +192,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase { } if (first) { - getTimerService().registerTimer(System.currentTimeMillis() + 100, this); + getProcessingTimeService().registerTimer(System.currentTimeMillis() + 100, this); first = false; } numElements++; @@ -209,7 +209,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase { try { numTimers++; throwIfDone(); - getTimerService().registerTimer(System.currentTimeMillis() + 1, this); + getProcessingTimeService().registerTimer(System.currentTimeMillis() + 1, this); } finally { semaphore.release(); } @@ -251,7 +251,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase { } if (first) { - getTimerService().registerTimer(System.currentTimeMillis() + 100, this); + getProcessingTimeService().registerTimer(System.currentTimeMillis() + 100, this); first = false; } numElements++; @@ -266,7 +266,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase { } if (first) { - getTimerService().registerTimer(System.currentTimeMillis() + 100, this); + getProcessingTimeService().registerTimer(System.currentTimeMillis() + 100, this); first = false; } numElements++; @@ -284,7 +284,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase { try { numTimers++; throwIfDone(); - getTimerService().registerTimer(System.currentTimeMillis() + 1, this); + getProcessingTimeService().registerTimer(System.currentTimeMillis() + 1, this); } finally { semaphore.release(); }
