Repository: beam
Updated Branches:
  refs/heads/master 73305d631 -> 2d28ece8e


Cleanups in SimpleDoFnRunner and ParDoEvaluator

- Makes it an error to output to an undeclared output tag,
  instead of effectively silently dropping this data.
- Removes code in SimpleDoFnRunner that, IIRC, assigned windows to
  outputs from bundle methods - which is now obsolete because you
  have to specify the window when outputting from FinishBundle,
  and you can't output from StartBundle at all.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/281eaab3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/281eaab3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/281eaab3

Branch: refs/heads/master
Commit: 281eaab3a0ac3810733b87159c9ea9e82a8480fd
Parents: 73305d6
Author: Eugene Kirpichov <[email protected]>
Authored: Fri May 19 16:09:13 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Thu May 25 13:45:17 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/SimpleDoFnRunner.java     | 387 ++++---------------
 .../beam/runners/direct/ParDoEvaluator.java     |  18 +-
 .../spark/translation/MultiDoFnFunction.java    |   8 +-
 .../spark/translation/TransformTranslator.java  |   1 +
 .../streaming/StreamingTransformTranslator.java |   1 +
 .../apache/beam/sdk/transforms/ParDoTest.java   |  76 +---
 6 files changed, 102 insertions(+), 389 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/281eaab3/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 97b0b33..7d7babd 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -20,11 +20,10 @@ package org.apache.beam.runners.core;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import javax.annotation.Nullable;
@@ -37,20 +36,13 @@ import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.state.TimerSpec;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.FinishBundleContext;
-import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
-import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
-import org.apache.beam.sdk.transforms.DoFn.StartBundleContext;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -73,18 +65,19 @@ import org.joda.time.format.PeriodFormat;
  */
 public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, 
OutputT> {
 
+  private final PipelineOptions options;
   /** The {@link DoFn} being run. */
   private final DoFn<InputT, OutputT> fn;
 
   /** The {@link DoFnInvoker} being run. */
   private final DoFnInvoker<InputT, OutputT> invoker;
 
-  /** The context used for running the {@link DoFn}. */
-  private final DoFnContext<InputT, OutputT> context;
-
+  private final SideInputReader sideInputReader;
   private final OutputManager outputManager;
 
   private final TupleTag<OutputT> mainOutputTag;
+  /** The set of known output tags. */
+  private final Set<TupleTag<?>> outputTags;
 
   private final boolean observesWindow;
 
@@ -106,12 +99,16 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
       List<TupleTag<?>> additionalOutputTags,
       StepContext stepContext,
       WindowingStrategy<?, ?> windowingStrategy) {
+    this.options = options;
     this.fn = fn;
     this.signature = DoFnSignatures.getSignature(fn.getClass());
     this.observesWindow = signature.processElement().observesWindow() || 
!sideInputReader.isEmpty();
     this.invoker = DoFnInvokers.invokerFor(fn);
+    this.sideInputReader = sideInputReader;
     this.outputManager = outputManager;
     this.mainOutputTag = mainOutputTag;
+    this.outputTags =
+        
Sets.newHashSet(FluentIterable.<TupleTag<?>>of(mainOutputTag).append(additionalOutputTags));
     this.stepContext = stepContext;
 
     // This is a cast of an _invariant_ coder. But we are assured by pipeline 
validation
@@ -121,26 +118,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
         (Coder<BoundedWindow>) windowingStrategy.getWindowFn().windowCoder();
     this.windowCoder = untypedCoder;
     this.allowedLateness = windowingStrategy.getAllowedLateness();
-
-    this.context =
-        new DoFnContext<>(
-            options,
-            fn,
-            sideInputReader,
-            outputManager,
-            mainOutputTag,
-            additionalOutputTags,
-            stepContext,
-            windowingStrategy.getWindowFn());
   }
 
   @Override
   public void startBundle() {
-    DoFnStartBundleContext<InputT, OutputT> startBundleContext =
-        createStartBundleContext(fn, context);
     // This can contain user code. Wrap it in case it throws an exception.
     try {
-      invoker.invokeStartBundle(startBundleContext);
+      invoker.invokeStartBundle(new DoFnStartBundleContext());
     } catch (Throwable t) {
       // Exception in user code.
       throw wrapUserCodeException(t);
@@ -174,7 +158,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
 
       case PROCESSING_TIME:
       case SYNCHRONIZED_PROCESSING_TIME:
-        effectiveTimestamp = 
context.stepContext.timerInternals().currentInputWatermarkTime();
+        effectiveTimestamp = 
stepContext.timerInternals().currentInputWatermarkTime();
         break;
 
       default:
@@ -182,18 +166,15 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
             String.format("Unknown time domain: %s", timeDomain));
     }
 
