Rename DoFn to OldDoFn in Gearpump runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bc1b3549 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bc1b3549 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bc1b3549 Branch: refs/heads/gearpump-runner Commit: bc1b354949416db3b52c4f37c66968bdb86f0813 Parents: 40be715 Author: manuzhang <[email protected]> Authored: Fri Aug 12 07:22:00 2016 +0800 Committer: Kenneth Knowles <[email protected]> Committed: Thu Aug 25 11:38:08 2016 -0700 ---------------------------------------------------------------------- .../gearpump/GearpumpPipelineResult.java | 23 ++++++++++++++++++-- .../gearpump/GearpumpPipelineRunner.java | 6 ++--- .../gearpump/examples/StreamingWordCount.java | 6 ++--- .../translators/ParDoBoundMultiTranslator.java | 3 ++- .../translators/ParDoBoundTranslator.java | 3 ++- .../translators/functions/DoFnFunction.java | 3 ++- .../translators/utils/GearpumpDoFnRunner.java | 23 ++++++++++---------- 7 files changed, 45 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc1b3549/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java index bc27147..6184bc3 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java @@ -17,11 +17,14 @@ */ package org.apache.beam.runners.gearpump; +import org.apache.beam.sdk.AggregatorRetrievalException; +import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.runners.AggregatorRetrievalException; -import org.apache.beam.sdk.runners.AggregatorValues; import org.apache.beam.sdk.transforms.Aggregator; +import org.joda.time.Duration; + +import java.io.IOException; /** * Result of executing a {@link Pipeline} with Gearpump. @@ -33,10 +36,26 @@ public class GearpumpPipelineResult implements PipelineResult { } @Override + public State cancel() throws IOException { + return null; + } + + @Override + public State waitUntilFinish(Duration duration) throws IOException, InterruptedException { + return null; + } + + @Override + public State waitUntilFinish() throws IOException, InterruptedException { + return null; + } + + @Override public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) throws AggregatorRetrievalException { throw new AggregatorRetrievalException( "PipelineResult getAggregatorValues not supported in Gearpump pipeline", new UnsupportedOperationException()); } + } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc1b3549/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java index 660d703..4182ee4 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java @@ -23,8 +23,8 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -151,7 +151,7 @@ public class GearpumpPipelineRunner extends PipelineRunner<GearpumpPipelineResul private final Window.Bound<T> wrapped; - public AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) { + AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) { this.wrapped = wrapped; } @@ -184,7 +184,7 @@ public class GearpumpPipelineRunner extends PipelineRunner<GearpumpPipelineResul } } - private static class IdentityFn<T> extends DoFn<T, T> { + private static class IdentityFn<T> extends OldDoFn<T, T> { @Override public void processElement(ProcessContext c) { c.output(c.element()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc1b3549/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java index c51289d..5f35c6b 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java @@ -25,7 +25,7 @@ import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.FixedWindows; @@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory; */ public class StreamingWordCount { - static class ExtractWordsFn extends DoFn<String, String> { + static class ExtractWordsFn extends OldDoFn<String, String> { private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", new Sum.SumLongFn()); @@ -66,7 +66,7 @@ public class StreamingWordCount { } } - static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { + static class FormatAsStringFn extends OldDoFn<KV<String, Long>, String> { private static final Logger LOG = LoggerFactory.getLogger(FormatAsStringFn.class); @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc1b3549/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java index af5bcbc..d5ed0d2 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java @@ -23,6 +23,7 @@ import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner; import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader; import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.DoFnRunner; import org.apache.beam.sdk.util.DoFnRunners; @@ -92,7 +93,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements public DoFnMultiFunction( GearpumpPipelineOptions pipelineOptions, - DoFn<InputT, OutputT> doFn, + OldDoFn<InputT, OutputT> doFn, TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags, WindowingStrategy<?, ?> windowingStrategy, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc1b3549/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java index 689bc08..b97cbb4 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java @@ -21,6 +21,7 @@ package org.apache.beam.runners.gearpump.translators; import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction; import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; @@ -38,7 +39,7 @@ public class ParDoBoundTranslator<InputT, OutputT> implements @Override public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) { - DoFn<InputT, OutputT> doFn = transform.getFn(); + OldDoFn<InputT, OutputT> doFn = transform.getFn(); PCollection<OutputT> output = context.getOutput(transform); WindowingStrategy<?, ?> windowingStrategy = output.getWindowingStrategy(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc1b3549/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java index 088fc14..b1ebd2a 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java @@ -22,6 +22,7 @@ import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner; import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.DoFnRunner; import org.apache.beam.sdk.util.DoFnRunners; import org.apache.beam.sdk.util.SideInputReader; @@ -50,7 +51,7 @@ public class DoFnFunction<InputT, OutputT> implements public DoFnFunction( GearpumpPipelineOptions pipelineOptions, - DoFn<InputT, OutputT> doFn, + OldDoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, SideInputReader sideInputReader) { this.doFnRunner = new GearpumpDoFnRunner<>( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc1b3549/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java index 608ad7c..be0d025 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; @@ -64,7 +65,7 @@ import java.util.Set; public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT>, Serializable { - private final DoFn<InputT, OutputT> fn; + private final OldDoFn<InputT, OutputT> fn; private final transient PipelineOptions options; private final SideInputReader sideInputReader; private final DoFnRunners.OutputManager outputManager; @@ -76,7 +77,7 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O public GearpumpDoFnRunner( GearpumpPipelineOptions pipelineOptions, - DoFn<InputT, OutputT> doFn, + OldDoFn<InputT, OutputT> doFn, SideInputReader sideInputReader, DoFnRunners.OutputManager outputManager, TupleTag<OutputT> mainOutputTag, @@ -119,7 +120,7 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O @Override public void processElement(WindowedValue<InputT> elem) { if (elem.getWindows().size() <= 1 - || (!DoFn.RequiresWindowAccess.class.isAssignableFrom(fn.getClass()) + || (!OldDoFn.RequiresWindowAccess.class.isAssignableFrom(fn.getClass()) && context.sideInputReader.isEmpty())) { invokeProcessElement(elem); } else { @@ -144,7 +145,7 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O } private void invokeProcessElement(WindowedValue<InputT> elem) { - final DoFn<InputT, OutputT>.ProcessContext processContext = + final OldDoFn<InputT, OutputT>.ProcessContext processContext = new DoFnProcessContext<>(fn, context, elem); // This can contain user code. Wrap it in case it throws an exception. try { @@ -169,11 +170,11 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O * @param <OutputT> the type of the DoFn's (main) output elements */ private static class DoFnContext<InputT, OutputT> - extends DoFn<InputT, OutputT>.Context { + extends OldDoFn<InputT, OutputT>.Context { private static final int MAX_SIDE_OUTPUTS = 1000; final transient PipelineOptions options; - final DoFn<InputT, OutputT> fn; + final OldDoFn<InputT, OutputT> fn; final SideInputReader sideInputReader; final DoFnRunners.OutputManager outputManager; final TupleTag<OutputT> mainOutputTag; @@ -187,7 +188,7 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O private final Set<TupleTag<?>> outputTags; public DoFnContext(PipelineOptions options, - DoFn<InputT, OutputT> fn, + OldDoFn<InputT, OutputT> fn, SideInputReader sideInputReader, DoFnRunners.OutputManager outputManager, TupleTag<OutputT> mainOutputTag, @@ -357,14 +358,14 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O * @param <OutputT> the type of the DoFn's (main) output elements */ private static class DoFnProcessContext<InputT, OutputT> - extends DoFn<InputT, OutputT>.ProcessContext { + extends OldDoFn<InputT, OutputT>.ProcessContext { - final DoFn<InputT, OutputT> fn; + final OldDoFn<InputT, OutputT> fn; final DoFnContext<InputT, OutputT> context; final WindowedValue<InputT> windowedValue; - public DoFnProcessContext(DoFn<InputT, OutputT> fn, + public DoFnProcessContext(OldDoFn<InputT, OutputT> fn, DoFnContext<InputT, OutputT> context, WindowedValue<InputT> windowedValue) { fn.super(); @@ -409,7 +410,7 @@ public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, O @Override public BoundedWindow window() { - if (!(fn instanceof DoFn.RequiresWindowAccess)) { + if (!(fn instanceof OldDoFn.RequiresWindowAccess)) { throw new UnsupportedOperationException( "window() is only available in the context of a DoFn marked as RequiresWindow."); }
