Repository: beam
Updated Branches:
  refs/heads/master c9abd15e5 -> 89236e3b5


[BEAM-79] Support merging windows in GearpumpRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7af64720
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7af64720
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7af64720

Branch: refs/heads/master
Commit: 7af6472082cbc7f3853e87831ed4bdc72978a3a3
Parents: 4001aeb
Author: manuzhang <owenzhang1...@gmail.com>
Authored: Tue Feb 7 22:14:18 2017 +0800
Committer: manuzhang <owenzhang1...@gmail.com>
Committed: Wed Feb 15 14:59:42 2017 +0800

----------------------------------------------------------------------
 runners/gearpump/pom.xml                        |   5 -
 .../gearpump/GearpumpPipelineResult.java        |   8 +-
 .../beam/runners/gearpump/GearpumpRunner.java   |  24 +---
 .../translators/GroupByKeyTranslator.java       | 133 +++++++++++--------
 .../translators/WindowBoundTranslator.java      |  53 +-------
 .../gearpump/translators/io/GearpumpSource.java |   6 +-
 .../translators/utils/DoFnRunnerFactory.java    |   1 +
 .../translators/utils/TranslatorUtils.java      |  20 +++
 .../translators/utils/TranslatorUtilsTest.java  |  75 +++++++++++
 9 files changed, 186 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7af64720/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index 7c6fa76..6f91c50 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -93,11 +93,6 @@
                       org.apache.beam.sdk.transforms.ViewTest,
                       org.apache.beam.sdk.transforms.join.CoGroupByKeyTest
                     </exclude>
-                    <!-- merging windows is not supported in Gearpump -->
-                    <exclude>
-                      org.apache.beam.sdk.transforms.windowing.WindowingTest,
-                      org.apache.beam.sdk.util.ReshuffleTest
-                    </exclude>
                   </excludes>
                   <systemPropertyVariables>
                     <beamTestPipelineOptions>

http://git-wip-us.apache.org/repos/asf/beam/blob/7af64720/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index 9e53517..a3740b7 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.transforms.Aggregator;
 
-import org.apache.gearpump.cluster.MasterToAppMaster;
+import org.apache.gearpump.cluster.ApplicationStatus;
 import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData;
 import org.apache.gearpump.cluster.client.ClientContext;
 import org.joda.time.Duration;
@@ -105,7 +105,7 @@ public class GearpumpPipelineResult implements 
PipelineResult {
   }
 
   private State getGearpumpState() {
-    String status = null;
+    ApplicationStatus status = null;
     List<AppMasterData> apps =
         JavaConverters.<AppMasterData>seqAsJavaListConverter(
             (Seq<AppMasterData>) client.listApps().appMasters()).asJava();
@@ -114,9 +114,9 @@ public class GearpumpPipelineResult implements 
PipelineResult {
         status = app.status();
       }
     }