-    OnTimerArgumentProvider<InputT, OutputT> argumentProvider =
-        new OnTimerArgumentProvider<>(
-            fn, context, window, allowedLateness, effectiveTimestamp, 
timeDomain);
+    OnTimerArgumentProvider argumentProvider =
+        new OnTimerArgumentProvider(window, effectiveTimestamp, timeDomain);
     invoker.invokeOnTimer(timerId, argumentProvider);
   }
 
   private void invokeProcessElement(WindowedValue<InputT> elem) {
-    final DoFnProcessContext<InputT, OutputT> processContext = 
createProcessContext(elem);
-
     // This can contain user code. Wrap it in case it throws an exception.
     try {
-      invoker.invokeProcessElement(processContext);
+      invoker.invokeProcessElement(new DoFnProcessContext(elem));
     } catch (Exception ex) {
       throw wrapUserCodeException(ex);
     }
@@ -201,32 +182,15 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
 
   @Override
   public void finishBundle() {
-    DoFnFinishBundleContext<InputT, OutputT> finishBundleContext =
-        createFinishBundleContext(fn, context);
     // This can contain user code. Wrap it in case it throws an exception.
     try {
-      invoker.invokeFinishBundle(finishBundleContext);
+      invoker.invokeFinishBundle(new DoFnFinishBundleContext());
     } catch (Throwable t) {
       // Exception in user code.
       throw wrapUserCodeException(t);
     }
   }
 
-  private DoFnStartBundleContext<InputT, OutputT> createStartBundleContext(
-      DoFn<InputT, OutputT> fn, DoFnContext<InputT, OutputT> context) {
-    return new DoFnStartBundleContext<>(fn, context);
-  }
-
-  private DoFnFinishBundleContext<InputT, OutputT> createFinishBundleContext(
-      DoFn<InputT, OutputT> fn, DoFnContext<InputT, OutputT> context) {
-    return new DoFnFinishBundleContext<>(fn, context);
-  }
-
-  /** Returns a new {@link DoFn.ProcessContext} for the given element. */
-  private DoFnProcessContext<InputT, OutputT> 
createProcessContext(WindowedValue<InputT> elem) {
-    return new DoFnProcessContext<InputT, OutputT>(fn, context, elem, 
allowedLateness);
-  }
-
   private RuntimeException wrapUserCodeException(Throwable t) {
     throw UserCodeException.wrapIf(!isSystemDoFn(), t);
   }
@@ -235,170 +199,31 @@ public class SimpleDoFnRunner<InputT, OutputT> 
implements DoFnRunner<InputT, Out
     return invoker.getClass().isAnnotationPresent(SystemDoFnInternal.class);
   }
 
