http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java index 01f9c32..fb240f4 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java @@ -43,464 +43,464 @@ import java.util.concurrent.ConcurrentLinkedQueue; public class GroupAlsoByWindowTest { - private final Combine.CombineFn combiner = new Sum.SumIntegerFn(); - - private final WindowingStrategy slidingWindowWithAfterWatermarkTriggerStrategy = - WindowingStrategy.of(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5))) - .withTrigger(AfterWatermark.pastEndOfWindow()).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES); - - private final WindowingStrategy sessionWindowingStrategy = - WindowingStrategy.of(Sessions.withGapDuration(Duration.standardSeconds(2))) - .withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) - .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) - .withAllowedLateness(Duration.standardSeconds(100)); - - private final WindowingStrategy fixedWindowingStrategy = - WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(10))); - - private final WindowingStrategy fixedWindowWithCountTriggerStrategy = - fixedWindowingStrategy.withTrigger(AfterPane.elementCountAtLeast(5)); - - private final WindowingStrategy fixedWindowWithAfterWatermarkTriggerStrategy = - fixedWindowingStrategy.withTrigger(AfterWatermark.pastEndOfWindow()); - - private final WindowingStrategy fixedWindowWithCompoundTriggerStrategy = - fixedWindowingStrategy.withTrigger( - AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(5)) - .withLateFirings(AfterPane.elementCountAtLeast(5)).buildTrigger()); - - /** - * The default accumulation mode is - * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#DISCARDING_FIRED_PANES}. - * This strategy changes it to - * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#ACCUMULATING_FIRED_PANES} - */ - private final WindowingStrategy fixedWindowWithCompoundTriggerStrategyAcc = - fixedWindowWithCompoundTriggerStrategy - .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES); - - @Test - public void testWithLateness() throws Exception { - WindowingStrategy strategy = WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(2))) - .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) - .withAllowedLateness(Duration.millis(1000)); - long initialTime = 0L; - Pipeline pipeline = FlinkTestPipeline.createForStreaming(); - - KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); - - FlinkGroupAlsoByWindowWrapper gbwOperaror = - FlinkGroupAlsoByWindowWrapper.createForTesting( - pipeline.getOptions(), - pipeline.getCoderRegistry(), - strategy, - inputCoder, - combiner.<String>asKeyedFn()); - - OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = - new OneInputStreamOperatorTestHarness<>(gbwOperaror); - testHarness.open(); - - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processWatermark(new Watermark(initialTime + 2000)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processWatermark(new Watermark(initialTime + 4000)); - - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key1", 4), - new Instant(initialTime + 1), - new IntervalWindow(new Instant(0), new Instant(2000)), - PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0)) - , initialTime + 1)); - expectedOutput.add(new Watermark(initialTime + 2000)); - - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key1", 5), - new Instant(initialTime + 1999), - new IntervalWindow(new Instant(0), new Instant(2000)), - PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 1, 1)) - , initialTime + 1999)); - - - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key1", 6), - new Instant(initialTime + 1999), - new IntervalWindow(new Instant(0), new Instant(2000)), - PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 2, 2)) - , initialTime + 1999)); - expectedOutput.add(new Watermark(initialTime + 4000)); - - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); - testHarness.close(); - } - - @Test - public void testSessionWindows() throws Exception { - WindowingStrategy strategy = sessionWindowingStrategy; - - long initialTime = 0L; - Pipeline pipeline = FlinkTestPipeline.createForStreaming(); - - KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); - - FlinkGroupAlsoByWindowWrapper gbwOperaror = - FlinkGroupAlsoByWindowWrapper.createForTesting( - pipeline.getOptions(), - pipeline.getCoderRegistry(), - strategy, - inputCoder, - combiner.<String>asKeyedFn()); - - OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = - new OneInputStreamOperatorTestHarness<>(gbwOperaror); - testHarness.open(); - - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 3500), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 3700), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 2700), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processWatermark(new Watermark(initialTime + 6000)); - - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 6700), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 6800), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 8900), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 7600), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 5600), null, PaneInfo.NO_FIRING), initialTime + 20)); - - testHarness.processWatermark(new Watermark(initialTime + 12000)); - - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key1", 6), - new Instant(initialTime + 1), - new IntervalWindow(new Instant(1), new Instant(5700)), - PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0)) - , initialTime + 1)); - expectedOutput.add(new Watermark(initialTime + 6000)); - - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key1", 11), - new Instant(initialTime + 6700), - new IntervalWindow(new Instant(1), new Instant(10900)), - PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0)) - , initialTime + 6700)); - expectedOutput.add(new Watermark(initialTime + 12000)); - - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); - testHarness.close(); - } - - @Test - public void testSlidingWindows() throws Exception { - WindowingStrategy strategy = slidingWindowWithAfterWatermarkTriggerStrategy; - long initialTime = 0L; - OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = - createTestingOperatorAndState(strategy, initialTime); - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - testHarness.processWatermark(new Watermark(initialTime + 25000)); - - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key1", 6), - new Instant(initialTime + 5000), - new IntervalWindow(new Instant(0), new Instant(10000)), - PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) - , initialTime + 5000)); - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key1", 6), - new Instant(initialTime + 1), - new IntervalWindow(new Instant(-5000), new Instant(5000)), - PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) - , initialTime + 1)); - expectedOutput.add(new Watermark(initialTime + 10000)); - - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key1", 11), - new Instant(initialTime + 15000), - new IntervalWindow(new Instant(10000), new Instant(20000)), - PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) - , initialTime + 15000)); - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key1", 3), - new Instant(initialTime + 10000), - new IntervalWindow(new Instant(5000), new Instant(15000)), - PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) - , initialTime + 10000)); - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key2", 1), - new Instant(initialTime + 19500), - new IntervalWindow(new Instant(10000), new Instant(20000)), - PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) - , initialTime + 19500)); - expectedOutput.add(new Watermark(initialTime + 20000)); - - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key2", 1), - new Instant(initialTime + 20000), - /** - * this is 20000 and not 19500 because of a convention in dataflow where - * timestamps of windowed values in a window cannot be smaller than the - * end of a previous window. Checkout the documentation of the - * {@link WindowFn#getOutputTime(Instant, BoundedWindow)} - */ - new IntervalWindow(new Instant(15000), new Instant(25000)), - PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) - , initialTime + 20000)); - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key1", 8), - new Instant(initialTime + 20000), - new IntervalWindow(new Instant(15000), new Instant(25000)), - PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) - , initialTime + 20000)); - expectedOutput.add(new Watermark(initialTime + 25000)); - - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); - testHarness.close(); - } - - @Test - public void testAfterWatermarkProgram() throws Exception { - WindowingStrategy strategy = fixedWindowWithAfterWatermarkTriggerStrategy; - long initialTime = 0L; - OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = - createTestingOperatorAndState(strategy, initialTime); - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6), - new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 1)); - expectedOutput.add(new Watermark(initialTime + 10000)); - - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11), - new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 10000)); - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), - new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500)); - expectedOutput.add(new Watermark(initialTime + 20000)); - - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); - testHarness.close(); - } - - @Test - public void testAfterCountProgram() throws Exception { - WindowingStrategy strategy = fixedWindowWithCountTriggerStrategy; - - long initialTime = 0L; - OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = - createTestingOperatorAndState(strategy, initialTime); - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), - new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 1)); - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), - new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 10000)); - expectedOutput.add(new Watermark(initialTime + 10000)); - - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), - new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0, 0)), initialTime + 19500)); - expectedOutput.add(new Watermark(initialTime + 20000)); - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); - - testHarness.close(); - } - - @Test - public void testCompoundProgram() throws Exception { - WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategy; - - long initialTime = 0L; - OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = - createTestingOperatorAndState(strategy, initialTime); - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - /** - * PaneInfo are: - * isFirst (pane in window), - * isLast, Timing (of triggering), - * index (of pane in the window), - * onTimeIndex (if it the 1st,2nd, ... pane that was fired on time) - * */ - - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), - new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1)); - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), - new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000)); - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), - new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500)); - - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), - new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200)); - - expectedOutput.add(new Watermark(initialTime + 10000)); - - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), - new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500)); - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), - new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500)); - - expectedOutput.add(new Watermark(initialTime + 20000)); - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); - - testHarness.close(); - } - - @Test - public void testCompoundAccumulatingPanesProgram() throws Exception { - WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategyAcc; - long initialTime = 0L; - OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = - createTestingOperatorAndState(strategy, initialTime); - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), - new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1)); - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), - new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000)); - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 10), - new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500)); - - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6), - new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200)); - - expectedOutput.add(new Watermark(initialTime + 10000)); - - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11), - new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500)); - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), - new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500)); - - expectedOutput.add(new Watermark(initialTime + 20000)); - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); - - testHarness.close(); - } - - private OneInputStreamOperatorTestHarness createTestingOperatorAndState(WindowingStrategy strategy, long initialTime) throws Exception { - Pipeline pipeline = FlinkTestPipeline.createForStreaming(); - - KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); - - FlinkGroupAlsoByWindowWrapper gbwOperaror = - FlinkGroupAlsoByWindowWrapper.createForTesting( - pipeline.getOptions(), - pipeline.getCoderRegistry(), - strategy, - inputCoder, - combiner.<String>asKeyedFn()); - - OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = - new OneInputStreamOperatorTestHarness<>(gbwOperaror); - testHarness.open(); - - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); - - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 10000), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 12100), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 14200), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 15300), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 16500), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); - - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); - - testHarness.processWatermark(new Watermark(initialTime + 10000)); - testHarness.processWatermark(new Watermark(initialTime + 20000)); - - return testHarness; - } - - private static class ResultSortComparator implements Comparator<Object> { - @Override - public int compare(Object o1, Object o2) { - if (o1 instanceof Watermark && o2 instanceof Watermark) { - Watermark w1 = (Watermark) o1; - Watermark w2 = (Watermark) o2; - return (int) (w1.getTimestamp() - w2.getTimestamp()); - } else { - StreamRecord<WindowedValue<KV<String, Integer>>> sr0 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o1; - StreamRecord<WindowedValue<KV<String, Integer>>> sr1 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o2; - - int comparison = (int) (sr0.getValue().getTimestamp().getMillis() - sr1.getValue().getTimestamp().getMillis()); - if (comparison != 0) { - return comparison; - } - - comparison = sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey()); - if(comparison == 0) { - comparison = Integer.compare( - sr0.getValue().getValue().getValue(), - sr1.getValue().getValue().getValue()); - } - if(comparison == 0) { - Collection windowsA = sr0.getValue().getWindows(); - Collection windowsB = sr1.getValue().getWindows(); - - if(windowsA.size() != 1 || windowsB.size() != 1) { - throw new IllegalStateException("A value cannot belong to more than one windows after grouping."); - } - - BoundedWindow windowA = (BoundedWindow) windowsA.iterator().next(); - BoundedWindow windowB = (BoundedWindow) windowsB.iterator().next(); - comparison = Long.compare(windowA.maxTimestamp().getMillis(), windowB.maxTimestamp().getMillis()); - } - return comparison; - } - } - } - - private <T> WindowedValue<T> makeWindowedValue(WindowingStrategy strategy, - T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { - final Instant inputTimestamp = timestamp; - final WindowFn windowFn = strategy.getWindowFn(); - - if (timestamp == null) { - timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; - } - - if (windows == null) { - try { - windows = windowFn.assignWindows(windowFn.new AssignContext() { - @Override - public Object element() { - throw new UnsupportedOperationException( - "WindowFn attempted to access input element when none was available"); - } - - @Override - public Instant timestamp() { - if (inputTimestamp == null) { - throw new UnsupportedOperationException( - "WindowFn attempted to access input timestamp when none was available"); - } - return inputTimestamp; - } - - @Override - public Collection<? extends BoundedWindow> windows() { - throw new UnsupportedOperationException( - "WindowFn attempted to access input windows when none were available"); - } - }); - } catch (Exception e) { - throw UserCodeException.wrap(e); - } - } - - return WindowedValue.of(output, timestamp, windows, pane); - } + private final Combine.CombineFn combiner = new Sum.SumIntegerFn(); + + private final WindowingStrategy slidingWindowWithAfterWatermarkTriggerStrategy = + WindowingStrategy.of(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5))) + .withTrigger(AfterWatermark.pastEndOfWindow()).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES); + + private final WindowingStrategy sessionWindowingStrategy = + WindowingStrategy.of(Sessions.withGapDuration(Duration.standardSeconds(2))) + .withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) + .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.standardSeconds(100)); + + private final WindowingStrategy fixedWindowingStrategy = + WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(10))); + + private final WindowingStrategy fixedWindowWithCountTriggerStrategy = + fixedWindowingStrategy.withTrigger(AfterPane.elementCountAtLeast(5)); + + private final WindowingStrategy fixedWindowWithAfterWatermarkTriggerStrategy = + fixedWindowingStrategy.withTrigger(AfterWatermark.pastEndOfWindow()); + + private final WindowingStrategy fixedWindowWithCompoundTriggerStrategy = + fixedWindowingStrategy.withTrigger( + AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(5)) + .withLateFirings(AfterPane.elementCountAtLeast(5)).buildTrigger()); + + /** + * The default accumulation mode is + * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#DISCARDING_FIRED_PANES}. + * This strategy changes it to + * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#ACCUMULATING_FIRED_PANES} + */ + private final WindowingStrategy fixedWindowWithCompoundTriggerStrategyAcc = + fixedWindowWithCompoundTriggerStrategy + .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES); + + @Test + public void testWithLateness() throws Exception { + WindowingStrategy strategy = WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(2))) + .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.millis(1000)); + long initialTime = 0L; + Pipeline pipeline = FlinkTestPipeline.createForStreaming(); + + KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); + + FlinkGroupAlsoByWindowWrapper gbwOperaror = + FlinkGroupAlsoByWindowWrapper.createForTesting( + pipeline.getOptions(), + pipeline.getCoderRegistry(), + strategy, + inputCoder, + combiner.<String>asKeyedFn()); + + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + new OneInputStreamOperatorTestHarness<>(gbwOperaror); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processWatermark(new Watermark(initialTime + 2000)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processWatermark(new Watermark(initialTime + 4000)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 4), + new Instant(initialTime + 1), + new IntervalWindow(new Instant(0), new Instant(2000)), + PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0)) + , initialTime + 1)); + expectedOutput.add(new Watermark(initialTime + 2000)); + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 5), + new Instant(initialTime + 1999), + new IntervalWindow(new Instant(0), new Instant(2000)), + PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 1, 1)) + , initialTime + 1999)); + + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 6), + new Instant(initialTime + 1999), + new IntervalWindow(new Instant(0), new Instant(2000)), + PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 2, 2)) + , initialTime + 1999)); + expectedOutput.add(new Watermark(initialTime + 4000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testSessionWindows() throws Exception { + WindowingStrategy strategy = sessionWindowingStrategy; + + long initialTime = 0L; + Pipeline pipeline = FlinkTestPipeline.createForStreaming(); + + KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); + + FlinkGroupAlsoByWindowWrapper gbwOperaror = + FlinkGroupAlsoByWindowWrapper.createForTesting( + pipeline.getOptions(), + pipeline.getCoderRegistry(), + strategy, + inputCoder, + combiner.<String>asKeyedFn()); + + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + new OneInputStreamOperatorTestHarness<>(gbwOperaror); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 3500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 3700), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 2700), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processWatermark(new Watermark(initialTime + 6000)); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 6700), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 6800), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 8900), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 7600), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 5600), null, PaneInfo.NO_FIRING), initialTime + 20)); + + testHarness.processWatermark(new Watermark(initialTime + 12000)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 6), + new Instant(initialTime + 1), + new IntervalWindow(new Instant(1), new Instant(5700)), + PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0)) + , initialTime + 1)); + expectedOutput.add(new Watermark(initialTime + 6000)); + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 11), + new Instant(initialTime + 6700), + new IntervalWindow(new Instant(1), new Instant(10900)), + PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0)) + , initialTime + 6700)); + expectedOutput.add(new Watermark(initialTime + 12000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testSlidingWindows() throws Exception { + WindowingStrategy strategy = slidingWindowWithAfterWatermarkTriggerStrategy; + long initialTime = 0L; + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + createTestingOperatorAndState(strategy, initialTime); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + testHarness.processWatermark(new Watermark(initialTime + 25000)); + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 6), + new Instant(initialTime + 5000), + new IntervalWindow(new Instant(0), new Instant(10000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime + 5000)); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 6), + new Instant(initialTime + 1), + new IntervalWindow(new Instant(-5000), new Instant(5000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime + 1)); + expectedOutput.add(new Watermark(initialTime + 10000)); + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 11), + new Instant(initialTime + 15000), + new IntervalWindow(new Instant(10000), new Instant(20000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime + 15000)); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 3), + new Instant(initialTime + 10000), + new IntervalWindow(new Instant(5000), new Instant(15000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime + 10000)); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key2", 1), + new Instant(initialTime + 19500), + new IntervalWindow(new Instant(10000), new Instant(20000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime + 19500)); + expectedOutput.add(new Watermark(initialTime + 20000)); + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key2", 1), + new Instant(initialTime + 20000), + /** + * this is 20000 and not 19500 because of a convention in dataflow where + * timestamps of windowed values in a window cannot be smaller than the + * end of a previous window. Checkout the documentation of the + * {@link WindowFn#getOutputTime(Instant, BoundedWindow)} + */ + new IntervalWindow(new Instant(15000), new Instant(25000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime + 20000)); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 8), + new Instant(initialTime + 20000), + new IntervalWindow(new Instant(15000), new Instant(25000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime + 20000)); + expectedOutput.add(new Watermark(initialTime + 25000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testAfterWatermarkProgram() throws Exception { + WindowingStrategy strategy = fixedWindowWithAfterWatermarkTriggerStrategy; + long initialTime = 0L; + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + createTestingOperatorAndState(strategy, initialTime); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6), + new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 1)); + expectedOutput.add(new Watermark(initialTime + 10000)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11), + new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 10000)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), + new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500)); + expectedOutput.add(new Watermark(initialTime + 20000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testAfterCountProgram() throws Exception { + WindowingStrategy strategy = fixedWindowWithCountTriggerStrategy; + + long initialTime = 0L; + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + createTestingOperatorAndState(strategy, initialTime); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 1)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 10000)); + expectedOutput.add(new Watermark(initialTime + 10000)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), + new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0, 0)), initialTime + 19500)); + expectedOutput.add(new Watermark(initialTime + 20000)); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.close(); + } + + @Test + public void testCompoundProgram() throws Exception { + WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategy; + + long initialTime = 0L; + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + createTestingOperatorAndState(strategy, initialTime); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + /** + * PaneInfo are: + * isFirst (pane in window), + * isLast, Timing (of triggering), + * index (of pane in the window), + * onTimeIndex (if it the 1st,2nd, ... pane that was fired on time) + * */ + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), + new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200)); + + expectedOutput.add(new Watermark(initialTime + 10000)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), + new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), + new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500)); + + expectedOutput.add(new Watermark(initialTime + 20000)); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.close(); + } + + @Test + public void testCompoundAccumulatingPanesProgram() throws Exception { + WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategyAcc; + long initialTime = 0L; + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + createTestingOperatorAndState(strategy, initialTime); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 10), + new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6), + new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200)); + + expectedOutput.add(new Watermark(initialTime + 10000)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11), + new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), + new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500)); + + expectedOutput.add(new Watermark(initialTime + 20000)); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.close(); + } + + private OneInputStreamOperatorTestHarness createTestingOperatorAndState(WindowingStrategy strategy, long initialTime) throws Exception { + Pipeline pipeline = FlinkTestPipeline.createForStreaming(); + + KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); + + FlinkGroupAlsoByWindowWrapper gbwOperaror = + FlinkGroupAlsoByWindowWrapper.createForTesting( + pipeline.getOptions(), + pipeline.getCoderRegistry(), + strategy, + inputCoder, + combiner.<String>asKeyedFn()); + + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + new OneInputStreamOperatorTestHarness<>(gbwOperaror); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 10000), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 12100), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 14200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 15300), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 16500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + + testHarness.processWatermark(new Watermark(initialTime + 10000)); + testHarness.processWatermark(new Watermark(initialTime + 20000)); + + return testHarness; + } + + private static class ResultSortComparator implements Comparator<Object> { + @Override + public int compare(Object o1, Object o2) { + if (o1 instanceof Watermark && o2 instanceof Watermark) { + Watermark w1 = (Watermark) o1; + Watermark w2 = (Watermark) o2; + return (int) (w1.getTimestamp() - w2.getTimestamp()); + } else { + StreamRecord<WindowedValue<KV<String, Integer>>> sr0 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o1; + StreamRecord<WindowedValue<KV<String, Integer>>> sr1 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o2; + + int comparison = (int) (sr0.getValue().getTimestamp().getMillis() - sr1.getValue().getTimestamp().getMillis()); + if (comparison != 0) { + return comparison; + } + + comparison = sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey()); + if(comparison == 0) { + comparison = Integer.compare( + sr0.getValue().getValue().getValue(), + sr1.getValue().getValue().getValue()); + } + if(comparison == 0) { + Collection windowsA = sr0.getValue().getWindows(); + Collection windowsB = sr1.getValue().getWindows(); + + if(windowsA.size() != 1 || windowsB.size() != 1) { + throw new IllegalStateException("A value cannot belong to more than one windows after grouping."); + } + + BoundedWindow windowA = (BoundedWindow) windowsA.iterator().next(); + BoundedWindow windowB = (BoundedWindow) windowsB.iterator().next(); + comparison = Long.compare(windowA.maxTimestamp().getMillis(), windowB.maxTimestamp().getMillis()); + } + return comparison; + } + } + } + + private <T> WindowedValue<T> makeWindowedValue(WindowingStrategy strategy, + T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { + final Instant inputTimestamp = timestamp; + final WindowFn windowFn = strategy.getWindowFn(); + + if (timestamp == null) { + timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + if (windows == null) { + try { + windows = windowFn.assignWindows(windowFn.new AssignContext() { + @Override + public Object element() { + throw new UnsupportedOperationException( + "WindowFn attempted to access input element when none was available"); + } + + @Override + public Instant timestamp() { + if (inputTimestamp == null) { + throw new UnsupportedOperationException( + "WindowFn attempted to access input timestamp when none was available"); + } + return inputTimestamp; + } + + @Override + public Collection<? extends BoundedWindow> windows() { + throw new UnsupportedOperationException( + "WindowFn attempted to access input windows when none were available"); + } + }); + } catch (Exception e) { + throw UserCodeException.wrap(e); + } + } + + return WindowedValue.of(output, timestamp, windows, pane); + } }
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java index 5a412aa..52e9e25 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java @@ -39,83 +39,83 @@ import java.util.Arrays; public class GroupByNullKeyTest extends StreamingProgramTestBase implements Serializable { - protected String resultPath; + protected String resultPath; - static final String[] EXPECTED_RESULT = new String[] { - "k: null v: user1 user1 user1 user2 user2 user2 user2 user3" - }; + static final String[] EXPECTED_RESULT = new String[] { + "k: null v: user1 user1 user1 user2 user2 user2 user2 user3" + }; - public GroupByNullKeyTest(){ - } + public GroupByNullKeyTest(){ + } - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } - public static class ExtractUserAndTimestamp extends DoFn<KV<Integer, String>, String> { - private static final long serialVersionUID = 0; + public static class ExtractUserAndTimestamp extends DoFn<KV<Integer, String>, String> { + private static final long serialVersionUID = 0; - @Override - public void processElement(ProcessContext c) { - KV<Integer, String> record = c.element(); - long now = System.currentTimeMillis(); - int timestamp = record.getKey(); - String userName = record.getValue(); - if (userName != null) { - // Sets the implicit timestamp field to be used in windowing. - c.outputWithTimestamp(userName, new Instant(timestamp + now)); - } - } - } + @Override + public void processElement(ProcessContext c) { + KV<Integer, String> record = c.element(); + long now = System.currentTimeMillis(); + int timestamp = record.getKey(); + String userName = record.getValue(); + if (userName != null) { + // Sets the implicit timestamp field to be used in windowing. + c.outputWithTimestamp(userName, new Instant(timestamp + now)); + } + } + } - @Override - protected void testProgram() throws Exception { + @Override + protected void testProgram() throws Exception { - Pipeline p = FlinkTestPipeline.createForStreaming(); + Pipeline p = FlinkTestPipeline.createForStreaming(); - PCollection<String> output = - p.apply(Create.of(Arrays.asList( - KV.<Integer, String>of(0, "user1"), - KV.<Integer, String>of(1, "user1"), - KV.<Integer, String>of(2, "user1"), - KV.<Integer, String>of(10, "user2"), - KV.<Integer, String>of(1, "user2"), - KV.<Integer, String>of(15000, "user2"), - KV.<Integer, String>of(12000, "user2"), - KV.<Integer, String>of(25000, "user3")))) - .apply(ParDo.of(new ExtractUserAndTimestamp())) - .apply(Window.<String>into(FixedWindows.of(Duration.standardHours(1))) - .triggering(AfterWatermark.pastEndOfWindow()) - .withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()) + PCollection<String> output = + p.apply(Create.of(Arrays.asList( + KV.<Integer, String>of(0, "user1"), + KV.<Integer, String>of(1, "user1"), + KV.<Integer, String>of(2, "user1"), + KV.<Integer, String>of(10, "user2"), + KV.<Integer, String>of(1, "user2"), + KV.<Integer, String>of(15000, "user2"), + KV.<Integer, String>of(12000, "user2"), + KV.<Integer, String>of(25000, "user3")))) + .apply(ParDo.of(new ExtractUserAndTimestamp())) + .apply(Window.<String>into(FixedWindows.of(Duration.standardHours(1))) + .triggering(AfterWatermark.pastEndOfWindow()) + .withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()) - .apply(ParDo.of(new DoFn<String, KV<Void, String>>() { - @Override - public void processElement(ProcessContext c) throws Exception { - String elem = c.element(); - c.output(KV.<Void, String>of((Void) null, elem)); - } - })) - .apply(GroupByKey.<Void, String>create()) - .apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() { - @Override - public void processElement(ProcessContext c) throws Exception { - KV<Void, Iterable<String>> elem = c.element(); - StringBuilder str = new StringBuilder(); - str.append("k: " + elem.getKey() + " v:"); - for (String v : elem.getValue()) { - str.append(" " + v); - } - c.output(str.toString()); - } - })); - output.apply(TextIO.Write.to(resultPath)); - p.run(); - } + .apply(ParDo.of(new DoFn<String, KV<Void, String>>() { + @Override + public void processElement(ProcessContext c) throws Exception { + String elem = c.element(); + c.output(KV.<Void, String>of((Void) null, elem)); + } + })) + .apply(GroupByKey.<Void, String>create()) + .apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + KV<Void, Iterable<String>> elem = c.element(); + StringBuilder str = new StringBuilder(); + str.append("k: " + elem.getKey() + " v:"); + for (String v : elem.getValue()) { + str.append(" " + v); + } + c.output(str.toString()); + } + })); + output.apply(TextIO.Write.to(resultPath)); + p.run(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java index 7489fcc..d5b1043 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java @@ -43,261 +43,261 @@ import static org.junit.Assert.assertEquals; public class StateSerializationTest { - private static final StateNamespace NAMESPACE_1 = StateNamespaces.global(); - private static final String KEY_PREFIX = "TEST_"; - - // TODO: This can be replaced with the standard Sum.SumIntererFn once the state no longer needs - // to create a StateTag at the point of restoring state. Currently StateTags are compared strictly - // by type and combiners always use KeyedCombineFnWithContext rather than KeyedCombineFn or CombineFn. - private static CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[], Integer> SUM_COMBINER = - new CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[], Integer>() { - @Override - public int[] createAccumulator(Object key, CombineWithContext.Context c) { - return new int[1]; - } - - @Override - public int[] addInput(Object key, int[] accumulator, Integer value, CombineWithContext.Context c) { - accumulator[0] += value; - return accumulator; - } - - @Override - public int[] mergeAccumulators(Object key, Iterable<int[]> accumulators, CombineWithContext.Context c) { - int[] r = new int[1]; - for (int[] a : accumulators) { - r[0] += a[0]; - } - return r; - } - - @Override - public Integer extractOutput(Object key, int[] accumulator, CombineWithContext.Context c) { - return accumulator[0]; - } - }; - - private static Coder<int[]> INT_ACCUM_CODER = DelegateCoder.of( - VarIntCoder.of(), - new DelegateCoder.CodingFunction<int[], Integer>() { - @Override - public Integer apply(int[] accumulator) { - return accumulator[0]; - } - }, - new DelegateCoder.CodingFunction<Integer, int[]>() { - @Override - public int[] apply(Integer value) { - int[] a = new int[1]; - a[0] = value; - return a; - } - }); - - private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR = - StateTags.value("stringValue", StringUtf8Coder.of()); - private static final StateTag<Object, ValueState<Integer>> INT_VALUE_ADDR = - StateTags.value("stringValue", VarIntCoder.of()); - private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>> SUM_INTEGER_ADDR = - StateTags.keyedCombiningValueWithContext("sumInteger", INT_ACCUM_CODER, SUM_COMBINER); - private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = - StateTags.bag("stringBag", StringUtf8Coder.of()); - private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_BAG_ADDR = - StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp()); - - private Map<String, FlinkStateInternals<String>> statePerKey = new HashMap<>(); - - private Map<String, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(); - - private void initializeStateAndTimers() throws CannotProvideCoderException { - for (int i = 0; i < 10; i++) { - String key = KEY_PREFIX + i; - - FlinkStateInternals state = initializeStateForKey(key); - Set<TimerInternals.TimerData> timers = new HashSet<>(); - for (int j = 0; j < 5; j++) { - TimerInternals.TimerData timer = TimerInternals - .TimerData.of(NAMESPACE_1, - new Instant(1000 + i + j), TimeDomain.values()[j % 3]); - timers.add(timer); - } - - statePerKey.put(key, state); - activeTimers.put(key, timers); - } - } - - private FlinkStateInternals<String> initializeStateForKey(String key) throws CannotProvideCoderException { - FlinkStateInternals<String> state = createState(key); - - ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR); - value.write("test"); - - ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR); - value2.write(4); - value2.write(5); - - AccumulatorCombiningState<Integer, int[], Integer> combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR); - combiningValue.add(1); - combiningValue.add(2); - - WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR); - watermark.add(new Instant(1000)); - - BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR); - bag.add("v1"); - bag.add("v2"); - bag.add("v3"); - bag.add("v4"); - return state; - } - - private boolean restoreAndTestState(DataInputView in) throws Exception { - StateCheckpointReader reader = new StateCheckpointReader(in); - final ClassLoader userClassloader = this.getClass().getClassLoader(); - Coder<? extends BoundedWindow> windowCoder = IntervalWindow.getCoder(); - Coder<String> keyCoder = StringUtf8Coder.of(); - - boolean comparisonRes = true; - - for (String key : statePerKey.keySet()) { - comparisonRes &= checkStateForKey(key); - } - - // restore the timers - Map<String, Set<TimerInternals.TimerData>> restoredTimersPerKey = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder); - if (activeTimers.size() != restoredTimersPerKey.size()) { - return false; - } - - for (String key : statePerKey.keySet()) { - Set<TimerInternals.TimerData> originalTimers = activeTimers.get(key); - Set<TimerInternals.TimerData> restoredTimers = restoredTimersPerKey.get(key); - comparisonRes &= checkTimersForKey(originalTimers, restoredTimers); - } - - // restore the state - Map<String, FlinkStateInternals<String>> restoredPerKeyState = - StateCheckpointUtils.decodeState(reader, OutputTimeFns.outputAtEarliestInputTimestamp(), keyCoder, windowCoder, userClassloader); - if (restoredPerKeyState.size() != statePerKey.size()) { - return false; - } - - for (String key : statePerKey.keySet()) { - FlinkStateInternals<String> originalState = statePerKey.get(key); - FlinkStateInternals<String> restoredState = restoredPerKeyState.get(key); - comparisonRes &= checkStateForKey(originalState, restoredState); - } - return comparisonRes; - } - - private boolean checkStateForKey(String key) throws CannotProvideCoderException { - FlinkStateInternals<String> state = statePerKey.get(key); - - ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR); - boolean comp = value.read().equals("test"); - - ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR); - comp &= value2.read().equals(5); - - AccumulatorCombiningState<Integer, int[], Integer> combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR); - comp &= combiningValue.read().equals(3); - - WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR); - comp &= watermark.read().equals(new Instant(1000)); - - BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR); - Iterator<String> it = bag.read().iterator(); - int i = 0; - while (it.hasNext()) { - comp &= it.next().equals("v" + (++i)); - } - return comp; - } - - private void storeState(AbstractStateBackend.CheckpointStateOutputView out) throws Exception { - StateCheckpointWriter checkpointBuilder = StateCheckpointWriter.create(out); - Coder<String> keyCoder = StringUtf8Coder.of(); - - // checkpoint the timers - StateCheckpointUtils.encodeTimers(activeTimers, checkpointBuilder, keyCoder); - - // checkpoint the state - StateCheckpointUtils.encodeState(statePerKey, checkpointBuilder, keyCoder); - } - - private boolean checkTimersForKey(Set<TimerInternals.TimerData> originalTimers, Set<TimerInternals.TimerData> restoredTimers) { - boolean comp = true; - if (restoredTimers == null) { - return false; - } - - if (originalTimers.size() != restoredTimers.size()) { - return false; - } - - for (TimerInternals.TimerData timer : originalTimers) { - comp &= restoredTimers.contains(timer); - } - return comp; - } - - private boolean checkStateForKey(FlinkStateInternals<String> originalState, FlinkStateInternals<String> restoredState) throws CannotProvideCoderException { - if (restoredState == null) { - return false; - } - - ValueState<String> orValue = originalState.state(NAMESPACE_1, STRING_VALUE_ADDR); - ValueState<String> resValue = restoredState.state(NAMESPACE_1, STRING_VALUE_ADDR); - boolean comp = orValue.read().equals(resValue.read()); - - ValueState<Integer> orIntValue = originalState.state(NAMESPACE_1, INT_VALUE_ADDR); - ValueState<Integer> resIntValue = restoredState.state(NAMESPACE_1, INT_VALUE_ADDR); - comp &= orIntValue.read().equals(resIntValue.read()); - - AccumulatorCombiningState<Integer, int[], Integer> combOrValue = originalState.state(NAMESPACE_1, SUM_INTEGER_ADDR); - AccumulatorCombiningState<Integer, int[], Integer> combResValue = restoredState.state(NAMESPACE_1, SUM_INTEGER_ADDR); - comp &= combOrValue.read().equals(combResValue.read()); - - WatermarkHoldState orWatermark = originalState.state(NAMESPACE_1, WATERMARK_BAG_ADDR); - WatermarkHoldState resWatermark = restoredState.state(NAMESPACE_1, WATERMARK_BAG_ADDR); - comp &= orWatermark.read().equals(resWatermark.read()); - - BagState<String> orBag = originalState.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> resBag = restoredState.state(NAMESPACE_1, STRING_BAG_ADDR); - - Iterator<String> orIt = orBag.read().iterator(); - Iterator<String> resIt = resBag.read().iterator(); - - while (orIt.hasNext() && resIt.hasNext()) { - comp &= orIt.next().equals(resIt.next()); - } - - return !((orIt.hasNext() && !resIt.hasNext()) || (!orIt.hasNext() && resIt.hasNext())) && comp; - } - - private FlinkStateInternals<String> createState(String key) throws CannotProvideCoderException { - return new FlinkStateInternals<>( - key, - StringUtf8Coder.of(), - IntervalWindow.getCoder(), - OutputTimeFns.outputAtEarliestInputTimestamp()); - } - - @Test - public void test() throws Exception { - StateSerializationTest test = new StateSerializationTest(); - test.initializeStateAndTimers(); + private static final StateNamespace NAMESPACE_1 = StateNamespaces.global(); + private static final String KEY_PREFIX = "TEST_"; + + // TODO: This can be replaced with the standard Sum.SumIntererFn once the state no longer needs + // to create a StateTag at the point of restoring state. Currently StateTags are compared strictly + // by type and combiners always use KeyedCombineFnWithContext rather than KeyedCombineFn or CombineFn. + private static CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[], Integer> SUM_COMBINER = + new CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[], Integer>() { + @Override + public int[] createAccumulator(Object key, CombineWithContext.Context c) { + return new int[1]; + } + + @Override + public int[] addInput(Object key, int[] accumulator, Integer value, CombineWithContext.Context c) { + accumulator[0] += value; + return accumulator; + } + + @Override + public int[] mergeAccumulators(Object key, Iterable<int[]> accumulators, CombineWithContext.Context c) { + int[] r = new int[1]; + for (int[] a : accumulators) { + r[0] += a[0]; + } + return r; + } + + @Override + public Integer extractOutput(Object key, int[] accumulator, CombineWithContext.Context c) { + return accumulator[0]; + } + }; + + private static Coder<int[]> INT_ACCUM_CODER = DelegateCoder.of( + VarIntCoder.of(), + new DelegateCoder.CodingFunction<int[], Integer>() { + @Override + public Integer apply(int[] accumulator) { + return accumulator[0]; + } + }, + new DelegateCoder.CodingFunction<Integer, int[]>() { + @Override + public int[] apply(Integer value) { + int[] a = new int[1]; + a[0] = value; + return a; + } + }); + + private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR = + StateTags.value("stringValue", StringUtf8Coder.of()); + private static final StateTag<Object, ValueState<Integer>> INT_VALUE_ADDR = + StateTags.value("stringValue", VarIntCoder.of()); + private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>> SUM_INTEGER_ADDR = + StateTags.keyedCombiningValueWithContext("sumInteger", INT_ACCUM_CODER, SUM_COMBINER); + private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = + StateTags.bag("stringBag", StringUtf8Coder.of()); + private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_BAG_ADDR = + StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp()); + + private Map<String, FlinkStateInternals<String>> statePerKey = new HashMap<>(); + + private Map<String, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(); + + private void initializeStateAndTimers() throws CannotProvideCoderException { + for (int i = 0; i < 10; i++) { + String key = KEY_PREFIX + i; + + FlinkStateInternals state = initializeStateForKey(key); + Set<TimerInternals.TimerData> timers = new HashSet<>(); + for (int j = 0; j < 5; j++) { + TimerInternals.TimerData timer = TimerInternals + .TimerData.of(NAMESPACE_1, + new Instant(1000 + i + j), TimeDomain.values()[j % 3]); + timers.add(timer); + } + + statePerKey.put(key, state); + activeTimers.put(key, timers); + } + } + + private FlinkStateInternals<String> initializeStateForKey(String key) throws CannotProvideCoderException { + FlinkStateInternals<String> state = createState(key); + + ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR); + value.write("test"); + + ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR); + value2.write(4); + value2.write(5); + + AccumulatorCombiningState<Integer, int[], Integer> combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR); + combiningValue.add(1); + combiningValue.add(2); + + WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR); + watermark.add(new Instant(1000)); + + BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR); + bag.add("v1"); + bag.add("v2"); + bag.add("v3"); + bag.add("v4"); + return state; + } + + private boolean restoreAndTestState(DataInputView in) throws Exception { + StateCheckpointReader reader = new StateCheckpointReader(in); + final ClassLoader userClassloader = this.getClass().getClassLoader(); + Coder<? extends BoundedWindow> windowCoder = IntervalWindow.getCoder(); + Coder<String> keyCoder = StringUtf8Coder.of(); + + boolean comparisonRes = true; + + for (String key : statePerKey.keySet()) { + comparisonRes &= checkStateForKey(key); + } + + // restore the timers + Map<String, Set<TimerInternals.TimerData>> restoredTimersPerKey = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder); + if (activeTimers.size() != restoredTimersPerKey.size()) { + return false; + } + + for (String key : statePerKey.keySet()) { + Set<TimerInternals.TimerData> originalTimers = activeTimers.get(key); + Set<TimerInternals.TimerData> restoredTimers = restoredTimersPerKey.get(key); + comparisonRes &= checkTimersForKey(originalTimers, restoredTimers); + } + + // restore the state + Map<String, FlinkStateInternals<String>> restoredPerKeyState = + StateCheckpointUtils.decodeState(reader, OutputTimeFns.outputAtEarliestInputTimestamp(), keyCoder, windowCoder, userClassloader); + if (restoredPerKeyState.size() != statePerKey.size()) { + return false; + } + + for (String key : statePerKey.keySet()) { + FlinkStateInternals<String> originalState = statePerKey.get(key); + FlinkStateInternals<String> restoredState = restoredPerKeyState.get(key); + comparisonRes &= checkStateForKey(originalState, restoredState); + } + return comparisonRes; + } + + private boolean checkStateForKey(String key) throws CannotProvideCoderException { + FlinkStateInternals<String> state = statePerKey.get(key); + + ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR); + boolean comp = value.read().equals("test"); + + ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR); + comp &= value2.read().equals(5); + + AccumulatorCombiningState<Integer, int[], Integer> combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR); + comp &= combiningValue.read().equals(3); + + WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR); + comp &= watermark.read().equals(new Instant(1000)); + + BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR); + Iterator<String> it = bag.read().iterator(); + int i = 0; + while (it.hasNext()) { + comp &= it.next().equals("v" + (++i)); + } + return comp; + } + + private void storeState(AbstractStateBackend.CheckpointStateOutputView out) throws Exception { + StateCheckpointWriter checkpointBuilder = StateCheckpointWriter.create(out); + Coder<String> keyCoder = StringUtf8Coder.of(); + + // checkpoint the timers + StateCheckpointUtils.encodeTimers(activeTimers, checkpointBuilder, keyCoder); + + // checkpoint the state + StateCheckpointUtils.encodeState(statePerKey, checkpointBuilder, keyCoder); + } + + private boolean checkTimersForKey(Set<TimerInternals.TimerData> originalTimers, Set<TimerInternals.TimerData> restoredTimers) { + boolean comp = true; + if (restoredTimers == null) { + return false; + } + + if (originalTimers.size() != restoredTimers.size()) { + return false; + } + + for (TimerInternals.TimerData timer : originalTimers) { + comp &= restoredTimers.contains(timer); + } + return comp; + } + + private boolean checkStateForKey(FlinkStateInternals<String> originalState, FlinkStateInternals<String> restoredState) throws CannotProvideCoderException { + if (restoredState == null) { + return false; + } + + ValueState<String> orValue = originalState.state(NAMESPACE_1, STRING_VALUE_ADDR); + ValueState<String> resValue = restoredState.state(NAMESPACE_1, STRING_VALUE_ADDR); + boolean comp = orValue.read().equals(resValue.read()); + + ValueState<Integer> orIntValue = originalState.state(NAMESPACE_1, INT_VALUE_ADDR); + ValueState<Integer> resIntValue = restoredState.state(NAMESPACE_1, INT_VALUE_ADDR); + comp &= orIntValue.read().equals(resIntValue.read()); + + AccumulatorCombiningState<Integer, int[], Integer> combOrValue = originalState.state(NAMESPACE_1, SUM_INTEGER_ADDR); + AccumulatorCombiningState<Integer, int[], Integer> combResValue = restoredState.state(NAMESPACE_1, SUM_INTEGER_ADDR); + comp &= combOrValue.read().equals(combResValue.read()); + + WatermarkHoldState orWatermark = originalState.state(NAMESPACE_1, WATERMARK_BAG_ADDR); + WatermarkHoldState resWatermark = restoredState.state(NAMESPACE_1, WATERMARK_BAG_ADDR); + comp &= orWatermark.read().equals(resWatermark.read()); + + BagState<String> orBag = originalState.state(NAMESPACE_1, STRING_BAG_ADDR); + BagState<String> resBag = restoredState.state(NAMESPACE_1, STRING_BAG_ADDR); + + Iterator<String> orIt = orBag.read().iterator(); + Iterator<String> resIt = resBag.read().iterator(); + + while (orIt.hasNext() && resIt.hasNext()) { + comp &= orIt.next().equals(resIt.next()); + } + + return !((orIt.hasNext() && !resIt.hasNext()) || (!orIt.hasNext() && resIt.hasNext())) && comp; + } + + private FlinkStateInternals<String> createState(String key) throws CannotProvideCoderException { + return new FlinkStateInternals<>( + key, + StringUtf8Coder.of(), + IntervalWindow.getCoder(), + OutputTimeFns.outputAtEarliestInputTimestamp()); + } + + @Test + public void test() throws Exception { + StateSerializationTest test = new StateSerializationTest(); + test.initializeStateAndTimers(); - MemoryStateBackend.MemoryCheckpointOutputStream memBackend = new MemoryStateBackend.MemoryCheckpointOutputStream(32048); - AbstractStateBackend.CheckpointStateOutputView out = new AbstractStateBackend.CheckpointStateOutputView(memBackend); - - test.storeState(out); + MemoryStateBackend.MemoryCheckpointOutputStream memBackend = new MemoryStateBackend.MemoryCheckpointOutputStream(32048); + AbstractStateBackend.CheckpointStateOutputView out = new AbstractStateBackend.CheckpointStateOutputView(memBackend); + + test.storeState(out); - byte[] contents = memBackend.closeAndGetBytes(); - DataInputView in = new DataInputDeserializer(contents, 0, contents.length); + byte[] contents = memBackend.closeAndGetBytes(); + DataInputView in = new DataInputDeserializer(contents, 0, contents.length); - assertEquals(test.restoreAndTestState(in), true); - } + assertEquals(test.restoreAndTestState(in), true); + } }
