update to latest gearpump dsl function interface
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3bf82638 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3bf82638 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3bf82638 Branch: refs/heads/gearpump-runner Commit: 3bf82638096ae7aa91c7d3c862c2994772bee51b Parents: e63d42d Author: manuzhang <owenzhang1...@gmail.com> Authored: Sat Jan 14 13:36:07 2017 +0800 Committer: manuzhang <owenzhang1...@gmail.com> Committed: Sat Jan 14 21:40:18 2017 +0800 ---------------------------------------------------------------------- .../translators/GroupByKeyTranslator.java | 12 ++++---- .../translators/ParDoBoundMultiTranslator.java | 29 ++++++++++++++------ .../translators/WindowBoundTranslator.java | 4 +-- .../translators/functions/DoFnFunction.java | 21 +++++++++++--- 4 files changed, 46 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3bf82638/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java index 8e3ffe3..4eaf755 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java @@ -36,15 +36,15 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.gearpump.streaming.dsl.api.functions.MapFunction; +import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction; import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction; import org.apache.gearpump.streaming.dsl.window.api.Discarding$; import org.apache.gearpump.streaming.dsl.window.api.EventTimeTrigger$; import org.apache.gearpump.streaming.dsl.window.api.Window; import org.apache.gearpump.streaming.dsl.window.api.WindowFn; import org.apache.gearpump.streaming.dsl.window.impl.Bucket; -import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction; -import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction; -import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction; import scala.collection.JavaConversions; @@ -122,7 +122,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe } } - private static class GroupByFn<K, V> implements + private static class GroupByFn<K, V> extends GroupByFunction<WindowedValue<KV<K, V>>, K> { @Override @@ -132,7 +132,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe } private static class ValueToIterable<K, V> - implements MapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<K, Iterable<V>>>> { + extends MapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<K, Iterable<V>>>> { @Override public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, V>> wv) { @@ -141,7 +141,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe } } - private static class MergeValue<K, V> implements + private static class MergeValue<K, V> extends ReduceFunction<WindowedValue<KV<K, Iterable<V>>>> { @Override http://git-wip-us.apache.org/repos/asf/beam/blob/3bf82638/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java index 24f9734..0d5b8bc 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java @@ -33,6 +33,7 @@ import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader; import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; @@ -41,10 +42,10 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.gearpump.streaming.dsl.api.functions.FilterFunction; +import org.apache.gearpump.streaming.dsl.api.functions.MapFunction; import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; -import org.apache.gearpump.streaming.javaapi.dsl.functions.FilterFunction; -import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction; -import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction; +import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction; /** * {@link ParDo.BoundMulti} is translated to Gearpump flatMap function @@ -83,12 +84,13 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements /** * Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFnMultiFunction}. */ - private static class DoFnMultiFunction<InputT, OutputT> implements - FlatMapFunction<WindowedValue<InputT>, WindowedValue<KV<TupleTag<OutputT>, OutputT>>>, - DoFnRunners.OutputManager { + private static class DoFnMultiFunction<InputT, OutputT> + extends FlatMapFunction<WindowedValue<InputT>, WindowedValue<KV<TupleTag<OutputT>, OutputT>>> + implements DoFnRunners.OutputManager { private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory; private DoFnRunner<InputT, OutputT> doFnRunner; + private final DoFn<InputT, OutputT> doFn; private final List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs = Lists .newArrayList(); @@ -99,6 +101,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements TupleTagList sideOutputTags, WindowingStrategy<?, ?> windowingStrategy, SideInputReader sideInputReader) { + this.doFn = doFn; this.doFnRunnerFactory = new DoFnRunnerFactory<>( pipelineOptions, doFn, @@ -113,6 +116,16 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements } @Override + public void setup() { + DoFnInvokers.invokerFor(doFn).invokeSetup(); + } + + @Override + public void teardown() { + DoFnInvokers.invokerFor(doFn).invokeTeardown(); + } + + @Override public Iterator<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> apply(WindowedValue<InputT> wv) { if (null == doFnRunner) { doFnRunner = doFnRunnerFactory.createRunner(); @@ -133,7 +146,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements } } - private static class FilterByOutputTag<OutputT> implements + private static class FilterByOutputTag<OutputT> extends FilterFunction<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> { private final TupleTag<OutputT> tupleTag; @@ -148,7 +161,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements } } - private static class ExtractOutput<OutputT> implements + private static class ExtractOutput<OutputT> extends MapFunction<WindowedValue<KV<TupleTag<OutputT>, OutputT>>, WindowedValue<OutputT>> { @Override http://git-wip-us.apache.org/repos/asf/beam/blob/3bf82638/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java index 32dd5de..d3c50a5 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java @@ -34,8 +34,8 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.gearpump.Message; import org.apache.gearpump.cluster.UserConfig; import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction; import org.apache.gearpump.streaming.javaapi.Task; -import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction; import org.apache.gearpump.streaming.task.TaskContext; import org.joda.time.Instant; @@ -61,7 +61,7 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou context.setOutputStream(context.getOutput(transform), outputStream); } - private static class AssignWindows<T> implements + private static class AssignWindows<T> extends FlatMapFunction<WindowedValue<T>, WindowedValue<T>> { private final WindowFn<T, BoundedWindow> fn; http://git-wip-us.apache.org/repos/asf/beam/blob/3bf82638/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java index 42969fe..a66d3a4 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java @@ -30,30 +30,33 @@ import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory; import org.apache.beam.runners.gearpump.translators.utils.NoOpAggregatorFactory; import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; - -import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction; +import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction; /** * Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFn}. */ -public class DoFnFunction<InputT, OutputT> implements - FlatMapFunction<WindowedValue<InputT>, WindowedValue<OutputT>>, DoFnRunners.OutputManager { +public class DoFnFunction<InputT, OutputT> extends + FlatMapFunction<WindowedValue<InputT>, WindowedValue<OutputT>> implements + DoFnRunners.OutputManager { private final TupleTag<OutputT> mainTag = new TupleTag<OutputT>() {}; private List<WindowedValue<OutputT>> outputs = Lists.newArrayList(); private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory; private DoFnRunner<InputT, OutputT> doFnRunner; + private final DoFn<InputT, OutputT> doFn; public DoFnFunction( GearpumpPipelineOptions pipelineOptions, DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, SideInputReader sideInputReader) { + this.doFn = doFn; this.doFnRunnerFactory = new DoFnRunnerFactory<>( pipelineOptions, doFn, @@ -68,6 +71,16 @@ public class DoFnFunction<InputT, OutputT> implements } @Override + public void setup() { + DoFnInvokers.invokerFor(doFn).invokeSetup(); + } + + @Override + public void teardown() { + DoFnInvokers.invokerFor(doFn).invokeTeardown(); + } + + @Override public Iterator<WindowedValue<OutputT>> apply(WindowedValue<InputT> value) { outputs = Lists.newArrayList();