-  /**
-   * A concrete implementation of {@code DoFn.Context} used for running a 
{@link DoFn}.
-   *
-   * @param <InputT> the type of the {@link DoFn} (main) input elements
-   * @param <OutputT> the type of the {@link DoFn} (main) output elements
-   */
-  private static class DoFnContext<InputT, OutputT> {
-    private static final int MAX_SIDE_OUTPUTS = 1000;
-
-    final PipelineOptions options;
-    final DoFn<InputT, OutputT> fn;
-    final SideInputReader sideInputReader;
-    final OutputManager outputManager;
-    final TupleTag<OutputT> mainOutputTag;
-    final StepContext stepContext;
-    final WindowFn<?, ?> windowFn;
-
-    /**
-     * The set of known output tags, some of which may be undeclared, so we 
can throw an exception
-     * when it exceeds {@link #MAX_SIDE_OUTPUTS}.
-     */
-    private Set<TupleTag<?>> outputTags;
-
-    public DoFnContext(
-        PipelineOptions options,
-        DoFn<InputT, OutputT> fn,
-        SideInputReader sideInputReader,
-        OutputManager outputManager,
-        TupleTag<OutputT> mainOutputTag,
-        List<TupleTag<?>> additionalOutputTags,
-        StepContext stepContext,
-        WindowFn<?, ?> windowFn) {
-      this.options = options;
-      this.fn = fn;
-      this.sideInputReader = sideInputReader;
-      this.outputManager = outputManager;
-      this.mainOutputTag = mainOutputTag;
-      this.outputTags = Sets.newHashSet();
-
-      outputTags.add(mainOutputTag);
-      for (TupleTag<?> additionalOutputTag : additionalOutputTags) {
-        outputTags.add(additionalOutputTag);
-      }
-
-      this.stepContext = stepContext;
-      this.windowFn = windowFn;
-    }
-
-    
//////////////////////////////////////////////////////////////////////////////
-
-    public PipelineOptions getPipelineOptions() {
-      return options;
-    }
-
-    <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue(
-        T output, Instant timestamp, Collection<W> windows, PaneInfo pane) {
-      final Instant inputTimestamp = timestamp;
-
-      if (timestamp == null) {
-        timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
-      }
-
-      if (windows == null) {
-        try {
-          // The windowFn can never succeed at accessing the element, so its 
type does not
-          // matter here
-          @SuppressWarnings("unchecked")
-          WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn;
-          windows =
-              objectWindowFn.assignWindows(
-                  objectWindowFn.new AssignContext() {
-                    @Override
-                    public Object element() {
-                      throw new UnsupportedOperationException(
-                          "WindowFn attempted to access input element when 
none was available");
-                    }
-
-                    @Override
-                    public Instant timestamp() {
-                      if (inputTimestamp == null) {
-                        throw new UnsupportedOperationException(
-                            "WindowFn attempted to access input timestamp when 
none was available");
-                      }
-                      return inputTimestamp;
-                    }
-
-                    @Override
-                    public W window() {
-                      throw new UnsupportedOperationException(
-                          "WindowFn attempted to access input windows when 
none were available");
-                    }
-                  });
-        } catch (Exception e) {
-          throw UserCodeException.wrap(e);
-        }
-      }
-
-      return WindowedValue.of(output, timestamp, windows, pane);
-    }
-
-    public <T> T sideInput(PCollectionView<T> view, BoundedWindow 
sideInputWindow) {
-      if (!sideInputReader.contains(view)) {
-        throw new IllegalArgumentException("calling sideInput() with unknown 
view");
-      }
-      return sideInputReader.get(view, sideInputWindow);
-    }
-
-    void outputWindowedValue(
-        OutputT output,
-        Instant timestamp,
-        Collection<? extends BoundedWindow> windows,
-        PaneInfo pane) {
-      outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane));
-    }
-
-    void outputWindowedValue(WindowedValue<OutputT> windowedElem) {
-      outputManager.output(mainOutputTag, windowedElem);
-    }
-
-    private <T> void outputWindowedValue(
-        TupleTag<T> tag,
-        T output,
-        Instant timestamp,
-        Collection<? extends BoundedWindow> windows,
-        PaneInfo pane) {
-      outputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, 
pane));
-    }
-
-    private <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> 
windowedElem) {
-      if (!outputTags.contains(tag)) {
-        // This tag wasn't declared nor was it seen before during this 
execution.
-        // Thus, this must be a new, undeclared and unconsumed output.
-        // To prevent likely user errors, enforce the limit on the number of 
side
-        // outputs.
-        if (outputTags.size() >= MAX_SIDE_OUTPUTS) {
-          throw new IllegalArgumentException(
-              "the number of outputs has exceeded a limit of " + 
MAX_SIDE_OUTPUTS);
-        }
-        outputTags.add(tag);
-      }
-
-      outputManager.output(tag, windowedElem);
+  private <T> T sideInput(PCollectionView<T> view, BoundedWindow 
sideInputWindow) {
+    if (!sideInputReader.contains(view)) {
+      throw new IllegalArgumentException("calling sideInput() with unknown 
view");
     }
+    return sideInputReader.get(view, sideInputWindow);
   }
 
+  private <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> 
windowedElem) {
+    checkArgument(outputTags.contains(tag), "Unknown output tag %s", tag);
+    outputManager.output(tag, windowedElem);
+  }
 
   /**
    * A concrete implementation of {@link DoFn.StartBundleContext}.
    */
-  private class DoFnStartBundleContext<InputT, OutputT>
+  private class DoFnStartBundleContext
       extends DoFn<InputT, OutputT>.StartBundleContext
       implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-    private final DoFn<InputT, OutputT> fn;
-    private final DoFnContext<InputT, OutputT> context;
-
-    private DoFnStartBundleContext(DoFn<InputT, OutputT> fn, 
DoFnContext<InputT, OutputT> context) {
+    private DoFnStartBundleContext() {
       fn.super();
-      this.fn = fn;
-      this.context = context;
     }
 
     @Override
     public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
+      return options;
     }
 
     @Override
@@ -408,24 +233,25 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
 
     @Override
-    public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
+    public DoFn<InputT, OutputT>.StartBundleContext 
startBundleContext(DoFn<InputT, OutputT> doFn) {
       return this;
     }
 
     @Override
-    public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) 
{
+    public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
+        DoFn<InputT, OutputT> doFn) {
       throw new UnsupportedOperationException(
           "Cannot access FinishBundleContext outside of @FinishBundle 
method.");
     }
 
     @Override
-    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+    public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, 
OutputT> doFn) {
       throw new UnsupportedOperationException(
           "Cannot access ProcessContext outside of @ProcessElement method.");
     }
 
     @Override
