http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java new file mode 100644 index 0000000..e39b81d --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java @@ -0,0 +1,63 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.VoidCoder; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import org.apache.flink.test.util.JavaProgramTestBase; + +import java.io.Serializable; + +public class MaybeEmptyTestITCase extends JavaProgramTestBase implements Serializable { + + protected String resultPath; + + protected final String expected = "test"; + + public MaybeEmptyTestITCase() { + } + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(expected, resultPath); + } + + @Override + protected void testProgram() throws Exception { + + Pipeline p = FlinkTestPipeline.createForBatch(); + + p.apply(Create.of((Void) null)).setCoder(VoidCoder.of()) + .apply(ParDo.of( + new DoFn<Void, String>() { + @Override + public void processElement(DoFn<Void, String>.ProcessContext c) { + c.output(expected); + } + })).apply(TextIO.Write.to(resultPath)); + p.run(); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java new file mode 100644 index 0000000..08e5323 --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java @@ -0,0 +1,98 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollectionTuple; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import com.google.cloud.dataflow.sdk.values.TupleTagList; +import com.google.common.base.Joiner; +import org.apache.flink.test.util.JavaProgramTestBase; + +import java.io.Serializable; + +public class ParDoMultiOutputITCase extends JavaProgramTestBase implements Serializable { + + private String resultPath; + + private static String[] expectedWords = {"MAAA", "MAAFOOO"}; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on("\n").join(expectedWords), resultPath); + } + + @Override + protected void testProgram() throws Exception { + Pipeline p = FlinkTestPipeline.createForBatch(); + + PCollection<String> words = p.apply(Create.of("Hello", "Whatupmyman", "hey", "SPECIALthere", "MAAA", "MAAFOOO")); + + // Select words whose length is below a cut off, + // plus the lengths of words that are above the cut off. + // Also select words starting with "MARKER". + final int wordLengthCutOff = 3; + // Create tags to use for the main and side outputs. + final TupleTag<String> wordsBelowCutOffTag = new TupleTag<String>(){}; + final TupleTag<Integer> wordLengthsAboveCutOffTag = new TupleTag<Integer>(){}; + final TupleTag<String> markedWordsTag = new TupleTag<String>(){}; + + PCollectionTuple results = + words.apply(ParDo + .withOutputTags(wordsBelowCutOffTag, TupleTagList.of(wordLengthsAboveCutOffTag) + .and(markedWordsTag)) + .of(new DoFn<String, String>() { + final TupleTag<String> specialWordsTag = new TupleTag<String>() { + }; + + public void processElement(ProcessContext c) { + String word = c.element(); + if (word.length() <= wordLengthCutOff) { + c.output(word); + } else { + c.sideOutput(wordLengthsAboveCutOffTag, word.length()); + } + if (word.startsWith("MAA")) { + c.sideOutput(markedWordsTag, word); + } + + if (word.startsWith("SPECIAL")) { + c.sideOutput(specialWordsTag, word); + } + } + })); + + // Extract the PCollection results, by tag. + PCollection<String> wordsBelowCutOff = results.get(wordsBelowCutOffTag); + PCollection<Integer> wordLengthsAboveCutOff = results.get + (wordLengthsAboveCutOffTag); + PCollection<String> markedWords = results.get(markedWordsTag); + + markedWords.apply(TextIO.Write.to(resultPath)); + + p.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java new file mode 100644 index 0000000..7202417 --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java @@ -0,0 +1,163 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.io.BoundedSource; +import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import org.apache.flink.test.util.JavaProgramTestBase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + + +public class ReadSourceITCase extends JavaProgramTestBase { + + protected String resultPath; + + public ReadSourceITCase(){ + } + + static final String[] EXPECTED_RESULT = new String[] { + "1", "2", "3", "4", "5", "6", "7", "8", "9"}; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } + + @Override + protected void testProgram() throws Exception { + runProgram(resultPath); + } + + private static void runProgram(String resultPath) { + + Pipeline p = FlinkTestPipeline.createForBatch(); + + PCollection<String> result = p + .apply(Read.from(new ReadSource(1, 10))) + .apply(ParDo.of(new DoFn<Integer, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(c.element().toString()); + } + })); + + result.apply(TextIO.Write.to(resultPath)); + p.run(); + } + + + private static class ReadSource extends BoundedSource<Integer> { + final int from; + final int to; + + ReadSource(int from, int to) { + this.from = from; + this.to = to; + } + + @Override + public List<ReadSource> splitIntoBundles(long desiredShardSizeBytes, PipelineOptions options) + throws Exception { + List<ReadSource> res = new ArrayList<>(); + FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class); + int numWorkers = flinkOptions.getParallelism(); + Preconditions.checkArgument(numWorkers > 0, "Number of workers should be larger than 0."); + + float step = 1.0f * (to - from) / numWorkers; + for (int i = 0; i < numWorkers; ++i) { + res.add(new ReadSource(Math.round(from + i * step), Math.round(from + (i + 1) * step))); + } + return res; + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + return 8 * (to - from); + } + + @Override + public boolean producesSortedKeys(PipelineOptions options) throws Exception { + return true; + } + + @Override + public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException { + return new RangeReader(this); + } + + @Override + public void validate() {} + + @Override + public Coder<Integer> getDefaultOutputCoder() { + return BigEndianIntegerCoder.of(); + } + + private class RangeReader extends BoundedReader<Integer> { + private int current; + + public RangeReader(ReadSource source) { + this.current = source.from - 1; + } + + @Override + public boolean start() throws IOException { + return true; + } + + @Override + public boolean advance() throws IOException { + current++; + return (current < to); + } + + @Override + public Integer getCurrent() { + return current; + } + + @Override + public void close() throws IOException { + // Nothing + } + + @Override + public BoundedSource<Integer> getCurrentSource() { + return ReadSource.this; + } + } + } +} + + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java new file mode 100644 index 0000000..dc82d7d --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java @@ -0,0 +1,68 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.base.Joiner; +import org.apache.flink.test.util.JavaProgramTestBase; + +import java.util.Collections; +import java.util.List; + + +public class RemoveDuplicatesEmptyITCase extends JavaProgramTestBase { + + protected String resultPath; + + public RemoveDuplicatesEmptyITCase(){ + } + + static final String[] EXPECTED_RESULT = new String[] {}; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } + + @Override + protected void testProgram() throws Exception { + + List<String> strings = Collections.emptyList(); + + Pipeline p = FlinkTestPipeline.createForBatch(); + + PCollection<String> input = + p.apply(Create.of(strings)) + .setCoder(StringUtf8Coder.of()); + + PCollection<String> output = + input.apply(RemoveDuplicates.<String>create()); + + output.apply(TextIO.Write.to(resultPath)); + p.run(); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java new file mode 100644 index 0000000..78b48b5 --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java @@ -0,0 +1,69 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.base.Joiner; +import org.apache.flink.test.util.JavaProgramTestBase; + +import java.util.Arrays; +import java.util.List; + + +public class RemoveDuplicatesITCase extends JavaProgramTestBase { + + protected String resultPath; + + public RemoveDuplicatesITCase(){ + } + + static final String[] EXPECTED_RESULT = new String[] { + "k1", "k5", "k2", "k3"}; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } + + @Override + protected void testProgram() throws Exception { + + List<String> strings = Arrays.asList("k1", "k5", "k5", "k2", "k1", "k2", "k3"); + + Pipeline p = FlinkTestPipeline.createForBatch(); + + PCollection<String> input = + p.apply(Create.of(strings)) + .setCoder(StringUtf8Coder.of()); + + PCollection<String> output = + input.apply(RemoveDuplicates.<String>create()); + + output.apply(TextIO.Write.to(resultPath)); + p.run(); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java new file mode 100644 index 0000000..5cd7d78 --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java @@ -0,0 +1,67 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.View; +import com.google.cloud.dataflow.sdk.values.PCollectionView; +import org.apache.flink.test.util.JavaProgramTestBase; + +import java.io.Serializable; + +public class SideInputITCase extends JavaProgramTestBase implements Serializable { + + private static final String expected = "Hello!"; + + protected String resultPath; + + @Override + protected void testProgram() throws Exception { + + + Pipeline p = FlinkTestPipeline.createForBatch(); + + + final PCollectionView<String> sidesInput = p + .apply(Create.of(expected)) + .apply(View.<String>asSingleton()); + + p.apply(Create.of("bli")) + .apply(ParDo.of(new DoFn<String, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + String s = c.sideInput(sidesInput); + c.output(s); + } + }).withSideInputs(sidesInput)).apply(TextIO.Write.to(resultPath)); + + p.run(); + } + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(expected, resultPath); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java new file mode 100644 index 0000000..ceb0a3f --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java @@ -0,0 +1,76 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import org.apache.beam.runners.flink.examples.TFIDF; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.Keys; +import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.base.Joiner; +import org.apache.flink.test.util.JavaProgramTestBase; + +import java.net.URI; + + +public class TfIdfITCase extends JavaProgramTestBase { + + protected String resultPath; + + public TfIdfITCase(){ + } + + static final String[] EXPECTED_RESULT = new String[] { + "a", "m", "n", "b", "c", "d"}; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } + + @Override + protected void testProgram() throws Exception { + + Pipeline pipeline = FlinkTestPipeline.createForBatch(); + + pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); + + PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline + .apply(Create.of( + KV.of(new URI("x"), "a b c d"), + KV.of(new URI("y"), "a b c"), + KV.of(new URI("z"), "a m n"))) + .apply(new TFIDF.ComputeTfIdf()); + + PCollection<String> words = wordToUriAndTfIdf + .apply(Keys.<String>create()) + .apply(RemoveDuplicates.<String>create()); + + words.apply(TextIO.Write.to(resultPath)); + + pipeline.run(); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java new file mode 100644 index 0000000..c2b6fdd --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java @@ -0,0 +1,74 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import org.apache.beam.runners.flink.examples.WordCount; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.MapElements; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.base.Joiner; +import org.apache.flink.test.util.JavaProgramTestBase; + +import java.util.Arrays; +import java.util.List; + + +public class WordCountITCase extends JavaProgramTestBase { + + protected String resultPath; + + public WordCountITCase(){ + } + + static final String[] WORDS_ARRAY = new String[] { + "hi there", "hi", "hi sue bob", + "hi sue", "", "bob hi"}; + + static final List<String> WORDS = Arrays.asList(WORDS_ARRAY); + + static final String[] COUNTS_ARRAY = new String[] { + "hi: 5", "there: 1", "sue: 2", "bob: 2"}; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(COUNTS_ARRAY), resultPath); + } + + @Override + protected void testProgram() throws Exception { + + Pipeline p = FlinkTestPipeline.createForBatch(); + + PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); + + input + .apply(new WordCount.CountWords()) + .apply(MapElements.via(new WordCount.FormatAsTextFn())) + .apply(TextIO.Write.to(resultPath)); + + p.run(); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java new file mode 100644 index 0000000..d78434b --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java @@ -0,0 +1,136 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.Count; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; +import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; +import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import com.google.common.base.Joiner; +import org.apache.flink.test.util.JavaProgramTestBase; + + +public class WordCountJoin2ITCase extends JavaProgramTestBase { + + static final String[] WORDS_1 = new String[] { + "hi there", "hi", "hi sue bob", + "hi sue", "", "bob hi"}; + + static final String[] WORDS_2 = new String[] { + "hi tim", "beauty", "hooray sue bob", + "hi there", "", "please say hi"}; + + static final String[] RESULTS = new String[] { + "beauty -> Tag1: Tag2: 1", + "bob -> Tag1: 2 Tag2: 1", + "hi -> Tag1: 5 Tag2: 3", + "hooray -> Tag1: Tag2: 1", + "please -> Tag1: Tag2: 1", + "say -> Tag1: Tag2: 1", + "sue -> Tag1: 2 Tag2: 1", + "there -> Tag1: 1 Tag2: 1", + "tim -> Tag1: Tag2: 1" + }; + + static final TupleTag<Long> tag1 = new TupleTag<>("Tag1"); + static final TupleTag<Long> tag2 = new TupleTag<>("Tag2"); + + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath); + } + + @Override + protected void testProgram() throws Exception { + Pipeline p = FlinkTestPipeline.createForBatch(); + + /* Create two PCollections and join them */ + PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1)) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Count.<String>perElement()); + + PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2)) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Count.<String>perElement()); + + /* CoGroup the two collections */ + PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple + .of(tag1, occurences1) + .and(tag2, occurences2) + .apply(CoGroupByKey.<String>create()); + + /* Format output */ + mergedOccurences.apply(ParDo.of(new FormatCountsFn())) + .apply(TextIO.Write.named("test").to(resultPath)); + + p.run(); + } + + + static class ExtractWordsFn extends DoFn<String, String> { + + @Override + public void startBundle(Context c) { + } + + @Override + public void processElement(ProcessContext c) { + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> { + @Override + public void processElement(ProcessContext c) { + CoGbkResult value = c.element().getValue(); + String key = c.element().getKey(); + String countTag1 = tag1.getId() + ": "; + String countTag2 = tag2.getId() + ": "; + for (Long count : value.getAll(tag1)) { + countTag1 += count + " "; + } + for (Long count : value.getAll(tag2)) { + countTag2 += count; + } + c.output(key + " -> " + countTag1 + countTag2); + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java new file mode 100644 index 0000000..0836279 --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java @@ -0,0 +1,154 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.Count; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; +import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; +import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import com.google.common.base.Joiner; +import org.apache.flink.test.util.JavaProgramTestBase; + + +public class WordCountJoin3ITCase extends JavaProgramTestBase { + + static final String[] WORDS_1 = new String[] { + "hi there", "hi", "hi sue bob", + "hi sue", "", "bob hi"}; + + static final String[] WORDS_2 = new String[] { + "hi tim", "beauty", "hooray sue bob", + "hi there", "", "please say hi"}; + + static final String[] WORDS_3 = new String[] { + "hi stephan", "beauty", "hooray big fabian", + "hi yo", "", "please say hi"}; + + static final String[] RESULTS = new String[] { + "beauty -> Tag1: Tag2: 1 Tag3: 1", + "bob -> Tag1: 2 Tag2: 1 Tag3: ", + "hi -> Tag1: 5 Tag2: 3 Tag3: 3", + "hooray -> Tag1: Tag2: 1 Tag3: 1", + "please -> Tag1: Tag2: 1 Tag3: 1", + "say -> Tag1: Tag2: 1 Tag3: 1", + "sue -> Tag1: 2 Tag2: 1 Tag3: ", + "there -> Tag1: 1 Tag2: 1 Tag3: ", + "tim -> Tag1: Tag2: 1 Tag3: ", + "stephan -> Tag1: Tag2: Tag3: 1", + "yo -> Tag1: Tag2: Tag3: 1", + "fabian -> Tag1: Tag2: Tag3: 1", + "big -> Tag1: Tag2: Tag3: 1" + }; + + static final TupleTag<Long> tag1 = new TupleTag<>("Tag1"); + static final TupleTag<Long> tag2 = new TupleTag<>("Tag2"); + static final TupleTag<Long> tag3 = new TupleTag<>("Tag3"); + + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath); + } + + @Override + protected void testProgram() throws Exception { + + Pipeline p = FlinkTestPipeline.createForBatch(); + + /* Create two PCollections and join them */ + PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1)) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Count.<String>perElement()); + + PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2)) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Count.<String>perElement()); + + PCollection<KV<String,Long>> occurences3 = p.apply(Create.of(WORDS_3)) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Count.<String>perElement()); + + /* CoGroup the two collections */ + PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple + .of(tag1, occurences1) + .and(tag2, occurences2) + .and(tag3, occurences3) + .apply(CoGroupByKey.<String>create()); + + /* Format output */ + mergedOccurences.apply(ParDo.of(new FormatCountsFn())) + .apply(TextIO.Write.named("test").to(resultPath)); + + p.run(); + } + + + static class ExtractWordsFn extends DoFn<String, String> { + + @Override + public void startBundle(Context c) { + } + + @Override + public void processElement(ProcessContext c) { + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> { + @Override + public void processElement(ProcessContext c) { + CoGbkResult value = c.element().getValue(); + String key = c.element().getKey(); + String countTag1 = tag1.getId() + ": "; + String countTag2 = tag2.getId() + ": "; + String countTag3 = tag3.getId() + ": "; + for (Long count : value.getAll(tag1)) { + countTag1 += count + " "; + } + for (Long count : value.getAll(tag2)) { + countTag2 += count + " "; + } + for (Long count : value.getAll(tag3)) { + countTag3 += count; + } + c.output(key + " -> " + countTag1 + countTag2 + countTag3); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java new file mode 100644 index 0000000..497a5bb --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java @@ -0,0 +1,156 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.flink; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.io.Sink; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.Write; +import com.google.common.base.Joiner; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.test.util.JavaProgramTestBase; + +import java.io.File; +import java.io.PrintWriter; +import java.net.URI; + +import static org.junit.Assert.*; + +/** + * Tests the translation of custom Write.Bound sinks. + */ +public class WriteSinkITCase extends JavaProgramTestBase { + + protected String resultPath; + + public WriteSinkITCase(){ + } + + static final String[] EXPECTED_RESULT = new String[] { + "Joe red 3", "Mary blue 4", "Max yellow 23"}; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } + + @Override + protected void testProgram() throws Exception { + runProgram(resultPath); + } + + private static void runProgram(String resultPath) { + Pipeline p = FlinkTestPipeline.createForBatch(); + + p.apply(Create.of(EXPECTED_RESULT)).setCoder(StringUtf8Coder.of()) + .apply("CustomSink", Write.to(new MyCustomSink(resultPath))); + + p.run(); + } + + /** + * Simple custom sink which writes to a file. + */ + private static class MyCustomSink extends Sink<String> { + + private final String resultPath; + + public MyCustomSink(String resultPath) { + this.resultPath = resultPath; + } + + @Override + public void validate(PipelineOptions options) { + assertNotNull(options); + } + + @Override + public WriteOperation<String, ?> createWriteOperation(PipelineOptions options) { + return new MyWriteOperation(); + } + + private class MyWriteOperation extends WriteOperation<String, String> { + + @Override + public Coder<String> getWriterResultCoder() { + return StringUtf8Coder.of(); + } + + @Override + public void initialize(PipelineOptions options) throws Exception { + + } + + @Override + public void finalize(Iterable<String> writerResults, PipelineOptions options) throws Exception { + + } + + @Override + public Writer<String, String> createWriter(PipelineOptions options) throws Exception { + return new MyWriter(); + } + + @Override + public Sink<String> getSink() { + return MyCustomSink.this; + } + + /** + * Simple Writer which writes to a file. + */ + private class MyWriter extends Writer<String, String> { + + private PrintWriter internalWriter; + + @Override + public void open(String uId) throws Exception { + Path path = new Path(resultPath + "/" + uId); + FileSystem.get(new URI("file:///")).create(path, false); + internalWriter = new PrintWriter(new File(path.toUri())); + } + + @Override + public void write(String value) throws Exception { + internalWriter.println(value); + } + + @Override + public String close() throws Exception { + internalWriter.close(); + return resultPath; + } + + @Override + public WriteOperation<String, String> getWriteOperation() { + return MyWriteOperation.this; + } + } + } + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java new file mode 100644 index 0000000..27ddc83 --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java @@ -0,0 +1,506 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import org.apache.beam.runners.flink.FlinkTestPipeline; +import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkGroupAlsoByWindowWrapper; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.KvCoder; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.coders.VarIntCoder; +import com.google.cloud.dataflow.sdk.transforms.Combine; +import com.google.cloud.dataflow.sdk.transforms.Sum; +import com.google.cloud.dataflow.sdk.transforms.windowing.*; +import com.google.cloud.dataflow.sdk.util.UserCodeException; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.common.base.Throwables; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; + +import java.util.Collection; +import java.util.Comparator; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class GroupAlsoByWindowTest { + + private final Combine.CombineFn combiner = new Sum.SumIntegerFn(); + + private final WindowingStrategy slidingWindowWithAfterWatermarkTriggerStrategy = + WindowingStrategy.of(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5))) + .withTrigger(AfterWatermark.pastEndOfWindow()).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES); + + private final WindowingStrategy sessionWindowingStrategy = + WindowingStrategy.of(Sessions.withGapDuration(Duration.standardSeconds(2))) + .withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) + .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.standardSeconds(100)); + + private final WindowingStrategy fixedWindowingStrategy = + WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(10))); + + private final WindowingStrategy fixedWindowWithCountTriggerStrategy = + fixedWindowingStrategy.withTrigger(AfterPane.elementCountAtLeast(5)); + + private final WindowingStrategy fixedWindowWithAfterWatermarkTriggerStrategy = + fixedWindowingStrategy.withTrigger(AfterWatermark.pastEndOfWindow()); + + private final WindowingStrategy fixedWindowWithCompoundTriggerStrategy = + fixedWindowingStrategy.withTrigger( + AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(5)) + .withLateFirings(AfterPane.elementCountAtLeast(5)).buildTrigger()); + + /** + * The default accumulation mode is + * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#DISCARDING_FIRED_PANES}. + * This strategy changes it to + * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#ACCUMULATING_FIRED_PANES} + */ + private final WindowingStrategy fixedWindowWithCompoundTriggerStrategyAcc = + fixedWindowWithCompoundTriggerStrategy + .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES); + + @Test + public void testWithLateness() throws Exception { + WindowingStrategy strategy = WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(2))) + .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.millis(1000)); + long initialTime = 0L; + Pipeline pipeline = FlinkTestPipeline.createForStreaming(); + + KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); + + FlinkGroupAlsoByWindowWrapper gbwOperaror = + FlinkGroupAlsoByWindowWrapper.createForTesting( + pipeline.getOptions(), + pipeline.getCoderRegistry(), + strategy, + inputCoder, + combiner.<String>asKeyedFn()); + + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + new OneInputStreamOperatorTestHarness<>(gbwOperaror); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processWatermark(new Watermark(initialTime + 2000)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processWatermark(new Watermark(initialTime + 4000)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 4), + new Instant(initialTime + 1), + new IntervalWindow(new Instant(0), new Instant(2000)), + PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0)) + , initialTime + 1)); + expectedOutput.add(new Watermark(initialTime + 2000)); + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 5), + new Instant(initialTime + 1999), + new IntervalWindow(new Instant(0), new Instant(2000)), + PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 1, 1)) + , initialTime + 1999)); + + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 6), + new Instant(initialTime + 1999), + new IntervalWindow(new Instant(0), new Instant(2000)), + PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 2, 2)) + , initialTime + 1999)); + expectedOutput.add(new Watermark(initialTime + 4000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testSessionWindows() throws Exception { + WindowingStrategy strategy = sessionWindowingStrategy; + + long initialTime = 0L; + Pipeline pipeline = FlinkTestPipeline.createForStreaming(); + + KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); + + FlinkGroupAlsoByWindowWrapper gbwOperaror = + FlinkGroupAlsoByWindowWrapper.createForTesting( + pipeline.getOptions(), + pipeline.getCoderRegistry(), + strategy, + inputCoder, + combiner.<String>asKeyedFn()); + + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + new OneInputStreamOperatorTestHarness<>(gbwOperaror); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 3500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 3700), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 2700), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processWatermark(new Watermark(initialTime + 6000)); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 6700), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 6800), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 8900), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 7600), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 5600), null, PaneInfo.NO_FIRING), initialTime + 20)); + + testHarness.processWatermark(new Watermark(initialTime + 12000)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 6), + new Instant(initialTime + 1), + new IntervalWindow(new Instant(1), new Instant(5700)), + PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0)) + , initialTime + 1)); + expectedOutput.add(new Watermark(initialTime + 6000)); + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 11), + new Instant(initialTime + 6700), + new IntervalWindow(new Instant(1), new Instant(10900)), + PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0)) + , initialTime + 6700)); + expectedOutput.add(new Watermark(initialTime + 12000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testSlidingWindows() throws Exception { + WindowingStrategy strategy = slidingWindowWithAfterWatermarkTriggerStrategy; + long initialTime = 0L; + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + createTestingOperatorAndState(strategy, initialTime); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + testHarness.processWatermark(new Watermark(initialTime + 25000)); + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 6), + new Instant(initialTime + 5000), + new IntervalWindow(new Instant(0), new Instant(10000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime + 5000)); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 6), + new Instant(initialTime + 1), + new IntervalWindow(new Instant(-5000), new Instant(5000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime + 1)); + expectedOutput.add(new Watermark(initialTime + 10000)); + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 11), + new Instant(initialTime + 15000), + new IntervalWindow(new Instant(10000), new Instant(20000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime + 15000)); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 3), + new Instant(initialTime + 10000), + new IntervalWindow(new Instant(5000), new Instant(15000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime + 10000)); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key2", 1), + new Instant(initialTime + 19500), + new IntervalWindow(new Instant(10000), new Instant(20000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime + 19500)); + expectedOutput.add(new Watermark(initialTime + 20000)); + + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key2", 1), + new Instant(initialTime + 20000), + /** + * this is 20000 and not 19500 because of a convention in dataflow where + * timestamps of windowed values in a window cannot be smaller than the + * end of a previous window. Checkout the documentation of the + * {@link WindowFn#getOutputTime(Instant, BoundedWindow)} + */ + new IntervalWindow(new Instant(15000), new Instant(25000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime + 20000)); + expectedOutput.add(new StreamRecord<>( + WindowedValue.of(KV.of("key1", 8), + new Instant(initialTime + 20000), + new IntervalWindow(new Instant(15000), new Instant(25000)), + PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) + , initialTime + 20000)); + expectedOutput.add(new Watermark(initialTime + 25000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testAfterWatermarkProgram() throws Exception { + WindowingStrategy strategy = fixedWindowWithAfterWatermarkTriggerStrategy; + long initialTime = 0L; + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + createTestingOperatorAndState(strategy, initialTime); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6), + new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 1)); + expectedOutput.add(new Watermark(initialTime + 10000)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11), + new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 10000)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), + new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500)); + expectedOutput.add(new Watermark(initialTime + 20000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testAfterCountProgram() throws Exception { + WindowingStrategy strategy = fixedWindowWithCountTriggerStrategy; + + long initialTime = 0L; + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + createTestingOperatorAndState(strategy, initialTime); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 1)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 10000)); + expectedOutput.add(new Watermark(initialTime + 10000)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), + new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0, 0)), initialTime + 19500)); + expectedOutput.add(new Watermark(initialTime + 20000)); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.close(); + } + + @Test + public void testCompoundProgram() throws Exception { + WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategy; + + long initialTime = 0L; + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + createTestingOperatorAndState(strategy, initialTime); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + /** + * PaneInfo are: + * isFirst (pane in window), + * isLast, Timing (of triggering), + * index (of pane in the window), + * onTimeIndex (if it the 1st,2nd, ... pane that was fired on time) + * */ + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), + new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200)); + + expectedOutput.add(new Watermark(initialTime + 10000)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), + new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), + new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500)); + + expectedOutput.add(new Watermark(initialTime + 20000)); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.close(); + } + + @Test + public void testCompoundAccumulatingPanesProgram() throws Exception { + WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategyAcc; + long initialTime = 0L; + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + createTestingOperatorAndState(strategy, initialTime); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), + new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 10), + new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6), + new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200)); + + expectedOutput.add(new Watermark(initialTime + 10000)); + + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11), + new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500)); + expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), + new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500)); + + expectedOutput.add(new Watermark(initialTime + 20000)); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); + + testHarness.close(); + } + + private OneInputStreamOperatorTestHarness createTestingOperatorAndState(WindowingStrategy strategy, long initialTime) throws Exception { + Pipeline pipeline = FlinkTestPipeline.createForStreaming(); + + KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); + + FlinkGroupAlsoByWindowWrapper gbwOperaror = + FlinkGroupAlsoByWindowWrapper.createForTesting( + pipeline.getOptions(), + pipeline.getCoderRegistry(), + strategy, + inputCoder, + combiner.<String>asKeyedFn()); + + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, WindowedValue<KV<String, Integer>>> testHarness = + new OneInputStreamOperatorTestHarness<>(gbwOperaror); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20)); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 10000), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 12100), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 14200), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 15300), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 16500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + + testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20)); + + testHarness.processWatermark(new Watermark(initialTime + 10000)); + testHarness.processWatermark(new Watermark(initialTime + 20000)); + + return testHarness; + } + + private static class ResultSortComparator implements Comparator<Object> { + @Override + public int compare(Object o1, Object o2) { + if (o1 instanceof Watermark && o2 instanceof Watermark) { + Watermark w1 = (Watermark) o1; + Watermark w2 = (Watermark) o2; + return (int) (w1.getTimestamp() - w2.getTimestamp()); + } else { + StreamRecord<WindowedValue<KV<String, Integer>>> sr0 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o1; + StreamRecord<WindowedValue<KV<String, Integer>>> sr1 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o2; + + int comparison = (int) (sr0.getValue().getTimestamp().getMillis() - sr1.getValue().getTimestamp().getMillis()); + if (comparison != 0) { + return comparison; + } + + comparison = sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey()); + if(comparison == 0) { + comparison = Integer.compare( + sr0.getValue().getValue().getValue(), + sr1.getValue().getValue().getValue()); + } + if(comparison == 0) { + Collection windowsA = sr0.getValue().getWindows(); + Collection windowsB = sr1.getValue().getWindows(); + + if(windowsA.size() != 1 || windowsB.size() != 1) { + throw new IllegalStateException("A value cannot belong to more than one windows after grouping."); + } + + BoundedWindow windowA = (BoundedWindow) windowsA.iterator().next(); + BoundedWindow windowB = (BoundedWindow) windowsB.iterator().next(); + comparison = Long.compare(windowA.maxTimestamp().getMillis(), windowB.maxTimestamp().getMillis()); + } + return comparison; + } + } + } + + private <T> WindowedValue<T> makeWindowedValue(WindowingStrategy strategy, + T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { + final Instant inputTimestamp = timestamp; + final WindowFn windowFn = strategy.getWindowFn(); + + if (timestamp == null) { + timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + if (windows == null) { + try { + windows = windowFn.assignWindows(windowFn.new AssignContext() { + @Override + public Object element() { + throw new UnsupportedOperationException( + "WindowFn attempted to access input element when none was available"); + } + + @Override + public Instant timestamp() { + if (inputTimestamp == null) { + throw new UnsupportedOperationException( + "WindowFn attempted to access input timestamp when none was available"); + } + return inputTimestamp; + } + + @Override + public Collection<? extends BoundedWindow> windows() { + throw new UnsupportedOperationException( + "WindowFn attempted to access input windows when none were available"); + } + }); + } catch (Exception e) { + throw UserCodeException.wrap(e); + } + } + + return WindowedValue.of(output, timestamp, windows, pane); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java new file mode 100644 index 0000000..80d78b9 --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java @@ -0,0 +1,121 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import org.apache.beam.runners.flink.FlinkTestPipeline; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.GroupByKey; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.base.Joiner; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.joda.time.Duration; +import org.joda.time.Instant; + +import java.io.Serializable; +import java.util.Arrays; + +public class GroupByNullKeyTest extends StreamingProgramTestBase implements Serializable { + + + protected String resultPath; + + static final String[] EXPECTED_RESULT = new String[] { + "k: null v: user1 user1 user1 user2 user2 user2 user2 user3" + }; + + public GroupByNullKeyTest(){ + } + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } + + public static class ExtractUserAndTimestamp extends DoFn<KV<Integer, String>, String> { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + KV<Integer, String> record = c.element(); + long now = System.currentTimeMillis(); + int timestamp = record.getKey(); + String userName = record.getValue(); + if (userName != null) { + // Sets the implicit timestamp field to be used in windowing. + c.outputWithTimestamp(userName, new Instant(timestamp + now)); + } + } + } + + @Override + protected void testProgram() throws Exception { + + Pipeline p = FlinkTestPipeline.createForStreaming(); + + PCollection<String> output = + p.apply(Create.of(Arrays.asList( + KV.<Integer, String>of(0, "user1"), + KV.<Integer, String>of(1, "user1"), + KV.<Integer, String>of(2, "user1"), + KV.<Integer, String>of(10, "user2"), + KV.<Integer, String>of(1, "user2"), + KV.<Integer, String>of(15000, "user2"), + KV.<Integer, String>of(12000, "user2"), + KV.<Integer, String>of(25000, "user3")))) + .apply(ParDo.of(new ExtractUserAndTimestamp())) + .apply(Window.<String>into(FixedWindows.of(Duration.standardHours(1))) + .triggering(AfterWatermark.pastEndOfWindow()) + .withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()) + + .apply(ParDo.of(new DoFn<String, KV<Void, String>>() { + @Override + public void processElement(ProcessContext c) throws Exception { + String elem = c.element(); + c.output(KV.<Void, String>of((Void) null, elem)); + } + })) + .apply(GroupByKey.<Void, String>create()) + .apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + KV<Void, Iterable<String>> elem = c.element(); + StringBuilder str = new StringBuilder(); + str.append("k: " + elem.getKey() + " v:"); + for (String v : elem.getValue()) { + str.append(" " + v); + } + c.output(str.toString()); + } + })); + output.apply(TextIO.Write.to(resultPath)); + p.run(); + } +}
