[BEAM-79] fix gearpump runner build failure

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2afc0cd9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2afc0cd9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2afc0cd9

Branch: refs/heads/gearpump-runner
Commit: 2afc0cd99e33bc724345a2e5b0498820d05b460c
Parents: 86414c0
Author: manuzhang <owenzhang1...@gmail.com>
Authored: Tue Dec 6 11:28:24 2016 +0800
Committer: manuzhang <owenzhang1...@gmail.com>
Committed: Tue Dec 6 13:04:01 2016 +0800

----------------------------------------------------------------------
 runners/gearpump/pom.xml                        |  9 ++-
 .../gearpump/GearpumpPipelineTranslator.java    | 12 ++--
 .../translators/TranslationContext.java         |  4 +-
 .../gearpump/translators/io/ValuesSource.java   | 71 ++++++++++----------
 4 files changed, 47 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2afc0cd9/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index abd135f..04bd724 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -170,11 +170,6 @@
       <artifactId>beam-runners-core-java</artifactId>
     </dependency>
     <dependency>
-      <groupId>com.google.code.findbugs</groupId>
-      <artifactId>annotations</artifactId>
-      <version>3.0.1</version>
-    </dependency>
-    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
@@ -199,6 +194,10 @@
       <artifactId>guava</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.gearpump</groupId>
       <artifactId>gearpump-shaded-metrics-graphite_2.11</artifactId>
       <version>${gearpump.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2afc0cd9/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
index 8588fff..84dfeec 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
@@ -31,7 +31,7 @@ import 
org.apache.beam.runners.gearpump.translators.TransformTranslator;
 import org.apache.beam.runners.gearpump.translators.TranslationContext;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory;
  * into Gearpump {@link Graph}.
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
-public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor {
+public class GearpumpPipelineTranslator extends 
Pipeline.PipelineVisitor.Defaults {
 
   private static final Logger LOG = LoggerFactory.getLogger(
       GearpumpPipelineTranslator.class);
@@ -83,18 +83,18 @@ public class GearpumpPipelineTranslator implements 
Pipeline.PipelineVisitor {
   }
 
   @Override
-  public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node 
node) {
     LOG.debug("entering composite transform {}", node.getTransform());
     return CompositeBehavior.ENTER_TRANSFORM;
   }
 
   @Override
-  public void leaveCompositeTransform(TransformTreeNode node) {
+  public void leaveCompositeTransform(TransformHierarchy.Node node) {
     LOG.debug("leaving composite transform {}", node.getTransform());
   }
 
   @Override
-  public void visitPrimitiveTransform(TransformTreeNode node) {
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
     LOG.debug("visiting transform {}", node.getTransform());
     PTransform transform = node.getTransform();
     TransformTranslator translator = 
getTransformTranslator(transform.getClass());
@@ -107,7 +107,7 @@ public class GearpumpPipelineTranslator implements 
Pipeline.PipelineVisitor {
   }
 
   @Override
-  public void visitValue(PValue value, TransformTreeNode producer) {
+  public void visitValue(PValue value, TransformHierarchy.Node producer) {
     LOG.debug("visiting value {}", value);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2afc0cd9/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
index d3bc75d..d9d6a8e 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
-import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
@@ -53,7 +53,7 @@ public class TranslationContext {
 
   }
 
-  public void setCurrentTransform(TransformTreeNode treeNode) {
+  public void setCurrentTransform(TransformHierarchy.Node treeNode) {
     this.currentTransform = AppliedPTransform.of(treeNode.getFullName(),
         treeNode.getInput(), treeNode.getOutput(), (PTransform) 
treeNode.getTransform());
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2afc0cd9/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
index 9359e35..3b67f09 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
@@ -21,16 +21,14 @@ package org.apache.beam.runners.gearpump.translators.io;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.NoSuchElementException;
 
 import javax.annotation.Nullable;
 
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 
@@ -41,26 +39,33 @@ import org.joda.time.Instant;
  */
 public class ValuesSource<T> extends UnboundedSource<T, 
UnboundedSource.CheckpointMark> {
 
-  private final Iterable<byte[]> values;
-  private final Coder<T> coder;
+  private final byte[] values;
+  private final IterableCoder<T> iterableCoder;
 
   public ValuesSource(Iterable<T> values, Coder<T> coder) {
-    this.values = encode(values, coder);
-    this.coder = coder;
+    this.iterableCoder = IterableCoder.of(coder);
+    this.values = encode(values, iterableCoder);
   }
 
-  private Iterable<byte[]> encode(Iterable<T> values, Coder<T> coder) {
-    List<byte[]> bytes = new LinkedList<>();
-    for (T t: values) {
-      try {
-        ByteArrayOutputStream stream = new ByteArrayOutputStream();
-        coder.encode(t, stream, Coder.Context.OUTER);
-        bytes.add(stream.toByteArray());
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
+  private byte[] encode(Iterable<T> values, IterableCoder<T> coder) {
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    try {
+      coder.encode(values, stream, Coder.Context.OUTER);
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    }
+    return stream.toByteArray();
+  }
+
+  private Iterable<T> decode(byte[] bytes) throws IOException{
+    ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
+    try {
+      return iterableCoder.decode(inputStream, Coder.Context.OUTER);
+    } catch (IOException ex) {
+      throw new RuntimeException(ex);
+    } finally {
+      inputStream.close();
     }
-    return bytes;
   }
 
   @Override
@@ -72,7 +77,11 @@ public class ValuesSource<T> extends UnboundedSource<T, 
UnboundedSource.Checkpoi
   @Override
   public UnboundedReader<T> createReader(PipelineOptions options,
       @Nullable CheckpointMark checkpointMark) {
-    return new ValuesReader<>(values, coder, this);
+    try {
+      return new ValuesReader<>(decode(values), iterableCoder, this);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Nullable
@@ -87,32 +96,22 @@ public class ValuesSource<T> extends UnboundedSource<T, 
UnboundedSource.Checkpoi
 
   @Override
   public Coder<T> getDefaultOutputCoder() {
-    return coder;
+    return iterableCoder.getElemCoder();
   }
 
-  private static class ValuesReader<T> extends UnboundedReader<T> implements 
Serializable {
-
-    private final Iterable<byte[]> values;
-    private final Coder<T> coder;
+  private static class ValuesReader<T> extends UnboundedReader<T> {
     private final UnboundedSource<T, CheckpointMark> source;
-    private transient Iterator<byte[]> iterator;
+    private final Iterable<T> values;
+    private transient Iterator<T> iterator;
     private T current;
 
-    public ValuesReader(Iterable<byte[]> values, Coder<T> coder,
+    public ValuesReader(Iterable<T> values, IterableCoder<T> coder,
         UnboundedSource<T, CheckpointMark> source) {
       this.values = values;
-      this.coder = coder;
       this.source = source;
     }
 
-    private T decode(byte[] bytes) throws IOException {
-      ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
-      try {
-        return coder.decode(inputStream, Coder.Context.OUTER);
-      } finally {
-        inputStream.close();
-      }
-    }
+
 
     @Override
     public boolean start() throws IOException {
@@ -125,7 +124,7 @@ public class ValuesSource<T> extends UnboundedSource<T, 
UnboundedSource.Checkpoi
     @Override
     public boolean advance() throws IOException {
       if (iterator.hasNext()) {
-        current = decode(iterator.next());
+        current = iterator.next();
         return true;
       } else {
         return false;

Reply via email to