Fixes Void handling
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a4e9b09f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a4e9b09f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a4e9b09f Branch: refs/heads/master Commit: a4e9b09fb4690b4e110afa6bc5744b3646980115 Parents: 067837f Author: kl0u <[email protected]> Authored: Mon Feb 29 16:26:12 2016 +0100 Committer: Davor Bonaci <[email protected]> Committed: Fri Mar 4 10:04:23 2016 -0800 ---------------------------------------------------------------------- .../streaming/FlinkGroupByKeyWrapper.java | 8 +- .../flink/dataflow/FlinkTestPipeline.java | 4 +- .../dataflow/streaming/GroupByNullKeyTest.java | 121 +++++++++++++++++++ 3 files changed, 128 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4e9b09f/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java index b0d9e48..24f6d40 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java @@ -16,9 +16,11 @@ package com.dataartisans.flink.dataflow.translation.wrappers.streaming; import com.dataartisans.flink.dataflow.translation.types.CoderTypeInformation; +import com.dataartisans.flink.dataflow.translation.types.VoidCoderTypeSerializer; import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.KvCoder; -import com.google.cloud.dataflow.sdk.util.*; +import com.google.cloud.dataflow.sdk.coders.VoidCoder; +import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.KV; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; @@ -42,13 +44,15 @@ public class FlinkGroupByKeyWrapper { public static <K, V> KeyedStream<WindowedValue<KV<K, V>>, K> groupStreamByKey(DataStream<WindowedValue<KV<K, V>>> inputDataStream, KvCoder<K, V> inputKvCoder) { final Coder<K> keyCoder = inputKvCoder.getKeyCoder(); final TypeInformation<K> keyTypeInfo = new CoderTypeInformation<>(keyCoder); + final boolean isKeyVoid = keyCoder instanceof VoidCoder; return inputDataStream.keyBy( new KeySelectorWithQueryableResultType<K, V>() { @Override public K getKey(WindowedValue<KV<K, V>> value) throws Exception { - return value.getValue().getKey(); + return isKeyVoid ? (K) VoidCoderTypeSerializer.VoidValue.INSTANCE : + value.getValue().getKey(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4e9b09f/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java index 56af3f1..59c3b69 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java @@ -59,9 +59,7 @@ public class FlinkTestPipeline extends Pipeline { */ private static FlinkTestPipeline create(boolean streaming) { FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest(streaming); - FlinkPipelineOptions pipelineOptions = flinkRunner.getPipelineOptions(); - pipelineOptions.setStreaming(streaming); - return new FlinkTestPipeline(flinkRunner, pipelineOptions); + return new FlinkTestPipeline(flinkRunner, flinkRunner.getPipelineOptions()); } private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4e9b09f/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java new file mode 100644 index 0000000..5a412aa --- /dev/null +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java @@ -0,0 +1,121 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow.streaming; + +import com.dataartisans.flink.dataflow.FlinkTestPipeline; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.GroupByKey; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.base.Joiner; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.joda.time.Duration; +import org.joda.time.Instant; + +import java.io.Serializable; +import java.util.Arrays; + +public class GroupByNullKeyTest extends StreamingProgramTestBase implements Serializable { + + + protected String resultPath; + + static final String[] EXPECTED_RESULT = new String[] { + "k: null v: user1 user1 user1 user2 user2 user2 user2 user3" + }; + + public GroupByNullKeyTest(){ + } + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } + + public static class ExtractUserAndTimestamp extends DoFn<KV<Integer, String>, String> { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + KV<Integer, String> record = c.element(); + long now = System.currentTimeMillis(); + int timestamp = record.getKey(); + String userName = record.getValue(); + if (userName != null) { + // Sets the implicit timestamp field to be used in windowing. + c.outputWithTimestamp(userName, new Instant(timestamp + now)); + } + } + } + + @Override + protected void testProgram() throws Exception { + + Pipeline p = FlinkTestPipeline.createForStreaming(); + + PCollection<String> output = + p.apply(Create.of(Arrays.asList( + KV.<Integer, String>of(0, "user1"), + KV.<Integer, String>of(1, "user1"), + KV.<Integer, String>of(2, "user1"), + KV.<Integer, String>of(10, "user2"), + KV.<Integer, String>of(1, "user2"), + KV.<Integer, String>of(15000, "user2"), + KV.<Integer, String>of(12000, "user2"), + KV.<Integer, String>of(25000, "user3")))) + .apply(ParDo.of(new ExtractUserAndTimestamp())) + .apply(Window.<String>into(FixedWindows.of(Duration.standardHours(1))) + .triggering(AfterWatermark.pastEndOfWindow()) + .withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()) + + .apply(ParDo.of(new DoFn<String, KV<Void, String>>() { + @Override + public void processElement(ProcessContext c) throws Exception { + String elem = c.element(); + c.output(KV.<Void, String>of((Void) null, elem)); + } + })) + .apply(GroupByKey.<Void, String>create()) + .apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + KV<Void, Iterable<String>> elem = c.element(); + StringBuilder str = new StringBuilder(); + str.append("k: " + elem.getKey() + " v:"); + for (String v : elem.getValue()) { + str.append(" " + v); + } + c.output(str.toString()); + } + })); + output.apply(TextIO.Write.to(resultPath)); + p.run(); + } +}