-    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+    public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, 
OutputT> doFn) {
       throw new UnsupportedOperationException(
           "Cannot access OnTimerContext outside of @OnTimer methods.");
     }
@@ -453,20 +279,16 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
    * B
    * A concrete implementation of {@link DoFn.FinishBundleContext}.
    */
-  private class DoFnFinishBundleContext<InputT, OutputT>
+  private class DoFnFinishBundleContext
       extends DoFn<InputT, OutputT>.FinishBundleContext
       implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-    private final DoFnContext<InputT, OutputT> context;
-
-    private DoFnFinishBundleContext(
-        DoFn<InputT, OutputT> fn, DoFnContext<InputT, OutputT> context) {
+    private DoFnFinishBundleContext() {
       fn.super();
-      this.context = context;
     }
 
     @Override
     public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
+      return options;
     }
 
     @Override
@@ -476,24 +298,25 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
 
     @Override
-    public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
+    public DoFn<InputT, OutputT>.StartBundleContext 
startBundleContext(DoFn<InputT, OutputT> doFn) {
       throw new UnsupportedOperationException(
           "Cannot access StartBundleContext outside of @StartBundle method.");
     }
 
     @Override
-    public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) 
{
+    public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
+        DoFn<InputT, OutputT> doFn) {
       return this;
     }
 
     @Override
-    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+    public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, 
OutputT> doFn) {
       throw new UnsupportedOperationException(
           "Cannot access ProcessContext outside of @ProcessElement method.");
     }
 
     @Override
-    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+    public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, 
OutputT> doFn) {
       throw new UnsupportedOperationException(
           "Cannot access OnTimerContext outside of @OnTimer methods.");
     }
@@ -518,30 +341,22 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
 
     @Override
     public void output(OutputT output, Instant timestamp, BoundedWindow 
window) {
-      context.outputWindowedValue(WindowedValue.of(output, timestamp, window, 
PaneInfo.NO_FIRING));
+      output(mainOutputTag, output, timestamp, window);
     }
 
     @Override
     public <T> void output(TupleTag<T> tag, T output, Instant timestamp, 
BoundedWindow window) {
-      context.outputWindowedValue(
-          tag, WindowedValue.of(output, timestamp, window, 
PaneInfo.NO_FIRING));
+      outputWindowedValue(tag, WindowedValue.of(output, timestamp, window, 
PaneInfo.NO_FIRING));
     }
   }
 
   /**
    * A concrete implementation of {@link DoFn.ProcessContext} used for running 
a {@link DoFn} over a
    * single element.
-   *
-   * @param <InputT> the type of the {@link DoFn} (main) input elements
-   * @param <OutputT> the type of the {@link DoFn} (main) output elements
    */
-  private class DoFnProcessContext<InputT, OutputT> extends DoFn<InputT, 
OutputT>.ProcessContext
+  private class DoFnProcessContext extends DoFn<InputT, OutputT>.ProcessContext
       implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-
-    final DoFn<InputT, OutputT> fn;
-    final DoFnContext<InputT, OutputT> context;
-    final WindowedValue<InputT> windowedValue;
-    private final Duration allowedLateness;
+    final WindowedValue<InputT> elem;
 
     /** Lazily initialized; should only be accessed via {@link 
#getNamespace()}. */
     @Nullable private StateNamespace namespace;
@@ -549,7 +364,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     /**
      * The state namespace for this context.
      *
-     * <p>Any call to {@link #getNamespace()} when more than one window is 
present will crash; this
+     * <p>Any call to this method when more than one window is present will 
crash; this
      * represents a bug in the runner or the {@link DoFnSignature}, since 
values must be in exactly
      * one window when state or timers are relevant.
      */
@@ -561,55 +376,32 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
 
     private DoFnProcessContext(
-        DoFn<InputT, OutputT> fn,
-        DoFnContext<InputT, OutputT> context,
-        WindowedValue<InputT> windowedValue,
-        Duration allowedLateness) {
+        WindowedValue<InputT> elem) {
       fn.super();
-      this.fn = fn;
-      this.context = context;
-      this.windowedValue = windowedValue;
-      this.allowedLateness = allowedLateness;
+      this.elem = elem;
     }
 
     @Override
     public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
