[BEAM-79] fix gearpump runner build failure
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2afc0cd9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2afc0cd9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2afc0cd9 Branch: refs/heads/gearpump-runner Commit: 2afc0cd99e33bc724345a2e5b0498820d05b460c Parents: 86414c0 Author: manuzhang <owenzhang1...@gmail.com> Authored: Tue Dec 6 11:28:24 2016 +0800 Committer: manuzhang <owenzhang1...@gmail.com> Committed: Tue Dec 6 13:04:01 2016 +0800 ---------------------------------------------------------------------- runners/gearpump/pom.xml | 9 ++- .../gearpump/GearpumpPipelineTranslator.java | 12 ++-- .../translators/TranslationContext.java | 4 +- .../gearpump/translators/io/ValuesSource.java | 71 ++++++++++---------- 4 files changed, 47 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2afc0cd9/runners/gearpump/pom.xml ---------------------------------------------------------------------- diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml index abd135f..04bd724 100644 --- a/runners/gearpump/pom.xml +++ b/runners/gearpump/pom.xml @@ -170,11 +170,6 @@ <artifactId>beam-runners-core-java</artifactId> </dependency> <dependency> - <groupId>com.google.code.findbugs</groupId> - <artifactId>annotations</artifactId> - <version>3.0.1</version> - </dependency> - <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> @@ -199,6 +194,10 @@ <artifactId>guava</artifactId> </dependency> <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + <dependency> <groupId>org.apache.gearpump</groupId> <artifactId>gearpump-shaded-metrics-graphite_2.11</artifactId> <version>${gearpump.version}</version> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2afc0cd9/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java index 8588fff..84dfeec 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java @@ -31,7 +31,7 @@ import org.apache.beam.runners.gearpump.translators.TransformTranslator; import org.apache.beam.runners.gearpump.translators.TranslationContext; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.runners.TransformTreeNode; +import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; @@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory; * into Gearpump {@link Graph}. */ @SuppressWarnings({"rawtypes", "unchecked"}) -public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor { +public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Defaults { private static final Logger LOG = LoggerFactory.getLogger( GearpumpPipelineTranslator.class); @@ -83,18 +83,18 @@ public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor { } @Override - public CompositeBehavior enterCompositeTransform(TransformTreeNode node) { + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { LOG.debug("entering composite transform {}", node.getTransform()); return CompositeBehavior.ENTER_TRANSFORM; } @Override - public void leaveCompositeTransform(TransformTreeNode node) { + public void leaveCompositeTransform(TransformHierarchy.Node node) { LOG.debug("leaving composite transform {}", node.getTransform()); } @Override - public void visitPrimitiveTransform(TransformTreeNode node) { + public void visitPrimitiveTransform(TransformHierarchy.Node node) { LOG.debug("visiting transform {}", node.getTransform()); PTransform transform = node.getTransform(); TransformTranslator translator = getTransformTranslator(transform.getClass()); @@ -107,7 +107,7 @@ public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor { } @Override - public void visitValue(PValue value, TransformTreeNode producer) { + public void visitValue(PValue value, TransformHierarchy.Node producer) { LOG.debug("visiting value {}", value); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2afc0cd9/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java index d3bc75d..d9d6a8e 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java @@ -24,7 +24,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; -import org.apache.beam.sdk.runners.TransformTreeNode; +import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PInput; @@ -53,7 +53,7 @@ public class TranslationContext { } - public void setCurrentTransform(TransformTreeNode treeNode) { + public void setCurrentTransform(TransformHierarchy.Node treeNode) { this.currentTransform = AppliedPTransform.of(treeNode.getFullName(), treeNode.getInput(), treeNode.getOutput(), (PTransform) treeNode.getTransform()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2afc0cd9/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java index 9359e35..3b67f09 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java @@ -21,16 +21,14 @@ package org.apache.beam.runners.gearpump.translators.io; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.Serializable; import java.util.Collections; import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; import java.util.NoSuchElementException; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; @@ -41,26 +39,33 @@ import org.joda.time.Instant; */ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> { - private final Iterable<byte[]> values; - private final Coder<T> coder; + private final byte[] values; + private final IterableCoder<T> iterableCoder; public ValuesSource(Iterable<T> values, Coder<T> coder) { - this.values = encode(values, coder); - this.coder = coder; + this.iterableCoder = IterableCoder.of(coder); + this.values = encode(values, iterableCoder); } - private Iterable<byte[]> encode(Iterable<T> values, Coder<T> coder) { - List<byte[]> bytes = new LinkedList<>(); - for (T t: values) { - try { - ByteArrayOutputStream stream = new ByteArrayOutputStream(); - coder.encode(t, stream, Coder.Context.OUTER); - bytes.add(stream.toByteArray()); - } catch (IOException e) { - throw new RuntimeException(e); - } + private byte[] encode(Iterable<T> values, IterableCoder<T> coder) { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + try { + coder.encode(values, stream, Coder.Context.OUTER); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + return stream.toByteArray(); + } + + private Iterable<T> decode(byte[] bytes) throws IOException{ + ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); + try { + return iterableCoder.decode(inputStream, Coder.Context.OUTER); + } catch (IOException ex) { + throw new RuntimeException(ex); + } finally { + inputStream.close(); } - return bytes; } @Override @@ -72,7 +77,11 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi @Override public UnboundedReader<T> createReader(PipelineOptions options, @Nullable CheckpointMark checkpointMark) { - return new ValuesReader<>(values, coder, this); + try { + return new ValuesReader<>(decode(values), iterableCoder, this); + } catch (IOException e) { + throw new RuntimeException(e); + } } @Nullable @@ -87,32 +96,22 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi @Override public Coder<T> getDefaultOutputCoder() { - return coder; + return iterableCoder.getElemCoder(); } - private static class ValuesReader<T> extends UnboundedReader<T> implements Serializable { - - private final Iterable<byte[]> values; - private final Coder<T> coder; + private static class ValuesReader<T> extends UnboundedReader<T> { private final UnboundedSource<T, CheckpointMark> source; - private transient Iterator<byte[]> iterator; + private final Iterable<T> values; + private transient Iterator<T> iterator; private T current; - public ValuesReader(Iterable<byte[]> values, Coder<T> coder, + public ValuesReader(Iterable<T> values, IterableCoder<T> coder, UnboundedSource<T, CheckpointMark> source) { this.values = values; - this.coder = coder; this.source = source; } - private T decode(byte[] bytes) throws IOException { - ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); - try { - return coder.decode(inputStream, Coder.Context.OUTER); - } finally { - inputStream.close(); - } - } + @Override public boolean start() throws IOException { @@ -125,7 +124,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi @Override public boolean advance() throws IOException { if (iterator.hasNext()) { - current = decode(iterator.next()); + current = iterator.next(); return true; } else { return false;