-    if (null == status || 
status.equals(MasterToAppMaster.AppMasterNonExist())) {
+    if (null == status || status instanceof ApplicationStatus.NONEXIST$) {
       return State.UNKNOWN;
-    } else if (status.equals(MasterToAppMaster.AppMasterActive())) {
+    } else if (status instanceof ApplicationStatus.ACTIVE$) {
       return State.RUNNING;
     } else {
       return State.STOPPED;

http://git-wip-us.apache.org/repos/asf/beam/blob/7af64720/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index 01fdb3b..9ca1eb2 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -29,13 +29,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.IdentityWindowFn;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -74,11 +69,7 @@ public class GearpumpRunner extends 
PipelineRunner<GearpumpPipelineResult> {
 
   public <OutputT extends POutput, InputT extends PInput> OutputT apply(
       PTransform<InputT, OutputT> transform, InputT input) {
-    if (Window.Bound.class.equals(transform.getClass())
-        && isNullOrIdentityWindowFn(((Window.Bound) transform).getWindowFn())) 
{
-      return (OutputT) super.apply(
-              ParDo.of(new IdentityFn()), input);
-    } else if 
(Flatten.FlattenPCollectionList.class.equals(transform.getClass())
+    if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
             && ((PCollectionList<?>) input).size() == 0) {
       return (OutputT) Pipeline.applyTransform(input.getPipeline().begin(), 
Create.of());
     } else if (Create.Values.class.equals(transform.getClass())) {
@@ -108,7 +99,7 @@ public class GearpumpRunner extends 
PipelineRunner<GearpumpPipelineResult> {
     TranslationContext translationContext = new TranslationContext(streamApp, 
options);
     GearpumpPipelineTranslator translator = new 
GearpumpPipelineTranslator(translationContext);
     translator.translate(pipeline);
-    int appId = streamApp.submit();
+    int appId = streamApp.submit().appId();
 
     return new GearpumpPipelineResult(clientContext, appId);
   }
@@ -140,15 +131,4 @@ public class GearpumpRunner extends 
PipelineRunner<GearpumpPipelineResult> {
     return config.withValue(GEARPUMP_SERIALIZERS, 
ConfigValueFactory.fromMap(serializers));
   }
 
-  private static class IdentityFn<T> extends DoFn<T, T> {
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      c.output(c.element());
-    }
-  }
-
-  private boolean isNullOrIdentityWindowFn(WindowFn windowFn) {
-    return windowFn == null || 
windowFn.getClass().equals(IdentityWindowFn.class);
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7af64720/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index ac8e218..69a1d11 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -23,9 +23,8 @@ import com.google.common.collect.Lists;
 
 import java.io.Serializable;
 import java.nio.ByteBuffer;
-import java.time.Instant;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
@@ -34,9 +33,8 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -48,15 +46,14 @@ import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
 import org.apache.gearpump.streaming.dsl.window.api.Discarding$;
 import org.apache.gearpump.streaming.dsl.window.api.EventTimeTrigger$;
-import org.apache.gearpump.streaming.dsl.window.api.Window;
-import org.apache.gearpump.streaming.dsl.window.api.WindowFn;
-import org.apache.gearpump.streaming.dsl.window.impl.Bucket;
-import scala.collection.JavaConversions;
-
+import org.apache.gearpump.streaming.dsl.window.api.WindowFunction;
+import org.apache.gearpump.streaming.dsl.window.api.Windows;
+import org.apache.gearpump.streaming.dsl.window.impl.Window;
 
 /**
  * {@link GroupByKey} is translated to Gearpump groupBy function.
  */
+@SuppressWarnings({"rawtypes", "unchecked"})
 public class GroupByKeyTranslator<K, V> implements 
TransformTranslator<GroupByKey<K, V>> {
   @Override
   public void translate(GroupByKey<K, V> transform, TranslationContext 
context) {
@@ -67,69 +64,51 @@ public class GroupByKeyTranslator<K, V> implements 
TransformTranslator<GroupByKe
     int parallelism = context.getPipelineOptions().getParallelism();
     OutputTimeFn<? super BoundedWindow> outputTimeFn = (OutputTimeFn<? super 
BoundedWindow>)
         input.getWindowingStrategy().getOutputTimeFn();
+    WindowFn<KV<K, V>, BoundedWindow> windowFn = (WindowFn<KV<K, V>, 
BoundedWindow>)
+        input.getWindowingStrategy().getWindowFn();
     JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream
-        .window(Window.apply(new 
GearpumpWindowFn(input.getWindowingStrategy().getWindowFn()),
+        .window(Windows.apply(
+            new GearpumpWindowFn(windowFn.isNonMerging()),
             EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
         .groupBy(new GroupByFn<K, V>(inputKeyCoder), parallelism, 
"group_by_Key_and_Window")
         .map(new ValueToIterable<K, V>(), "map_value_to_iterable")
-        .map(new KeyedByTimestamp<K, V>(), "keyed_by_timestamp")
-        .reduce(new Merge<K, V>(outputTimeFn), "merge")
+        .map(new KeyedByTimestamp<K, V>((OutputTimeFn<? super BoundedWindow>)
+            input.getWindowingStrategy().getOutputTimeFn()), 
"keyed_by_timestamp")
+        .reduce(new Merge<>(windowFn, outputTimeFn), "merge")
         .map(new Values<K, V>(), "values");
 
     context.setOutputStream(context.getOutput(transform), outputStream);
   }
 
-  private static class GearpumpWindowFn<T, W extends BoundedWindow> implements 
WindowFn,
-      Serializable {
+  private static class GearpumpWindowFn<T, W extends BoundedWindow>
+      implements WindowFunction<WindowedValue<T>>, Serializable {
 
-    private org.apache.beam.sdk.transforms.windowing.WindowFn<T, W> windowFn;
+    private final boolean isNonMerging;
 
-    GearpumpWindowFn(org.apache.beam.sdk.transforms.windowing.WindowFn<T, W> 
windowFn) {
-      this.windowFn = windowFn;
+    public GearpumpWindowFn(boolean isNonMerging) {
+      this.isNonMerging = isNonMerging;
     }
 
     @Override
-    public scala.collection.immutable.List<Bucket> apply(final Instant 
timestamp) {
+    public Window[] apply(Context<WindowedValue<T>> context) {
       try {
-        Collection<W> windows = windowFn.assignWindows(windowFn.new 
AssignContext() {
-          @Override
-          public T element() {
-            throw new UnsupportedOperationException();
-          }
-
-          @Override
-          public org.joda.time.Instant timestamp() {
-            return TranslatorUtils.java8TimeToJodaTime(timestamp);
-          }
-
-          @Override
-          public W window() {
-            throw new UnsupportedOperationException();
-          }
-        });
-
-        List<Bucket> buckets = new LinkedList<>();
-        for (BoundedWindow window : windows) {
-          buckets.add(getBucket(window));
-        }
-        return JavaConversions.asScalaBuffer(buckets).toList();
+        return toGearpumpWindows(context.element().getWindows().toArray(new 
BoundedWindow[0]));
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
     }
 
-    private Bucket getBucket(BoundedWindow window) {
-      if (window instanceof IntervalWindow) {
-        IntervalWindow intervalWindow = (IntervalWindow) window;
-        Instant start = 
TranslatorUtils.jodaTimeToJava8Time(intervalWindow.start());
-        Instant end = 
TranslatorUtils.jodaTimeToJava8Time(intervalWindow.end());
-        return new Bucket(start, end);
-      } else if (window instanceof GlobalWindow) {
-        Instant end = 
TranslatorUtils.jodaTimeToJava8Time(window.maxTimestamp());
-        return new Bucket(Instant.MIN, end);
-      } else {
-        throw new RuntimeException("unknown window " + 
window.getClass().getName());
+    @Override
+    public boolean isNonMerging() {
+      return isNonMerging;
+    }
+
+    private Window[] toGearpumpWindows(BoundedWindow[] windows) {
+      Window[] gwins = new Window[windows.length];
+      for (int i = 0; i < windows.length; i++) {
+        gwins[i] = TranslatorUtils.boundedWindowToGearpumpWindow(windows[i]);
       }
+      return gwins;
     }
   }
 
@@ -166,19 +145,30 @@ public class GroupByKeyTranslator<K, V> implements 
TransformTranslator<GroupByKe
       extends MapFunction<WindowedValue<KV<K, Iterable<V>>>,
       KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>> {
 
+    private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
+
+    public KeyedByTimestamp(OutputTimeFn<? super BoundedWindow> outputTimeFn) {
+      this.outputTimeFn = outputTimeFn;
+    }
+
     @Override
     public KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> apply(
         WindowedValue<KV<K, Iterable<V>>> wv) {
-      return KV.of(wv.getTimestamp(), wv);
+      org.joda.time.Instant timestamp = 
outputTimeFn.assignOutputTime(wv.getTimestamp(),
+          Iterables.getOnlyElement(wv.getWindows()));
+      return KV.of(timestamp, wv);
     }
   }
 
   private static class Merge<K, V> extends
       ReduceFunction<KV<org.joda.time.Instant, WindowedValue<KV<K, 
Iterable<V>>>>> {
 
+    private final WindowFn<KV<K, V>, BoundedWindow> windowFn;
     private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
 
-    Merge(OutputTimeFn<? super BoundedWindow> outputTimeFn) {
+    Merge(WindowFn<KV<K, V>, BoundedWindow> windowFn,
+        OutputTimeFn<? super BoundedWindow> outputTimeFn) {
+      this.windowFn = windowFn;
       this.outputTimeFn = outputTimeFn;
     }
 
@@ -189,13 +179,40 @@ public class GroupByKeyTranslator<K, V> implements 
TransformTranslator<GroupByKe
       org.joda.time.Instant t1 = kv1.getKey();
       org.joda.time.Instant t2 = kv2.getKey();
 
-      WindowedValue<KV<K, Iterable<V>>> wv1 = kv1.getValue();
-      WindowedValue<KV<K, Iterable<V>>> wv2 = kv2.getValue();
+      final WindowedValue<KV<K, Iterable<V>>> wv1 = kv1.getValue();
+      final WindowedValue<KV<K, Iterable<V>>> wv2 = kv2.getValue();
+
+      final List<BoundedWindow> mergedWindows = new ArrayList<>();
+      if (!windowFn.isNonMerging()) {
+        try {
+          windowFn.mergeWindows(windowFn.new MergeContext() {
+
+            @Override
+            public Collection<BoundedWindow> windows() {
+              ArrayList<BoundedWindow> windows = new ArrayList<>();
+              windows.addAll(wv1.getWindows());
+              windows.addAll(wv2.getWindows());
+              return windows;
+            }
+
+            @Override
+            public void merge(Collection<BoundedWindow> toBeMerged,
+                BoundedWindow mergeResult) throws Exception {
+              mergedWindows.add(mergeResult);
+            }
+          });
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      } else {
+        mergedWindows.addAll(wv1.getWindows());
+      }
 
-      return KV.of(outputTimeFn.combine(t1, t2),
+      org.joda.time.Instant timestamp = outputTimeFn.combine(t1, t2);
+      return KV.of(timestamp,
           WindowedValue.of(KV.of(wv1.getValue().getKey(),
-              Iterables.concat(wv1.getValue().getValue(), 
wv2.getValue().getValue())),
-              wv1.getTimestamp(), wv1.getWindows(), wv1.getPane()));
+              Iterables.concat(wv1.getValue().getValue(), 
wv2.getValue().getValue())), timestamp,
+              mergedWindows, wv1.getPane()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7af64720/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
index 9bf1936..c0de2df 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
@@ -21,23 +21,15 @@ package org.apache.beam.runners.gearpump.translators;
 import com.google.common.collect.Iterables;
 
 import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
 
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.gearpump.Message;
-import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
-import org.apache.gearpump.streaming.javaapi.Task;
-import org.apache.gearpump.streaming.task.TaskContext;
 import org.joda.time.Instant;
 
 /**
@@ -52,34 +44,25 @@ public class WindowBoundTranslator<T> implements  
TransformTranslator<Window.Bou
     JavaStream<WindowedValue<T>> inputStream = context.getInputStream(input);
     WindowingStrategy<?, ?> outputStrategy =
         transform.getOutputStrategyInternal(input.getWindowingStrategy());
-    WindowFn<T, BoundedWindow> windowFn =
-        (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
-    OutputTimeFn<? super BoundedWindow> outputTimeFn = (OutputTimeFn<? super 
BoundedWindow>)
-        outputStrategy.getOutputTimeFn();
+    WindowFn<T, BoundedWindow> windowFn = (WindowFn<T, BoundedWindow>) 
outputStrategy.getWindowFn();
     JavaStream<WindowedValue<T>> outputStream =
         inputStream
-            .flatMap(new AssignWindows(windowFn, outputTimeFn), 
"assign_windows")
-            .process(AssignTimestampTask.class, 1, UserConfig.empty(), 
"assign_timestamp");
+            .map(new AssignWindows(windowFn), "assign_windows");
 
     context.setOutputStream(context.getOutput(transform), outputStream);
   }
 
   private static class AssignWindows<T> extends
-      FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
+      MapFunction<WindowedValue<T>, WindowedValue<T>> {
 
     private final WindowFn<T, BoundedWindow> windowFn;
-    private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
 
-    AssignWindows(
-        WindowFn<T, BoundedWindow> windowFn,
-        OutputTimeFn<? super BoundedWindow> outputTimeFn) {
+    AssignWindows(WindowFn<T, BoundedWindow> windowFn) {
       this.windowFn = windowFn;
-      this.outputTimeFn = outputTimeFn;
     }
 
     @Override
-    public Iterator<WindowedValue<T>> apply(final WindowedValue<T> value) {
-      List<WindowedValue<T>>  ret = new LinkedList<>();
+    public WindowedValue<T> apply(final WindowedValue<T> value) {
       try {
         Collection<BoundedWindow> windows = 
windowFn.assignWindows(windowFn.new AssignContext() {
           @Override
@@ -97,32 +80,10 @@ public class WindowBoundTranslator<T> implements  
TransformTranslator<Window.Bou
             return Iterables.getOnlyElement(value.getWindows());
           }
         });
-        for (BoundedWindow window: windows) {
-          Instant timestamp = 
outputTimeFn.assignOutputTime(value.getTimestamp(), window);
-          ret.add(WindowedValue.of(
-              value.getValue(), timestamp, window, value.getPane()));
-        }
+        return WindowedValue.of(value.getValue(), value.getTimestamp(), 
windows, value.getPane());
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
-      return ret.iterator();
-    }
-  }
-
-  /**
-   * Assign WindowedValue timestamp to Gearpump message.
-   * @param <T> element type of WindowedValue
-   */
-  public static class AssignTimestampTask<T> extends Task {
-
-    public AssignTimestampTask(TaskContext taskContext, UserConfig userConfig) 
{
-      super(taskContext, userConfig);
-    }
-
-    @Override
-    public void onNext(Message message) {
-      final WindowedValue<T> value = (WindowedValue<T>) message.msg();
-      context.output(Message.apply(value, value.getTimestamp().getMillis()));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7af64720/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index 6e5b2de..3d0d7c8 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -62,9 +62,8 @@ public abstract class GearpumpSource<T> implements DataSource 
{
       this.reader = createReader(options);
       this.available = reader.start();
     } catch (Exception e) {
-      throw new RuntimeException(e);
-    } finally {
       close();
+      throw new RuntimeException(e);
     }
   }
 
@@ -81,9 +80,8 @@ public abstract class GearpumpSource<T> implements DataSource 
{
             timestamp.getMillis());
       }
     } catch (Exception e) {
-      throw new RuntimeException(e);
-    } finally {
       close();
+      throw new RuntimeException(e);
     }
     return message;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/7af64720/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
index 7e1402f..aaefb88 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.values.TupleTag;
  */
 public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
 
+  private static final long serialVersionUID = 1083167395296383469L;
   private final DoFn<InputT, OutputT> fn;
   private final transient PipelineOptions options;
   private final SideInputReader sideInputReader;

http://git-wip-us.apache.org/repos/asf/beam/blob/7af64720/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
index 9b72275..656fc6a 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
@@ -20,6 +20,12 @@ package org.apache.beam.runners.gearpump.translators.utils;
 
 import java.time.Instant;
 
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.gearpump.streaming.dsl.window.impl.Window;
+
+
 /**
  * Utility methods for translators.
  */
@@ -32,4 +38,18 @@ public class TranslatorUtils {
   public static org.joda.time.Instant java8TimeToJodaTime(Instant time) {
     return new org.joda.time.Instant(time.toEpochMilli());
   }
+
+  public static Window boundedWindowToGearpumpWindow(BoundedWindow window) {
+    Instant end = 
TranslatorUtils.jodaTimeToJava8Time(window.maxTimestamp().plus(1L));
+    if (window instanceof IntervalWindow) {
+      IntervalWindow intervalWindow = (IntervalWindow) window;
+      Instant start = 
TranslatorUtils.jodaTimeToJava8Time(intervalWindow.start());
+      return new Window(start, end);
+    } else if (window instanceof GlobalWindow) {
+      return new 
Window(TranslatorUtils.jodaTimeToJava8Time(BoundedWindow.TIMESTAMP_MIN_VALUE),
+          end);
+    } else {
+      throw new RuntimeException("unknown window " + 
window.getClass().getName());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7af64720/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
 
b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
new file mode 100644
index 0000000..10976e8
--- /dev/null
+++ 
b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators.utils;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.Lists;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.gearpump.streaming.dsl.window.impl.Window;
+import org.junit.Test;
+
+
+/**
+ * Tests for {@link TranslatorUtils}.
+ */
+public class TranslatorUtilsTest {
+
+  private static final List<KV<org.joda.time.Instant, Instant>> TEST_VALUES = 
Lists.newArrayList(
+      KV.of(new org.joda.time.Instant(0), Instant.EPOCH),
+      KV.of(new org.joda.time.Instant(42), Instant.ofEpochMilli(42)),
+      KV.of(new org.joda.time.Instant(Long.MIN_VALUE), 
Instant.ofEpochMilli(Long.MIN_VALUE)),
+      KV.of(new org.joda.time.Instant(Long.MAX_VALUE), 
Instant.ofEpochMilli(Long.MAX_VALUE)));
+
+  @Test
+  public void testJodaTimeAndJava8TimeConversion() {
+    for (KV<org.joda.time.Instant, Instant> kv: TEST_VALUES) {
+      assertThat(TranslatorUtils.jodaTimeToJava8Time(kv.getKey()),
+          equalTo(kv.getValue()));
+      assertThat(TranslatorUtils.java8TimeToJodaTime(kv.getValue()),
+          equalTo(kv.getKey()));
+    }
+  }
+
+  @Test
+  public void testBoundedWindowToGearpumpWindow() {
+    assertThat(TranslatorUtils.boundedWindowToGearpumpWindow(
+        new IntervalWindow(new org.joda.time.Instant(0),
+            new org.joda.time.Instant(Long.MAX_VALUE))),
+        equalTo(Window.apply(Instant.EPOCH, 
Instant.ofEpochMilli(Long.MAX_VALUE))));
+    assertThat(TranslatorUtils.boundedWindowToGearpumpWindow(
+        new IntervalWindow(new org.joda.time.Instant(Long.MIN_VALUE),
+            new org.joda.time.Instant(Long.MAX_VALUE))),
+        equalTo(Window.apply(Instant.ofEpochMilli(Long.MIN_VALUE),
+            Instant.ofEpochMilli(Long.MAX_VALUE))));
+    BoundedWindow globalWindow = GlobalWindow.INSTANCE;
+    assertThat(TranslatorUtils.boundedWindowToGearpumpWindow(globalWindow),
+        equalTo(Window.apply(Instant.ofEpochMilli(Long.MIN_VALUE / 1000),
+            Instant.ofEpochMilli(Long.MAX_VALUE / 
1000).minus(Duration.ofDays(1)).plusMillis(1))));
+  }
+}

Reply via email to