+      return options;
     }
 
     @Override
     public InputT element() {
-      return windowedValue.getValue();
+      return elem.getValue();
     }
 
     @Override
     public <T> T sideInput(PCollectionView<T> view) {
       checkNotNull(view, "View passed to sideInput cannot be null");
-      Iterator<? extends BoundedWindow> windowIter = windows().iterator();
-      BoundedWindow window;
-      if (!windowIter.hasNext()) {
-        if (context.windowFn instanceof GlobalWindows) {
-          // TODO: Remove this once GroupByKeyOnly no longer outputs elements
-          // without windows
-          window = GlobalWindow.INSTANCE;
-        } else {
-          throw new IllegalStateException(
-              "sideInput called when main input element is not in any 
windows");
-        }
-      } else {
-        window = windowIter.next();
-        if (windowIter.hasNext()) {
-          throw new IllegalStateException(
-              "sideInput called when main input element is in multiple 
windows");
-        }
-      }
-      return context.sideInput(
+      BoundedWindow window = Iterables.getOnlyElement(windows());
+      return SimpleDoFnRunner.this.sideInput(
           view, view.getWindowMappingFn().getSideInputWindow(window));
     }
 
     @Override
     public PaneInfo pane() {
-      return windowedValue.getPane();
+      return elem.getPane();
     }
 
     @Override
@@ -619,37 +411,36 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
 
     @Override
     public void output(OutputT output) {
-      context.outputWindowedValue(windowedValue.withValue(output));
+      output(mainOutputTag, output);
     }
 
     @Override
     public void outputWithTimestamp(OutputT output, Instant timestamp) {
       checkTimestamp(timestamp);
-      context.outputWindowedValue(
-          output, timestamp, windowedValue.getWindows(), 
windowedValue.getPane());
+      outputWithTimestamp(mainOutputTag, output, timestamp);
     }
 
     @Override
     public <T> void output(TupleTag<T> tag, T output) {
       checkNotNull(tag, "Tag passed to output cannot be null");
-      context.outputWindowedValue(tag, windowedValue.withValue(output));
+      outputWindowedValue(tag, elem.withValue(output));
     }
 
     @Override
     public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
       checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null");
       checkTimestamp(timestamp);
-      context.outputWindowedValue(
-          tag, output, timestamp, windowedValue.getWindows(), 
windowedValue.getPane());
+      outputWindowedValue(
+          tag, WindowedValue.of(output, timestamp, elem.getWindows(), 
elem.getPane()));
     }
 
     @Override
     public Instant timestamp() {
-      return windowedValue.getTimestamp();
+      return elem.getTimestamp();
     }
 
     public Collection<? extends BoundedWindow> windows() {
-      return windowedValue.getWindows();
+      return elem.getWindows();
     }
 
     @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, 
but must be respected
@@ -657,7 +448,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
       // The documentation of getAllowedTimestampSkew explicitly permits 
Long.MAX_VALUE to be used
       // for infinite skew. Defend against underflow in that case for 
timestamps before the epoch
       if (fn.getAllowedTimestampSkew().getMillis() != Long.MAX_VALUE
-          && 
timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew())))
 {
+          && 
timestamp.isBefore(elem.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
         throw new IllegalArgumentException(
             String.format(
                 "Cannot output with timestamp %s. Output timestamps must be no 
earlier than the "
@@ -665,23 +456,24 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
                     + "DoFn#getAllowedTimestampSkew() Javadoc for details on 
changing the allowed "
                     + "skew.",
                 timestamp,
-                windowedValue.getTimestamp(),
+                elem.getTimestamp(),
                 
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod())));
       }
     }
 
     @Override
     public BoundedWindow window() {
-      return Iterables.getOnlyElement(windowedValue.getWindows());
+      return Iterables.getOnlyElement(elem.getWindows());
     }
 
     @Override
-    public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
+    public DoFn<InputT, OutputT>.StartBundleContext 
startBundleContext(DoFn<InputT, OutputT> doFn) {
       throw new UnsupportedOperationException("StartBundleContext parameters 
are not supported.");
     }
 
     @Override
-    public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) 
{
+    public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
+        DoFn<InputT, OutputT> doFn) {
       throw new UnsupportedOperationException("FinishBundleContext parameters 
are not supported.");
     }
 
@@ -691,7 +483,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
 
     @Override
-    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+    public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, 
OutputT> doFn) {
       throw new UnsupportedOperationException(
           "Cannot access OnTimerContext outside of @OnTimer methods.");
     }
@@ -720,7 +512,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
         TimerSpec spec =
             (TimerSpec) 
signature.timerDeclarations().get(timerId).field().get(fn);
         return new TimerInternalsTimer(
-            window(), getNamespace(), allowedLateness, timerId, spec, 
stepContext.timerInternals());
+            window(), getNamespace(), timerId, spec, 
stepContext.timerInternals());
       } catch (IllegalAccessException e) {
         throw new RuntimeException(e);
       }
@@ -730,20 +522,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
   /**
    * A concrete implementation of {@link DoFnInvoker.ArgumentProvider} used 
for running a {@link
    * DoFn} on a timer.
-   *
-   * @param <InputT> the type of the {@link DoFn} (main) input elements
-   * @param <OutputT> the type of the {@link DoFn} (main) output elements
    */
