Fixes Void handling

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a4e9b09f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a4e9b09f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a4e9b09f

Branch: refs/heads/master
Commit: a4e9b09fb4690b4e110afa6bc5744b3646980115
Parents: 067837f
Author: kl0u <[email protected]>
Authored: Mon Feb 29 16:26:12 2016 +0100
Committer: Davor Bonaci <[email protected]>
Committed: Fri Mar 4 10:04:23 2016 -0800

----------------------------------------------------------------------
 .../streaming/FlinkGroupByKeyWrapper.java       |   8 +-
 .../flink/dataflow/FlinkTestPipeline.java       |   4 +-
 .../dataflow/streaming/GroupByNullKeyTest.java  | 121 +++++++++++++++++++
 3 files changed, 128 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4e9b09f/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
index b0d9e48..24f6d40 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
@@ -16,9 +16,11 @@
 package com.dataartisans.flink.dataflow.translation.wrappers.streaming;
 
 import com.dataartisans.flink.dataflow.translation.types.CoderTypeInformation;
+import 
com.dataartisans.flink.dataflow.translation.types.VoidCoderTypeSerializer;
 import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.util.*;
+import com.google.cloud.dataflow.sdk.coders.VoidCoder;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.KV;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -42,13 +44,15 @@ public class FlinkGroupByKeyWrapper {
        public static <K, V> KeyedStream<WindowedValue<KV<K, V>>, K> 
groupStreamByKey(DataStream<WindowedValue<KV<K, V>>> inputDataStream, 
KvCoder<K, V> inputKvCoder) {
                final Coder<K> keyCoder = inputKvCoder.getKeyCoder();
                final TypeInformation<K> keyTypeInfo = new 
CoderTypeInformation<>(keyCoder);
+               final boolean isKeyVoid = keyCoder instanceof VoidCoder;
 
                return inputDataStream.keyBy(
                                new KeySelectorWithQueryableResultType<K, V>() {
 
                                        @Override
                                        public K getKey(WindowedValue<KV<K, V>> 
value) throws Exception {
-                                               return 
value.getValue().getKey();
+                                               return isKeyVoid ? (K) 
VoidCoderTypeSerializer.VoidValue.INSTANCE :
+                                                               
value.getValue().getKey();
                                        }
 
                                        @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4e9b09f/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
index 56af3f1..59c3b69 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
@@ -59,9 +59,7 @@ public class FlinkTestPipeline extends Pipeline {
         */
        private static FlinkTestPipeline create(boolean streaming) {
                FlinkPipelineRunner flinkRunner = 
FlinkPipelineRunner.createForTest(streaming);
-               FlinkPipelineOptions pipelineOptions = 
flinkRunner.getPipelineOptions();
-               pipelineOptions.setStreaming(streaming);
-               return new FlinkTestPipeline(flinkRunner, pipelineOptions);
+               return new FlinkTestPipeline(flinkRunner, 
flinkRunner.getPipelineOptions());
        }
 
        private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> 
runner,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4e9b09f/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
new file mode 100644
index 0000000..5a412aa
--- /dev/null
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.streaming;
+
+import com.dataartisans.flink.dataflow.FlinkTestPipeline;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Joiner;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+public class GroupByNullKeyTest extends StreamingProgramTestBase implements 
Serializable {
+
+
+       protected String resultPath;
+
+       static final String[] EXPECTED_RESULT = new String[] {
+                       "k: null v: user1 user1 user1 user2 user2 user2 user2 
user3"
+       };
+
+       public GroupByNullKeyTest(){
+       }
+
+       @Override
+       protected void preSubmit() throws Exception {
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
+       }
+
+       public static class ExtractUserAndTimestamp extends DoFn<KV<Integer, 
String>, String> {
+               private static final long serialVersionUID = 0;
+
+               @Override
+               public void processElement(ProcessContext c) {
+                       KV<Integer, String> record = c.element();
+                       long now = System.currentTimeMillis();
+                       int timestamp = record.getKey();
+                       String userName = record.getValue();
+                       if (userName != null) {
+                               // Sets the implicit timestamp field to be used 
in windowing.
+                               c.outputWithTimestamp(userName, new 
Instant(timestamp + now));
+                       }
+               }
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+
+               Pipeline p = FlinkTestPipeline.createForStreaming();
+
+               PCollection<String> output =
+                       p.apply(Create.of(Arrays.asList(
+                                       KV.<Integer, String>of(0, "user1"),
+                                       KV.<Integer, String>of(1, "user1"),
+                                       KV.<Integer, String>of(2, "user1"),
+                                       KV.<Integer, String>of(10, "user2"),
+                                       KV.<Integer, String>of(1, "user2"),
+                                       KV.<Integer, String>of(15000, "user2"),
+                                       KV.<Integer, String>of(12000, "user2"),
+                                       KV.<Integer, String>of(25000, 
"user3"))))
+                                       .apply(ParDo.of(new 
ExtractUserAndTimestamp()))
+                                       
.apply(Window.<String>into(FixedWindows.of(Duration.standardHours(1)))
+                                                       
.triggering(AfterWatermark.pastEndOfWindow())
+                                                       
.withAllowedLateness(Duration.ZERO)
+                                                       .discardingFiredPanes())
+
+                                       .apply(ParDo.of(new DoFn<String, 
KV<Void, String>>() {
+                                               @Override
+                                               public void 
processElement(ProcessContext c) throws Exception {
+                                                       String elem = 
c.element();
+                                                       c.output(KV.<Void, 
String>of((Void) null, elem));
+                                               }
+                                       }))
+                                       .apply(GroupByKey.<Void, 
String>create())
+                                       .apply(ParDo.of(new DoFn<KV<Void, 
Iterable<String>>, String>() {
+                                               @Override
+                                               public void 
processElement(ProcessContext c) throws Exception {
+                                                       KV<Void, 
Iterable<String>> elem = c.element();
+                                                       StringBuilder str = new 
StringBuilder();
+                                                       str.append("k: " + 
elem.getKey() + " v:");
+                                                       for (String v : 
elem.getValue()) {
+                                                               str.append(" " 
+ v);
+                                                       }
+                                                       
c.output(str.toString());
+                                               }
+                                       }));
+               output.apply(TextIO.Write.to(resultPath));
+               p.run();
+       }
+}

Reply via email to