[BEAM-79] Upgrade to beam-0.5.0-incubating-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/647034cf Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/647034cf Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/647034cf Branch: refs/heads/gearpump-runner Commit: 647034cfc6ee419548b6da222e6d134792366a26 Parents: c2fb7c0 Author: manuzhang <owenzhang1...@gmail.com> Authored: Wed Dec 21 09:32:35 2016 +0800 Committer: manuzhang <owenzhang1...@gmail.com> Committed: Wed Dec 21 09:32:35 2016 +0800 ---------------------------------------------------------------------- runners/gearpump/pom.xml | 2 +- .../runners/gearpump/examples/StreamingWordCount.java | 14 +++++++------- .../translators/ParDoBoundMultiTranslator.java | 2 +- .../gearpump/translators/ParDoBoundTranslator.java | 2 +- .../gearpump/translators/TranslationContext.java | 3 +-- .../gearpump/translators/utils/DoFnRunnerFactory.java | 2 +- .../translators/utils/NoOpAggregatorFactory.java | 2 +- 7 files changed, 13 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/647034cf/runners/gearpump/pom.xml ---------------------------------------------------------------------- diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml index 9320561..bb35ad7 100644 --- a/runners/gearpump/pom.xml +++ b/runners/gearpump/pom.xml @@ -23,7 +23,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-parent</artifactId> - <version>0.4.0-incubating-SNAPSHOT</version> + <version>0.5.0-incubating-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/647034cf/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java index 1d85c25..b2d762a 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; @@ -42,10 +42,10 @@ import org.slf4j.LoggerFactory; */ public class StreamingWordCount { - static class ExtractWordsFn extends OldDoFn<String, String> { + static class ExtractWordsFn extends DoFn<String, String> { - @Override - public void processElement(ProcessContext c) { + @ProcessElement + public void process(ProcessContext c) { // Split the line into words. String[] words = c.element().split("[^a-zA-Z']+"); @@ -58,11 +58,11 @@ public class StreamingWordCount { } } - static class FormatAsStringFn extends OldDoFn<KV<String, Long>, String> { + static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { private static final Logger LOG = LoggerFactory.getLogger(FormatAsStringFn.class); - @Override - public void processElement(ProcessContext c) { + @ProcessElement + public void process(ProcessContext c) { String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/647034cf/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java index 54f1c3f..24f9734 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java @@ -64,7 +64,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements JavaStream<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputStream = inputStream.flatMap( new DoFnMultiFunction<>( context.getPipelineOptions(), - transform.getNewFn(), + transform.getFn(), transform.getMainOutputTag(), transform.getSideOutputTags(), inputT.getWindowingStrategy(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/647034cf/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java index a796c83..689bc08 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java @@ -38,7 +38,7 @@ public class ParDoBoundTranslator<InputT, OutputT> implements @Override public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) { - DoFn<InputT, OutputT> doFn = transform.getNewFn(); + DoFn<InputT, OutputT> doFn = transform.getFn(); PCollection<OutputT> output = context.getOutput(transform); WindowingStrategy<?, ?> windowingStrategy = output.getWindowingStrategy(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/647034cf/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java index d9d6a8e..63fb619 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java @@ -54,8 +54,7 @@ public class TranslationContext { } public void setCurrentTransform(TransformHierarchy.Node treeNode) { - this.currentTransform = AppliedPTransform.of(treeNode.getFullName(), - treeNode.getInput(), treeNode.getOutput(), (PTransform) treeNode.getTransform()); + this.currentTransform = treeNode.toAppliedPTransform(); } public GearpumpPipelineOptions getPipelineOptions() { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/647034cf/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java index 7119a87..7e1402f 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java @@ -21,12 +21,12 @@ package org.apache.beam.runners.gearpump.translators.utils; import java.io.Serializable; import java.util.List; +import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.SimpleDoFnRunner; import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.ExecutionContext; import org.apache.beam.sdk.util.SideInputReader; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/647034cf/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java index bfc73bf..22ffc4d 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java @@ -20,8 +20,8 @@ package org.apache.beam.runners.gearpump.translators.utils; import java.io.Serializable; +import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.util.ExecutionContext;