-  private class OnTimerArgumentProvider<InputT, OutputT>
+  private class OnTimerArgumentProvider
       extends DoFn<InputT, OutputT>.OnTimerContext
       implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-
-    final DoFn<InputT, OutputT> fn;
-    final DoFnContext<InputT, OutputT> context;
     private final BoundedWindow window;
     private final Instant timestamp;
     private final TimeDomain timeDomain;
-    private final Duration allowedLateness;
 
     /** Lazily initialized; should only be accessed via {@link 
#getNamespace()}. */
     private StateNamespace namespace;
@@ -751,7 +536,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     /**
      * The state namespace for this context.
      *
-     * <p>Any call to {@link #getNamespace()} when more than one window is 
present will crash; this
+     * <p>Any call to this method when more than one window is present will 
crash; this
      * represents a bug in the runner or the {@link DoFnSignature}, since 
values must be in exactly
      * one window when state or timers are relevant.
      */
@@ -763,17 +548,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
 
     private OnTimerArgumentProvider(
-        DoFn<InputT, OutputT> fn,
-        DoFnContext<InputT, OutputT> context,
         BoundedWindow window,
-        Duration allowedLateness,
         Instant timestamp,
         TimeDomain timeDomain) {
       fn.super();
-      this.fn = fn;
-      this.context = context;
       this.window = window;
-      this.allowedLateness = allowedLateness;
       this.timestamp = timestamp;
       this.timeDomain = timeDomain;
     }
@@ -789,12 +568,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
 
     @Override
-    public StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
+    public DoFn<InputT, OutputT>.StartBundleContext 
startBundleContext(DoFn<InputT, OutputT> doFn) {
       throw new UnsupportedOperationException("StartBundleContext parameters 
are not supported.");
     }
 
     @Override
-    public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) 
{
+    public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
+        DoFn<InputT, OutputT> doFn) {
       throw new UnsupportedOperationException("FinishBundleContext parameters 
are not supported.");
     }
 
@@ -805,12 +585,12 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
 
 
     @Override
-    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+    public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, 
OutputT> doFn) {
       throw new UnsupportedOperationException("ProcessContext parameters are 
not supported.");
     }
 
     @Override
-    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+    public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, 
OutputT> doFn) {
       return this;
     }
 
@@ -838,7 +618,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
         TimerSpec spec =
             (TimerSpec) 
signature.timerDeclarations().get(timerId).field().get(fn);
         return new TimerInternalsTimer(
-            window, getNamespace(), allowedLateness, timerId, spec, 
stepContext.timerInternals());
+            window, getNamespace(), timerId, spec, 
stepContext.timerInternals());
       } catch (IllegalAccessException e) {
         throw new RuntimeException(e);
       }
@@ -846,42 +626,37 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
 
     @Override
     public PipelineOptions getPipelineOptions() {
-      return context.getPipelineOptions();
+      return options;
     }
 
     @Override
     public void output(OutputT output) {
-      context.outputWindowedValue(
-          output, timestamp(), Collections.singleton(window()), 
PaneInfo.NO_FIRING);
+      output(mainOutputTag, output);
     }
 
     @Override
     public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      context.outputWindowedValue(
-          output, timestamp, Collections.singleton(window()), 
PaneInfo.NO_FIRING);
+      outputWithTimestamp(mainOutputTag, output, timestamp);
     }
 
     @Override
     public <T> void output(TupleTag<T> tag, T output) {
-      context.outputWindowedValue(
-          tag, output, timestamp, Collections.singleton(window()), 
PaneInfo.NO_FIRING);
+      outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), 
PaneInfo.NO_FIRING));
     }
 
     @Override
     public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
-      context.outputWindowedValue(
-          tag, output, timestamp, Collections.singleton(window()), 
PaneInfo.NO_FIRING);
+      outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), 
PaneInfo.NO_FIRING));
     }
   }
 
