Repository: incubator-beam Updated Branches: refs/heads/master c0b9fc660 -> 071e4dd67
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java deleted file mode 100644 index e73c456..0000000 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Count; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; -import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; -import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import com.google.common.base.Joiner; -import org.apache.flink.test.util.JavaProgramTestBase; - - -public class WordCountJoin2ITCase extends JavaProgramTestBase { - - static final String[] WORDS_1 = new String[] { - "hi there", "hi", "hi sue bob", - "hi sue", "", "bob hi"}; - - static final String[] WORDS_2 = new String[] { - "hi tim", "beauty", "hooray sue bob", - "hi there", "", "please say hi"}; - - static final String[] RESULTS = new String[] { - "beauty -> Tag1: Tag2: 1", - "bob -> Tag1: 2 Tag2: 1", - "hi -> Tag1: 5 Tag2: 3", - "hooray -> Tag1: Tag2: 1", - "please -> Tag1: Tag2: 1", - "say -> Tag1: Tag2: 1", - "sue -> Tag1: 2 Tag2: 1", - "there -> Tag1: 1 Tag2: 1", - "tim -> Tag1: Tag2: 1" - }; - - static final TupleTag<Long> tag1 = new TupleTag<>("Tag1"); - static final TupleTag<Long> tag2 = new TupleTag<>("Tag2"); - - protected String resultPath; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath); - } - - @Override - protected void testProgram() throws Exception { - Pipeline p = FlinkTestPipeline.createForBatch(); - - /* Create two PCollections and join them */ - PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1)) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Count.<String>perElement()); - - PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2)) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Count.<String>perElement()); - - /* CoGroup the two collections */ - PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple - .of(tag1, occurences1) - .and(tag2, occurences2) - .apply(CoGroupByKey.<String>create()); - - /* Format output */ - mergedOccurences.apply(ParDo.of(new FormatCountsFn())) - .apply(TextIO.Write.named("test").to(resultPath)); - - p.run(); - } - - - static class ExtractWordsFn extends DoFn<String, String> { - - @Override - public void startBundle(Context c) { - } - - @Override - public void processElement(ProcessContext c) { - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> { - @Override - public void processElement(ProcessContext c) { - CoGbkResult value = c.element().getValue(); - String key = c.element().getKey(); - String countTag1 = tag1.getId() + ": "; - String countTag2 = tag2.getId() + ": "; - for (Long count : value.getAll(tag1)) { - countTag1 += count + " "; - } - for (Long count : value.getAll(tag2)) { - countTag2 += count; - } - c.output(key + " -> " + countTag1 + countTag2); - } - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java deleted file mode 100644 index 6b57d77..0000000 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Count; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; -import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; -import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import com.google.common.base.Joiner; -import org.apache.flink.test.util.JavaProgramTestBase; - - -public class WordCountJoin3ITCase extends JavaProgramTestBase { - - static final String[] WORDS_1 = new String[] { - "hi there", "hi", "hi sue bob", - "hi sue", "", "bob hi"}; - - static final String[] WORDS_2 = new String[] { - "hi tim", "beauty", "hooray sue bob", - "hi there", "", "please say hi"}; - - static final String[] WORDS_3 = new String[] { - "hi stephan", "beauty", "hooray big fabian", - "hi yo", "", "please say hi"}; - - static final String[] RESULTS = new String[] { - "beauty -> Tag1: Tag2: 1 Tag3: 1", - "bob -> Tag1: 2 Tag2: 1 Tag3: ", - "hi -> Tag1: 5 Tag2: 3 Tag3: 3", - "hooray -> Tag1: Tag2: 1 Tag3: 1", - "please -> Tag1: Tag2: 1 Tag3: 1", - "say -> Tag1: Tag2: 1 Tag3: 1", - "sue -> Tag1: 2 Tag2: 1 Tag3: ", - "there -> Tag1: 1 Tag2: 1 Tag3: ", - "tim -> Tag1: Tag2: 1 Tag3: ", - "stephan -> Tag1: Tag2: Tag3: 1", - "yo -> Tag1: Tag2: Tag3: 1", - "fabian -> Tag1: Tag2: Tag3: 1", - "big -> Tag1: Tag2: Tag3: 1" - }; - - static final TupleTag<Long> tag1 = new TupleTag<>("Tag1"); - static final TupleTag<Long> tag2 = new TupleTag<>("Tag2"); - static final TupleTag<Long> tag3 = new TupleTag<>("Tag3"); - - protected String resultPath; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath); - } - - @Override - protected void testProgram() throws Exception { - - Pipeline p = FlinkTestPipeline.createForBatch(); - - /* Create two PCollections and join them */ - PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1)) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Count.<String>perElement()); - - PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2)) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Count.<String>perElement()); - - PCollection<KV<String,Long>> occurences3 = p.apply(Create.of(WORDS_3)) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Count.<String>perElement()); - - /* CoGroup the two collections */ - PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple - .of(tag1, occurences1) - .and(tag2, occurences2) - .and(tag3, occurences3) - .apply(CoGroupByKey.<String>create()); - - /* Format output */ - mergedOccurences.apply(ParDo.of(new FormatCountsFn())) - .apply(TextIO.Write.named("test").to(resultPath)); - - p.run(); - } - - - static class ExtractWordsFn extends DoFn<String, String> { - - @Override - public void startBundle(Context c) { - } - - @Override - public void processElement(ProcessContext c) { - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> { - @Override - public void processElement(ProcessContext c) { - CoGbkResult value = c.element().getValue(); - String key = c.element().getKey(); - String countTag1 = tag1.getId() + ": "; - String countTag2 = tag2.getId() + ": "; - String countTag3 = tag3.getId() + ": "; - for (Long count : value.getAll(tag1)) { - countTag1 += count + " "; - } - for (Long count : value.getAll(tag2)) { - countTag2 += count + " "; - } - for (Long count : value.getAll(tag3)) { - countTag3 += count; - } - c.output(key + " -> " + countTag1 + countTag2 + countTag3); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java deleted file mode 100644 index dfa15ce..0000000 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.flink; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.io.Sink; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.Write; -import com.google.common.base.Joiner; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.io.File; -import java.io.PrintWriter; -import java.net.URI; - -import static org.junit.Assert.*; - -/** - * Tests the translation of custom Write.Bound sinks. - */ -public class WriteSinkITCase extends JavaProgramTestBase { - - protected String resultPath; - - public WriteSinkITCase(){ - } - - static final String[] EXPECTED_RESULT = new String[] { - "Joe red 3", "Mary blue 4", "Max yellow 23"}; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } - - @Override - protected void testProgram() throws Exception { - runProgram(resultPath); - } - - private static void runProgram(String resultPath) { - Pipeline p = FlinkTestPipeline.createForBatch(); - - p.apply(Create.of(EXPECTED_RESULT)).setCoder(StringUtf8Coder.of()) - .apply("CustomSink", Write.to(new MyCustomSink(resultPath))); - - p.run(); - } - - /** - * Simple custom sink which writes to a file. - */ - private static class MyCustomSink extends Sink<String> { - - private final String resultPath; - - public MyCustomSink(String resultPath) { - this.resultPath = resultPath; - } - - @Override - public void validate(PipelineOptions options) { - assertNotNull(options); - } - - @Override - public WriteOperation<String, ?> createWriteOperation(PipelineOptions options) { - return new MyWriteOperation(); - } - - private class MyWriteOperation extends WriteOperation<String, String> { - - @Override - public Coder<String> getWriterResultCoder() { - return StringUtf8Coder.of(); - } - - @Override - public void initialize(PipelineOptions options) throws Exception { - - } - - @Override - public void finalize(Iterable<String> writerResults, PipelineOptions options) throws Exception { - - } - - @Override - public Writer<String, String> createWriter(PipelineOptions options) throws Exception { - return new MyWriter(); - } - - @Override - public Sink<String> getSink() { - return MyCustomSink.this; - } - - /** - * Simple Writer which writes to a file. - */ - private class MyWriter extends Writer<String, String> { - - private PrintWriter internalWriter; - - @Override - public void open(String uId) throws Exception { - Path path = new Path(resultPath + "/" + uId); - FileSystem.get(new URI("file:///")).create(path, false); - internalWriter = new PrintWriter(new File(path.toUri())); - } - - @Override - public void write(String value) throws Exception { - internalWriter.println(value); - } - - @Override - public String close() throws Exception { - internalWriter.close(); - return resultPath; - } - - @Override - public WriteOperation<String, String> getWriteOperation() { - return MyWriteOperation.this; - } - } - } - } - -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java deleted file mode 100644 index 880da59..0000000 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java +++ /dev/null @@ -1,508 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.streaming; - -import org.apache.beam.runners.flink.FlinkTestPipeline; -import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkGroupAlsoByWindowWrapper; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.KvCoder; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.coders.VarIntCoder; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.Sum; -import com.google.cloud.dataflow.sdk.transforms.windowing.*; -import com.google.cloud.dataflow.sdk.util.UserCodeException; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.util.WindowingStrategy; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.common.base.Throwables; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.TestHarnessUtil; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Test; - -import java.util.Collection; -import java.util.Comparator; -import java.util.concurrent.ConcurrentLinkedQueue; - -public class GroupAlsoByWindowTest { - - private final Combine.CombineFn combiner = new Sum.SumIntegerFn(); - - private final WindowingStrategy slidingWindowWithAfterWatermarkTriggerStrategy = - WindowingStrategy.of(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5))) - .withTrigger(AfterWatermark.pastEndOfWindow()).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES); - - private final WindowingStrategy sessionWindowingStrategy = - WindowingStrategy.of(Sessions.withGapDuration(Duration.standardSeconds(2))) - .withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) - .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) - .withAllowedLateness(Duration.standardSeconds(100)); - - private final WindowingStrategy fixedWindowingStrategy = - WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(10))); - - private final WindowingStrategy fixedWindowWithCountTriggerStrategy = - fixedWindowingStrategy.withTrigger(AfterPane.elementCountAtLeast(5)); - - private final WindowingStrategy fixedWindowWithAfterWatermarkTriggerStrategy = - fixedWindowingStrategy.withTrigger(AfterWatermark.pastEndOfWindow()); - - private final WindowingStrategy fixedWindowWithCompoundTriggerStrategy = - fixedWindowingStrategy.withTrigger( - AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(5)) - .withLateFirings(AfterPane.elementCountAtLeast(5)).buildTrigger()); - - /** - * The default accumulation mode is - * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#DISCARDING_FIRED_PANES}. - * This strategy changes it to - * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#ACCUMULATING_FIRED_PANES} - */ - private final WindowingStrategy fixedWindowWithCompoundTriggerStrategyAcc = - fixedWindowWithCompoundTriggerStrategy - .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES); - - @Test - public void testWithLateness() throws Exception { - WindowingStrategy strategy = WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(2))) - .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) - .withAllowedLateness(Duration.millis(1000)); - long initialTime = 0L; - Pipeline pipeline = FlinkTestPipeline.createForStreaming(); - - KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); - - FlinkGroupAlsoByWindowWrapper gbwOperaror = - FlinkGroupAlsoByWindowWrapper.createForTesting( - pipeline.getOptions(), - pipeline.getCoderRegistry(), - strategy, - inputCoder, - combiner.<String>asKeyedFn()); - - OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = - new OneInputStreamOperatorTestHarness<>(gbwOperaror); - testHarness.open(); - - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processWatermark(new Watermark(initialTime + 2000)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processWatermark(new Watermark(initialTime + 4000)); - - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key1", 4), - new Instant(initialTime + 1), - new IntervalWindow(new Instant(0), new Instant(2000)), - PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0)) - , initialTime + 1)); - expectedOutput.add(new Watermark(initialTime + 2000)); - - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key1", 5), - new Instant(initialTime + 1999), - new IntervalWindow(new Instant(0), new Instant(2000)), - PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 1, 1)) - , initialTime + 1999)); - - - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key1", 6), - new Instant(initialTime + 1999), - new IntervalWindow(new Instant(0), new Instant(2000)), - PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 2, 2)) - , initialTime + 1999)); - expectedOutput.add(new Watermark(initialTime + 4000)); - - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); - testHarness.close(); - } - - @Test - public void testSessionWindows() throws Exception { - WindowingStrategy strategy = sessionWindowingStrategy; - - long initialTime = 0L; - Pipeline pipeline = FlinkTestPipeline.createForStreaming(); - - KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); - - FlinkGroupAlsoByWindowWrapper gbwOperaror = - FlinkGroupAlsoByWindowWrapper.createForTesting( - pipeline.getOptions(), - pipeline.getCoderRegistry(), - strategy, - inputCoder, - combiner.<String>asKeyedFn()); - - OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = - new OneInputStreamOperatorTestHarness<>(gbwOperaror); - testHarness.open(); - - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 3500), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 3700), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 2700), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processWatermark(new Watermark(initialTime + 6000)); - - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 6700), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 6800), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 8900), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 7600), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 5600), null, PaneInfo.NO_FIRING), initialTime + 20)); - - testHarness.processWatermark(new Watermark(initialTime + 12000)); - - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key1", 6), - new Instant(initialTime + 1), - new IntervalWindow(new Instant(1), new Instant(5700)), - PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0)) - , initialTime + 1)); - expectedOutput.add(new Watermark(initialTime + 6000)); - - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key1", 11), - new Instant(initialTime + 6700), - new IntervalWindow(new Instant(1), new Instant(10900)), - PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0)) - , initialTime + 6700)); - expectedOutput.add(new Watermark(initialTime + 12000)); - - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); - testHarness.close(); - } - - @Test - public void testSlidingWindows() throws Exception { - WindowingStrategy strategy = slidingWindowWithAfterWatermarkTriggerStrategy; - long initialTime = 0L; - OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = - createTestingOperatorAndState(strategy, initialTime); - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - testHarness.processWatermark(new Watermark(initialTime + 25000)); - - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key1", 6), - new Instant(initialTime + 5000), - new IntervalWindow(new Instant(0), new Instant(10000)), - PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) - , initialTime + 5000)); - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key1", 6), - new Instant(initialTime + 1), - new IntervalWindow(new Instant(-5000), new Instant(5000)), - PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) - , initialTime + 1)); - expectedOutput.add(new Watermark(initialTime + 10000)); - - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key1", 11), - new Instant(initialTime + 15000), - new IntervalWindow(new Instant(10000), new Instant(20000)), - PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) - , initialTime + 15000)); - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key1", 3), - new Instant(initialTime + 10000), - new IntervalWindow(new Instant(5000), new Instant(15000)), - PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) - , initialTime + 10000)); - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key2", 1), - new Instant(initialTime + 19500), - new IntervalWindow(new Instant(10000), new Instant(20000)), - PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) - , initialTime + 19500)); - expectedOutput.add(new Watermark(initialTime + 20000)); - - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key2", 1), - new Instant(initialTime + 20000), - /** - * this is 20000 and not 19500 because of a convention in dataflow where - * timestamps of windowed values in a window cannot be smaller than the - * end of a previous window. Checkout the documentation of the - * {@link WindowFn#getOutputTime(Instant, BoundedWindow)} - */ - new IntervalWindow(new Instant(15000), new Instant(25000)), - PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) - , initialTime + 20000)); - expectedOutput.add(new StreamRecord<>( - WindowedValue.of(KV.of("key1", 8), - new Instant(initialTime + 20000), - new IntervalWindow(new Instant(15000), new Instant(25000)), - PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) - , initialTime + 20000)); - expectedOutput.add(new Watermark(initialTime + 25000)); - - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); - testHarness.close(); - } - - @Test - public void testAfterWatermarkProgram() throws Exception { - WindowingStrategy strategy = fixedWindowWithAfterWatermarkTriggerStrategy; - long initialTime = 0L; - OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = - createTestingOperatorAndState(strategy, initialTime); - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6), - new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 1)); - expectedOutput.add(new Watermark(initialTime + 10000)); - - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11), - new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 10000)); - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), - new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500)); - expectedOutput.add(new Watermark(initialTime + 20000)); - - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); - testHarness.close(); - } - - @Test - public void testAfterCountProgram() throws Exception { - WindowingStrategy strategy = fixedWindowWithCountTriggerStrategy; - - long initialTime = 0L; - OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = - createTestingOperatorAndState(strategy, initialTime); - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), - new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 1)); - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), - new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 10000)); - expectedOutput.add(new Watermark(initialTime + 10000)); - - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), - new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0, 0)), initialTime + 19500)); - expectedOutput.add(new Watermark(initialTime + 20000)); - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); - - testHarness.close(); - } - - @Test - public void testCompoundProgram() throws Exception { - WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategy; - - long initialTime = 0L; - OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = - createTestingOperatorAndState(strategy, initialTime); - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - /** - * PaneInfo are: - * isFirst (pane in window), - * isLast, Timing (of triggering), - * index (of pane in the window), - * onTimeIndex (if it the 1st,2nd, ... pane that was fired on time) - * */ - - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), - new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1)); - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), - new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000)); - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), - new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500)); - - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), - new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200)); - - expectedOutput.add(new Watermark(initialTime + 10000)); - - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), - new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500)); - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), - new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500)); - - expectedOutput.add(new Watermark(initialTime + 20000)); - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); - - testHarness.close(); - } - - @Test - public void testCompoundAccumulatingPanesProgram() throws Exception { - WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategyAcc; - long initialTime = 0L; - OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = - createTestingOperatorAndState(strategy, initialTime); - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), - new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1)); - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), - new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000)); - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 10), - new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500)); - - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6), - new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200)); - - expectedOutput.add(new Watermark(initialTime + 10000)); - - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11), - new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500)); - expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), - new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500)); - - expectedOutput.add(new Watermark(initialTime + 20000)); - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); - - testHarness.close(); - } - - private OneInputStreamOperatorTestHarness createTestingOperatorAndState(WindowingStrategy strategy, long initialTime) throws Exception { - Pipeline pipeline = FlinkTestPipeline.createForStreaming(); - - KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); - - FlinkGroupAlsoByWindowWrapper gbwOperaror = - FlinkGroupAlsoByWindowWrapper.createForTesting( - pipeline.getOptions(), - pipeline.getCoderRegistry(), - strategy, - inputCoder, - combiner.<String>asKeyedFn()); - - OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = - new OneInputStreamOperatorTestHarness<>(gbwOperaror); - testHarness.open(); - - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); - - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 10000), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 12100), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 14200), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 15300), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 16500), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); - - testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); - - testHarness.processWatermark(new Watermark(initialTime + 10000)); - testHarness.processWatermark(new Watermark(initialTime + 20000)); - - return testHarness; - } - - private static class ResultSortComparator implements Comparator<Object> { - @Override - public int compare(Object o1, Object o2) { - if (o1 instanceof Watermark && o2 instanceof Watermark) { - Watermark w1 = (Watermark) o1; - Watermark w2 = (Watermark) o2; - return (int) (w1.getTimestamp() - w2.getTimestamp()); - } else { - StreamRecord<WindowedValue<KV<String, Integer>>> sr0 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o1; - StreamRecord<WindowedValue<KV<String, Integer>>> sr1 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o2; - - int comparison = (int) (sr0.getValue().getTimestamp().getMillis() - sr1.getValue().getTimestamp().getMillis()); - if (comparison != 0) { - return comparison; - } - - comparison = sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey()); - if(comparison == 0) { - comparison = Integer.compare( - sr0.getValue().getValue().getValue(), - sr1.getValue().getValue().getValue()); - } - if(comparison == 0) { - Collection windowsA = sr0.getValue().getWindows(); - Collection windowsB = sr1.getValue().getWindows(); - - if(windowsA.size() != 1 || windowsB.size() != 1) { - throw new IllegalStateException("A value cannot belong to more than one windows after grouping."); - } - - BoundedWindow windowA = (BoundedWindow) windowsA.iterator().next(); - BoundedWindow windowB = (BoundedWindow) windowsB.iterator().next(); - comparison = Long.compare(windowA.maxTimestamp().getMillis(), windowB.maxTimestamp().getMillis()); - } - return comparison; - } - } - } - - private <T> WindowedValue<T> makeWindowedValue(WindowingStrategy strategy, - T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { - final Instant inputTimestamp = timestamp; - final WindowFn windowFn = strategy.getWindowFn(); - - if (timestamp == null) { - timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; - } - - if (windows == null) { - try { - windows = windowFn.assignWindows(windowFn.new AssignContext() { - @Override - public Object element() { - throw new UnsupportedOperationException( - "WindowFn attempted to access input element when none was available"); - } - - @Override - public Instant timestamp() { - if (inputTimestamp == null) { - throw new UnsupportedOperationException( - "WindowFn attempted to access input timestamp when none was available"); - } - return inputTimestamp; - } - - @Override - public Collection<? extends BoundedWindow> windows() { - throw new UnsupportedOperationException( - "WindowFn attempted to access input windows when none were available"); - } - }); - } catch (Exception e) { - throw UserCodeException.wrap(e); - } - } - - return WindowedValue.of(output, timestamp, windows, pane); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java deleted file mode 100644 index 63e0bcf..0000000 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.streaming; - -import org.apache.beam.runners.flink.FlinkTestPipeline; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.GroupByKey; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark; -import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.base.Joiner; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.util.StreamingProgramTestBase; -import org.joda.time.Duration; -import org.joda.time.Instant; - -import java.io.Serializable; -import java.util.Arrays; - -public class GroupByNullKeyTest extends StreamingProgramTestBase implements Serializable { - - - protected String resultPath; - - static final String[] EXPECTED_RESULT = new String[] { - "k: null v: user1 user1 user1 user2 user2 user2 user2 user3" - }; - - public GroupByNullKeyTest(){ - } - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } - - public static class ExtractUserAndTimestamp extends DoFn<KV<Integer, String>, String> { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - KV<Integer, String> record = c.element(); - long now = System.currentTimeMillis(); - int timestamp = record.getKey(); - String userName = record.getValue(); - if (userName != null) { - // Sets the implicit timestamp field to be used in windowing. - c.outputWithTimestamp(userName, new Instant(timestamp + now)); - } - } - } - - @Override - protected void testProgram() throws Exception { - - Pipeline p = FlinkTestPipeline.createForStreaming(); - - PCollection<String> output = - p.apply(Create.of(Arrays.asList( - KV.<Integer, String>of(0, "user1"), - KV.<Integer, String>of(1, "user1"), - KV.<Integer, String>of(2, "user1"), - KV.<Integer, String>of(10, "user2"), - KV.<Integer, String>of(1, "user2"), - KV.<Integer, String>of(15000, "user2"), - KV.<Integer, String>of(12000, "user2"), - KV.<Integer, String>of(25000, "user3")))) - .apply(ParDo.of(new ExtractUserAndTimestamp())) - .apply(Window.<String>into(FixedWindows.of(Duration.standardHours(1))) - .triggering(AfterWatermark.pastEndOfWindow()) - .withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()) - - .apply(ParDo.of(new DoFn<String, KV<Void, String>>() { - @Override - public void processElement(ProcessContext c) throws Exception { - String elem = c.element(); - c.output(KV.<Void, String>of((Void) null, elem)); - } - })) - .apply(GroupByKey.<Void, String>create()) - .apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() { - @Override - public void processElement(ProcessContext c) throws Exception { - KV<Void, Iterable<String>> elem = c.element(); - StringBuilder str = new StringBuilder(); - str.append("k: " + elem.getKey() + " v:"); - for (String v : elem.getValue()) { - str.append(" " + v); - } - c.output(str.toString()); - } - })); - output.apply(TextIO.Write.to(resultPath)); - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java deleted file mode 100644 index 77a8de6..0000000 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java +++ /dev/null @@ -1,305 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.streaming; - -import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals; -import org.apache.beam.runners.flink.translation.wrappers.streaming.state.StateCheckpointReader; -import org.apache.beam.runners.flink.translation.wrappers.streaming.state.StateCheckpointUtils; -import org.apache.beam.runners.flink.translation.wrappers.streaming.state.StateCheckpointWriter; -import com.google.cloud.dataflow.sdk.coders.*; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.CombineWithContext; -import com.google.cloud.dataflow.sdk.transforms.Sum; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns; -import com.google.cloud.dataflow.sdk.util.TimeDomain; -import com.google.cloud.dataflow.sdk.util.TimerInternals; -import com.google.cloud.dataflow.sdk.util.state.*; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.runtime.util.DataInputDeserializer; -import org.joda.time.Instant; -import org.junit.Test; - -import java.nio.ByteBuffer; -import java.util.*; - -import static org.junit.Assert.assertEquals; - -public class StateSerializationTest { - - private static final StateNamespace NAMESPACE_1 = StateNamespaces.global(); - private static final String KEY_PREFIX = "TEST_"; - - // TODO: This can be replaced with the standard Sum.SumIntererFn once the state no longer needs - // to create a StateTag at the point of restoring state. Currently StateTags are compared strictly - // by type and combiners always use KeyedCombineFnWithContext rather than KeyedCombineFn or CombineFn. - private static CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[], Integer> SUM_COMBINER = - new CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[], Integer>() { - @Override - public int[] createAccumulator(Object key, CombineWithContext.Context c) { - return new int[1]; - } - - @Override - public int[] addInput(Object key, int[] accumulator, Integer value, CombineWithContext.Context c) { - accumulator[0] += value; - return accumulator; - } - - @Override - public int[] mergeAccumulators(Object key, Iterable<int[]> accumulators, CombineWithContext.Context c) { - int[] r = new int[1]; - for (int[] a : accumulators) { - r[0] += a[0]; - } - return r; - } - - @Override - public Integer extractOutput(Object key, int[] accumulator, CombineWithContext.Context c) { - return accumulator[0]; - } - }; - - private static Coder<int[]> INT_ACCUM_CODER = DelegateCoder.of( - VarIntCoder.of(), - new DelegateCoder.CodingFunction<int[], Integer>() { - @Override - public Integer apply(int[] accumulator) { - return accumulator[0]; - } - }, - new DelegateCoder.CodingFunction<Integer, int[]>() { - @Override - public int[] apply(Integer value) { - int[] a = new int[1]; - a[0] = value; - return a; - } - }); - - private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR = - StateTags.value("stringValue", StringUtf8Coder.of()); - private static final StateTag<Object, ValueState<Integer>> INT_VALUE_ADDR = - StateTags.value("stringValue", VarIntCoder.of()); - private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>> SUM_INTEGER_ADDR = - StateTags.keyedCombiningValueWithContext("sumInteger", INT_ACCUM_CODER, SUM_COMBINER); - private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = - StateTags.bag("stringBag", StringUtf8Coder.of()); - private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_BAG_ADDR = - StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp()); - - private Map<String, FlinkStateInternals<String>> statePerKey = new HashMap<>(); - - private Map<String, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(); - - private void initializeStateAndTimers() throws CannotProvideCoderException { - for (int i = 0; i < 10; i++) { - String key = KEY_PREFIX + i; - - FlinkStateInternals state = initializeStateForKey(key); - Set<TimerInternals.TimerData> timers = new HashSet<>(); - for (int j = 0; j < 5; j++) { - TimerInternals.TimerData timer = TimerInternals - .TimerData.of(NAMESPACE_1, - new Instant(1000 + i + j), TimeDomain.values()[j % 3]); - timers.add(timer); - } - - statePerKey.put(key, state); - activeTimers.put(key, timers); - } - } - - private FlinkStateInternals<String> initializeStateForKey(String key) throws CannotProvideCoderException { - FlinkStateInternals<String> state = createState(key); - - ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR); - value.write("test"); - - ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR); - value2.write(4); - value2.write(5); - - AccumulatorCombiningState<Integer, int[], Integer> combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR); - combiningValue.add(1); - combiningValue.add(2); - - WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR); - watermark.add(new Instant(1000)); - - BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR); - bag.add("v1"); - bag.add("v2"); - bag.add("v3"); - bag.add("v4"); - return state; - } - - private boolean restoreAndTestState(DataInputView in) throws Exception { - StateCheckpointReader reader = new StateCheckpointReader(in); - final ClassLoader userClassloader = this.getClass().getClassLoader(); - Coder<? extends BoundedWindow> windowCoder = IntervalWindow.getCoder(); - Coder<String> keyCoder = StringUtf8Coder.of(); - - boolean comparisonRes = true; - - for (String key : statePerKey.keySet()) { - comparisonRes &= checkStateForKey(key); - } - - // restore the timers - Map<String, Set<TimerInternals.TimerData>> restoredTimersPerKey = StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder); - if (activeTimers.size() != restoredTimersPerKey.size()) { - return false; - } - - for (String key : statePerKey.keySet()) { - Set<TimerInternals.TimerData> originalTimers = activeTimers.get(key); - Set<TimerInternals.TimerData> restoredTimers = restoredTimersPerKey.get(key); - comparisonRes &= checkTimersForKey(originalTimers, restoredTimers); - } - - // restore the state - Map<String, FlinkStateInternals<String>> restoredPerKeyState = - StateCheckpointUtils.decodeState(reader, OutputTimeFns.outputAtEarliestInputTimestamp(), keyCoder, windowCoder, userClassloader); - if (restoredPerKeyState.size() != statePerKey.size()) { - return false; - } - - for (String key : statePerKey.keySet()) { - FlinkStateInternals<String> originalState = statePerKey.get(key); - FlinkStateInternals<String> restoredState = restoredPerKeyState.get(key); - comparisonRes &= checkStateForKey(originalState, restoredState); - } - return comparisonRes; - } - - private boolean checkStateForKey(String key) throws CannotProvideCoderException { - FlinkStateInternals<String> state = statePerKey.get(key); - - ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR); - boolean comp = value.read().equals("test"); - - ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR); - comp &= value2.read().equals(5); - - AccumulatorCombiningState<Integer, int[], Integer> combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR); - comp &= combiningValue.read().equals(3); - - WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, WATERMARK_BAG_ADDR); - comp &= watermark.read().equals(new Instant(1000)); - - BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR); - Iterator<String> it = bag.read().iterator(); - int i = 0; - while (it.hasNext()) { - comp &= it.next().equals("v" + (++i)); - } - return comp; - } - - private void storeState(AbstractStateBackend.CheckpointStateOutputView out) throws Exception { - StateCheckpointWriter checkpointBuilder = StateCheckpointWriter.create(out); - Coder<String> keyCoder = StringUtf8Coder.of(); - - // checkpoint the timers - StateCheckpointUtils.encodeTimers(activeTimers, checkpointBuilder, keyCoder); - - // checkpoint the state - StateCheckpointUtils.encodeState(statePerKey, checkpointBuilder, keyCoder); - } - - private boolean checkTimersForKey(Set<TimerInternals.TimerData> originalTimers, Set<TimerInternals.TimerData> restoredTimers) { - boolean comp = true; - if (restoredTimers == null) { - return false; - } - - if (originalTimers.size() != restoredTimers.size()) { - return false; - } - - for (TimerInternals.TimerData timer : originalTimers) { - comp &= restoredTimers.contains(timer); - } - return comp; - } - - private boolean checkStateForKey(FlinkStateInternals<String> originalState, FlinkStateInternals<String> restoredState) throws CannotProvideCoderException { - if (restoredState == null) { - return false; - } - - ValueState<String> orValue = originalState.state(NAMESPACE_1, STRING_VALUE_ADDR); - ValueState<String> resValue = restoredState.state(NAMESPACE_1, STRING_VALUE_ADDR); - boolean comp = orValue.read().equals(resValue.read()); - - ValueState<Integer> orIntValue = originalState.state(NAMESPACE_1, INT_VALUE_ADDR); - ValueState<Integer> resIntValue = restoredState.state(NAMESPACE_1, INT_VALUE_ADDR); - comp &= orIntValue.read().equals(resIntValue.read()); - - AccumulatorCombiningState<Integer, int[], Integer> combOrValue = originalState.state(NAMESPACE_1, SUM_INTEGER_ADDR); - AccumulatorCombiningState<Integer, int[], Integer> combResValue = restoredState.state(NAMESPACE_1, SUM_INTEGER_ADDR); - comp &= combOrValue.read().equals(combResValue.read()); - - WatermarkHoldState orWatermark = originalState.state(NAMESPACE_1, WATERMARK_BAG_ADDR); - WatermarkHoldState resWatermark = restoredState.state(NAMESPACE_1, WATERMARK_BAG_ADDR); - comp &= orWatermark.read().equals(resWatermark.read()); - - BagState<String> orBag = originalState.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> resBag = restoredState.state(NAMESPACE_1, STRING_BAG_ADDR); - - Iterator<String> orIt = orBag.read().iterator(); - Iterator<String> resIt = resBag.read().iterator(); - - while (orIt.hasNext() && resIt.hasNext()) { - comp &= orIt.next().equals(resIt.next()); - } - - return !((orIt.hasNext() && !resIt.hasNext()) || (!orIt.hasNext() && resIt.hasNext())) && comp; - } - - private FlinkStateInternals<String> createState(String key) throws CannotProvideCoderException { - return new FlinkStateInternals<>( - key, - StringUtf8Coder.of(), - IntervalWindow.getCoder(), - OutputTimeFns.outputAtEarliestInputTimestamp()); - } - - @Test - public void test() throws Exception { - StateSerializationTest test = new StateSerializationTest(); - test.initializeStateAndTimers(); - - MemoryStateBackend.MemoryCheckpointOutputStream memBackend = new MemoryStateBackend.MemoryCheckpointOutputStream(32048); - AbstractStateBackend.CheckpointStateOutputView out = new AbstractStateBackend.CheckpointStateOutputView(memBackend); - - test.storeState(out); - - byte[] contents = memBackend.closeAndGetBytes(); - DataInputView in = new DataInputDeserializer(contents, 0, contents.length); - - assertEquals(test.restoreAndTestState(in), true); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java deleted file mode 100644 index 83c1661..0000000 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.streaming; - -import org.apache.beam.runners.flink.FlinkTestPipeline; -import com.google.api.services.bigquery.model.TableRow; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Count; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions; -import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.base.Joiner; -import org.apache.flink.streaming.util.StreamingProgramTestBase; -import org.joda.time.Duration; -import org.joda.time.Instant; - -import java.io.Serializable; -import java.util.Arrays; - - -/** - * Session window test - */ -public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable { - protected String resultPath; - - public TopWikipediaSessionsITCase(){ - } - - static final String[] EXPECTED_RESULT = new String[] { - "user: user1 value:3", - "user: user1 value:1", - "user: user2 value:4", - "user: user2 value:6", - "user: user3 value:7", - "user: user3 value:2" - }; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } - - @Override - protected void testProgram() throws Exception { - - Pipeline p = FlinkTestPipeline.createForStreaming(); - - Long now = (System.currentTimeMillis() + 10000) / 1000; - - PCollection<KV<String, Long>> output = - p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set - ("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now).set - ("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set - ("contributor_username", "user1"), new TableRow().set("timestamp", now).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set - ("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now) - .set("contributor_username", "user3")))) - - - - .apply(ParDo.of(new DoFn<TableRow, String>() { - @Override - public void processElement(ProcessContext c) throws Exception { - TableRow row = c.element(); - long timestamp = (Integer) row.get("timestamp"); - String userName = (String) row.get("contributor_username"); - if (userName != null) { - // Sets the timestamp field to be used in windowing. - c.outputWithTimestamp(userName, new Instant(timestamp * 1000L)); - } - } - })) - - .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1)))) - - .apply(Count.<String>perElement()); - - PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() { - @Override - public void processElement(ProcessContext c) throws Exception { - KV<String, Long> el = c.element(); - String out = "user: " + el.getKey() + " value:" + el.getValue(); - c.output(out); - } - })); - - format.apply(TextIO.Write.to(resultPath)); - - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java deleted file mode 100644 index e850dd6..0000000 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.util; - -import com.google.api.services.bigquery.model.TableRow; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.BigQueryIO; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.options.Validation; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; -import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; -import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.TupleTag; - -/** - * Copied from {@link com.google.cloud.dataflow.examples.JoinExamples} because the code - * is private there. - */ -public class JoinExamples { - - // A 1000-row sample of the GDELT data here: gdelt-bq:full.events. - private static final String GDELT_EVENTS_TABLE = - "clouddataflow-readonly:samples.gdelt_sample"; - // A table that maps country codes to country names. - private static final String COUNTRY_CODES = - "gdelt-bq:full.crosswalk_geocountrycodetohuman"; - - /** - * Join two collections, using country code as the key. - */ - public static PCollection<String> joinEvents(PCollection<TableRow> eventsTable, - PCollection<TableRow> countryCodes) throws Exception { - - final TupleTag<String> eventInfoTag = new TupleTag<>(); - final TupleTag<String> countryInfoTag = new TupleTag<>(); - - // transform both input collections to tuple collections, where the keys are country - // codes in both cases. - PCollection<KV<String, String>> eventInfo = eventsTable.apply( - ParDo.of(new ExtractEventDataFn())); - PCollection<KV<String, String>> countryInfo = countryCodes.apply( - ParDo.of(new ExtractCountryInfoFn())); - - // country code 'key' -> CGBKR (<event info>, <country name>) - PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple - .of(eventInfoTag, eventInfo) - .and(countryInfoTag, countryInfo) - .apply(CoGroupByKey.<String>create()); - - // Process the CoGbkResult elements generated by the CoGroupByKey transform. - // country code 'key' -> string of <event info>, <country name> - PCollection<KV<String, String>> finalResultCollection = - kvpCollection.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { - @Override - public void processElement(ProcessContext c) { - KV<String, CoGbkResult> e = c.element(); - CoGbkResult val = e.getValue(); - String countryCode = e.getKey(); - String countryName; - countryName = e.getValue().getOnly(countryInfoTag, "Kostas"); - for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) { - // Generate a string that combines information from both collection values - c.output(KV.of(countryCode, "Country name: " + countryName - + ", Event info: " + eventInfo)); - } - } - })); - - // write to GCS - return finalResultCollection - .apply(ParDo.of(new DoFn<KV<String, String>, String>() { - @Override - public void processElement(ProcessContext c) { - String outputstring = "Country code: " + c.element().getKey() - + ", " + c.element().getValue(); - c.output(outputstring); - } - })); - } - - /** - * Examines each row (event) in the input table. Output a KV with the key the country - * code of the event, and the value a string encoding event information. - */ - static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> { - @Override - public void processElement(ProcessContext c) { - TableRow row = c.element(); - String countryCode = (String) row.get("ActionGeo_CountryCode"); - String sqlDate = (String) row.get("SQLDATE"); - String actor1Name = (String) row.get("Actor1Name"); - String sourceUrl = (String) row.get("SOURCEURL"); - String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", url: " + sourceUrl; - c.output(KV.of(countryCode, eventInfo)); - } - } - - - /** - * Examines each row (country info) in the input table. Output a KV with the key the country - * code, and the value the country name. - */ - static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> { - @Override - public void processElement(ProcessContext c) { - TableRow row = c.element(); - String countryCode = (String) row.get("FIPSCC"); - String countryName = (String) row.get("HumanName"); - c.output(KV.of(countryCode, countryName)); - } - } - - - /** - * Options supported by {@link JoinExamples}. - * <p> - * Inherits standard configuration options. - */ - private interface Options extends PipelineOptions { - @Description("Path of the file to write to") - @Validation.Required - String getOutput(); - void setOutput(String value); - } - - public static void main(String[] args) throws Exception { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - Pipeline p = Pipeline.create(options); - // the following two 'applys' create multiple inputs to our pipeline, one for each - // of our two input sources. - PCollection<TableRow> eventsTable = p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE)); - PCollection<TableRow> countryCodes = p.apply(BigQueryIO.Read.from(COUNTRY_CODES)); - PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes); - formattedResults.apply(TextIO.Write.to(options.getOutput())); - p.run(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/resources/log4j-test.properties b/runners/flink/src/test/resources/log4j-test.properties deleted file mode 100644 index 4c74d85..0000000 --- a/runners/flink/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,27 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -# Set root logger level to OFF to not flood build logs -# set manually to INFO for debugging purposes -log4j.rootLogger=OFF, testlogger - -# A1 is set to be a ConsoleAppender. -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/pom.xml ---------------------------------------------------------------------- diff --git a/runners/pom.xml b/runners/pom.xml index 980900b..4f07ceb 100644 --- a/runners/pom.xml +++ b/runners/pom.xml @@ -1,43 +1,43 @@ <?xml version="1.0" encoding="UTF-8"?> <!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. --> <project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" - xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> - <modelVersion>4.0.0</modelVersion> + <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>com.google.cloud.dataflow</groupId> - <artifactId>google-cloud-dataflow-java-sdk-parent</artifactId> - <version>1.6.0-SNAPSHOT</version> - </parent> - - <groupId>org.apache.beam</groupId> - <artifactId>runners</artifactId> + <parent> + <groupId>com.google.cloud.dataflow</groupId> + <artifactId>google-cloud-dataflow-java-sdk-parent</artifactId> <version>1.6.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.beam</groupId> + <artifactId>runners-parent</artifactId> + <version>1.6.0-SNAPSHOT</version> - <packaging>pom</packaging> + <packaging>pom</packaging> - <name>Beam Runners</name> + <name>Beam Runners</name> - <modules> - <module>flink</module> - </modules> + <modules> + <module>flink</module> + </modules> </project>
