Repository: beam Updated Branches: refs/heads/master 73305d631 -> 2d28ece8e
Cleanups in SimpleDoFnRunner and ParDoEvaluator - Makes it an error to output to an undeclared output tag, instead of effectively silently dropping this data. - Removes code in SimpleDoFnRunner that, IIRC, assigned windows to outputs from bundle methods - which is now obsolete because you have to specify the window when outputting from FinishBundle, and you can't output from StartBundle at all. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/281eaab3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/281eaab3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/281eaab3 Branch: refs/heads/master Commit: 281eaab3a0ac3810733b87159c9ea9e82a8480fd Parents: 73305d6 Author: Eugene Kirpichov <[email protected]> Authored: Fri May 19 16:09:13 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Thu May 25 13:45:17 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/SimpleDoFnRunner.java | 387 ++++--------------- .../beam/runners/direct/ParDoEvaluator.java | 18 +- .../spark/translation/MultiDoFnFunction.java | 8 +- .../spark/translation/TransformTranslator.java | 1 + .../streaming/StreamingTransformTranslator.java | 1 + .../apache/beam/sdk/transforms/ParDoTest.java | 76 +--- 6 files changed, 102 insertions(+), 389 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/281eaab3/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 97b0b33..7d7babd 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -20,11 +20,10 @@ package org.apache.beam.runners.core; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.collect.FluentIterable; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Set; import javax.annotation.Nullable; @@ -37,20 +36,13 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.state.Timer; import org.apache.beam.sdk.state.TimerSpec; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.FinishBundleContext; -import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; -import org.apache.beam.sdk.transforms.DoFn.ProcessContext; -import org.apache.beam.sdk.transforms.DoFn.StartBundleContext; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; @@ -73,18 +65,19 @@ import org.joda.time.format.PeriodFormat; */ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { + private final PipelineOptions options; /** The {@link DoFn} being run. */ private final DoFn<InputT, OutputT> fn; /** The {@link DoFnInvoker} being run. */ private final DoFnInvoker<InputT, OutputT> invoker; - /** The context used for running the {@link DoFn}. */ - private final DoFnContext<InputT, OutputT> context; - + private final SideInputReader sideInputReader; private final OutputManager outputManager; private final TupleTag<OutputT> mainOutputTag; + /** The set of known output tags. */ + private final Set<TupleTag<?>> outputTags; private final boolean observesWindow; @@ -106,12 +99,16 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out List<TupleTag<?>> additionalOutputTags, StepContext stepContext, WindowingStrategy<?, ?> windowingStrategy) { + this.options = options; this.fn = fn; this.signature = DoFnSignatures.getSignature(fn.getClass()); this.observesWindow = signature.processElement().observesWindow() || !sideInputReader.isEmpty(); this.invoker = DoFnInvokers.invokerFor(fn); + this.sideInputReader = sideInputReader; this.outputManager = outputManager; this.mainOutputTag = mainOutputTag; + this.outputTags = + Sets.newHashSet(FluentIterable.<TupleTag<?>>of(mainOutputTag).append(additionalOutputTags)); this.stepContext = stepContext; // This is a cast of an _invariant_ coder. But we are assured by pipeline validation @@ -121,26 +118,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out (Coder<BoundedWindow>) windowingStrategy.getWindowFn().windowCoder(); this.windowCoder = untypedCoder; this.allowedLateness = windowingStrategy.getAllowedLateness(); - - this.context = - new DoFnContext<>( - options, - fn, - sideInputReader, - outputManager, - mainOutputTag, - additionalOutputTags, - stepContext, - windowingStrategy.getWindowFn()); } @Override public void startBundle() { - DoFnStartBundleContext<InputT, OutputT> startBundleContext = - createStartBundleContext(fn, context); // This can contain user code. Wrap it in case it throws an exception. try { - invoker.invokeStartBundle(startBundleContext); + invoker.invokeStartBundle(new DoFnStartBundleContext()); } catch (Throwable t) { // Exception in user code. throw wrapUserCodeException(t); @@ -174,7 +158,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out case PROCESSING_TIME: case SYNCHRONIZED_PROCESSING_TIME: - effectiveTimestamp = context.stepContext.timerInternals().currentInputWatermarkTime(); + effectiveTimestamp = stepContext.timerInternals().currentInputWatermarkTime(); break; default: @@ -182,18 +166,15 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out String.format("Unknown time domain: %s", timeDomain)); } - OnTimerArgumentProvider<InputT, OutputT> argumentProvider = - new OnTimerArgumentProvider<>( - fn, context, window, allowedLateness, effectiveTimestamp, timeDomain); + OnTimerArgumentProvider argumentProvider = + new OnTimerArgumentProvider(window, effectiveTimestamp, timeDomain); invoker.invokeOnTimer(timerId, argumentProvider); } private void invokeProcessElement(WindowedValue<InputT> elem) { - final DoFnProcessContext<InputT, OutputT> processContext = createProcessContext(elem); - // This can contain user code. Wrap it in case it throws an exception. try { - invoker.invokeProcessElement(processContext); + invoker.invokeProcessElement(new DoFnProcessContext(elem)); } catch (Exception ex) { throw wrapUserCodeException(ex); } @@ -201,32 +182,15 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out @Override public void finishBundle() { - DoFnFinishBundleContext<InputT, OutputT> finishBundleContext = - createFinishBundleContext(fn, context); // This can contain user code. Wrap it in case it throws an exception. try { - invoker.invokeFinishBundle(finishBundleContext); + invoker.invokeFinishBundle(new DoFnFinishBundleContext()); } catch (Throwable t) { // Exception in user code. throw wrapUserCodeException(t); } } - private DoFnStartBundleContext<InputT, OutputT> createStartBundleContext( - DoFn<InputT, OutputT> fn, DoFnContext<InputT, OutputT> context) { - return new DoFnStartBundleContext<>(fn, context); - } - - private DoFnFinishBundleContext<InputT, OutputT> createFinishBundleContext( - DoFn<InputT, OutputT> fn, DoFnContext<InputT, OutputT> context) { - return new DoFnFinishBundleContext<>(fn, context); - } - - /** Returns a new {@link DoFn.ProcessContext} for the given element. */ - private DoFnProcessContext<InputT, OutputT> createProcessContext(WindowedValue<InputT> elem) { - return new DoFnProcessContext<InputT, OutputT>(fn, context, elem, allowedLateness); - } - private RuntimeException wrapUserCodeException(Throwable t) { throw UserCodeException.wrapIf(!isSystemDoFn(), t); } @@ -235,170 +199,31 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out return invoker.getClass().isAnnotationPresent(SystemDoFnInternal.class); } - /** - * A concrete implementation of {@code DoFn.Context} used for running a {@link DoFn}. - * - * @param <InputT> the type of the {@link DoFn} (main) input elements - * @param <OutputT> the type of the {@link DoFn} (main) output elements - */ - private static class DoFnContext<InputT, OutputT> { - private static final int MAX_SIDE_OUTPUTS = 1000; - - final PipelineOptions options; - final DoFn<InputT, OutputT> fn; - final SideInputReader sideInputReader; - final OutputManager outputManager; - final TupleTag<OutputT> mainOutputTag; - final StepContext stepContext; - final WindowFn<?, ?> windowFn; - - /** - * The set of known output tags, some of which may be undeclared, so we can throw an exception - * when it exceeds {@link #MAX_SIDE_OUTPUTS}. - */ - private Set<TupleTag<?>> outputTags; - - public DoFnContext( - PipelineOptions options, - DoFn<InputT, OutputT> fn, - SideInputReader sideInputReader, - OutputManager outputManager, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> additionalOutputTags, - StepContext stepContext, - WindowFn<?, ?> windowFn) { - this.options = options; - this.fn = fn; - this.sideInputReader = sideInputReader; - this.outputManager = outputManager; - this.mainOutputTag = mainOutputTag; - this.outputTags = Sets.newHashSet(); - - outputTags.add(mainOutputTag); - for (TupleTag<?> additionalOutputTag : additionalOutputTags) { - outputTags.add(additionalOutputTag); - } - - this.stepContext = stepContext; - this.windowFn = windowFn; - } - - ////////////////////////////////////////////////////////////////////////////// - - public PipelineOptions getPipelineOptions() { - return options; - } - - <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue( - T output, Instant timestamp, Collection<W> windows, PaneInfo pane) { - final Instant inputTimestamp = timestamp; - - if (timestamp == null) { - timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; - } - - if (windows == null) { - try { - // The windowFn can never succeed at accessing the element, so its type does not - // matter here - @SuppressWarnings("unchecked") - WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn; - windows = - objectWindowFn.assignWindows( - objectWindowFn.new AssignContext() { - @Override - public Object element() { - throw new UnsupportedOperationException( - "WindowFn attempted to access input element when none was available"); - } - - @Override - public Instant timestamp() { - if (inputTimestamp == null) { - throw new UnsupportedOperationException( - "WindowFn attempted to access input timestamp when none was available"); - } - return inputTimestamp; - } - - @Override - public W window() { - throw new UnsupportedOperationException( - "WindowFn attempted to access input windows when none were available"); - } - }); - } catch (Exception e) { - throw UserCodeException.wrap(e); - } - } - - return WindowedValue.of(output, timestamp, windows, pane); - } - - public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) { - if (!sideInputReader.contains(view)) { - throw new IllegalArgumentException("calling sideInput() with unknown view"); - } - return sideInputReader.get(view, sideInputWindow); - } - - void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane)); - } - - void outputWindowedValue(WindowedValue<OutputT> windowedElem) { - outputManager.output(mainOutputTag, windowedElem); - } - - private <T> void outputWindowedValue( - TupleTag<T> tag, - T output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - outputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane)); - } - - private <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) { - if (!outputTags.contains(tag)) { - // This tag wasn't declared nor was it seen before during this execution. - // Thus, this must be a new, undeclared and unconsumed output. - // To prevent likely user errors, enforce the limit on the number of side - // outputs. - if (outputTags.size() >= MAX_SIDE_OUTPUTS) { - throw new IllegalArgumentException( - "the number of outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS); - } - outputTags.add(tag); - } - - outputManager.output(tag, windowedElem); + private <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) { + if (!sideInputReader.contains(view)) { + throw new IllegalArgumentException("calling sideInput() with unknown view"); } + return sideInputReader.get(view, sideInputWindow); } + private <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) { + checkArgument(outputTags.contains(tag), "Unknown output tag %s", tag); + outputManager.output(tag, windowedElem); + } /** * A concrete implementation of {@link DoFn.StartBundleContext}. */ - private class DoFnStartBundleContext<InputT, OutputT> + private class DoFnStartBundleContext extends DoFn<InputT, OutputT>.StartBundleContext implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { - private final DoFn<InputT, OutputT> fn; - private final DoFnContext<InputT, OutputT> context; - - private DoFnStartBundleContext(DoFn<InputT, OutputT> fn, DoFnContext<InputT, OutputT> context) { + private DoFnStartBundleContext() { fn.super(); - this.fn = fn; - this.context = context; } @Override public PipelineOptions getPipelineOptions() { - return context.getPipelineOptions(); + return options; } @Override @@ -408,24 +233,25 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override - public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { + public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { return this; } @Override - public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) { + public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext( + DoFn<InputT, OutputT> doFn) { throw new UnsupportedOperationException( "Cannot access FinishBundleContext outside of @FinishBundle method."); } @Override - public ProcessContext processContext(DoFn<InputT, OutputT> doFn) { + public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) { throw new UnsupportedOperationException( "Cannot access ProcessContext outside of @ProcessElement method."); } @Override - public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { throw new UnsupportedOperationException( "Cannot access OnTimerContext outside of @OnTimer methods."); } @@ -453,20 +279,16 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out * B * A concrete implementation of {@link DoFn.FinishBundleContext}. */ - private class DoFnFinishBundleContext<InputT, OutputT> + private class DoFnFinishBundleContext extends DoFn<InputT, OutputT>.FinishBundleContext implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { - private final DoFnContext<InputT, OutputT> context; - - private DoFnFinishBundleContext( - DoFn<InputT, OutputT> fn, DoFnContext<InputT, OutputT> context) { + private DoFnFinishBundleContext() { fn.super(); - this.context = context; } @Override public PipelineOptions getPipelineOptions() { - return context.getPipelineOptions(); + return options; } @Override @@ -476,24 +298,25 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override - public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { + public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { throw new UnsupportedOperationException( "Cannot access StartBundleContext outside of @StartBundle method."); } @Override - public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) { + public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext( + DoFn<InputT, OutputT> doFn) { return this; } @Override - public ProcessContext processContext(DoFn<InputT, OutputT> doFn) { + public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) { throw new UnsupportedOperationException( "Cannot access ProcessContext outside of @ProcessElement method."); } @Override - public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { throw new UnsupportedOperationException( "Cannot access OnTimerContext outside of @OnTimer methods."); } @@ -518,30 +341,22 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out @Override public void output(OutputT output, Instant timestamp, BoundedWindow window) { - context.outputWindowedValue(WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING)); + output(mainOutputTag, output, timestamp, window); } @Override public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) { - context.outputWindowedValue( - tag, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING)); + outputWindowedValue(tag, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING)); } } /** * A concrete implementation of {@link DoFn.ProcessContext} used for running a {@link DoFn} over a * single element. - * - * @param <InputT> the type of the {@link DoFn} (main) input elements - * @param <OutputT> the type of the {@link DoFn} (main) output elements */ - private class DoFnProcessContext<InputT, OutputT> extends DoFn<InputT, OutputT>.ProcessContext + private class DoFnProcessContext extends DoFn<InputT, OutputT>.ProcessContext implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { - - final DoFn<InputT, OutputT> fn; - final DoFnContext<InputT, OutputT> context; - final WindowedValue<InputT> windowedValue; - private final Duration allowedLateness; + final WindowedValue<InputT> elem; /** Lazily initialized; should only be accessed via {@link #getNamespace()}. */ @Nullable private StateNamespace namespace; @@ -549,7 +364,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out /** * The state namespace for this context. * - * <p>Any call to {@link #getNamespace()} when more than one window is present will crash; this + * <p>Any call to this method when more than one window is present will crash; this * represents a bug in the runner or the {@link DoFnSignature}, since values must be in exactly * one window when state or timers are relevant. */ @@ -561,55 +376,32 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } private DoFnProcessContext( - DoFn<InputT, OutputT> fn, - DoFnContext<InputT, OutputT> context, - WindowedValue<InputT> windowedValue, - Duration allowedLateness) { + WindowedValue<InputT> elem) { fn.super(); - this.fn = fn; - this.context = context; - this.windowedValue = windowedValue; - this.allowedLateness = allowedLateness; + this.elem = elem; } @Override public PipelineOptions getPipelineOptions() { - return context.getPipelineOptions(); + return options; } @Override public InputT element() { - return windowedValue.getValue(); + return elem.getValue(); } @Override public <T> T sideInput(PCollectionView<T> view) { checkNotNull(view, "View passed to sideInput cannot be null"); - Iterator<? extends BoundedWindow> windowIter = windows().iterator(); - BoundedWindow window; - if (!windowIter.hasNext()) { - if (context.windowFn instanceof GlobalWindows) { - // TODO: Remove this once GroupByKeyOnly no longer outputs elements - // without windows - window = GlobalWindow.INSTANCE; - } else { - throw new IllegalStateException( - "sideInput called when main input element is not in any windows"); - } - } else { - window = windowIter.next(); - if (windowIter.hasNext()) { - throw new IllegalStateException( - "sideInput called when main input element is in multiple windows"); - } - } - return context.sideInput( + BoundedWindow window = Iterables.getOnlyElement(windows()); + return SimpleDoFnRunner.this.sideInput( view, view.getWindowMappingFn().getSideInputWindow(window)); } @Override public PaneInfo pane() { - return windowedValue.getPane(); + return elem.getPane(); } @Override @@ -619,37 +411,36 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out @Override public void output(OutputT output) { - context.outputWindowedValue(windowedValue.withValue(output)); + output(mainOutputTag, output); } @Override public void outputWithTimestamp(OutputT output, Instant timestamp) { checkTimestamp(timestamp); - context.outputWindowedValue( - output, timestamp, windowedValue.getWindows(), windowedValue.getPane()); + outputWithTimestamp(mainOutputTag, output, timestamp); } @Override public <T> void output(TupleTag<T> tag, T output) { checkNotNull(tag, "Tag passed to output cannot be null"); - context.outputWindowedValue(tag, windowedValue.withValue(output)); + outputWindowedValue(tag, elem.withValue(output)); } @Override public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null"); checkTimestamp(timestamp); - context.outputWindowedValue( - tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane()); + outputWindowedValue( + tag, WindowedValue.of(output, timestamp, elem.getWindows(), elem.getPane())); } @Override public Instant timestamp() { - return windowedValue.getTimestamp(); + return elem.getTimestamp(); } public Collection<? extends BoundedWindow> windows() { - return windowedValue.getWindows(); + return elem.getWindows(); } @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, but must be respected @@ -657,7 +448,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out // The documentation of getAllowedTimestampSkew explicitly permits Long.MAX_VALUE to be used // for infinite skew. Defend against underflow in that case for timestamps before the epoch if (fn.getAllowedTimestampSkew().getMillis() != Long.MAX_VALUE - && timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) { + && timestamp.isBefore(elem.getTimestamp().minus(fn.getAllowedTimestampSkew()))) { throw new IllegalArgumentException( String.format( "Cannot output with timestamp %s. Output timestamps must be no earlier than the " @@ -665,23 +456,24 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out + "DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed " + "skew.", timestamp, - windowedValue.getTimestamp(), + elem.getTimestamp(), PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()))); } } @Override public BoundedWindow window() { - return Iterables.getOnlyElement(windowedValue.getWindows()); + return Iterables.getOnlyElement(elem.getWindows()); } @Override - public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { + public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { throw new UnsupportedOperationException("StartBundleContext parameters are not supported."); } @Override - public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) { + public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext( + DoFn<InputT, OutputT> doFn) { throw new UnsupportedOperationException("FinishBundleContext parameters are not supported."); } @@ -691,7 +483,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override - public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { throw new UnsupportedOperationException( "Cannot access OnTimerContext outside of @OnTimer methods."); } @@ -720,7 +512,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out TimerSpec spec = (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn); return new TimerInternalsTimer( - window(), getNamespace(), allowedLateness, timerId, spec, stepContext.timerInternals()); + window(), getNamespace(), timerId, spec, stepContext.timerInternals()); } catch (IllegalAccessException e) { throw new RuntimeException(e); } @@ -730,20 +522,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out /** * A concrete implementation of {@link DoFnInvoker.ArgumentProvider} used for running a {@link * DoFn} on a timer. - * - * @param <InputT> the type of the {@link DoFn} (main) input elements - * @param <OutputT> the type of the {@link DoFn} (main) output elements */ - private class OnTimerArgumentProvider<InputT, OutputT> + private class OnTimerArgumentProvider extends DoFn<InputT, OutputT>.OnTimerContext implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { - - final DoFn<InputT, OutputT> fn; - final DoFnContext<InputT, OutputT> context; private final BoundedWindow window; private final Instant timestamp; private final TimeDomain timeDomain; - private final Duration allowedLateness; /** Lazily initialized; should only be accessed via {@link #getNamespace()}. */ private StateNamespace namespace; @@ -751,7 +536,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out /** * The state namespace for this context. * - * <p>Any call to {@link #getNamespace()} when more than one window is present will crash; this + * <p>Any call to this method when more than one window is present will crash; this * represents a bug in the runner or the {@link DoFnSignature}, since values must be in exactly * one window when state or timers are relevant. */ @@ -763,17 +548,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } private OnTimerArgumentProvider( - DoFn<InputT, OutputT> fn, - DoFnContext<InputT, OutputT> context, BoundedWindow window, - Duration allowedLateness, Instant timestamp, TimeDomain timeDomain) { fn.super(); - this.fn = fn; - this.context = context; this.window = window; - this.allowedLateness = allowedLateness; this.timestamp = timestamp; this.timeDomain = timeDomain; } @@ -789,12 +568,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override - public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { + public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { throw new UnsupportedOperationException("StartBundleContext parameters are not supported."); } @Override - public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) { + public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext( + DoFn<InputT, OutputT> doFn) { throw new UnsupportedOperationException("FinishBundleContext parameters are not supported."); } @@ -805,12 +585,12 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out @Override - public ProcessContext processContext(DoFn<InputT, OutputT> doFn) { + public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) { throw new UnsupportedOperationException("ProcessContext parameters are not supported."); } @Override - public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { return this; } @@ -838,7 +618,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out TimerSpec spec = (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn); return new TimerInternalsTimer( - window, getNamespace(), allowedLateness, timerId, spec, stepContext.timerInternals()); + window, getNamespace(), timerId, spec, stepContext.timerInternals()); } catch (IllegalAccessException e) { throw new RuntimeException(e); } @@ -846,42 +626,37 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out @Override public PipelineOptions getPipelineOptions() { - return context.getPipelineOptions(); + return options; } @Override public void output(OutputT output) { - context.outputWindowedValue( - output, timestamp(), Collections.singleton(window()), PaneInfo.NO_FIRING); + output(mainOutputTag, output); } @Override public void outputWithTimestamp(OutputT output, Instant timestamp) { - context.outputWindowedValue( - output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING); + outputWithTimestamp(mainOutputTag, output, timestamp); } @Override public <T> void output(TupleTag<T> tag, T output) { - context.outputWindowedValue( - tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING); + outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING)); } @Override public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - context.outputWindowedValue( - tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING); + outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING)); } } - private static class TimerInternalsTimer implements Timer { + private class TimerInternalsTimer implements Timer { private final TimerInternals timerInternals; // The window and the namespace represent the same thing, but the namespace is a cached // and specially encoded form. Since the namespace can be cached across timers, it is // passed in whole rather than being computed here. private final BoundedWindow window; - private final Duration allowedLateness; private final StateNamespace namespace; private final String timerId; private final TimerSpec spec; @@ -891,12 +666,10 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out public TimerInternalsTimer( BoundedWindow window, StateNamespace namespace, - Duration allowedLateness, String timerId, TimerSpec spec, TimerInternals timerInternals) { this.window = window; - this.allowedLateness = allowedLateness; this.namespace = namespace; this.timerId = timerId; this.spec = spec; http://git-wip-us.apache.org/repos/asf/beam/blob/281eaab3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index 28fc68d..26da6c6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -17,8 +17,9 @@ */ package org.apache.beam.runners.direct; +import static com.google.common.base.Preconditions.checkArgument; + import com.google.common.collect.ImmutableList; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -218,7 +219,6 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> { static class BundleOutputManager implements OutputManager { private final Map<TupleTag<?>, UncommittedBundle<?>> bundles; - private final Map<TupleTag<?>, List<?>> undeclaredOutputs; public static BundleOutputManager create(Map<TupleTag<?>, UncommittedBundle<?>> outputBundles) { return new BundleOutputManager(outputBundles); @@ -226,23 +226,13 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> { private BundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) { this.bundles = bundles; - undeclaredOutputs = new HashMap<>(); } @SuppressWarnings({"unchecked", "rawtypes"}) @Override public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { - UncommittedBundle bundle = bundles.get(tag); - if (bundle == null) { - List<WindowedValue<T>> undeclaredContents = (List) undeclaredOutputs.get(tag); - if (undeclaredContents == null) { - undeclaredContents = new ArrayList<>(); - undeclaredOutputs.put(tag, undeclaredContents); - } - undeclaredContents.add(output); - } else { - bundle.add(output); - } + checkArgument(bundles.containsKey(tag), "Unknown output tag %s", tag); + ((UncommittedBundle) bundles.get(tag)).add(output); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/281eaab3/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index 4a66541..3274912 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -22,8 +22,8 @@ import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.Multimap; -import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Map; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; @@ -57,6 +57,7 @@ public class MultiDoFnFunction<InputT, OutputT> private final DoFn<InputT, OutputT> doFn; private final SparkRuntimeContext runtimeContext; private final TupleTag<OutputT> mainOutputTag; + private final List<TupleTag<?>> additionalOutputTags; private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs; private final WindowingStrategy<?, ?> windowingStrategy; @@ -66,6 +67,7 @@ public class MultiDoFnFunction<InputT, OutputT> * @param doFn The {@link DoFn} to be wrapped. * @param runtimeContext The {@link SparkRuntimeContext}. * @param mainOutputTag The main output {@link TupleTag}. + * @param additionalOutputTags Additional {@link TupleTag output tags}. * @param sideInputs Side inputs used in this {@link DoFn}. * @param windowingStrategy Input {@link WindowingStrategy}. */ @@ -76,6 +78,7 @@ public class MultiDoFnFunction<InputT, OutputT> DoFn<InputT, OutputT> doFn, SparkRuntimeContext runtimeContext, TupleTag<OutputT> mainOutputTag, + List<TupleTag<?>> additionalOutputTags, Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> sideInputs, WindowingStrategy<?, ?> windowingStrategy) { this.aggAccum = aggAccum; @@ -84,6 +87,7 @@ public class MultiDoFnFunction<InputT, OutputT> this.doFn = doFn; this.runtimeContext = runtimeContext; this.mainOutputTag = mainOutputTag; + this.additionalOutputTags = additionalOutputTags; this.sideInputs = sideInputs; this.windowingStrategy = windowingStrategy; } @@ -101,7 +105,7 @@ public class MultiDoFnFunction<InputT, OutputT> new SparkSideInputReader(sideInputs), outputManager, mainOutputTag, - Collections.<TupleTag<?>>emptyList(), + additionalOutputTags, new SparkProcessContext.NoOpStepContext(), windowingStrategy); http://git-wip-us.apache.org/repos/asf/beam/blob/281eaab3/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index b2ed3a9..742ea83 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -369,6 +369,7 @@ public final class TransformTranslator { doFn, context.getRuntimeContext(), transform.getMainOutputTag(), + transform.getAdditionalOutputTags().getAll(), TranslationUtils.getSideInputs(transform.getSideInputs(), context), windowingStrategy)); Map<TupleTag<?>, PValue> outputs = context.getOutputs(transform); http://git-wip-us.apache.org/repos/asf/beam/blob/281eaab3/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index acb4a02..43f4b75 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -411,6 +411,7 @@ public final class StreamingTransformTranslator { doFn, runtimeContext, transform.getMainOutputTag(), + transform.getAdditionalOutputTags().getAll(), sideInputs, windowingStrategy)); } http://git-wip-us.apache.org/repos/asf/beam/blob/281eaab3/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 35c02ba..c67cf2a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -32,7 +32,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.not; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; -import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -515,58 +514,18 @@ public class ParDoTest implements Serializable { @Test @Category(NeedsRunner.class) public void testParDoWritingToUndeclaredTag() { - List<Integer> inputs = Arrays.asList(3, -42, 666); TupleTag<String> notOutputTag = new TupleTag<String>("additional"){}; - PCollection<String> output = pipeline + pipeline .apply(Create.of(inputs)) .apply(ParDo.of(new TestDoFn( Arrays.<PCollectionView<Integer>>asList(), - Arrays.asList(notOutputTag)))); - - PAssert.that(output) - .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); + Arrays.asList(notOutputTag))) + /* No call to .withOutputTags - should cause error */); - pipeline.run(); - } - - @Test - // TODO: The exception thrown is runner-specific, even if the behavior is general - @Category(NeedsRunner.class) - public void testParDoUndeclaredTagLimit() { - - PCollection<Integer> input = pipeline.apply(Create.of(Arrays.asList(3))); - - // Success for a total of 1000 outputs. - input - .apply("Success1000", ParDo.of(new DoFn<Integer, String>() { - @ProcessElement - public void processElement(ProcessContext c) { - TupleTag<String> specialOutputTag = new TupleTag<String>(){}; - c.output(specialOutputTag, "special"); - c.output(specialOutputTag, "special"); - c.output(specialOutputTag, "special"); - - for (int i = 0; i < 998; i++) { - c.output(new TupleTag<String>(){}, "tag" + i); - } - }})); - pipeline.run(); - - // Failure for a total of 1001 outputs. - input - .apply("Failure1001", ParDo.of(new DoFn<Integer, String>() { - @ProcessElement - public void processElement(ProcessContext c) { - for (int i = 0; i < 1000; i++) { - c.output(new TupleTag<String>(){}, "output" + i); - } - }})); - - thrown.expect(RuntimeException.class); - thrown.expectMessage("the number of outputs has exceeded a limit"); + thrown.expectMessage("additional"); pipeline.run(); } @@ -1107,43 +1066,32 @@ public class ParDoTest implements Serializable { private final List<Integer> inputs; private final List<Integer> sideInputs; private final String additionalOutput; - private final boolean ordered; public static HasExpectedOutput forInput(List<Integer> inputs) { return new HasExpectedOutput( new ArrayList<Integer>(inputs), new ArrayList<Integer>(), - "", - false); + ""); } private HasExpectedOutput(List<Integer> inputs, List<Integer> sideInputs, - String additionalOutput, - boolean ordered) { + String additionalOutput) { this.inputs = inputs; this.sideInputs = sideInputs; this.additionalOutput = additionalOutput; - this.ordered = ordered; } public HasExpectedOutput andSideInputs(Integer... sideInputValues) { - List<Integer> sideInputs = new ArrayList<>(); - for (Integer sideInputValue : sideInputValues) { - sideInputs.add(sideInputValue); - } - return new HasExpectedOutput(inputs, sideInputs, additionalOutput, ordered); + return new HasExpectedOutput( + inputs, Arrays.asList(sideInputValues), additionalOutput); } public HasExpectedOutput fromOutput(TupleTag<String> outputTag) { return fromOutput(outputTag.getId()); } public HasExpectedOutput fromOutput(String outputId) { - return new HasExpectedOutput(inputs, sideInputs, outputId, ordered); - } - - public HasExpectedOutput inOrder() { - return new HasExpectedOutput(inputs, sideInputs, additionalOutput, true); + return new HasExpectedOutput(inputs, sideInputs, outputId); } @Override @@ -1179,11 +1127,7 @@ public class ParDoTest implements Serializable { } String[] expectedProcessedsArray = expectedProcesseds.toArray(new String[expectedProcesseds.size()]); - if (!ordered || expectedProcesseds.isEmpty()) { - assertThat(processeds, containsInAnyOrder(expectedProcessedsArray)); - } else { - assertThat(processeds, contains(expectedProcessedsArray)); - } + assertThat(processeds, containsInAnyOrder(expectedProcessedsArray)); for (String finished : finisheds) { assertEquals(additionalOutputPrefix + "finished", finished);
