http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java new file mode 100644 index 0000000..b667187 --- /dev/null +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java @@ -0,0 +1,507 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow.streaming; + +import com.dataartisans.flink.dataflow.FlinkTestPipeline; +import com.dataartisans.flink.dataflow.translation.wrappers.streaming.FlinkGroupAlsoByWindowWrapper; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.KvCoder; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.coders.VarIntCoder; +import com.google.cloud.dataflow.sdk.transforms.Combine; +import com.google.cloud.dataflow.sdk.transforms.Sum; +import com.google.cloud.dataflow.sdk.transforms.windowing.*; +import com.google.cloud.dataflow.sdk.util.UserCodeException; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.common.base.Throwables; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; + +import java.util.Collection; +import java.util.Comparator; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class GroupAlsoByWindowTest { + + private final Combine.CombineFn combiner = new Sum.SumIntegerFn(); + + private final WindowingStrategy slidingWindowWithAfterWatermarkTriggerStrategy = + WindowingStrategy.of(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5))) + .withTrigger(AfterWatermark.pastEndOfWindow()).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES); + + private final WindowingStrategy sessionWindowingStrategy = + WindowingStrategy.of(Sessions.withGapDuration(Duration.standardSeconds(2))) + .withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) + .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.standardSeconds(100)); + + private final WindowingStrategy fixedWindowingStrategy = + WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(10))); + + private final WindowingStrategy fixedWindowWithCountTriggerStrategy = + fixedWindowingStrategy.withTrigger(AfterPane.elementCountAtLeast(5)); + + private final WindowingStrategy fixedWindowWithAfterWatermarkTriggerStrategy = + fixedWindowingStrategy.withTrigger(AfterWatermark.pastEndOfWindow()); + + private final WindowingStrategy fixedWindowWithCompoundTriggerStrategy = + fixedWindowingStrategy.withTrigger( + Repeatedly.forever(AfterFirst.of( + AfterPane.elementCountAtLeast(5), + AfterWatermark.pastEndOfWindow()))); + + /** + * The default accumulation mode is + * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#DISCARDING_FIRED_PANES}. + * This strategy changes it to + * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#ACCUMULATING_FIRED_PANES} + */ + private final WindowingStrategy fixedWindowWithCompoundTriggerStrategyAcc = + fixedWindowWithCompoundTriggerStrategy + .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES); + + @Test + public void testWithLateness() throws Exception { + WindowingStrategy strategy = WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(2))) + .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.millis(1000)); + long initialTime = 0L; + Pipeline pipeline = FlinkTestPipeline.create(); + + KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); + + FlinkGroupAlsoByWindowWrapper gbwOperaror = + FlinkGroupAlsoByWindowWrapper.createForTesting( + pipeline.getOptions(), + pipeline.getCoderRegistry(), + strategy, + inputCoder, + combiner.<String>asKeyedFn()); + + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + new OneInputStreamOperatorTestHarness<>(gbwOperaror); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processWatermark(new Watermark(initialTime + 2000)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processWatermark(new Watermark(initialTime + 4000)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 4), + new Instant(initialTime + 1), + new IntervalWindow(new Instant(0), new Instant(2000)), + PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0)) + , initialTime)); + expectedOutput.add(new Watermark(initialTime + 2000)); + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 6), + new Instant(initialTime + 1999), + new IntervalWindow(new Instant(0), new Instant(2000)), + PaneInfo.createPane(false, true, PaneInfo.Timing.LATE, 1, 1)) + , initialTime)); + + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 0), + new Instant(initialTime + 1999), + new IntervalWindow(new Instant(0), new Instant(2000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0, 0)) + , initialTime)); + expectedOutput.add(new Watermark(initialTime + 4000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testSessionWindows() throws Exception { + WindowingStrategy strategy = sessionWindowingStrategy; + + long initialTime = 0L; + Pipeline pipeline = FlinkTestPipeline.create(); + + KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); + + FlinkGroupAlsoByWindowWrapper gbwOperaror = + FlinkGroupAlsoByWindowWrapper.createForTesting( + pipeline.getOptions(), + pipeline.getCoderRegistry(), + strategy, + inputCoder, + combiner.<String>asKeyedFn()); + + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + new OneInputStreamOperatorTestHarness<>(gbwOperaror); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 3500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 3700), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 2700), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processWatermark(new Watermark(initialTime + 6000)); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 6700), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 6800), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 8900), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 7600), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 5600), null, PaneInfo.NO_FIRING), initialTime + 20)); + + testHarness.processWatermark(new Watermark(initialTime + 12000)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 6), + new Instant(initialTime + 1), + new IntervalWindow(new Instant(1), new Instant(5700)), + PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0)) + , initialTime)); + expectedOutput.add(new Watermark(initialTime + 6000)); + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 11), + new Instant(initialTime + 6700), + new IntervalWindow(new Instant(1), new Instant(10900)), + PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0)) + , initialTime)); + expectedOutput.add(new Watermark(initialTime + 12000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testSlidingWindows() throws Exception { + WindowingStrategy strategy = slidingWindowWithAfterWatermarkTriggerStrategy; + long initialTime = 0L; + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + createTestingOperatorAndState(strategy, initialTime); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + testHarness.processWatermark(new Watermark(initialTime + 25000)); + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 6), + new Instant(initialTime + 5000), + new IntervalWindow(new Instant(0), new Instant(10000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime)); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 6), + new Instant(initialTime + 1), + new IntervalWindow(new Instant(-5000), new Instant(5000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime)); + expectedOutput.add(new Watermark(initialTime + 10000)); + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 11), + new Instant(initialTime + 15000), + new IntervalWindow(new Instant(10000), new Instant(20000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime)); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 3), + new Instant(initialTime + 10000), + new IntervalWindow(new Instant(5000), new Instant(15000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime)); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key2", 1), + new Instant(initialTime + 19500), + new IntervalWindow(new Instant(10000), new Instant(20000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime)); + expectedOutput.add(new Watermark(initialTime + 20000)); + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key2", 1), + new Instant(initialTime + 20000), + /** + * this is 20000 and not 19500 because of a convention in dataflow where + * timestamps of windowed values in a window cannot be smaller than the + * end of a previous window. Checkout the documentation of the + * {@link WindowFn#getOutputTime(Instant, BoundedWindow)} + */ + new IntervalWindow(new Instant(15000), new Instant(25000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime)); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 8), + new Instant(initialTime + 20000), + new IntervalWindow(new Instant(15000), new Instant(25000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime)); + expectedOutput.add(new Watermark(initialTime + 25000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testAfterWatermarkProgram() throws Exception { + WindowingStrategy strategy = fixedWindowWithAfterWatermarkTriggerStrategy; + long initialTime = 0L; + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + createTestingOperatorAndState(strategy, initialTime); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6), + new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime)); + expectedOutput.add(new Watermark(initialTime + 10000)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11), + new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), + new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime)); + expectedOutput.add(new Watermark(initialTime + 20000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testAfterCountProgram() throws Exception { + WindowingStrategy strategy = fixedWindowWithCountTriggerStrategy; + + long initialTime = 0L; + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + createTestingOperatorAndState(strategy, initialTime); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime)); + expectedOutput.add(new Watermark(initialTime + 10000)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), + new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.LATE, 0, 0)), initialTime)); + expectedOutput.add(new Watermark(initialTime + 20000)); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.close(); + } + + @Test + public void testCompoundProgram() throws Exception { + WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategy; + + long initialTime = 0L; + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + createTestingOperatorAndState(strategy, initialTime); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + /** + * PaneInfo are: + * isFirst (pane in window), + * isLast, Timing (of triggering), + * index (of pane in the window), + * onTimeIndex (if it the 1st,2nd, ... pane that was fired on time) + * */ + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), + new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime)); + + expectedOutput.add(new Watermark(initialTime + 10000)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), + new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), + new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime)); + + expectedOutput.add(new Watermark(initialTime + 20000)); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.close(); + } + + @Test + public void testCompoundAccumulatingPanesProgram() throws Exception { + WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategyAcc; + long initialTime = 0L; + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + createTestingOperatorAndState(strategy, initialTime); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 10), + new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6), + new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime)); + + expectedOutput.add(new Watermark(initialTime + 10000)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11), + new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), + new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime)); + + expectedOutput.add(new Watermark(initialTime + 20000)); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.close(); + } + + private OneInputStreamOperatorTestHarness createTestingOperatorAndState(WindowingStrategy strategy, long initialTime) throws Exception { + Pipeline pipeline = FlinkTestPipeline.create(); + + KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); + + FlinkGroupAlsoByWindowWrapper gbwOperaror = + FlinkGroupAlsoByWindowWrapper.createForTesting( + pipeline.getOptions(), + pipeline.getCoderRegistry(), + strategy, + inputCoder, + combiner.<String>asKeyedFn()); + + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + new OneInputStreamOperatorTestHarness<>(gbwOperaror); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 10000), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 12100), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 14200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 15300), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 16500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + + testHarness.processWatermark(new Watermark(initialTime + 10000)); + testHarness.processWatermark(new Watermark(initialTime + 20000)); + + return testHarness; + } + + private static class ResultSortComparator implements Comparator<Object> { + @Override + public int compare(Object o1, Object o2) { + if (o1 instanceof Watermark && o2 instanceof Watermark) { + Watermark w1 = (Watermark) o1; + Watermark w2 = (Watermark) o2; + return (int) (w1.getTimestamp() - w2.getTimestamp()); + } else { + StreamRecord<WindowedValue<KV<String, Integer>>> sr0 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o1; + StreamRecord<WindowedValue<KV<String, Integer>>> sr1 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o2; + + if (sr0.getTimestamp() != sr1.getTimestamp()) { + return (int) (sr0.getTimestamp() - sr1.getTimestamp()); + } + + int comparison = sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey()); + if(comparison == 0) { + comparison = Integer.compare( + sr0.getValue().getValue().getValue(), + sr1.getValue().getValue().getValue()); + } + if(comparison == 0) { + Collection windowsA = sr0.getValue().getWindows(); + Collection windowsB = sr1.getValue().getWindows(); + + if(windowsA.size() != 1 || windowsB.size() != 1) { + throw new IllegalStateException("A value cannot belong to more than one windows after grouping."); + } + + BoundedWindow windowA = (BoundedWindow) windowsA.iterator().next(); + BoundedWindow windowB = (BoundedWindow) windowsB.iterator().next(); + comparison = Long.compare(windowA.maxTimestamp().getMillis(), windowB.maxTimestamp().getMillis()); + } + return comparison; + } + } + } + + private <T> WindowedValue<T> makeWindowedValue(WindowingStrategy strategy, + T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { + final Instant inputTimestamp = timestamp; + final WindowFn windowFn = strategy.getWindowFn(); + + if (timestamp == null) { + timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + if (windows == null) { + try { + windows = windowFn.assignWindows(windowFn.new AssignContext() { + @Override + public Object element() { + throw new UnsupportedOperationException( + "WindowFn attempted to access input element when none was available"); // TODO: 12/16/15 aljoscha's comment in slack + } + + @Override + public Instant timestamp() { + if (inputTimestamp == null) { + throw new UnsupportedOperationException( + "WindowFn attempted to access input timestamp when none was available"); + } + return inputTimestamp; + } + + @Override + public Collection<? extends BoundedWindow> windows() { + throw new UnsupportedOperationException( + "WindowFn attempted to access input windows when none were available"); + } + }); + } catch (Exception e) { + Throwables.propagateIfInstanceOf(e, UserCodeException.class); + throw new UserCodeException(e); + } + } + + return WindowedValue.of(output, timestamp, windows, pane); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java new file mode 100644 index 0000000..084ada2 --- /dev/null +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java @@ -0,0 +1,257 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow.streaming; + +import com.dataartisans.flink.dataflow.translation.wrappers.streaming.state.FlinkStateInternals; +import com.dataartisans.flink.dataflow.translation.wrappers.streaming.state.StateCheckpointReader; +import com.dataartisans.flink.dataflow.translation.wrappers.streaming.state.StateCheckpointUtils; +import com.dataartisans.flink.dataflow.translation.wrappers.streaming.state.StateCheckpointWriter; +import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.coders.VarIntCoder; +import com.google.cloud.dataflow.sdk.transforms.Combine; +import com.google.cloud.dataflow.sdk.transforms.Sum; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; +import com.google.cloud.dataflow.sdk.util.TimeDomain; +import com.google.cloud.dataflow.sdk.util.TimerInternals; +import com.google.cloud.dataflow.sdk.util.state.*; +import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.joda.time.Instant; +import org.junit.Test; + +import java.util.*; + +import static org.junit.Assert.assertEquals; + +public class StateSerializationTest { + + private static final StateNamespace NAMESPACE_1 = StateNamespaces.global(); + private static final String KEY_PREFIX = "TEST_"; + + private static final StateTag<ValueState<String>> STRING_VALUE_ADDR = + StateTags.value("stringValue", StringUtf8Coder.of()); + private static final StateTag<ValueState<Integer>> INT_VALUE_ADDR = + StateTags.value("stringValue", VarIntCoder.of()); + private static final StateTag<CombiningValueState<Integer, Integer>> SUM_INTEGER_ADDR = + StateTags.combiningValueFromInputInternal( + "sumInteger", VarIntCoder.of(), new Sum.SumIntegerFn()); + private static final StateTag<BagState<String>> STRING_BAG_ADDR = + StateTags.bag("stringBag", StringUtf8Coder.of()); + private static final StateTag<WatermarkStateInternal> WATERMARK_BAG_ADDR = + StateTags.watermarkStateInternal("watermark"); + + private Combine.CombineFn combiner = new Sum.SumIntegerFn(); + + private Map<String, FlinkStateInternals<String>> statePerKey = new HashMap<>(); + + private Map<String, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(); + + private void initializeStateAndTimers() throws CannotProvideCoderException { + for (int i = 0; i < 10; i++) { + String key = KEY_PREFIX + i; + + FlinkStateInternals state = initializeStateForKey(key); + Set<TimerInternals.TimerData> timers = new HashSet<>(); + for (int j = 0; j < 5; j++) { + TimerInternals.TimerData timer = TimerInternals + .TimerData.of(NAMESPACE_1, + new Instant(1000 + i + j), TimeDomain.values()[j % 3]); + timers.add(timer); + } + + statePerKey.put(key, state); + activeTimers.put(key, timers); + } + } + + private FlinkStateInternals<String> initializeStateForKey(String key) throws CannotProvideCoderException { + FlinkStateInternals<String> state = createState(key); + + ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR); + value.set("test"); + + ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR); + value2.set(4); + value2.set(5); + + CombiningValueState<Integer, Integer> combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR); + combiningValue.add(1); + combiningValue.add(2); + + WatermarkStateInternal watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR); + watermark.add(new Instant(1000)); + + BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR); + bag.add("v1"); + bag.add("v2"); + bag.add("v3"); + bag.add("v4"); + return state; + } + + private boolean restoreAndTestState(DataInputView in) throws Exception { + StateCheckpointReader reader = new StateCheckpointReader(in); + final ClassLoader userClassloader = this.getClass().getClassLoader(); + Coder<? extends BoundedWindow> windowCoder = IntervalWindow.getCoder(); + Coder<String> keyCoder = StringUtf8Coder.of(); + + boolean comparisonRes = true; + + for(String key: statePerKey.keySet()) { + comparisonRes &= checkStateForKey(key); + } + + // restore the timers + Map<String, Set<TimerInternals.TimerData>> restoredTimersPerKey = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder); + if(activeTimers.size() != restoredTimersPerKey.size()) { + return false; + } + + for(String key: statePerKey.keySet()) { + Set<TimerInternals.TimerData> originalTimers = activeTimers.get(key); + Set<TimerInternals.TimerData> restoredTimers = restoredTimersPerKey.get(key); + comparisonRes &= checkTimersForKey(originalTimers, restoredTimers); + } + + // restore the state + Map<String, FlinkStateInternals<String>> restoredPerKeyState = StateCheckpointUtils.decodeState(reader, combiner.asKeyedFn(), keyCoder, windowCoder, userClassloader); + if(restoredPerKeyState.size() != statePerKey.size()) { + return false; + } + + for(String key: statePerKey.keySet()) { + FlinkStateInternals<String> originalState = statePerKey.get(key); + FlinkStateInternals<String> restoredState = restoredPerKeyState.get(key); + comparisonRes &= checkStateForKey(originalState, restoredState); + } + return comparisonRes; + } + + private boolean checkStateForKey(String key) throws CannotProvideCoderException { + FlinkStateInternals<String> state = statePerKey.get(key); + + ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR); + boolean comp = value.get().read().equals("test"); + + ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR); + comp &= value2.get().read().equals(5); + + CombiningValueState<Integer, Integer> combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR); + comp &= combiningValue.get().read().equals(3); + + WatermarkStateInternal watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR); + comp &= watermark.get().read().equals(new Instant(1000)); + + BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR); + Iterator<String> it = bag.get().read().iterator(); + int i = 0; + while(it.hasNext()) { + comp &= it.next().equals("v"+ (++i)); + } + return comp; + } + + private void storeState(StateBackend.CheckpointStateOutputView out) throws Exception { + StateCheckpointWriter checkpointBuilder = StateCheckpointWriter.create(out); + Coder<String> keyCoder = StringUtf8Coder.of(); + + // checkpoint the timers + StateCheckpointUtils.encodeTimers(activeTimers, checkpointBuilder,keyCoder); + + // checkpoint the state + StateCheckpointUtils.encodeState(statePerKey, checkpointBuilder, keyCoder); + } + + private boolean checkTimersForKey(Set<TimerInternals.TimerData> originalTimers, Set<TimerInternals.TimerData> restoredTimers) { + boolean comp = true; + if(restoredTimers == null) { + return false; + } + + if(originalTimers.size() != restoredTimers.size()) { + return false; + } + + for(TimerInternals.TimerData timer: originalTimers) { + comp &= restoredTimers.contains(timer); + } + return comp; + } + + private boolean checkStateForKey(FlinkStateInternals<String> originalState, FlinkStateInternals<String> restoredState) throws CannotProvideCoderException { + if(restoredState == null) { + return false; + } + + ValueState<String> orValue = originalState.state(NAMESPACE_1, STRING_VALUE_ADDR); + ValueState<String> resValue = restoredState.state(NAMESPACE_1, STRING_VALUE_ADDR); + boolean comp = orValue.get().read().equals(resValue.get().read()); + + ValueState<Integer> orIntValue = originalState.state(NAMESPACE_1, INT_VALUE_ADDR); + ValueState<Integer> resIntValue = restoredState.state(NAMESPACE_1, INT_VALUE_ADDR); + comp &= orIntValue.get().read().equals(resIntValue.get().read()); + + CombiningValueState<Integer, Integer> combOrValue = originalState.state(NAMESPACE_1, SUM_INTEGER_ADDR); + CombiningValueState<Integer, Integer> combResValue = restoredState.state(NAMESPACE_1, SUM_INTEGER_ADDR); + comp &= combOrValue.get().read().equals(combResValue.get().read()); + + WatermarkStateInternal orWatermark = originalState.state(NAMESPACE_1, WATERMARK_BAG_ADDR); + WatermarkStateInternal resWatermark = restoredState.state(NAMESPACE_1, WATERMARK_BAG_ADDR); + comp &= orWatermark.get().read().equals(resWatermark.get().read()); + + BagState<String> orBag = originalState.state(NAMESPACE_1, STRING_BAG_ADDR); + BagState<String> resBag = restoredState.state(NAMESPACE_1, STRING_BAG_ADDR); + + Iterator<String> orIt = orBag.get().read().iterator(); + Iterator<String> resIt = resBag.get().read().iterator(); + + while (orIt.hasNext() && resIt.hasNext()) { + comp &= orIt.next().equals(resIt.next()); + } + + return !((orIt.hasNext() && !resIt.hasNext()) || (!orIt.hasNext() && resIt.hasNext())) && comp; + } + + private FlinkStateInternals<String> createState(String key) throws CannotProvideCoderException { + return new FlinkStateInternals<>( + key, + StringUtf8Coder.of(), + IntervalWindow.getCoder(), + combiner.asKeyedFn()); + } + + @Test + public void test() throws Exception { + StateSerializationTest test = new StateSerializationTest(); + test.initializeStateAndTimers(); + + MemoryStateBackend.MemoryCheckpointOutputStream memBackend = new MemoryStateBackend.MemoryCheckpointOutputStream(25728); + StateBackend.CheckpointStateOutputView out = new StateBackend.CheckpointStateOutputView(memBackend); + + test.storeState(out); + + byte[] contents = memBackend.closeAndGetBytes(); + ByteArrayInputView in = new ByteArrayInputView(contents); + + assertEquals(test.restoreAndTestState(in), true); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java index cbf5d77..74f754b 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java @@ -76,8 +76,8 @@ public class JoinExamples { KV<String, CoGbkResult> e = c.element(); CoGbkResult val = e.getValue(); String countryCode = e.getKey(); - String countryName; - countryName = e.getValue().getOnly(countryInfoTag); + String countryName = "none"; + countryName = e.getValue().getOnly(countryInfoTag, "Kostas"); for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) { // Generate a string that combines information from both collection values c.output(KV.of(countryCode, "Country name: " + countryName