-  private static class TimerInternalsTimer implements Timer {
+  private class TimerInternalsTimer implements Timer {
     private final TimerInternals timerInternals;
 
     // The window and the namespace represent the same thing, but the 
namespace is a cached
     // and specially encoded form. Since the namespace can be cached across 
timers, it is
     // passed in whole rather than being computed here.
     private final BoundedWindow window;
-    private final Duration allowedLateness;
     private final StateNamespace namespace;
     private final String timerId;
     private final TimerSpec spec;
@@ -891,12 +666,10 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     public TimerInternalsTimer(
         BoundedWindow window,
         StateNamespace namespace,
-        Duration allowedLateness,
         String timerId,
         TimerSpec spec,
         TimerInternals timerInternals) {
       this.window = window;
-      this.allowedLateness = allowedLateness;
       this.namespace = namespace;
       this.timerId = timerId;
       this.spec = spec;

http://git-wip-us.apache.org/repos/asf/beam/blob/281eaab3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 28fc68d..26da6c6 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -17,8 +17,9 @@
  */
 package org.apache.beam.runners.direct;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -218,7 +219,6 @@ class ParDoEvaluator<InputT> implements 
TransformEvaluator<InputT> {
 
   static class BundleOutputManager implements OutputManager {
     private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;
-    private final Map<TupleTag<?>, List<?>> undeclaredOutputs;
 
     public static BundleOutputManager create(Map<TupleTag<?>, 
UncommittedBundle<?>> outputBundles) {
       return new BundleOutputManager(outputBundles);
@@ -226,23 +226,13 @@ class ParDoEvaluator<InputT> implements 
TransformEvaluator<InputT> {
 
     private BundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> 
bundles) {
       this.bundles = bundles;
-      undeclaredOutputs = new HashMap<>();
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
     public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-      UncommittedBundle bundle = bundles.get(tag);
-      if (bundle == null) {
-        List<WindowedValue<T>> undeclaredContents = (List) 
undeclaredOutputs.get(tag);
-        if (undeclaredContents == null) {
-          undeclaredContents = new ArrayList<>();
-          undeclaredOutputs.put(tag, undeclaredContents);
-        }
-        undeclaredContents.add(output);
-      } else {
-        bundle.add(output);
-      }
+      checkArgument(bundles.containsKey(tag), "Unknown output tag %s", tag);
+      ((UncommittedBundle) bundles.get(tag)).add(output);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/281eaab3/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index 4a66541..3274912 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -22,8 +22,8 @@ import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.Multimap;
-import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
@@ -57,6 +57,7 @@ public class MultiDoFnFunction<InputT, OutputT>
   private final DoFn<InputT, OutputT> doFn;
   private final SparkRuntimeContext runtimeContext;
   private final TupleTag<OutputT> mainOutputTag;
+  private final List<TupleTag<?>> additionalOutputTags;
   private final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, 
SideInputBroadcast<?>>> sideInputs;
   private final WindowingStrategy<?, ?> windowingStrategy;
 
@@ -66,6 +67,7 @@ public class MultiDoFnFunction<InputT, OutputT>
    * @param doFn              The {@link DoFn} to be wrapped.
    * @param runtimeContext    The {@link SparkRuntimeContext}.
    * @param mainOutputTag     The main output {@link TupleTag}.
+   * @param additionalOutputTags Additional {@link TupleTag output tags}.
    * @param sideInputs        Side inputs used in this {@link DoFn}.
    * @param windowingStrategy Input {@link WindowingStrategy}.
    */
@@ -76,6 +78,7 @@ public class MultiDoFnFunction<InputT, OutputT>
       DoFn<InputT, OutputT> doFn,
       SparkRuntimeContext runtimeContext,
       TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> additionalOutputTags,
       Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, SideInputBroadcast<?>>> 
sideInputs,
       WindowingStrategy<?, ?> windowingStrategy) {
     this.aggAccum = aggAccum;
@@ -84,6 +87,7 @@ public class MultiDoFnFunction<InputT, OutputT>
     this.doFn = doFn;
     this.runtimeContext = runtimeContext;
     this.mainOutputTag = mainOutputTag;
+    this.additionalOutputTags = additionalOutputTags;
     this.sideInputs = sideInputs;
     this.windowingStrategy = windowingStrategy;
   }
@@ -101,7 +105,7 @@ public class MultiDoFnFunction<InputT, OutputT>
             new SparkSideInputReader(sideInputs),
             outputManager,
             mainOutputTag,
-            Collections.<TupleTag<?>>emptyList(),
+            additionalOutputTags,
             new SparkProcessContext.NoOpStepContext(),
             windowingStrategy);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/281eaab3/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index b2ed3a9..742ea83 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -369,6 +369,7 @@ public final class TransformTranslator {
                     doFn,
                     context.getRuntimeContext(),
                     transform.getMainOutputTag(),
+                    transform.getAdditionalOutputTags().getAll(),
                     TranslationUtils.getSideInputs(transform.getSideInputs(), 
context),
                     windowingStrategy));
         Map<TupleTag<?>, PValue> outputs = context.getOutputs(transform);

http://git-wip-us.apache.org/repos/asf/beam/blob/281eaab3/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index acb4a02..43f4b75 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -411,6 +411,7 @@ public final class StreamingTransformTranslator {
                             doFn,
                             runtimeContext,
                             transform.getMainOutputTag(),
+                            transform.getAdditionalOutputTags().getAll(),
                             sideInputs,
                             windowingStrategy));
                   }

http://git-wip-us.apache.org/repos/asf/beam/blob/281eaab3/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 35c02ba..c67cf2a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -32,7 +32,6 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.not;
 import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
-import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -515,58 +514,18 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testParDoWritingToUndeclaredTag() {
-
     List<Integer> inputs = Arrays.asList(3, -42, 666);
 
     TupleTag<String> notOutputTag = new TupleTag<String>("additional"){};
 
-    PCollection<String> output = pipeline
+    pipeline
         .apply(Create.of(inputs))
         .apply(ParDo.of(new TestDoFn(
             Arrays.<PCollectionView<Integer>>asList(),
-            Arrays.asList(notOutputTag))));
-
-    PAssert.that(output)
-        .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
+            Arrays.asList(notOutputTag)))
+            /* No call to .withOutputTags - should cause error */);
 
