Repository: incubator-beam Updated Branches: refs/heads/gearpump-runner 3933b5577 -> 323ec1188
[BEAM-79] Port Gearpump runner from OldDoFn to new DoFn Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/45570b9c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/45570b9c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/45570b9c Branch: refs/heads/gearpump-runner Commit: 45570b9c7ebb11080deca3346fc601c69796612a Parents: 3933b55 Author: manuzhang <owenzhang1...@gmail.com> Authored: Mon Oct 31 11:52:22 2016 +0800 Committer: Kenneth Knowles <k...@google.com> Committed: Thu Nov 3 09:38:41 2016 -0700 ---------------------------------------------------------------------- .../gearpump/GearpumpPipelineTranslator.java | 2 +- .../translators/ParDoBoundMultiTranslator.java | 17 +- .../translators/ParDoBoundTranslator.java | 3 +- .../translators/functions/DoFnFunction.java | 19 +- .../translators/utils/DoFnRunnerFactory.java | 77 +++ .../translators/utils/GearpumpDoFnRunner.java | 516 ------------------- .../utils/NoOpAggregatorFactory.java | 41 ++ 7 files changed, 143 insertions(+), 532 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java index 5045ae4..8588fff 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java @@ -108,7 +108,7 @@ public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor { @Override public void visitValue(PValue value, TransformTreeNode producer) { - LOG.info("visiting value {}", value); + LOG.debug("visiting value {}", value); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java index 2b49684..54f1c3f 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java @@ -27,11 +27,11 @@ import java.util.Map; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; -import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner; +import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory; +import org.apache.beam.runners.gearpump.translators.utils.NoOpAggregatorFactory; import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader; import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; @@ -64,7 +64,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements JavaStream<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputStream = inputStream.flatMap( new DoFnMultiFunction<>( context.getPipelineOptions(), - transform.getFn(), + transform.getNewFn(), transform.getMainOutputTag(), transform.getSideOutputTags(), inputT.getWindowingStrategy(), @@ -87,18 +87,19 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements FlatMapFunction<WindowedValue<InputT>, WindowedValue<KV<TupleTag<OutputT>, OutputT>>>, DoFnRunners.OutputManager { - private final DoFnRunner<InputT, OutputT> doFnRunner; + private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory; + private DoFnRunner<InputT, OutputT> doFnRunner; private final List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs = Lists .newArrayList(); public DoFnMultiFunction( GearpumpPipelineOptions pipelineOptions, - OldDoFn<InputT, OutputT> doFn, + DoFn<InputT, OutputT> doFn, TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags, WindowingStrategy<?, ?> windowingStrategy, SideInputReader sideInputReader) { - this.doFnRunner = new GearpumpDoFnRunner<>( + this.doFnRunnerFactory = new DoFnRunnerFactory<>( pipelineOptions, doFn, sideInputReader, @@ -106,12 +107,16 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements mainOutputTag, sideOutputTags.getAll(), new NoOpStepContext(), + new NoOpAggregatorFactory(), windowingStrategy ); } @Override public Iterator<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> apply(WindowedValue<InputT> wv) { + if (null == doFnRunner) { + doFnRunner = doFnRunnerFactory.createRunner(); + } doFnRunner.startBundle(); doFnRunner.processElement(wv); doFnRunner.finishBundle(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java index b97cbb4..a796c83 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java @@ -21,7 +21,6 @@ package org.apache.beam.runners.gearpump.translators; import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction; import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; @@ -39,7 +38,7 @@ public class ParDoBoundTranslator<InputT, OutputT> implements @Override public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) { - OldDoFn<InputT, OutputT> doFn = transform.getFn(); + DoFn<InputT, OutputT> doFn = transform.getNewFn(); PCollection<OutputT> output = context.getOutput(transform); WindowingStrategy<?, ?> windowingStrategy = output.getWindowingStrategy(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java index 8d16356..42969fe 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java @@ -26,10 +26,10 @@ import java.util.List; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; -import org.apache.beam.runners.gearpump.translators.utils.GearpumpDoFnRunner; +import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory; +import org.apache.beam.runners.gearpump.translators.utils.NoOpAggregatorFactory; import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; @@ -44,17 +44,17 @@ import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction; public class DoFnFunction<InputT, OutputT> implements FlatMapFunction<WindowedValue<InputT>, WindowedValue<OutputT>>, DoFnRunners.OutputManager { - private final TupleTag<OutputT> mainTag = new TupleTag<OutputT>() { - }; - private final DoFnRunner<InputT, OutputT> doFnRunner; + private final TupleTag<OutputT> mainTag = new TupleTag<OutputT>() {}; private List<WindowedValue<OutputT>> outputs = Lists.newArrayList(); + private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory; + private DoFnRunner<InputT, OutputT> doFnRunner; public DoFnFunction( GearpumpPipelineOptions pipelineOptions, - OldDoFn<InputT, OutputT> doFn, + DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, SideInputReader sideInputReader) { - this.doFnRunner = new GearpumpDoFnRunner<>( + this.doFnRunnerFactory = new DoFnRunnerFactory<>( pipelineOptions, doFn, sideInputReader, @@ -62,6 +62,7 @@ public class DoFnFunction<InputT, OutputT> implements mainTag, TupleTagList.empty().getAll(), new NoOpStepContext(), + new NoOpAggregatorFactory(), windowingStrategy ); } @@ -70,6 +71,10 @@ public class DoFnFunction<InputT, OutputT> implements public Iterator<WindowedValue<OutputT>> apply(WindowedValue<InputT> value) { outputs = Lists.newArrayList(); + if (null == doFnRunner) { + doFnRunner = doFnRunnerFactory.createRunner(); + } + doFnRunner.startBundle(); doFnRunner.processElement(value); doFnRunner.finishBundle(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java new file mode 100644 index 0000000..7119a87 --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators.utils; + +import java.io.Serializable; +import java.util.List; + +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.SimpleDoFnRunner; +import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.ExecutionContext; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.TupleTag; + +/** + * a serializable {@link SimpleDoFnRunner}. + */ +public class DoFnRunnerFactory<InputT, OutputT> implements Serializable { + + private final DoFn<InputT, OutputT> fn; + private final transient PipelineOptions options; + private final SideInputReader sideInputReader; + private final DoFnRunners.OutputManager outputManager; + private final TupleTag<OutputT> mainOutputTag; + private final List<TupleTag<?>> sideOutputTags; + private final ExecutionContext.StepContext stepContext; + private final AggregatorFactory aggregatorFactory; + private final WindowingStrategy<?, ?> windowingStrategy; + + public DoFnRunnerFactory( + GearpumpPipelineOptions pipelineOptions, + DoFn<InputT, OutputT> doFn, + SideInputReader sideInputReader, + DoFnRunners.OutputManager outputManager, + TupleTag<OutputT> mainOutputTag, + List<TupleTag<?>> sideOutputTags, + ExecutionContext.StepContext stepContext, + AggregatorFactory aggregatorFactory, + WindowingStrategy<?, ?> windowingStrategy) { + this.fn = doFn; + this.options = pipelineOptions; + this.sideInputReader = sideInputReader; + this.outputManager = outputManager; + this.mainOutputTag = mainOutputTag; + this.sideOutputTags = sideOutputTags; + this.stepContext = stepContext; + this.aggregatorFactory = aggregatorFactory; + this.windowingStrategy = windowingStrategy; + } + + public DoFnRunner<InputT, OutputT> createRunner() { + return DoFnRunners.createDefault(options, fn, sideInputReader, outputManager, mainOutputTag, + sideOutputTags, stepContext, aggregatorFactory, windowingStrategy); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java deleted file mode 100644 index ec86a8d..0000000 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/GearpumpDoFnRunner.java +++ /dev/null @@ -1,516 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.gearpump.translators.utils; - -import static org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners; -import org.apache.beam.runners.core.SimpleDoFnRunner; -import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.ExecutionContext; -import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.SystemDoFnInternal; -import org.apache.beam.sdk.util.TimerInternals; -import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingInternals; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; - -import org.joda.time.Instant; - - -/** - * a serializable {@link SimpleDoFnRunner}. - */ -public class GearpumpDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT>, - Serializable { - - private final OldDoFn<InputT, OutputT> fn; - private final transient PipelineOptions options; - private final SideInputReader sideInputReader; - private final DoFnRunners.OutputManager outputManager; - private final TupleTag<OutputT> mainOutputTag; - private final List<TupleTag<?>> sideOutputTags; - private final ExecutionContext.StepContext stepContext; - private final WindowFn<?, ?> windowFn; - private DoFnContext<InputT, OutputT> context; - - public GearpumpDoFnRunner( - GearpumpPipelineOptions pipelineOptions, - OldDoFn<InputT, OutputT> doFn, - SideInputReader sideInputReader, - DoFnRunners.OutputManager outputManager, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> sideOutputTags, - ExecutionContext.StepContext stepContext, - WindowingStrategy<?, ?> windowingStrategy) { - this.fn = doFn; - this.options = pipelineOptions; - this.sideInputReader = sideInputReader; - this.outputManager = outputManager; - this.mainOutputTag = mainOutputTag; - this.sideOutputTags = sideOutputTags; - this.stepContext = stepContext; - this.windowFn = windowingStrategy == null ? null : windowingStrategy.getWindowFn(); - } - - @Override - public void startBundle() { - // This can contain user code. Wrap it in case it throws an exception. - try { - if (null == context) { - this.context = new DoFnContext<>( - options, - fn, - sideInputReader, - outputManager, - mainOutputTag, - sideOutputTags, - stepContext, - windowFn - ); - } - fn.startBundle(context); - } catch (Throwable t) { - // Exception in user code. - throw wrapUserCodeException(t); - } - } - - @Override - public void processElement(WindowedValue<InputT> elem) { - if (elem.getWindows().size() <= 1 - || (!OldDoFn.RequiresWindowAccess.class.isAssignableFrom(fn.getClass()) - && context.sideInputReader.isEmpty())) { - invokeProcessElement(elem); - } else { - // We could modify the windowed value (and the processContext) to - // avoid repeated allocations, but this is more straightforward. - for (BoundedWindow window : elem.getWindows()) { - invokeProcessElement(WindowedValue.of( - elem.getValue(), elem.getTimestamp(), window, elem.getPane())); - } - } - } - - @Override - public void finishBundle() { - // This can contain user code. Wrap it in case it throws an exception. - try { - fn.finishBundle(context); - } catch (Throwable t) { - // Exception in user code. - throw wrapUserCodeException(t); - } - } - - private void invokeProcessElement(WindowedValue<InputT> elem) { - final OldDoFn<InputT, OutputT>.ProcessContext processContext = - new DoFnProcessContext<>(fn, context, elem); - // This can contain user code. Wrap it in case it throws an exception. - try { - fn.processElement(processContext); - } catch (Exception ex) { - throw wrapUserCodeException(ex); - } - } - - private RuntimeException wrapUserCodeException(Throwable t) { - throw UserCodeException.wrapIf(!isSystemDoFn(), t); - } - - private boolean isSystemDoFn() { - return fn.getClass().isAnnotationPresent(SystemDoFnInternal.class); - } - - /** - * A concrete implementation of {@code DoFn.Context} used for running a {@link DoFn}. - * - * @param <InputT> the type of the DoFn's (main) input elements - * @param <OutputT> the type of the DoFn's (main) output elements - */ - private static class DoFnContext<InputT, OutputT> - extends OldDoFn<InputT, OutputT>.Context { - private static final int MAX_SIDE_OUTPUTS = 1000; - - final transient PipelineOptions options; - final OldDoFn<InputT, OutputT> fn; - final SideInputReader sideInputReader; - final DoFnRunners.OutputManager outputManager; - final TupleTag<OutputT> mainOutputTag; - final ExecutionContext.StepContext stepContext; - final WindowFn<?, ?> windowFn; - - /** - * The set of known output tags, some of which may be undeclared, so we can throw an - * exception when it exceeds {@link #MAX_SIDE_OUTPUTS}. - */ - private final Set<TupleTag<?>> outputTags; - - public DoFnContext(PipelineOptions options, - OldDoFn<InputT, OutputT> fn, - SideInputReader sideInputReader, - DoFnRunners.OutputManager outputManager, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> sideOutputTags, - ExecutionContext.StepContext stepContext, - WindowFn<?, ?> windowFn) { - fn.super(); - this.options = options; - this.fn = fn; - this.sideInputReader = sideInputReader; - this.outputManager = outputManager; - this.mainOutputTag = mainOutputTag; - this.outputTags = Sets.newHashSet(); - - outputTags.add(mainOutputTag); - for (TupleTag<?> sideOutputTag : sideOutputTags) { - outputTags.add(sideOutputTag); - } - - this.stepContext = stepContext; - this.windowFn = windowFn; - super.setupDelegateAggregators(); - } - - ////////////////////////////////////////////////////////////////////////////// - - @Override - public PipelineOptions getPipelineOptions() { - return options; - } - - <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue( - T output, Instant timestamp, Collection<W> windows, PaneInfo pane) { - final Instant inputTimestamp = timestamp; - - if (timestamp == null) { - timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; - } - - if (windows == null) { - try { - // The windowFn can never succeed at accessing the element, so its type does not - // matter here - @SuppressWarnings("unchecked") - WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn; - windows = objectWindowFn.assignWindows(objectWindowFn.new AssignContext() { - @Override - public Object element() { - throw new UnsupportedOperationException( - "WindowFn attempted to access input element when none was available"); - } - - @Override - public Instant timestamp() { - if (inputTimestamp == null) { - throw new UnsupportedOperationException( - "WindowFn attempted to access input timestamp when none was available"); - } - return inputTimestamp; - } - - @Override - public BoundedWindow window() { - throw new UnsupportedOperationException( - "WindowFn attempted to access input windows when none were available"); - } - }); - } catch (Exception e) { - throw UserCodeException.wrap(e); - } - } - - return WindowedValue.of(output, timestamp, windows, pane); - } - - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { - if (!sideInputReader.contains(view)) { - throw new IllegalArgumentException("calling sideInput() with unknown view"); - } - BoundedWindow sideInputWindow = - view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow); - return sideInputReader.get(view, sideInputWindow); - } - - void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane)); - } - - void outputWindowedValue(WindowedValue<OutputT> windowedElem) { - outputManager.output(mainOutputTag, windowedElem); - if (stepContext != null) { - stepContext.noteOutput(windowedElem); - } - } - - protected <T> void sideOutputWindowedValue(TupleTag<T> tag, - T output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane)); - } - - protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) { - if (!outputTags.contains(tag)) { - // This tag wasn't declared nor was it seen before during this execution. - // Thus, this must be a new, undeclared and unconsumed output. - // To prevent likely user errors, enforce the limit on the number of side - // outputs. - if (outputTags.size() >= MAX_SIDE_OUTPUTS) { - throw new IllegalArgumentException( - "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS); - } - outputTags.add(tag); - } - - outputManager.output(tag, windowedElem); - if (stepContext != null) { - stepContext.noteSideOutput(tag, windowedElem); - } - } - - // Following implementations of output, outputWithTimestamp, and sideOutput - // are only accessible in DoFn.startBundle and DoFn.finishBundle, and will be shadowed by - // ProcessContext's versions in DoFn.processElement. - @Override - public void output(OutputT output) { - outputWindowedValue(output, null, null, PaneInfo.NO_FIRING); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING); - } - - @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { - checkNotNull(tag, "TupleTag passed to sideOutput cannot be null"); - sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING); - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null"); - sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING); - } - - @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal( - String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { - checkNotNull(combiner, - "Combiner passed to createAggregator cannot be null"); - throw new UnsupportedOperationException("aggregator not supported in Gearpump runner"); - } - } - - - /** - * A concrete implementation of {@code DoFn.ProcessContext} used for - * running a {@link DoFn} over a single element. - * - * @param <InputT> the type of the DoFn's (main) input elements - * @param <OutputT> the type of the DoFn's (main) output elements - */ - private static class DoFnProcessContext<InputT, OutputT> - extends OldDoFn<InputT, OutputT>.ProcessContext { - - - final OldDoFn<InputT, OutputT> fn; - final DoFnContext<InputT, OutputT> context; - final WindowedValue<InputT> windowedValue; - - public DoFnProcessContext(OldDoFn<InputT, OutputT> fn, - DoFnContext<InputT, OutputT> context, - WindowedValue<InputT> windowedValue) { - fn.super(); - this.fn = fn; - this.context = context; - this.windowedValue = windowedValue; - } - - @Override - public PipelineOptions getPipelineOptions() { - return context.getPipelineOptions(); - } - - @Override - public InputT element() { - return windowedValue.getValue(); - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - checkNotNull(view, "View passed to sideInput cannot be null"); - Iterator<? extends BoundedWindow> windowIter = windows().iterator(); - BoundedWindow window; - if (!windowIter.hasNext()) { - if (context.windowFn instanceof GlobalWindows) { - // TODO: Remove this once GroupByKeyOnly no longer outputs elements - // without windows - window = GlobalWindow.INSTANCE; - } else { - throw new IllegalStateException( - "sideInput called when main input element is not in any windows"); - } - } else { - window = windowIter.next(); - if (windowIter.hasNext()) { - throw new IllegalStateException( - "sideInput called when main input element is in multiple windows"); - } - } - return context.sideInput(view, window); - } - - @Override - public BoundedWindow window() { - if (!(fn instanceof OldDoFn.RequiresWindowAccess)) { - throw new UnsupportedOperationException( - "window() is only available in the context of a DoFn marked as RequiresWindow."); - } - return Iterables.getOnlyElement(windows()); - } - - @Override - public PaneInfo pane() { - return windowedValue.getPane(); - } - - @Override - public void output(OutputT output) { - context.outputWindowedValue(windowedValue.withValue(output)); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - context.outputWindowedValue(output, timestamp, - windowedValue.getWindows(), windowedValue.getPane()); - } - - @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { - checkNotNull(tag, "Tag passed to sideOutput cannot be null"); - context.sideOutputWindowedValue(tag, windowedValue.withValue(output)); - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null"); - context.sideOutputWindowedValue( - tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane()); - } - - @Override - public Instant timestamp() { - return windowedValue.getTimestamp(); - } - - public Collection<? extends BoundedWindow> windows() { - return windowedValue.getWindows(); - } - - @Override - public WindowingInternals<InputT, OutputT> windowingInternals() { - return new WindowingInternals<InputT, OutputT>() { - @Override - public void outputWindowedValue(OutputT output, Instant timestamp, - Collection<? extends BoundedWindow> windows, PaneInfo pane) { - context.outputWindowedValue(output, timestamp, windows, pane); - } - - @Override - public Collection<? extends BoundedWindow> windows() { - return windowedValue.getWindows(); - } - - @Override - public PaneInfo pane() { - return windowedValue.getPane(); - } - - @Override - public TimerInternals timerInternals() { - return context.stepContext.timerInternals(); - } - - @Override - public <T> void writePCollectionViewData( - TupleTag<?> tag, - Iterable<WindowedValue<T>> data, - Coder<T> elemCoder) throws IOException { - @SuppressWarnings("unchecked") - Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) context.windowFn.windowCoder(); - - context.stepContext.writePCollectionViewData( - tag, data, IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)), - window(), windowCoder); - } - - @Override - public StateInternals<?> stateInternals() { - return context.stepContext.stateInternals(); - } - - @Override - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { - return context.sideInput(view, mainInputWindow); - } - }; - } - - @Override - protected <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT> - createAggregatorInternal( - String name, Combine.CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) { - return context.createAggregatorInternal(name, combiner); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45570b9c/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java new file mode 100644 index 0000000..cd404a5 --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpAggregatorFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump.translators.utils; + +import java.io.Serializable; + +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.util.ExecutionContext; + +/** + * no-op aggregator factory. + */ +public class NoOpAggregatorFactory implements AggregatorFactory, Serializable { + + @Override + public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( + Class<?> fnClass, + ExecutionContext.StepContext stepContext, + String aggregatorName, + Combine.CombineFn<InputT, AccumT, OutputT> combine) { + return null; + } +}