Add Window.Bound translator

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/85d54ab2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/85d54ab2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/85d54ab2

Branch: refs/heads/gearpump-runner
Commit: 85d54ab20f21297da25059ed7b4c8ed02e93bb74
Parents: 46d3563
Author: manuzhang <owenzhang1...@gmail.com>
Authored: Fri Dec 16 16:49:06 2016 +0800
Committer: manuzhang <owenzhang1...@gmail.com>
Committed: Fri Dec 16 16:49:06 2016 +0800

----------------------------------------------------------------------
 .../gearpump/GearpumpPipelineTranslator.java    |  3 +
 .../translators/WindowBoundTranslator.java      | 97 ++++++++++++++++++++
 2 files changed, 100 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/85d54ab2/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
index 84dfeec..20624ed 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
@@ -29,6 +29,7 @@ import 
org.apache.beam.runners.gearpump.translators.ReadBoundedTranslator;
 import org.apache.beam.runners.gearpump.translators.ReadUnboundedTranslator;
 import org.apache.beam.runners.gearpump.translators.TransformTranslator;
 import org.apache.beam.runners.gearpump.translators.TranslationContext;
+import org.apache.beam.runners.gearpump.translators.WindowBoundTranslator;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.runners.TransformHierarchy;
@@ -37,6 +38,7 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PValue;
 
 import org.apache.gearpump.util.Graph;
@@ -71,6 +73,7 @@ public class GearpumpPipelineTranslator extends 
Pipeline.PipelineVisitor.Default
     registerTransformTranslator(Flatten.FlattenPCollectionList.class,
         new FlattenPCollectionTranslator());
     registerTransformTranslator(ParDo.BoundMulti.class, new 
ParDoBoundMultiTranslator());
+    registerTransformTranslator(Window.Bound.class, new 
WindowBoundTranslator());
     registerTransformTranslator(Create.Values.class, new 
CreateValuesTranslator());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/85d54ab2/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
new file mode 100644
index 0000000..11f30fc
--- /dev/null
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import com.google.common.collect.Iterables;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
+import org.joda.time.Instant;
+
+/**
+ * {@link Window.Bound} is translated to Gearpump flatMap function.
+ */
+@SuppressWarnings("unchecked")
+public class WindowBoundTranslator<T> implements  
TransformTranslator<Window.Bound<T>> {
+
+  @Override
+  public void translate(Window.Bound<T> transform, TranslationContext context) 
{
+    PCollection<T> input = context.getInput(transform);
+    JavaStream<WindowedValue<T>> inputStream = context.getInputStream(input);
+    WindowingStrategy<?, ?> outputStrategy =
+        transform.getOutputStrategyInternal(input.getWindowingStrategy());
+    WindowFn<T, BoundedWindow> windowFn =
+        (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
+    JavaStream<WindowedValue<T>> outputStream =
+        inputStream.flatMap(new AssignWindows(windowFn), "assign_windows");
+    context.setOutputStream(context.getOutput(transform), outputStream);
+  }
+
+
+  private static class AssignWindows<T> implements
+      FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
+
+    private final WindowFn<T, BoundedWindow> fn;
+
+    AssignWindows(WindowFn<T, BoundedWindow> fn) {
+      this.fn = fn;
+    }
+
+    @Override
+    public Iterator<WindowedValue<T>> apply(final WindowedValue<T> value) {
+      List<WindowedValue<T>>  ret = new LinkedList<>();
+      try {
+        Collection<BoundedWindow> windows = fn.assignWindows(fn.new 
AssignContext() {
+          @Override
+          public T element() {
+            return value.getValue();
+          }
+
+          @Override
+          public Instant timestamp() {
+            return value.getTimestamp();
+          }
+
+          @Override
+          public BoundedWindow window() {
+            return Iterables.getOnlyElement(value.getWindows());
+          }
+        });
+        for (BoundedWindow window: windows) {
+          ret.add(WindowedValue.of(
+              value.getValue(), value.getTimestamp(), window, 
value.getPane()));
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      return ret.iterator();
+    }
+  }
+}

Reply via email to