-    pipeline.run();
-  }
-
-  @Test
-  // TODO: The exception thrown is runner-specific, even if the behavior is 
general
-  @Category(NeedsRunner.class)
-  public void testParDoUndeclaredTagLimit() {
-
-    PCollection<Integer> input = pipeline.apply(Create.of(Arrays.asList(3)));
-
-    // Success for a total of 1000 outputs.
-    input
-        .apply("Success1000", ParDo.of(new DoFn<Integer, String>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) {
-              TupleTag<String> specialOutputTag = new TupleTag<String>(){};
-              c.output(specialOutputTag, "special");
-              c.output(specialOutputTag, "special");
-              c.output(specialOutputTag, "special");
-
-              for (int i = 0; i < 998; i++) {
-                c.output(new TupleTag<String>(){}, "tag" + i);
-              }
-            }}));
-    pipeline.run();
-
-    // Failure for a total of 1001 outputs.
-    input
-        .apply("Failure1001", ParDo.of(new DoFn<Integer, String>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) {
-              for (int i = 0; i < 1000; i++) {
-                c.output(new TupleTag<String>(){}, "output" + i);
-              }
-            }}));
-
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("the number of outputs has exceeded a limit");
+    thrown.expectMessage("additional");
     pipeline.run();
   }
 
@@ -1107,43 +1066,32 @@ public class ParDoTest implements Serializable {
     private final List<Integer> inputs;
     private final List<Integer> sideInputs;
     private final String additionalOutput;
-    private final boolean ordered;
 
     public static HasExpectedOutput forInput(List<Integer> inputs) {
       return new HasExpectedOutput(
           new ArrayList<Integer>(inputs),
           new ArrayList<Integer>(),
-          "",
-          false);
+          "");
     }
 
     private HasExpectedOutput(List<Integer> inputs,
                               List<Integer> sideInputs,
-                              String additionalOutput,
-                              boolean ordered) {
+                              String additionalOutput) {
       this.inputs = inputs;
       this.sideInputs = sideInputs;
       this.additionalOutput = additionalOutput;
-      this.ordered = ordered;
     }
 
     public HasExpectedOutput andSideInputs(Integer... sideInputValues) {
-      List<Integer> sideInputs = new ArrayList<>();
-      for (Integer sideInputValue : sideInputValues) {
-        sideInputs.add(sideInputValue);
-      }
-      return new HasExpectedOutput(inputs, sideInputs, additionalOutput, 
ordered);
+      return new HasExpectedOutput(
+          inputs, Arrays.asList(sideInputValues), additionalOutput);
     }
 
     public HasExpectedOutput fromOutput(TupleTag<String> outputTag) {
       return fromOutput(outputTag.getId());
     }
     public HasExpectedOutput fromOutput(String outputId) {
-      return new HasExpectedOutput(inputs, sideInputs, outputId, ordered);
-    }
-
-    public HasExpectedOutput inOrder() {
-      return new HasExpectedOutput(inputs, sideInputs, additionalOutput, true);
+      return new HasExpectedOutput(inputs, sideInputs, outputId);
     }
 
     @Override
@@ -1179,11 +1127,7 @@ public class ParDoTest implements Serializable {
       }
       String[] expectedProcessedsArray =
           expectedProcesseds.toArray(new String[expectedProcesseds.size()]);
-      if (!ordered || expectedProcesseds.isEmpty()) {
-        assertThat(processeds, containsInAnyOrder(expectedProcessedsArray));
-      } else {
-        assertThat(processeds, contains(expectedProcessedsArray));
-      }
+      assertThat(processeds, containsInAnyOrder(expectedProcessedsArray));
 
       for (String finished : finisheds) {
         assertEquals(additionalOutputPrefix + "finished", finished);

Reply via email to