http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java new file mode 100644 index 0000000..e73c456 --- /dev/null +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.Count; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; +import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; +import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import com.google.common.base.Joiner; +import org.apache.flink.test.util.JavaProgramTestBase; + + +public class WordCountJoin2ITCase extends JavaProgramTestBase { + + static final String[] WORDS_1 = new String[] { + "hi there", "hi", "hi sue bob", + "hi sue", "", "bob hi"}; + + static final String[] WORDS_2 = new String[] { + "hi tim", "beauty", "hooray sue bob", + "hi there", "", "please say hi"}; + + static final String[] RESULTS = new String[] { + "beauty -> Tag1: Tag2: 1", + "bob -> Tag1: 2 Tag2: 1", + "hi -> Tag1: 5 Tag2: 3", + "hooray -> Tag1: Tag2: 1", + "please -> Tag1: Tag2: 1", + "say -> Tag1: Tag2: 1", + "sue -> Tag1: 2 Tag2: 1", + "there -> Tag1: 1 Tag2: 1", + "tim -> Tag1: Tag2: 1" + }; + + static final TupleTag<Long> tag1 = new TupleTag<>("Tag1"); + static final TupleTag<Long> tag2 = new TupleTag<>("Tag2"); + + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath); + } + + @Override + protected void testProgram() throws Exception { + Pipeline p = FlinkTestPipeline.createForBatch(); + + /* Create two PCollections and join them */ + PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1)) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Count.<String>perElement()); + + PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2)) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Count.<String>perElement()); + + /* CoGroup the two collections */ + PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple + .of(tag1, occurences1) + .and(tag2, occurences2) + .apply(CoGroupByKey.<String>create()); + + /* Format output */ + mergedOccurences.apply(ParDo.of(new FormatCountsFn())) + .apply(TextIO.Write.named("test").to(resultPath)); + + p.run(); + } + + + static class ExtractWordsFn extends DoFn<String, String> { + + @Override + public void startBundle(Context c) { + } + + @Override + public void processElement(ProcessContext c) { + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> { + @Override + public void processElement(ProcessContext c) { + CoGbkResult value = c.element().getValue(); + String key = c.element().getKey(); + String countTag1 = tag1.getId() + ": "; + String countTag2 = tag2.getId() + ": "; + for (Long count : value.getAll(tag1)) { + countTag1 += count + " "; + } + for (Long count : value.getAll(tag2)) { + countTag2 += count; + } + c.output(key + " -> " + countTag1 + countTag2); + } + } + + +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java new file mode 100644 index 0000000..6b57d77 --- /dev/null +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.Count; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; +import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; +import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import com.google.common.base.Joiner; +import org.apache.flink.test.util.JavaProgramTestBase; + + +public class WordCountJoin3ITCase extends JavaProgramTestBase { + + static final String[] WORDS_1 = new String[] { + "hi there", "hi", "hi sue bob", + "hi sue", "", "bob hi"}; + + static final String[] WORDS_2 = new String[] { + "hi tim", "beauty", "hooray sue bob", + "hi there", "", "please say hi"}; + + static final String[] WORDS_3 = new String[] { + "hi stephan", "beauty", "hooray big fabian", + "hi yo", "", "please say hi"}; + + static final String[] RESULTS = new String[] { + "beauty -> Tag1: Tag2: 1 Tag3: 1", + "bob -> Tag1: 2 Tag2: 1 Tag3: ", + "hi -> Tag1: 5 Tag2: 3 Tag3: 3", + "hooray -> Tag1: Tag2: 1 Tag3: 1", + "please -> Tag1: Tag2: 1 Tag3: 1", + "say -> Tag1: Tag2: 1 Tag3: 1", + "sue -> Tag1: 2 Tag2: 1 Tag3: ", + "there -> Tag1: 1 Tag2: 1 Tag3: ", + "tim -> Tag1: Tag2: 1 Tag3: ", + "stephan -> Tag1: Tag2: Tag3: 1", + "yo -> Tag1: Tag2: Tag3: 1", + "fabian -> Tag1: Tag2: Tag3: 1", + "big -> Tag1: Tag2: Tag3: 1" + }; + + static final TupleTag<Long> tag1 = new TupleTag<>("Tag1"); + static final TupleTag<Long> tag2 = new TupleTag<>("Tag2"); + static final TupleTag<Long> tag3 = new TupleTag<>("Tag3"); + + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath); + } + + @Override + protected void testProgram() throws Exception { + + Pipeline p = FlinkTestPipeline.createForBatch(); + + /* Create two PCollections and join them */ + PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1)) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Count.<String>perElement()); + + PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2)) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Count.<String>perElement()); + + PCollection<KV<String,Long>> occurences3 = p.apply(Create.of(WORDS_3)) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Count.<String>perElement()); + + /* CoGroup the two collections */ + PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple + .of(tag1, occurences1) + .and(tag2, occurences2) + .and(tag3, occurences3) + .apply(CoGroupByKey.<String>create()); + + /* Format output */ + mergedOccurences.apply(ParDo.of(new FormatCountsFn())) + .apply(TextIO.Write.named("test").to(resultPath)); + + p.run(); + } + + + static class ExtractWordsFn extends DoFn<String, String> { + + @Override + public void startBundle(Context c) { + } + + @Override + public void processElement(ProcessContext c) { + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> { + @Override + public void processElement(ProcessContext c) { + CoGbkResult value = c.element().getValue(); + String key = c.element().getKey(); + String countTag1 = tag1.getId() + ": "; + String countTag2 = tag2.getId() + ": "; + String countTag3 = tag3.getId() + ": "; + for (Long count : value.getAll(tag1)) { + countTag1 += count + " "; + } + for (Long count : value.getAll(tag2)) { + countTag2 += count + " "; + } + for (Long count : value.getAll(tag3)) { + countTag3 += count; + } + c.output(key + " -> " + countTag1 + countTag2 + countTag3); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java new file mode 100644 index 0000000..dfa15ce --- /dev/null +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.flink; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.io.Sink; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.Write; +import com.google.common.base.Joiner; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.test.util.JavaProgramTestBase; + +import java.io.File; +import java.io.PrintWriter; +import java.net.URI; + +import static org.junit.Assert.*; + +/** + * Tests the translation of custom Write.Bound sinks. + */ +public class WriteSinkITCase extends JavaProgramTestBase { + + protected String resultPath; + + public WriteSinkITCase(){ + } + + static final String[] EXPECTED_RESULT = new String[] { + "Joe red 3", "Mary blue 4", "Max yellow 23"}; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } + + @Override + protected void testProgram() throws Exception { + runProgram(resultPath); + } + + private static void runProgram(String resultPath) { + Pipeline p = FlinkTestPipeline.createForBatch(); + + p.apply(Create.of(EXPECTED_RESULT)).setCoder(StringUtf8Coder.of()) + .apply("CustomSink", Write.to(new MyCustomSink(resultPath))); + + p.run(); + } + + /** + * Simple custom sink which writes to a file. + */ + private static class MyCustomSink extends Sink<String> { + + private final String resultPath; + + public MyCustomSink(String resultPath) { + this.resultPath = resultPath; + } + + @Override + public void validate(PipelineOptions options) { + assertNotNull(options); + } + + @Override + public WriteOperation<String, ?> createWriteOperation(PipelineOptions options) { + return new MyWriteOperation(); + } + + private class MyWriteOperation extends WriteOperation<String, String> { + + @Override + public Coder<String> getWriterResultCoder() { + return StringUtf8Coder.of(); + } + + @Override + public void initialize(PipelineOptions options) throws Exception { + + } + + @Override + public void finalize(Iterable<String> writerResults, PipelineOptions options) throws Exception { + + } + + @Override + public Writer<String, String> createWriter(PipelineOptions options) throws Exception { + return new MyWriter(); + } + + @Override + public Sink<String> getSink() { + return MyCustomSink.this; + } + + /** + * Simple Writer which writes to a file. + */ + private class MyWriter extends Writer<String, String> { + + private PrintWriter internalWriter; + + @Override + public void open(String uId) throws Exception { + Path path = new Path(resultPath + "/" + uId); + FileSystem.get(new URI("file:///")).create(path, false); + internalWriter = new PrintWriter(new File(path.toUri())); + } + + @Override + public void write(String value) throws Exception { + internalWriter.println(value); + } + + @Override + public String close() throws Exception { + internalWriter.close(); + return resultPath; + } + + @Override + public WriteOperation<String, String> getWriteOperation() { + return MyWriteOperation.this; + } + } + } + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java new file mode 100644 index 0000000..880da59 --- /dev/null +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import org.apache.beam.runners.flink.FlinkTestPipeline; +import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkGroupAlsoByWindowWrapper; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.KvCoder; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.coders.VarIntCoder; +import com.google.cloud.dataflow.sdk.transforms.Combine; +import com.google.cloud.dataflow.sdk.transforms.Sum; +import com.google.cloud.dataflow.sdk.transforms.windowing.*; +import com.google.cloud.dataflow.sdk.util.UserCodeException; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.common.base.Throwables; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; + +import java.util.Collection; +import java.util.Comparator; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class GroupAlsoByWindowTest { + + private final Combine.CombineFn combiner = new Sum.SumIntegerFn(); + + private final WindowingStrategy slidingWindowWithAfterWatermarkTriggerStrategy = + WindowingStrategy.of(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5))) + .withTrigger(AfterWatermark.pastEndOfWindow()).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES); + + private final WindowingStrategy sessionWindowingStrategy = + WindowingStrategy.of(Sessions.withGapDuration(Duration.standardSeconds(2))) + .withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) + .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.standardSeconds(100)); + + private final WindowingStrategy fixedWindowingStrategy = + WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(10))); + + private final WindowingStrategy fixedWindowWithCountTriggerStrategy = + fixedWindowingStrategy.withTrigger(AfterPane.elementCountAtLeast(5)); + + private final WindowingStrategy fixedWindowWithAfterWatermarkTriggerStrategy = + fixedWindowingStrategy.withTrigger(AfterWatermark.pastEndOfWindow()); + + private final WindowingStrategy fixedWindowWithCompoundTriggerStrategy = + fixedWindowingStrategy.withTrigger( + AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(5)) + .withLateFirings(AfterPane.elementCountAtLeast(5)).buildTrigger()); + + /** + * The default accumulation mode is + * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#DISCARDING_FIRED_PANES}. + * This strategy changes it to + * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#ACCUMULATING_FIRED_PANES} + */ + private final WindowingStrategy fixedWindowWithCompoundTriggerStrategyAcc = + fixedWindowWithCompoundTriggerStrategy + .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES); + + @Test + public void testWithLateness() throws Exception { + WindowingStrategy strategy = WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(2))) + .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.millis(1000)); + long initialTime = 0L; + Pipeline pipeline = FlinkTestPipeline.createForStreaming(); + + KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); + + FlinkGroupAlsoByWindowWrapper gbwOperaror = + FlinkGroupAlsoByWindowWrapper.createForTesting( + pipeline.getOptions(), + pipeline.getCoderRegistry(), + strategy, + inputCoder, + combiner.<String>asKeyedFn()); + + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + new OneInputStreamOperatorTestHarness<>(gbwOperaror); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processWatermark(new Watermark(initialTime + 2000)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processWatermark(new Watermark(initialTime + 4000)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 4), + new Instant(initialTime + 1), + new IntervalWindow(new Instant(0), new Instant(2000)), + PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0)) + , initialTime + 1)); + expectedOutput.add(new Watermark(initialTime + 2000)); + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 5), + new Instant(initialTime + 1999), + new IntervalWindow(new Instant(0), new Instant(2000)), + PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 1, 1)) + , initialTime + 1999)); + + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 6), + new Instant(initialTime + 1999), + new IntervalWindow(new Instant(0), new Instant(2000)), + PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 2, 2)) + , initialTime + 1999)); + expectedOutput.add(new Watermark(initialTime + 4000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testSessionWindows() throws Exception { + WindowingStrategy strategy = sessionWindowingStrategy; + + long initialTime = 0L; + Pipeline pipeline = FlinkTestPipeline.createForStreaming(); + + KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); + + FlinkGroupAlsoByWindowWrapper gbwOperaror = + FlinkGroupAlsoByWindowWrapper.createForTesting( + pipeline.getOptions(), + pipeline.getCoderRegistry(), + strategy, + inputCoder, + combiner.<String>asKeyedFn()); + + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + new OneInputStreamOperatorTestHarness<>(gbwOperaror); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 3500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 3700), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 2700), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processWatermark(new Watermark(initialTime + 6000)); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 6700), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 6800), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 8900), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 7600), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 5600), null, PaneInfo.NO_FIRING), initialTime + 20)); + + testHarness.processWatermark(new Watermark(initialTime + 12000)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 6), + new Instant(initialTime + 1), + new IntervalWindow(new Instant(1), new Instant(5700)), + PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0)) + , initialTime + 1)); + expectedOutput.add(new Watermark(initialTime + 6000)); + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 11), + new Instant(initialTime + 6700), + new IntervalWindow(new Instant(1), new Instant(10900)), + PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0)) + , initialTime + 6700)); + expectedOutput.add(new Watermark(initialTime + 12000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testSlidingWindows() throws Exception { + WindowingStrategy strategy = slidingWindowWithAfterWatermarkTriggerStrategy; + long initialTime = 0L; + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + createTestingOperatorAndState(strategy, initialTime); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + testHarness.processWatermark(new Watermark(initialTime + 25000)); + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 6), + new Instant(initialTime + 5000), + new IntervalWindow(new Instant(0), new Instant(10000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime + 5000)); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 6), + new Instant(initialTime + 1), + new IntervalWindow(new Instant(-5000), new Instant(5000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime + 1)); + expectedOutput.add(new Watermark(initialTime + 10000)); + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 11), + new Instant(initialTime + 15000), + new IntervalWindow(new Instant(10000), new Instant(20000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime + 15000)); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 3), + new Instant(initialTime + 10000), + new IntervalWindow(new Instant(5000), new Instant(15000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime + 10000)); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key2", 1), + new Instant(initialTime + 19500), + new IntervalWindow(new Instant(10000), new Instant(20000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime + 19500)); + expectedOutput.add(new Watermark(initialTime + 20000)); + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key2", 1), + new Instant(initialTime + 20000), + /** + * this is 20000 and not 19500 because of a convention in dataflow where + * timestamps of windowed values in a window cannot be smaller than the + * end of a previous window. Checkout the documentation of the + * {@link WindowFn#getOutputTime(Instant, BoundedWindow)} + */ + new IntervalWindow(new Instant(15000), new Instant(25000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime + 20000)); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 8), + new Instant(initialTime + 20000), + new IntervalWindow(new Instant(15000), new Instant(25000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime + 20000)); + expectedOutput.add(new Watermark(initialTime + 25000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testAfterWatermarkProgram() throws Exception { + WindowingStrategy strategy = fixedWindowWithAfterWatermarkTriggerStrategy; + long initialTime = 0L; + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + createTestingOperatorAndState(strategy, initialTime); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6), + new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 1)); + expectedOutput.add(new Watermark(initialTime + 10000)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11), + new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 10000)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), + new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500)); + expectedOutput.add(new Watermark(initialTime + 20000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testAfterCountProgram() throws Exception { + WindowingStrategy strategy = fixedWindowWithCountTriggerStrategy; + + long initialTime = 0L; + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + createTestingOperatorAndState(strategy, initialTime); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 1)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 10000)); + expectedOutput.add(new Watermark(initialTime + 10000)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), + new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0, 0)), initialTime + 19500)); + expectedOutput.add(new Watermark(initialTime + 20000)); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.close(); + } + + @Test + public void testCompoundProgram() throws Exception { + WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategy; + + long initialTime = 0L; + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + createTestingOperatorAndState(strategy, initialTime); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + /** + * PaneInfo are: + * isFirst (pane in window), + * isLast, Timing (of triggering), + * index (of pane in the window), + * onTimeIndex (if it the 1st,2nd, ... pane that was fired on time) + * */ + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), + new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200)); + + expectedOutput.add(new Watermark(initialTime + 10000)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), + new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), + new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500)); + + expectedOutput.add(new Watermark(initialTime + 20000)); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.close(); + } + + @Test + public void testCompoundAccumulatingPanesProgram() throws Exception { + WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategyAcc; + long initialTime = 0L; + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + createTestingOperatorAndState(strategy, initialTime); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 10), + new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6), + new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200)); + + expectedOutput.add(new Watermark(initialTime + 10000)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11), + new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), + new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500)); + + expectedOutput.add(new Watermark(initialTime + 20000)); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.close(); + } + + private OneInputStreamOperatorTestHarness createTestingOperatorAndState(WindowingStrategy strategy, long initialTime) throws Exception { + Pipeline pipeline = FlinkTestPipeline.createForStreaming(); + + KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); + + FlinkGroupAlsoByWindowWrapper gbwOperaror = + FlinkGroupAlsoByWindowWrapper.createForTesting( + pipeline.getOptions(), + pipeline.getCoderRegistry(), + strategy, + inputCoder, + combiner.<String>asKeyedFn()); + + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + new OneInputStreamOperatorTestHarness<>(gbwOperaror); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 10000), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 12100), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 14200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 15300), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 16500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + + testHarness.processWatermark(new Watermark(initialTime + 10000)); + testHarness.processWatermark(new Watermark(initialTime + 20000)); + + return testHarness; + } + + private static class ResultSortComparator implements Comparator<Object> { + @Override + public int compare(Object o1, Object o2) { + if (o1 instanceof Watermark && o2 instanceof Watermark) { + Watermark w1 = (Watermark) o1; + Watermark w2 = (Watermark) o2; + return (int) (w1.getTimestamp() - w2.getTimestamp()); + } else { + StreamRecord<WindowedValue<KV<String, Integer>>> sr0 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o1; + StreamRecord<WindowedValue<KV<String, Integer>>> sr1 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o2; + + int comparison = (int) (sr0.getValue().getTimestamp().getMillis() - sr1.getValue().getTimestamp().getMillis()); + if (comparison != 0) { + return comparison; + } + + comparison = sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey()); + if(comparison == 0) { + comparison = Integer.compare( + sr0.getValue().getValue().getValue(), + sr1.getValue().getValue().getValue()); + } + if(comparison == 0) { + Collection windowsA = sr0.getValue().getWindows(); + Collection windowsB = sr1.getValue().getWindows(); + + if(windowsA.size() != 1 || windowsB.size() != 1) { + throw new IllegalStateException("A value cannot belong to more than one windows after grouping."); + } + + BoundedWindow windowA = (BoundedWindow) windowsA.iterator().next(); + BoundedWindow windowB = (BoundedWindow) windowsB.iterator().next(); + comparison = Long.compare(windowA.maxTimestamp().getMillis(), windowB.maxTimestamp().getMillis()); + } + return comparison; + } + } + } + + private <T> WindowedValue<T> makeWindowedValue(WindowingStrategy strategy, + T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { + final Instant inputTimestamp = timestamp; + final WindowFn windowFn = strategy.getWindowFn(); + + if (timestamp == null) { + timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + if (windows == null) { + try { + windows = windowFn.assignWindows(windowFn.new AssignContext() { + @Override + public Object element() { + throw new UnsupportedOperationException( + "WindowFn attempted to access input element when none was available"); + } + + @Override + public Instant timestamp() { + if (inputTimestamp == null) { + throw new UnsupportedOperationException( + "WindowFn attempted to access input timestamp when none was available"); + } + return inputTimestamp; + } + + @Override + public Collection<? extends BoundedWindow> windows() { + throw new UnsupportedOperationException( + "WindowFn attempted to access input windows when none were available"); + } + }); + } catch (Exception e) { + throw UserCodeException.wrap(e); + } + } + + return WindowedValue.of(output, timestamp, windows, pane); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java new file mode 100644 index 0000000..63e0bcf --- /dev/null +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import org.apache.beam.runners.flink.FlinkTestPipeline; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.GroupByKey; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.base.Joiner; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.joda.time.Duration; +import org.joda.time.Instant; + +import java.io.Serializable; +import java.util.Arrays; + +public class GroupByNullKeyTest extends StreamingProgramTestBase implements Serializable { + + + protected String resultPath; + + static final String[] EXPECTED_RESULT = new String[] { + "k: null v: user1 user1 user1 user2 user2 user2 user2 user3" + }; + + public GroupByNullKeyTest(){ + } + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } + + public static class ExtractUserAndTimestamp extends DoFn<KV<Integer, String>, String> { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + KV<Integer, String> record = c.element(); + long now = System.currentTimeMillis(); + int timestamp = record.getKey(); + String userName = record.getValue(); + if (userName != null) { + // Sets the implicit timestamp field to be used in windowing. + c.outputWithTimestamp(userName, new Instant(timestamp + now)); + } + } + } + + @Override + protected void testProgram() throws Exception { + + Pipeline p = FlinkTestPipeline.createForStreaming(); + + PCollection<String> output = + p.apply(Create.of(Arrays.asList( + KV.<Integer, String>of(0, "user1"), + KV.<Integer, String>of(1, "user1"), + KV.<Integer, String>of(2, "user1"), + KV.<Integer, String>of(10, "user2"), + KV.<Integer, String>of(1, "user2"), + KV.<Integer, String>of(15000, "user2"), + KV.<Integer, String>of(12000, "user2"), + KV.<Integer, String>of(25000, "user3")))) + .apply(ParDo.of(new ExtractUserAndTimestamp())) + .apply(Window.<String>into(FixedWindows.of(Duration.standardHours(1))) + .triggering(AfterWatermark.pastEndOfWindow()) + .withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()) + + .apply(ParDo.of(new DoFn<String, KV<Void, String>>() { + @Override + public void processElement(ProcessContext c) throws Exception { + String elem = c.element(); + c.output(KV.<Void, String>of((Void) null, elem)); + } + })) + .apply(GroupByKey.<Void, String>create()) + .apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + KV<Void, Iterable<String>> elem = c.element(); + StringBuilder str = new StringBuilder(); + str.append("k: " + elem.getKey() + " v:"); + for (String v : elem.getValue()) { + str.append(" " + v); + } + c.output(str.toString()); + } + })); + output.apply(TextIO.Write.to(resultPath)); + p.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java new file mode 100644 index 0000000..77a8de6 --- /dev/null +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.StateCheckpointReader; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.StateCheckpointUtils; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.StateCheckpointWriter; +import com.google.cloud.dataflow.sdk.coders.*; +import com.google.cloud.dataflow.sdk.transforms.Combine; +import com.google.cloud.dataflow.sdk.transforms.CombineWithContext; +import com.google.cloud.dataflow.sdk.transforms.Sum; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns; +import com.google.cloud.dataflow.sdk.util.TimeDomain; +import com.google.cloud.dataflow.sdk.util.TimerInternals; +import com.google.cloud.dataflow.sdk.util.state.*; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.util.DataInputDeserializer; +import org.joda.time.Instant; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.*; + +import static org.junit.Assert.assertEquals; + +public class StateSerializationTest { + + private static final StateNamespace NAMESPACE_1 = StateNamespaces.global(); + private static final String KEY_PREFIX = "TEST_"; + + // TODO: This can be replaced with the standard Sum.SumIntererFn once the state no longer needs + // to create a StateTag at the point of restoring state. Currently StateTags are compared strictly + // by type and combiners always use KeyedCombineFnWithContext rather than KeyedCombineFn or CombineFn. + private static CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[], Integer> SUM_COMBINER = + new CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[], Integer>() { + @Override + public int[] createAccumulator(Object key, CombineWithContext.Context c) { + return new int[1]; + } + + @Override + public int[] addInput(Object key, int[] accumulator, Integer value, CombineWithContext.Context c) { + accumulator[0] += value; + return accumulator; + } + + @Override + public int[] mergeAccumulators(Object key, Iterable<int[]> accumulators, CombineWithContext.Context c) { + int[] r = new int[1]; + for (int[] a : accumulators) { + r[0] += a[0]; + } + return r; + } + + @Override + public Integer extractOutput(Object key, int[] accumulator, CombineWithContext.Context c) { + return accumulator[0]; + } + }; + + private static Coder<int[]> INT_ACCUM_CODER = DelegateCoder.of( + VarIntCoder.of(), + new DelegateCoder.CodingFunction<int[], Integer>() { + @Override + public Integer apply(int[] accumulator) { + return accumulator[0]; + } + }, + new DelegateCoder.CodingFunction<Integer, int[]>() { + @Override + public int[] apply(Integer value) { + int[] a = new int[1]; + a[0] = value; + return a; + } + }); + + private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR = + StateTags.value("stringValue", StringUtf8Coder.of()); + private static final StateTag<Object, ValueState<Integer>> INT_VALUE_ADDR = + StateTags.value("stringValue", VarIntCoder.of()); + private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>> SUM_INTEGER_ADDR = + StateTags.keyedCombiningValueWithContext("sumInteger", INT_ACCUM_CODER, SUM_COMBINER); + private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = + StateTags.bag("stringBag", StringUtf8Coder.of()); + private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_BAG_ADDR = + StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp()); + + private Map<String, FlinkStateInternals<String>> statePerKey = new HashMap<>(); + + private Map<String, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(); + + private void initializeStateAndTimers() throws CannotProvideCoderException { + for (int i = 0; i < 10; i++) { + String key = KEY_PREFIX + i; + + FlinkStateInternals state = initializeStateForKey(key); + Set<TimerInternals.TimerData> timers = new HashSet<>(); + for (int j = 0; j < 5; j++) { + TimerInternals.TimerData timer = TimerInternals + .TimerData.of(NAMESPACE_1, + new Instant(1000 + i + j), TimeDomain.values()[j % 3]); + timers.add(timer); + } + + statePerKey.put(key, state); + activeTimers.put(key, timers); + } + } + + private FlinkStateInternals<String> initializeStateForKey(String key) throws CannotProvideCoderException { + FlinkStateInternals<String> state = createState(key); + + ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR); + value.write("test"); + + ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR); + value2.write(4); + value2.write(5); + + AccumulatorCombiningState<Integer, int[], Integer> combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR); + combiningValue.add(1); + combiningValue.add(2); + + WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR); + watermark.add(new Instant(1000)); + + BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR); + bag.add("v1"); + bag.add("v2"); + bag.add("v3"); + bag.add("v4"); + return state; + } + + private boolean restoreAndTestState(DataInputView in) throws Exception { + StateCheckpointReader reader = new StateCheckpointReader(in); + final ClassLoader userClassloader = this.getClass().getClassLoader(); + Coder<? extends BoundedWindow> windowCoder = IntervalWindow.getCoder(); + Coder<String> keyCoder = StringUtf8Coder.of(); + + boolean comparisonRes = true; + + for (String key : statePerKey.keySet()) { + comparisonRes &= checkStateForKey(key); + } + + // restore the timers + Map<String, Set<TimerInternals.TimerData>> restoredTimersPerKey = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder); + if (activeTimers.size() != restoredTimersPerKey.size()) { + return false; + } + + for (String key : statePerKey.keySet()) { + Set<TimerInternals.TimerData> originalTimers = activeTimers.get(key); + Set<TimerInternals.TimerData> restoredTimers = restoredTimersPerKey.get(key); + comparisonRes &= checkTimersForKey(originalTimers, restoredTimers); + } + + // restore the state + Map<String, FlinkStateInternals<String>> restoredPerKeyState = + StateCheckpointUtils.decodeState(reader, OutputTimeFns.outputAtEarliestInputTimestamp(), keyCoder, windowCoder, userClassloader); + if (restoredPerKeyState.size() != statePerKey.size()) { + return false; + } + + for (String key : statePerKey.keySet()) { + FlinkStateInternals<String> originalState = statePerKey.get(key); + FlinkStateInternals<String> restoredState = restoredPerKeyState.get(key); + comparisonRes &= checkStateForKey(originalState, restoredState); + } + return comparisonRes; + } + + private boolean checkStateForKey(String key) throws CannotProvideCoderException { + FlinkStateInternals<String> state = statePerKey.get(key); + + ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR); + boolean comp = value.read().equals("test"); + + ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR); + comp &= value2.read().equals(5); + + AccumulatorCombiningState<Integer, int[], Integer> combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR); + comp &= combiningValue.read().equals(3); + + WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR); + comp &= watermark.read().equals(new Instant(1000)); + + BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR); + Iterator<String> it = bag.read().iterator(); + int i = 0; + while (it.hasNext()) { + comp &= it.next().equals("v" + (++i)); + } + return comp; + } + + private void storeState(AbstractStateBackend.CheckpointStateOutputView out) throws Exception { + StateCheckpointWriter checkpointBuilder = StateCheckpointWriter.create(out); + Coder<String> keyCoder = StringUtf8Coder.of(); + + // checkpoint the timers + StateCheckpointUtils.encodeTimers(activeTimers, checkpointBuilder, keyCoder); + + // checkpoint the state + StateCheckpointUtils.encodeState(statePerKey, checkpointBuilder, keyCoder); + } + + private boolean checkTimersForKey(Set<TimerInternals.TimerData> originalTimers, Set<TimerInternals.TimerData> restoredTimers) { + boolean comp = true; + if (restoredTimers == null) { + return false; + } + + if (originalTimers.size() != restoredTimers.size()) { + return false; + } + + for (TimerInternals.TimerData timer : originalTimers) { + comp &= restoredTimers.contains(timer); + } + return comp; + } + + private boolean checkStateForKey(FlinkStateInternals<String> originalState, FlinkStateInternals<String> restoredState) throws CannotProvideCoderException { + if (restoredState == null) { + return false; + } + + ValueState<String> orValue = originalState.state(NAMESPACE_1, STRING_VALUE_ADDR); + ValueState<String> resValue = restoredState.state(NAMESPACE_1, STRING_VALUE_ADDR); + boolean comp = orValue.read().equals(resValue.read()); + + ValueState<Integer> orIntValue = originalState.state(NAMESPACE_1, INT_VALUE_ADDR); + ValueState<Integer> resIntValue = restoredState.state(NAMESPACE_1, INT_VALUE_ADDR); + comp &= orIntValue.read().equals(resIntValue.read()); + + AccumulatorCombiningState<Integer, int[], Integer> combOrValue = originalState.state(NAMESPACE_1, SUM_INTEGER_ADDR); + AccumulatorCombiningState<Integer, int[], Integer> combResValue = restoredState.state(NAMESPACE_1, SUM_INTEGER_ADDR); + comp &= combOrValue.read().equals(combResValue.read()); + + WatermarkHoldState orWatermark = originalState.state(NAMESPACE_1, WATERMARK_BAG_ADDR); + WatermarkHoldState resWatermark = restoredState.state(NAMESPACE_1, WATERMARK_BAG_ADDR); + comp &= orWatermark.read().equals(resWatermark.read()); + + BagState<String> orBag = originalState.state(NAMESPACE_1, STRING_BAG_ADDR); + BagState<String> resBag = restoredState.state(NAMESPACE_1, STRING_BAG_ADDR); + + Iterator<String> orIt = orBag.read().iterator(); + Iterator<String> resIt = resBag.read().iterator(); + + while (orIt.hasNext() && resIt.hasNext()) { + comp &= orIt.next().equals(resIt.next()); + } + + return !((orIt.hasNext() && !resIt.hasNext()) || (!orIt.hasNext() && resIt.hasNext())) && comp; + } + + private FlinkStateInternals<String> createState(String key) throws CannotProvideCoderException { + return new FlinkStateInternals<>( + key, + StringUtf8Coder.of(), + IntervalWindow.getCoder(), + OutputTimeFns.outputAtEarliestInputTimestamp()); + } + + @Test + public void test() throws Exception { + StateSerializationTest test = new StateSerializationTest(); + test.initializeStateAndTimers(); + + MemoryStateBackend.MemoryCheckpointOutputStream memBackend = new MemoryStateBackend.MemoryCheckpointOutputStream(32048); + AbstractStateBackend.CheckpointStateOutputView out = new AbstractStateBackend.CheckpointStateOutputView(memBackend); + + test.storeState(out); + + byte[] contents = memBackend.closeAndGetBytes(); + DataInputView in = new DataInputDeserializer(contents, 0, contents.length); + + assertEquals(test.restoreAndTestState(in), true); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java new file mode 100644 index 0000000..83c1661 --- /dev/null +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import org.apache.beam.runners.flink.FlinkTestPipeline; +import com.google.api.services.bigquery.model.TableRow; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.Count; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.base.Joiner; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.joda.time.Duration; +import org.joda.time.Instant; + +import java.io.Serializable; +import java.util.Arrays; + + +/** + * Session window test + */ +public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable { + protected String resultPath; + + public TopWikipediaSessionsITCase(){ + } + + static final String[] EXPECTED_RESULT = new String[] { + "user: user1 value:3", + "user: user1 value:1", + "user: user2 value:4", + "user: user2 value:6", + "user: user3 value:7", + "user: user3 value:2" + }; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } + + @Override + protected void testProgram() throws Exception { + + Pipeline p = FlinkTestPipeline.createForStreaming(); + + Long now = (System.currentTimeMillis() + 10000) / 1000; + + PCollection<KV<String, Long>> output = + p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set + ("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now).set + ("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set + ("contributor_username", "user1"), new TableRow().set("timestamp", now).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set + ("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now) + .set("contributor_username", "user3")))) + + + + .apply(ParDo.of(new DoFn<TableRow, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + TableRow row = c.element(); + long timestamp = (Integer) row.get("timestamp"); + String userName = (String) row.get("contributor_username"); + if (userName != null) { + // Sets the timestamp field to be used in windowing. + c.outputWithTimestamp(userName, new Instant(timestamp * 1000L)); + } + } + })) + + .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1)))) + + .apply(Count.<String>perElement()); + + PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + KV<String, Long> el = c.element(); + String out = "user: " + el.getKey() + " value:" + el.getValue(); + c.output(out); + } + })); + + format.apply(TextIO.Write.to(resultPath)); + + p.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java new file mode 100644 index 0000000..e850dd6 --- /dev/null +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.util; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.BigQueryIO; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.options.Validation; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; +import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; +import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TupleTag; + +/** + * Copied from {@link com.google.cloud.dataflow.examples.JoinExamples} because the code + * is private there. + */ +public class JoinExamples { + + // A 1000-row sample of the GDELT data here: gdelt-bq:full.events. + private static final String GDELT_EVENTS_TABLE = + "clouddataflow-readonly:samples.gdelt_sample"; + // A table that maps country codes to country names. + private static final String COUNTRY_CODES = + "gdelt-bq:full.crosswalk_geocountrycodetohuman"; + + /** + * Join two collections, using country code as the key. + */ + public static PCollection<String> joinEvents(PCollection<TableRow> eventsTable, + PCollection<TableRow> countryCodes) throws Exception { + + final TupleTag<String> eventInfoTag = new TupleTag<>(); + final TupleTag<String> countryInfoTag = new TupleTag<>(); + + // transform both input collections to tuple collections, where the keys are country + // codes in both cases. + PCollection<KV<String, String>> eventInfo = eventsTable.apply( + ParDo.of(new ExtractEventDataFn())); + PCollection<KV<String, String>> countryInfo = countryCodes.apply( + ParDo.of(new ExtractCountryInfoFn())); + + // country code 'key' -> CGBKR (<event info>, <country name>) + PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple + .of(eventInfoTag, eventInfo) + .and(countryInfoTag, countryInfo) + .apply(CoGroupByKey.<String>create()); + + // Process the CoGbkResult elements generated by the CoGroupByKey transform. + // country code 'key' -> string of <event info>, <country name> + PCollection<KV<String, String>> finalResultCollection = + kvpCollection.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { + @Override + public void processElement(ProcessContext c) { + KV<String, CoGbkResult> e = c.element(); + CoGbkResult val = e.getValue(); + String countryCode = e.getKey(); + String countryName; + countryName = e.getValue().getOnly(countryInfoTag, "Kostas"); + for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) { + // Generate a string that combines information from both collection values + c.output(KV.of(countryCode, "Country name: " + countryName + + ", Event info: " + eventInfo)); + } + } + })); + + // write to GCS + return finalResultCollection + .apply(ParDo.of(new DoFn<KV<String, String>, String>() { + @Override + public void processElement(ProcessContext c) { + String outputstring = "Country code: " + c.element().getKey() + + ", " + c.element().getValue(); + c.output(outputstring); + } + })); + } + + /** + * Examines each row (event) in the input table. Output a KV with the key the country + * code of the event, and the value a string encoding event information. + */ + static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> { + @Override + public void processElement(ProcessContext c) { + TableRow row = c.element(); + String countryCode = (String) row.get("ActionGeo_CountryCode"); + String sqlDate = (String) row.get("SQLDATE"); + String actor1Name = (String) row.get("Actor1Name"); + String sourceUrl = (String) row.get("SOURCEURL"); + String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", url: " + sourceUrl; + c.output(KV.of(countryCode, eventInfo)); + } + } + + + /** + * Examines each row (country info) in the input table. Output a KV with the key the country + * code, and the value the country name. + */ + static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> { + @Override + public void processElement(ProcessContext c) { + TableRow row = c.element(); + String countryCode = (String) row.get("FIPSCC"); + String countryName = (String) row.get("HumanName"); + c.output(KV.of(countryCode, countryName)); + } + } + + + /** + * Options supported by {@link JoinExamples}. + * <p> + * Inherits standard configuration options. + */ + private interface Options extends PipelineOptions { + @Description("Path of the file to write to") + @Validation.Required + String getOutput(); + void setOutput(String value); + } + + public static void main(String[] args) throws Exception { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline p = Pipeline.create(options); + // the following two 'applys' create multiple inputs to our pipeline, one for each + // of our two input sources. + PCollection<TableRow> eventsTable = p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE)); + PCollection<TableRow> countryCodes = p.apply(BigQueryIO.Read.from(COUNTRY_CODES)); + PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes); + formattedResults.apply(TextIO.Write.to(options.getOutput())); + p.run(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/resources/log4j-test.properties b/runners/flink/runner/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..4c74d85 --- /dev/null +++ b/runners/flink/runner/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=OFF, testlogger + +# A1 is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
