[FLINK-4877] Refactor OperatorTestHarness to always use TestProcessingTimeService
Before, this would allow handing in a custom ProcessingTimeService but this was in reality always TestProcessingTimeService. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30554758 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30554758 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30554758 Branch: refs/heads/master Commit: 30554758897842ad851dc9b6e1758d452f7d702f Parents: e112a63 Author: Aljoscha Krettek <[email protected]> Authored: Wed Sep 28 16:43:40 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Oct 21 19:03:04 2016 +0200 ---------------------------------------------------------------------- .../hdfstests/ContinuousFileMonitoringTest.java | 18 +- .../fs/bucketing/BucketingSinkTest.java | 69 ++- ...stampsAndPeriodicWatermarksOperatorTest.java | 8 +- ...AlignedProcessingTimeWindowOperatorTest.java | 355 ++++++-------- ...AlignedProcessingTimeWindowOperatorTest.java | 475 ++++++------------- .../operators/windowing/CollectingOutput.java | 86 ---- .../operators/windowing/NoOpTimerService.java | 52 -- .../operators/windowing/WindowOperatorTest.java | 106 +---- .../KeyedOneInputStreamOperatorTestHarness.java | 20 +- .../util/OneInputStreamOperatorTestHarness.java | 50 +- .../streaming/util/WindowingTestHarness.java | 10 +- 11 files changed, 384 insertions(+), 865 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java index 971d5f8..56d8efc 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java @@ -34,7 +34,6 @@ import org.apache.flink.streaming.api.functions.source.FileProcessingMode; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileUtil; @@ -127,20 +126,21 @@ public class ContinuousFileMonitoringTest { ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format); reader.setOutputType(typeInfo, executionConfig); - final TestProcessingTimeService timeServiceProvider = new TestProcessingTimeService(); final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester = - new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + new OneInputStreamOperatorTestHarness<>(reader, executionConfig); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); // test that watermarks are correctly emitted - timeServiceProvider.setCurrentTime(201); - timeServiceProvider.setCurrentTime(301); - timeServiceProvider.setCurrentTime(401); - timeServiceProvider.setCurrentTime(501); + tester.setProcessingTime(201); + tester.setProcessingTime(301); + tester.setProcessingTime(401); + tester.setProcessingTime(501); int i = 0; for(Object line: tester.getOutput()) { @@ -170,8 +170,8 @@ public class ContinuousFileMonitoringTest { for(FileInputSplit split: splits) { // set the next "current processing time". - long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval; - timeServiceProvider.setCurrentTime(nextTimestamp); + long nextTimestamp = tester.getProcessingTime() + watermarkInterval; + tester.setProcessingTime(nextTimestamp); // send the next split to be read and wait until it is fully read. tester.processElement(new StreamRecord<>(split)); http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java index 0c0111c..f4b3cd7 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java @@ -35,8 +35,6 @@ import org.apache.flink.streaming.connectors.fs.Clock; import org.apache.flink.streaming.connectors.fs.SequenceFileWriter; import org.apache.flink.streaming.connectors.fs.StringWriter; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.NetUtils; import org.apache.hadoop.conf.Configuration; @@ -70,7 +68,7 @@ public class BucketingSinkTest { private static org.apache.hadoop.fs.FileSystem dfs; private static String hdfsURI; - private OneInputStreamOperatorTestHarness<String, Object> createTestSink(File dataDir, TestTimeServiceProvider clock) { + private OneInputStreamOperatorTestHarness<String, Object> createTestSink(File dataDir) throws Exception { BucketingSink<String> sink = new BucketingSink<String>(dataDir.getAbsolutePath()) .setBucketer(new Bucketer<String>() { private static final long serialVersionUID = 1L; @@ -87,12 +85,12 @@ public class BucketingSinkTest { .setInactiveBucketThreshold(5*60*1000L) .setPendingSuffix(".pending"); - return createTestSink(sink, clock); + return createTestSink(sink); } - private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink(BucketingSink<T> sink, - TestTimeServiceProvider clock) { - return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), new ExecutionConfig(), clock); + private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink( + BucketingSink<T> sink) throws Exception { + return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), new ExecutionConfig()); } @BeforeClass @@ -121,10 +119,7 @@ public class BucketingSinkTest { public void testCheckpointWithoutNotify() throws Exception { File dataDir = tempFolder.newFolder(); - TestTimeServiceProvider clock = new TestTimeServiceProvider(); - clock.setCurrentTime(0L); - - OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir, clock); + OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir); testHarness.setup(); testHarness.open(); @@ -133,13 +128,13 @@ public class BucketingSinkTest { testHarness.processElement(new StreamRecord<>("Hello")); testHarness.processElement(new StreamRecord<>("Hello")); - clock.setCurrentTime(10000L); + testHarness.setProcessingTime(10000L); // snapshot but don't call notify to simulate a notify that never // arrives, the sink should move pending files in restore() in that case StreamStateHandle snapshot1 = testHarness.snapshotLegacy(0, 0); - testHarness = createTestSink(dataDir, clock); + testHarness = createTestSink(dataDir); testHarness.setup(); testHarness.restore(snapshot1); testHarness.open(); @@ -175,16 +170,15 @@ public class BucketingSinkTest { final int numElements = 20; - TestTimeServiceProvider clock = new TestTimeServiceProvider(); - clock.setCurrentTime(0L); - BucketingSink<String> sink = new BucketingSink<String>(outPath) .setBucketer(new BasePathBucketer<String>()) .setPartPrefix("part") .setPendingPrefix("") .setPendingSuffix(""); - OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, clock); + OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink); + + testHarness.setProcessingTime(0L); testHarness.setup(); testHarness.open(); @@ -217,9 +211,6 @@ public class BucketingSinkTest { final int numElements = 20; - TestTimeServiceProvider clock = new TestTimeServiceProvider(); - clock.setCurrentTime(0L); - BucketingSink<Tuple2<IntWritable, Text>> sink = new BucketingSink<Tuple2<IntWritable, Text>>(outPath) .setWriter(new SequenceFileWriter<IntWritable, Text>()) .setBucketer(new BasePathBucketer<Tuple2<IntWritable, Text>>()) @@ -230,7 +221,9 @@ public class BucketingSinkTest { sink.setInputType(TypeInformation.of(new TypeHint<Tuple2<IntWritable, Text>>(){}), new ExecutionConfig()); OneInputStreamOperatorTestHarness<Tuple2<IntWritable, Text>, Object> testHarness = - createTestSink(sink, clock); + createTestSink(sink); + + testHarness.setProcessingTime(0L); testHarness.setup(); testHarness.open(); @@ -271,9 +264,6 @@ public class BucketingSinkTest { final int numElements = 20; - TestTimeServiceProvider clock = new TestTimeServiceProvider(); - clock.setCurrentTime(0L); - Map<String, String> properties = new HashMap<>(); Schema keySchema = Schema.create(Schema.Type.INT); Schema valueSchema = Schema.create(Schema.Type.STRING); @@ -290,7 +280,9 @@ public class BucketingSinkTest { .setPendingSuffix(""); OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, Object> testHarness = - createTestSink(sink, clock); + createTestSink(sink); + + testHarness.setProcessingTime(0L); testHarness.setup(); testHarness.open(); @@ -325,8 +317,8 @@ public class BucketingSinkTest { /** * This uses {@link DateTimeBucketer} to - * produce rolling files. A custom {@link TimeServiceProvider} is set - * to simulate the advancing of time alongside the processing of elements. + * produce rolling files. We use {@link OneInputStreamOperatorTestHarness} to manually + * advance processing time. */ @Test public void testDateTimeRollingStringWriter() throws Exception { @@ -334,16 +326,15 @@ public class BucketingSinkTest { final String outPath = hdfsURI + "/rolling-out"; - TestTimeServiceProvider clock = new TestTimeServiceProvider(); - clock.setCurrentTime(0L); - BucketingSink<String> sink = new BucketingSink<String>(outPath) .setBucketer(new DateTimeBucketer<String>("ss")) .setPartPrefix("part") .setPendingPrefix("") .setPendingSuffix(""); - OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, clock); + OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink); + + testHarness.setProcessingTime(0L); testHarness.setup(); testHarness.open(); @@ -351,7 +342,7 @@ public class BucketingSinkTest { for (int i = 0; i < numElements; i++) { // Every 5 elements, increase the clock time. We should end up with 5 elements per bucket. if (i % 5 == 0) { - clock.setCurrentTime(i * 1000L); + testHarness.setProcessingTime(i * 1000L); } testHarness.processElement(new StreamRecord<>("message #" + Integer.toString(i))); } @@ -427,10 +418,9 @@ public class BucketingSinkTest { final int numIds = 4; final int numElements = 20; - TestTimeServiceProvider clock = new TestTimeServiceProvider(); - clock.setCurrentTime(0L); + OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir); - OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir, clock); + testHarness.setProcessingTime(0L); testHarness.setup(); testHarness.open(); @@ -465,10 +455,9 @@ public class BucketingSinkTest { final int step2NumIds = 2; final int numElementsPerStep = 20; - TestTimeServiceProvider clock = new TestTimeServiceProvider(); - clock.setCurrentTime(0L); + OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir); - OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(dataDir, clock); + testHarness.setProcessingTime(0L); testHarness.setup(); testHarness.open(); @@ -477,13 +466,13 @@ public class BucketingSinkTest { testHarness.processElement(new StreamRecord<>(Integer.toString(i % step1NumIds))); } - clock.setCurrentTime(2*60*1000L); + testHarness.setProcessingTime(2*60*1000L); for (int i = 0; i < numElementsPerStep; i++) { testHarness.processElement(new StreamRecord<>(Integer.toString(i % step2NumIds))); } - clock.setCurrentTime(6*60*1000L); + testHarness.setProcessingTime(6*60*1000L); for (int i = 0; i < numElementsPerStep; i++) { testHarness.processElement(new StreamRecord<>(Integer.toString(i % step2NumIds))); http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java index af99d0d..febfcde 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java @@ -43,10 +43,8 @@ public class TimestampsAndPeriodicWatermarksOperatorTest { final ExecutionConfig config = new ExecutionConfig(); config.setAutoWatermarkInterval(50); - TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); - OneInputStreamOperatorTestHarness<Long, Long> testHarness = - new OneInputStreamOperatorTestHarness<Long, Long>(operator, config, processingTimeService); + new OneInputStreamOperatorTestHarness<Long, Long>(operator, config); long currentTime = 0; @@ -77,7 +75,7 @@ public class TimestampsAndPeriodicWatermarksOperatorTest { assertTrue(lastWatermark < nextElementValue); } else { currentTime = currentTime + 10; - processingTimeService.setCurrentTime(currentTime); + testHarness.setProcessingTime(currentTime); } } @@ -109,7 +107,7 @@ public class TimestampsAndPeriodicWatermarksOperatorTest { assertTrue(lastWatermark < nextElementValue); } else { currentTime = currentTime + 10; - processingTimeService.setCurrentTime(currentTime); + testHarness.setProcessingTime(currentTime); } } http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index 128c88b..720258e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -36,15 +36,10 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; -import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; @@ -59,7 +54,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -68,7 +62,7 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -@SuppressWarnings({"serial", "SynchronizationOnLocalVariableOrMethodParameter"}) +@SuppressWarnings({"serial"}) public class AccumulatingAlignedProcessingTimeWindowOperatorTest { @SuppressWarnings("unchecked") @@ -183,45 +177,57 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { @Test public void testWindowTriggerTimeAlignment() throws Exception { + try { - @SuppressWarnings("unchecked") - final Output<StreamRecord<String>> mockOut = mock(Output.class); - final ProcessingTimeService timerService = new NoOpTimerService(); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); + AccumulatingProcessingTimeWindowOperator<String, String, String> op = + new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, + StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000); - AccumulatingProcessingTimeWindowOperator<String, String, String> op; + KeyedOneInputStreamOperatorTestHarness<String, String, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.open(); - op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, - StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000); - op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); - op.open(); assertTrue(op.getNextSlideTime() % 1000 == 0); assertTrue(op.getNextEvaluationTime() % 1000 == 0); - op.dispose(); + testHarness.close(); op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000); - op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); - op.open(); + + testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.open(); + assertTrue(op.getNextSlideTime() % 1000 == 0); assertTrue(op.getNextEvaluationTime() % 1000 == 0); - op.dispose(); + testHarness.close(); + op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000); - op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); - op.open(); + + testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.open(); + assertTrue(op.getNextSlideTime() % 500 == 0); assertTrue(op.getNextEvaluationTime() % 1000 == 0); - op.dispose(); + testHarness.close(); op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100); - op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); - op.open(); - assertTrue(op.getNextSlideTime() % 100 == 0); - assertTrue(op.getNextEvaluationTime() % 1100 == 0); - op.dispose(); + + testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.open(); + + assertEquals(0, op.getNextSlideTime() % 100); + assertEquals(0, op.getNextEvaluationTime() % 1100); + testHarness.close(); } catch (Exception e) { e.printStackTrace(); @@ -231,16 +237,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { @Test public void testTumblingWindow() throws Exception { - final Object lock = new Object(); - final AtomicReference<Throwable> error = new AtomicReference<>(); - - final ProcessingTimeService timerService = new SystemProcessingTimeService( - new ReferenceSettingExceptionHandler(error), lock); - try { final int windowSize = 50; - final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); // tumbling window that triggers every 20 milliseconds AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = @@ -249,31 +247,23 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, IntSerializer.INSTANCE, windowSize, windowSize); - op.setup(mockTask, new StreamConfig(new Configuration()), out); - op.open(); - - final int numElements = 1000; + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO); - for (int i = 0; i < numElements; i++) { - synchronized (lock) { - op.processElement(new StreamRecord<Integer>(i)); - } - Thread.sleep(1); - } + testHarness.open(); - // get and verify the result - out.waitForNElements(numElements, 60_000); + final int numElements = 1000; - timerService.quiesceAndAwaitPending(); + long currentTime = 0; - synchronized (lock) { - op.close(); + for (int i = 0; i < numElements; i++) { + testHarness.processElement(new StreamRecord<>(i)); + currentTime = currentTime + 10; + testHarness.setProcessingTime(currentTime); } - shutdownTimerServiceAndWait(timerService); - op.dispose(); - List<Integer> result = out.getElements(); + List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords()); assertEquals(numElements, result.size()); Collections.sort(result); @@ -281,102 +271,70 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { assertEquals(i, result.get(i).intValue()); } - if (error.get() != null) { - throw new Exception(error.get()); - } + testHarness.close(); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } - finally { - timerService.shutdownService(); - } } @Test public void testSlidingWindow() throws Exception { - final Object lock = new Object(); - final AtomicReference<Throwable> error = new AtomicReference<>(); - final ProcessingTimeService timerService = new SystemProcessingTimeService( - new ReferenceSettingExceptionHandler(error), lock); + // tumbling window that triggers every 20 milliseconds + AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = + new AccumulatingProcessingTimeWindowOperator<>( + validatingIdentityFunction, identitySelector, + IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50); - try { - final CollectingOutput<Integer> out = new CollectingOutput<>(50); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); - - // tumbling window that triggers every 20 milliseconds - AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = - new AccumulatingProcessingTimeWindowOperator<>( - validatingIdentityFunction, identitySelector, - IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50); + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO); - op.setup(mockTask, new StreamConfig(new Configuration()), out); - op.open(); + testHarness.open(); - final int numElements = 1000; - - for (int i = 0; i < numElements; i++) { - synchronized (lock) { - op.processElement(new StreamRecord<Integer>(i)); - } - Thread.sleep(1); - } + final int numElements = 1000; - timerService.quiesceAndAwaitPending(); + long currentTime = 0; - synchronized (lock) { - op.close(); - } + for (int i = 0; i < numElements; i++) { + testHarness.processElement(new StreamRecord<>(i)); + currentTime = currentTime + 10; + testHarness.setProcessingTime(currentTime); + } - shutdownTimerServiceAndWait(timerService); - op.dispose(); + // get and verify the result + List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords()); - // get and verify the result - List<Integer> result = out.getElements(); + // if we kept this running, each element would be in the result three times (for each slide). + // we are closing the window before the final panes are through three times, so we may have less + // elements. + if (result.size() < numElements || result.size() > 3 * numElements) { + fail("Wrong number of results: " + result.size()); + } - // if we kept this running, each element would be in the result three times (for each slide). - // we are closing the window before the final panes are through three times, so we may have less - // elements. - if (result.size() < numElements || result.size() > 3 * numElements) { - fail("Wrong number of results: " + result.size()); - } + Collections.sort(result); + int lastNum = -1; + int lastCount = -1; - Collections.sort(result); - int lastNum = -1; - int lastCount = -1; - - for (int num : result) { - if (num == lastNum) { - lastCount++; - assertTrue(lastCount <= 3); - } - else { - lastNum = num; - lastCount = 1; - } + for (int num : result) { + if (num == lastNum) { + lastCount++; + assertTrue(lastCount <= 3); } - - if (error.get() != null) { - throw new Exception(error.get()); + else { + lastNum = num; + lastCount = 1; } - } finally { - timerService.shutdownService(); } + + testHarness.close(); } @Test public void testTumblingWindowSingleElements() throws Exception { - final Object lock = new Object(); - final AtomicReference<Throwable> error = new AtomicReference<>(); - - final ProcessingTimeService timerService = new SystemProcessingTimeService( - new ReferenceSettingExceptionHandler(error), lock); try { - final CollectingOutput<Integer> out = new CollectingOutput<>(50); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); // tumbling window that triggers every 20 milliseconds AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = @@ -384,66 +342,46 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { validatingIdentityFunction, identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50); - op.setup(mockTask, new StreamConfig(new Configuration()), out); - op.open(); + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO); - synchronized (lock) { - op.processElement(new StreamRecord<Integer>(1)); - op.processElement(new StreamRecord<Integer>(2)); - } - out.waitForNElements(2, 60000); + testHarness.open(); - synchronized (lock) { - op.processElement(new StreamRecord<Integer>(3)); - op.processElement(new StreamRecord<Integer>(4)); - op.processElement(new StreamRecord<Integer>(5)); - } - out.waitForNElements(5, 60000); + testHarness.setProcessingTime(0); - synchronized (lock) { - op.processElement(new StreamRecord<Integer>(6)); - } - out.waitForNElements(6, 60000); - - List<Integer> result = out.getElements(); - assertEquals(6, result.size()); + testHarness.processElement(new StreamRecord<>(1)); + testHarness.processElement(new StreamRecord<>(2)); - Collections.sort(result); - assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result); + testHarness.setProcessingTime(50); - timerService.quiesceAndAwaitPending(); + testHarness.processElement(new StreamRecord<>(3)); + testHarness.processElement(new StreamRecord<>(4)); + testHarness.processElement(new StreamRecord<>(5)); - synchronized (lock) { - op.close(); - } + testHarness.setProcessingTime(100); - shutdownTimerServiceAndWait(timerService); - op.dispose(); + testHarness.processElement(new StreamRecord<>(6)); - if (error.get() != null) { - throw new Exception(error.get()); - } + testHarness.setProcessingTime(200); + + + List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords()); + assertEquals(6, result.size()); + + Collections.sort(result); + assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result); + + testHarness.close(); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } - finally { - timerService.shutdownService(); - } } @Test public void testSlidingWindowSingleElements() throws Exception { - final Object lock = new Object(); - final AtomicReference<Throwable> error = new AtomicReference<>(); - - final ProcessingTimeService timerService = new SystemProcessingTimeService( - new ReferenceSettingExceptionHandler(error), lock); - try { - final CollectingOutput<Integer> out = new CollectingOutput<>(50); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); // tumbling window that triggers every 20 milliseconds AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = @@ -451,44 +389,33 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { validatingIdentityFunction, identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50); - op.setup(mockTask, new StreamConfig(new Configuration()), out); - op.open(); + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, BasicTypeInfo.INT_TYPE_INFO); - synchronized (lock) { - op.processElement(new StreamRecord<Integer>(1)); - op.processElement(new StreamRecord<Integer>(2)); - } + testHarness.setProcessingTime(0); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(1)); + testHarness.processElement(new StreamRecord<>(2)); + + testHarness.setProcessingTime(50); + testHarness.setProcessingTime(100); + testHarness.setProcessingTime(150); + + List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords()); - // each element should end up in the output three times - // wait until the elements have arrived 6 times in the output - out.waitForNElements(6, 120000); - - List<Integer> result = out.getElements(); assertEquals(6, result.size()); Collections.sort(result); assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result); - timerService.quiesceAndAwaitPending(); - - synchronized (lock) { - op.close(); - } - - shutdownTimerServiceAndWait(timerService); - op.dispose(); - - if (error.get() != null) { - throw new Exception(error.get()); - } + testHarness.close(); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } - finally { - timerService.shutdownService(); - } } @Test @@ -503,15 +430,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, IntSerializer.INSTANCE, windowSize, windowSize); - TestProcessingTimeService timerService = new TestProcessingTimeService(); - OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = - new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService); + new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig()); testHarness.setup(); testHarness.open(); - timerService.setCurrentTime(0); + testHarness.setProcessingTime(0); // inject some elements final int numElementsFirst = 700; @@ -542,8 +467,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, IntSerializer.INSTANCE, windowSize, windowSize); - timerService = new TestProcessingTimeService(); - testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService); + testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig()); testHarness.setup(); testHarness.restore(state); @@ -554,7 +478,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { testHarness.processElement(new StreamRecord<>(i)); } - timerService.setCurrentTime(400); + testHarness.setProcessingTime(400); // get and verify the result List<Integer> finalResult = new ArrayList<>(); @@ -568,7 +492,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { assertEquals(i, finalResult.get(i).intValue()); } testHarness.close(); - op.dispose(); } catch (Exception e) { e.printStackTrace(); @@ -583,8 +506,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { final int windowSlide = 50; final int windowSize = factor * windowSlide; - TestProcessingTimeService timerService = new TestProcessingTimeService(); - // sliding window (200 msecs) every 50 msecs AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op = new AccumulatingProcessingTimeWindowOperator<>( @@ -593,9 +514,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { windowSize, windowSlide); OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = - new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService); + new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig()); - timerService.setCurrentTime(0); + testHarness.setProcessingTime(0); testHarness.setup(); testHarness.open(); @@ -623,7 +544,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { } testHarness.close(); - op.dispose(); // re-create the operator and restore the state op = new AccumulatingProcessingTimeWindowOperator<>( @@ -631,8 +551,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, IntSerializer.INSTANCE, windowSize, windowSlide); - timerService = new TestProcessingTimeService(); - testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService); + testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig()); testHarness.setup(); testHarness.restore(state); @@ -644,13 +563,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { testHarness.processElement(new StreamRecord<>(i)); } - timerService.setCurrentTime(50); - timerService.setCurrentTime(100); - timerService.setCurrentTime(150); - timerService.setCurrentTime(200); - timerService.setCurrentTime(250); - timerService.setCurrentTime(300); - timerService.setCurrentTime(350); + testHarness.setProcessingTime(50); + testHarness.setProcessingTime(100); + testHarness.setProcessingTime(150); + testHarness.setProcessingTime(200); + testHarness.setProcessingTime(250); + testHarness.setProcessingTime(300); + testHarness.setProcessingTime(350); // get and verify the result List<Integer> finalResult = new ArrayList<>(resultAtSnapshot); @@ -684,14 +603,12 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { new StatefulFunction(), identitySelector, IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50); - TestProcessingTimeService timerService = new TestProcessingTimeService(); - OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService, identitySelector, BasicTypeInfo.INT_TYPE_INFO); + new KeyedOneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), identitySelector, BasicTypeInfo.INT_TYPE_INFO); testHarness.open(); - timerService.setCurrentTime(0); + testHarness.setProcessingTime(0); testHarness.processElement(new StreamRecord<>(1)); testHarness.processElement(new StreamRecord<>(2)); @@ -703,7 +620,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { op.processElement(new StreamRecord<>(2)); op.processElement(new StreamRecord<>(2)); - timerService.setCurrentTime(1000); + testHarness.setProcessingTime(1000); List<Integer> result = extractFromStreamRecords(testHarness.getOutput()); assertEquals(8, result.size()); @@ -808,7 +725,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { } @SuppressWarnings({"unchecked", "rawtypes"}) - private <T> List<T> extractFromStreamRecords(Iterable<Object> input) { + private <T> List<T> extractFromStreamRecords(Iterable<?> input) { List<T> result = new ArrayList<>(); for (Object in : input) { if (in instanceof StreamRecord) { @@ -824,5 +741,5 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { while (!timers.isTerminated()) { Thread.sleep(2); } - } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java index bb64a08..7ca5753 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.common.state.ValueState; @@ -32,24 +31,13 @@ import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; -import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; - import org.junit.After; import org.junit.Test; @@ -57,21 +45,18 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -@SuppressWarnings({"serial", "SynchronizationOnLocalVariableOrMethodParameter"}) +@SuppressWarnings("serial") public class AggregatingAlignedProcessingTimeWindowOperatorTest { @SuppressWarnings("unchecked") @@ -79,23 +64,23 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { @SuppressWarnings("unchecked") private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class); - - private final KeySelector<Tuple2<Integer, Integer>, Integer> fieldOneSelector = + + private final KeySelector<Tuple2<Integer, Integer>, Integer> fieldOneSelector = new KeySelector<Tuple2<Integer,Integer>, Integer>() { @Override public Integer getKey(Tuple2<Integer,Integer> value) { return value.f0; } }; - + private final ReduceFunction<Tuple2<Integer, Integer>> sumFunction = new ReduceFunction<Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) { return new Tuple2<>(value1.f0, value1.f1 + value2.f1); } }; - - private final TypeSerializer<Tuple2<Integer, Integer>> tupleSerializer = + + private final TypeSerializer<Tuple2<Integer, Integer>> tupleSerializer = new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO) .createSerializer(new ExecutionConfig()); @@ -107,14 +92,14 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { return diff0 != 0 ? diff0 : diff1; } }; - + // ------------------------------------------------------------------------ public AggregatingAlignedProcessingTimeWindowOperatorTest() { ClosureCleaner.clean(fieldOneSelector, false); ClosureCleaner.clean(sumFunction, false); } - + // ------------------------------------------------------------------------ @After @@ -131,9 +116,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { assertTrue("Not all trigger threads where properly shut down", StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0); } - + // ------------------------------------------------------------------------ - + @Test public void testInvalidParameters() { try { @@ -141,7 +126,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { assertInvalidParameter(10000L, -1L); assertInvalidParameter(-1L, 1000L); assertInvalidParameter(1000L, 2000L); - + // actual internal slide is too low here: assertInvalidParameter(1000L, 999L); } @@ -150,12 +135,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { fail(e.getMessage()); } } - + @Test public void testWindowSizeAndSlide() { try { AggregatingProcessingTimeWindowOperator<String, String> op; - + op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000); assertEquals(5000, op.getWindowSize()); @@ -193,44 +178,51 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { @Test public void testWindowTriggerTimeAlignment() throws Exception { try { - @SuppressWarnings("unchecked") - final Output<StreamRecord<String>> mockOut = mock(Output.class); - final ProcessingTimeService timerService = new NoOpTimerService(); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); - AggregatingProcessingTimeWindowOperator<String, String> op; + AggregatingProcessingTimeWindowOperator<String, String> op = + new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, + StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000); + + KeyedOneInputStreamOperatorTestHarness<String, String, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.open(); - op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, - StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000); - op.setup(mockTask, createTaskConfig(mockKeySelector, StringSerializer.INSTANCE, 10), mockOut); - op.open(); assertTrue(op.getNextSlideTime() % 1000 == 0); assertTrue(op.getNextEvaluationTime() % 1000 == 0); - op.dispose(); + testHarness.close(); op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000); - op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); - op.open(); + + testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.open(); + assertTrue(op.getNextSlideTime() % 1000 == 0); assertTrue(op.getNextEvaluationTime() % 1000 == 0); - op.dispose(); + testHarness.close(); op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000); - op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); - op.open(); + + testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.open(); + assertTrue(op.getNextSlideTime() % 500 == 0); assertTrue(op.getNextEvaluationTime() % 1000 == 0); - op.dispose(); + testHarness.close(); op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100); - op.setup(mockTask, new StreamConfig(new Configuration()), mockOut); - op.open(); + + testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.open(); + assertTrue(op.getNextSlideTime() % 100 == 0); assertTrue(op.getNextEvaluationTime() % 1100 == 0); - op.dispose(); + testHarness.close(); } catch (Exception e) { e.printStackTrace(); @@ -240,85 +232,54 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { @Test public void testTumblingWindowUniqueElements() throws Exception { - final Object lock = new Object(); - final AtomicReference<Throwable> error = new AtomicReference<>(); - - final ProcessingTimeService timerService = new SystemProcessingTimeService( - new ReferenceSettingExceptionHandler(error), lock); try { final int windowSize = 50; - final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize); - + AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op = new AggregatingProcessingTimeWindowOperator<>( sumFunction, fieldOneSelector, IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSize); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(op, fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO); - op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out); - op.open(); + testHarness.open(); final int numElements = 1000; + long currentTime = 0; + for (int i = 0; i < numElements; i++) { - synchronized (lock) { - StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i)); - op.setKeyContextElement1(next); - op.processElement(next); - } - Thread.sleep(1); + StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i)); + testHarness.processElement(next); + currentTime = currentTime + 10; + testHarness.setProcessingTime(currentTime); } - out.waitForNElements(numElements, 60_000); - // get and verify the result - List<Tuple2<Integer, Integer>> result = out.getElements(); + List<Tuple2<Integer, Integer>> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords()); assertEquals(numElements, result.size()); - timerService.quiesceAndAwaitPending(); - - synchronized (lock) { - op.close(); - } - - shutdownTimerServiceAndWait(timerService); - op.dispose(); - + testHarness.close(); Collections.sort(result, tupleComparator); for (int i = 0; i < numElements; i++) { assertEquals(i, result.get(i).f0.intValue()); assertEquals(i, result.get(i).f1.intValue()); } - - if (error.get() != null) { - throw new Exception(error.get()); - } } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } - finally { - timerService.shutdownService(); - } } @Test public void testTumblingWindowDuplicateElements() throws Exception { - final Object lock = new Object(); - final AtomicReference<Throwable> error = new AtomicReference<>(); - - final ProcessingTimeService timerService = new SystemProcessingTimeService( - new ReferenceSettingExceptionHandler(error), lock); try { final int windowSize = 50; - final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize); - - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op = new AggregatingProcessingTimeWindowOperator<>( @@ -326,43 +287,39 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSize); - op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out); - op.open(); + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(op, fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setProcessingTime(0); + testHarness.open(); final int numWindows = 10; long previousNextTime = 0; int window = 1; - - while (window <= numWindows) { - synchronized (lock) { - long nextTime = op.getNextEvaluationTime(); - int val = ((int) nextTime) ^ ((int) (nextTime >>> 32)); - - StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(val, val)); - op.setKeyContextElement1(next); - op.processElement(next); - - if (nextTime != previousNextTime) { - window++; - previousNextTime = nextTime; - } - } - Thread.sleep(1); - } - out.waitForNElements(numWindows, 60_000); + long currentTime = 0; - List<Tuple2<Integer, Integer>> result = out.getElements(); + while (window <= numWindows) { + long nextTime = op.getNextEvaluationTime(); + int val = ((int) nextTime) ^ ((int) (nextTime >>> 32)); - timerService.quiesceAndAwaitPending(); + StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(val, val)); + testHarness.processElement(next); - synchronized (lock) { - op.close(); + if (nextTime != previousNextTime) { + window++; + previousNextTime = nextTime; + } + currentTime = currentTime + 1; + testHarness.setProcessingTime(currentTime); } - shutdownTimerServiceAndWait(timerService); - op.dispose(); + testHarness.setProcessingTime(currentTime + 100); + + List<Tuple2<Integer, Integer>> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords()); + + testHarness.close(); // we have ideally one element per window. we may have more, when we emitted a value into the // successive window (corner case), so we can have twice the number of elements, in the worst case. @@ -371,33 +328,16 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { // deduplicate for more accurate checks HashSet<Tuple2<Integer, Integer>> set = new HashSet<>(result); assertTrue(set.size() == 10); - - if (error.get() != null) { - throw new Exception(error.get()); - } } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } - finally { - timerService.shutdownService(); - } } @Test public void testSlidingWindow() throws Exception { - final Object lock = new Object(); - final AtomicReference<Throwable> error = new AtomicReference<>(); - - final ProcessingTimeService timerService = new SystemProcessingTimeService( - new ReferenceSettingExceptionHandler(error), lock); - try { - final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50); - - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); - // tumbling window that triggers every 20 milliseconds AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op = new AggregatingProcessingTimeWindowOperator<>( @@ -405,32 +345,27 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, tupleSerializer, 150, 50); - op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out); - op.open(); + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(op, fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO); + + testHarness.open(); final int numElements = 1000; + long currentTime = 0; + for (int i = 0; i < numElements; i++) { - synchronized (lock) { - StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i)); - op.setKeyContextElement1(next); - op.processElement(next); - } - Thread.sleep(1); + StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i)); + testHarness.processElement(next); + currentTime = currentTime + 1; + testHarness.setProcessingTime(currentTime); } - timerService.quiesceAndAwaitPending(); - - synchronized (lock) { - op.close(); - } + // get and verify the result + List<Tuple2<Integer, Integer>> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords()); - shutdownTimerServiceAndWait(timerService); - op.dispose(); + testHarness.close(); - // get and verify the result - List<Tuple2<Integer, Integer>> result = out.getElements(); - // every element can occur between one and three times if (result.size() < numElements || result.size() > 3 * numElements) { System.out.println(result); @@ -440,10 +375,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { Collections.sort(result, tupleComparator); int lastNum = -1; int lastCount = -1; - + for (Tuple2<Integer, Integer> val : result) { assertEquals(val.f0, val.f1); - + if (val.f0 == lastNum) { lastCount++; assertTrue(lastCount <= 3); @@ -453,58 +388,42 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { lastCount = 1; } } - - if (error.get() != null) { - throw new Exception(error.get()); - } } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } - finally { - timerService.shutdownService(); - } } @Test public void testSlidingWindowSingleElements() throws Exception { - final Object lock = new Object(); - final AtomicReference<Throwable> error = new AtomicReference<>(); - - final ProcessingTimeService timerService = new SystemProcessingTimeService( - new ReferenceSettingExceptionHandler(error), lock); - try { - final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); - // tumbling window that triggers every 20 milliseconds AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op = new AggregatingProcessingTimeWindowOperator<>( sumFunction, fieldOneSelector, IntSerializer.INSTANCE, tupleSerializer, 150, 50); - op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out); - op.open(); - - synchronized (lock) { - StreamRecord<Tuple2<Integer, Integer>> next1 = new StreamRecord<>(new Tuple2<>(1, 1)); - op.setKeyContextElement1(next1); - op.processElement(next1); - - StreamRecord<Tuple2<Integer, Integer>> next2 = new StreamRecord<>(new Tuple2<>(2, 2)); - op.setKeyContextElement1(next2); - op.processElement(next2); - } + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(op, fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO); + + testHarness.open(); + + testHarness.setProcessingTime(0); + + StreamRecord<Tuple2<Integer, Integer>> next1 = new StreamRecord<>(new Tuple2<>(1, 1)); + testHarness.processElement(next1); - // each element should end up in the output three times - // wait until the elements have arrived 6 times in the output - out.waitForNElements(6, 120000); - - List<Tuple2<Integer, Integer>> result = out.getElements(); + StreamRecord<Tuple2<Integer, Integer>> next2 = new StreamRecord<>(new Tuple2<>(2, 2)); + testHarness.processElement(next2); + + testHarness.setProcessingTime(50); + testHarness.setProcessingTime(100); + testHarness.setProcessingTime(150); + + List<Tuple2<Integer, Integer>> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords()); assertEquals(6, result.size()); - + Collections.sort(result, tupleComparator); assertEquals(Arrays.asList( new Tuple2<>(1, 1), @@ -515,40 +434,18 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { new Tuple2<>(2, 2) ), result); - timerService.quiesceAndAwaitPending(); - - synchronized (lock) { - op.close(); - } - - shutdownTimerServiceAndWait(timerService); - op.dispose(); - - if (error.get() != null) { - throw new Exception(error.get()); - } + testHarness.close(); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } - finally { - timerService.shutdownService(); - } } @Test public void testPropagateExceptionsFromProcessElement() throws Exception { - final Object lock = new Object(); - final AtomicReference<Throwable> error = new AtomicReference<>(); - - final ProcessingTimeService timerService = new SystemProcessingTimeService( - new ReferenceSettingExceptionHandler(error), lock); try { - final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(); - final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService); - ReduceFunction<Tuple2<Integer, Integer>> failingFunction = new FailingFunction(100); // the operator has a window time that is so long that it will not fire in this test @@ -559,46 +456,31 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, tupleSerializer, hundredYears, hundredYears); - op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out); - op.open(); + KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(op, fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO); + + testHarness.open(); for (int i = 0; i < 100; i++) { - synchronized (lock) { - StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(1, 1)); - op.setKeyContextElement1(next); - op.processElement(next); - } + StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(1, 1)); + testHarness.processElement(next); } - + try { StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(1, 1)); - op.setKeyContextElement1(next); - op.processElement(next); + testHarness.processElement(next); fail("This fail with an exception"); } catch (Exception e) { assertTrue(e.getMessage().contains("Artificial Test Exception")); } - timerService.quiesceAndAwaitPending(); - synchronized (lock) { - op.close(); - } - - shutdownTimerServiceAndWait(timerService); op.dispose(); - - if (error.get() != null) { - throw new Exception(error.get()); - } } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } - finally { - timerService.shutdownService(); - } } @Test @@ -606,8 +488,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { try { final int windowSize = 200; - TestProcessingTimeService timerService = new TestProcessingTimeService(); - // tumbling window that triggers every 50 milliseconds AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op = new AggregatingProcessingTimeWindowOperator<>( @@ -616,9 +496,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { windowSize, windowSize); OneInputStreamOperatorTestHarness<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService); + new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig()); - timerService.setCurrentTime(0); + testHarness.setProcessingTime(0); testHarness.setup(); testHarness.open(); @@ -626,7 +506,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { // inject some elements final int numElementsFirst = 700; final int numElements = 1000; - + for (int i = 0; i < numElementsFirst; i++) { StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i)); testHarness.processElement(next); @@ -656,8 +536,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSize); - timerService = new TestProcessingTimeService(); - testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService); + testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig()); testHarness.setup(); testHarness.restore(state); @@ -669,7 +548,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { testHarness.processElement(next); } - timerService.setCurrentTime(200); + testHarness.setProcessingTime(200); // get and verify the result List<Tuple2<Integer, Integer>> finalResult = new ArrayList<>(resultAtSnapshot); @@ -699,8 +578,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { final int windowSlide = 50; final int windowSize = factor * windowSlide; - TestProcessingTimeService timerService = new TestProcessingTimeService(); - // sliding window (200 msecs) every 50 msecs AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op = new AggregatingProcessingTimeWindowOperator<>( @@ -708,10 +585,11 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSlide); - timerService.setCurrentTime(0); OneInputStreamOperatorTestHarness<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = - new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService); + new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig()); + + testHarness.setProcessingTime(0); testHarness.setup(); testHarness.open(); @@ -749,8 +627,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSlide); - timerService = new TestProcessingTimeService(); - testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService); + testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig()); testHarness.setup(); testHarness.restore(state); @@ -762,14 +639,14 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { testHarness.processElement(next); } - timerService.setCurrentTime(50); - timerService.setCurrentTime(100); - timerService.setCurrentTime(150); - timerService.setCurrentTime(200); - timerService.setCurrentTime(250); - timerService.setCurrentTime(300); - timerService.setCurrentTime(350); - timerService.setCurrentTime(400); + testHarness.setProcessingTime(50); + testHarness.setProcessingTime(100); + testHarness.setProcessingTime(150); + testHarness.setProcessingTime(200); + testHarness.setProcessingTime(250); + testHarness.setProcessingTime(300); + testHarness.setProcessingTime(350); + testHarness.setProcessingTime(400); // get and verify the result List<Tuple2<Integer, Integer>> finalResult = new ArrayList<>(resultAtSnapshot); @@ -796,11 +673,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { public void testKeyValueStateInWindowFunctionTumbling() { try { final long twoSeconds = 2000; - - TestProcessingTimeService timerService = new TestProcessingTimeService(); StatefulFunction.globalCounts.clear(); - + AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op = new AggregatingProcessingTimeWindowOperator<>( new StatefulFunction(), fieldOneSelector, @@ -809,16 +684,15 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>( op, new ExecutionConfig(), - timerService, fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO); - timerService.setCurrentTime(0); + testHarness.setProcessingTime(0); testHarness.open(); // because the window interval is so large, everything should be in one window // and aggregate into one value per key - + for (int i = 0; i < 10; i++) { StreamRecord<Tuple2<Integer, Integer>> next1 = new StreamRecord<>(new Tuple2<>(1, i)); testHarness.processElement(next1); @@ -827,7 +701,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { testHarness.processElement(next2); } - timerService.setCurrentTime(1000); + testHarness.setProcessingTime(1000); int count1 = StatefulFunction.globalCounts.get(1); int count2 = StatefulFunction.globalCounts.get(2); @@ -851,32 +725,30 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { final int windowSlide = 50; final int windowSize = factor * windowSlide; - TestProcessingTimeService timerService = new TestProcessingTimeService(); - StatefulFunction.globalCounts.clear(); - + AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op = new AggregatingProcessingTimeWindowOperator<>( new StatefulFunction(), fieldOneSelector, IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSlide); - timerService.setCurrentTime(0); KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = new KeyedOneInputStreamOperatorTestHarness<>( op, new ExecutionConfig(), - timerService, fieldOneSelector, BasicTypeInfo.INT_TYPE_INFO); + testHarness.setProcessingTime(0); + testHarness.open(); // because the window interval is so large, everything should be in one window // and aggregate into one value per key final int numElements = 100; - + // because we do not release the lock here, these elements for (int i = 0; i < numElements; i++) { - + StreamRecord<Tuple2<Integer, Integer>> next1 = new StreamRecord<>(new Tuple2<>(1, i)); StreamRecord<Tuple2<Integer, Integer>> next2 = new StreamRecord<>(new Tuple2<>(2, i)); StreamRecord<Tuple2<Integer, Integer>> next3 = new StreamRecord<>(new Tuple2<>(1, i)); @@ -888,14 +760,14 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { testHarness.processElement(next4); } - timerService.setCurrentTime(50); - timerService.setCurrentTime(100); - timerService.setCurrentTime(150); - timerService.setCurrentTime(200); + testHarness.setProcessingTime(50); + testHarness.setProcessingTime(100); + testHarness.setProcessingTime(150); + testHarness.setProcessingTime(200); int count1 = StatefulFunction.globalCounts.get(1); int count2 = StatefulFunction.globalCounts.get(2); - + assertTrue(count1 >= 2 && count1 <= 2 * numElements); assertEquals(count1, count2); @@ -907,12 +779,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { fail(e.getMessage()); } } - + // ------------------------------------------------------------------------ - + private void assertInvalidParameter(long windowSize, long windowSlide) { try { - new AggregatingProcessingTimeWindowOperator<String, String>( + new AggregatingProcessingTimeWindowOperator<>( mockFunction, mockKeySelector, StringSerializer.INSTANCE, StringSerializer.INSTANCE, windowSize, windowSlide); @@ -927,11 +799,11 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { } // ------------------------------------------------------------------------ - + private static class FailingFunction implements ReduceFunction<Tuple2<Integer, Integer>> { private final int failAfterElements; - + private int numElements; FailingFunction(int failAfterElements) { @@ -945,7 +817,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { if (numElements >= failAfterElements) { throw new Exception("Artificial Test Exception"); } - + return new Tuple2<>(value1.f0, value1.f1 + value2.f1); } } @@ -961,7 +833,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { @Override public void open(Configuration parameters) { assertNotNull(getRuntimeContext()); - + // start with one, so the final count is correct and we test that we do not // initialize with 0 always by default state = getRuntimeContext().getState(new ValueStateDescriptor<>("totalCount", Integer.class, 1)); @@ -971,44 +843,15 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception { state.update(state.value() + 1); globalCounts.put(value1.f0, state.value()); - + return new Tuple2<>(value1.f0, value1.f1 + value2.f1); } } // ------------------------------------------------------------------------ - - private static StreamTask<?, ?> createMockTask() { - Configuration configuration = new Configuration(); - configuration.setString(ConfigConstants.STATE_BACKEND, "jobmanager"); - - StreamTask<?, ?> task = mock(StreamTask.class); - when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>()); - when(task.getName()).thenReturn("Test task name"); - when(task.getExecutionConfig()).thenReturn(new ExecutionConfig()); - - final TaskManagerRuntimeInfo mockTaskManagerRuntimeInfo = mock(TaskManagerRuntimeInfo.class); - when(mockTaskManagerRuntimeInfo.getConfiguration()).thenReturn(configuration); - - final Environment env = new DummyEnvironment("Test task name", 1, 0); - when(task.getEnvironment()).thenReturn(env); - - return task; - } - - private static StreamTask<?, ?> createMockTaskWithTimer(final ProcessingTimeService timerService) - { - StreamTask<?, ?> mockTask = createMockTask(); - when(mockTask.getProcessingTimeService()).thenReturn(timerService); - return mockTask; - } - - private static StreamConfig createTaskConfig(KeySelector<?, ?> partitioner, TypeSerializer<?> keySerializer, int numberOfKeGroups) { - return new StreamConfig(new Configuration()); - } @SuppressWarnings({"unchecked", "rawtypes"}) - private <T> List<T> extractFromStreamRecords(Iterable<Object> input) { + private <T> List<T> extractFromStreamRecords(Iterable<?> input) { List<T> result = new ArrayList<>(); for (Object in : input) { if (in instanceof StreamRecord) { @@ -1017,12 +860,4 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { } return result; } - - private static void shutdownTimerServiceAndWait(ProcessingTimeService timers) throws Exception { - timers.shutdownService(); - - while (!timers.isTerminated()) { - Thread.sleep(2); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java deleted file mode 100644 index 42be131..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.windowing; - -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; - -import java.util.ArrayList; -import java.util.List; - -public class CollectingOutput<T> implements Output<StreamRecord<T>> { - - private final List<T> elements = new ArrayList<>(); - - private final int timeStampModulus; - - - public CollectingOutput() { - this.timeStampModulus = 0; - } - - public CollectingOutput(int timeStampModulus) { - this.timeStampModulus = timeStampModulus; - } - - // ------------------------------------------------------------------------ - - public List<T> getElements() { - return elements; - } - - public void waitForNElements(int n, long timeout) throws InterruptedException { - long deadline = System.currentTimeMillis() + timeout; - synchronized (elements) { - long now; - while (elements.size() < n && (now = System.currentTimeMillis()) < deadline) { - elements.wait(deadline - now); - } - } - } - - // ------------------------------------------------------------------------ - - @Override - public void emitWatermark(Watermark mark) { - throw new UnsupportedOperationException("The output should not emit watermarks"); - } - - @Override - public void emitLatencyMarker(LatencyMarker latencyMarker) { - throw new UnsupportedOperationException("The output should not emit latency markers"); - } - - @Override - public void collect(StreamRecord<T> record) { - elements.add(record.getValue()); - - if (timeStampModulus != 0 && record.getTimestamp() % timeStampModulus != 0) { - throw new IllegalArgumentException("Invalid timestamp"); - } - synchronized (elements) { - elements.notifyAll(); - } - } - - @Override - public void close() {} -} http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java deleted file mode 100644 index a7a71cf..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.windowing; - -import org.apache.flink.streaming.runtime.operators.Triggerable; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; - -import java.util.concurrent.ScheduledFuture; - -class NoOpTimerService extends ProcessingTimeService { - - private volatile boolean terminated; - - @Override - public long getCurrentProcessingTime() { - return System.currentTimeMillis(); - } - - @Override - public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) { - return null; - } - - @Override - public boolean isTerminated() { - return terminated; - } - - @Override - public void quiesceAndAwaitPending() {} - - @Override - public void shutdownService() { - terminated = true; - } -}
