Removes ParDo.Unbound and UnboundMulti

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0349eef
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0349eef
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0349eef

Branch: refs/heads/master
Commit: d0349eef08bfa3ac5992945b9453fa08e8b7d7e4
Parents: caba841
Author: Eugene Kirpichov <[email protected]>
Authored: Fri Mar 3 10:53:28 2017 -0800
Committer: Eugene Kirpichov <[email protected]>
Committed: Tue Mar 28 13:04:37 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/examples/complete/TfIdf.java    |   3 +-
 .../beam/examples/cookbook/FilterExamples.java  |   3 +-
 .../beam/examples/complete/game/GameStats.java  |   8 +-
 .../apex/translation/ParDoTranslatorTest.java   |  11 +-
 .../StatefulParDoEvaluatorFactoryTest.java      |   3 +-
 .../beam/runners/flink/examples/TFIDF.java      |   3 +-
 .../BatchStatefulParDoOverridesTest.java        |   2 +-
 .../DataflowPipelineTranslatorTest.java         |   4 +-
 .../org/apache/beam/sdk/testing/PAssert.java    |   2 +-
 .../org/apache/beam/sdk/transforms/Combine.java |   4 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   | 297 ++-----------------
 .../apache/beam/sdk/transforms/Partition.java   |   4 +-
 .../org/apache/beam/sdk/transforms/Sample.java  |   7 +-
 .../apache/beam/sdk/metrics/MetricsTest.java    |   5 +-
 .../apache/beam/sdk/transforms/FlattenTest.java |   4 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  54 ++--
 .../apache/beam/sdk/transforms/ViewTest.java    | 152 +++++-----
 .../apache/beam/sdk/values/TypedPValueTest.java |   4 +-
 18 files changed, 154 insertions(+), 416 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java 
b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 9de5617..f7904d3 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -323,7 +323,6 @@ public class TfIdf {
       // presented to each invocation of the DoFn.
       PCollection<KV<String, Double>> wordToDf = wordToDocCount
           .apply("ComputeDocFrequencies", ParDo
-              .withSideInputs(totalDocuments)
               .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
                 @ProcessElement
                 public void processElement(ProcessContext c) {
@@ -335,7 +334,7 @@ public class TfIdf {
 
                   c.output(KV.of(word, documentFrequency));
                 }
-              }));
+              }).withSideInputs(totalDocuments));
 
       // Join the term frequency and document frequency
       // collections, each keyed on the word.

http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
index 815ac7b..fed9db7 100644
--- 
a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
@@ -175,7 +175,6 @@ public class FilterExamples {
       // We'll only output readings with temperatures below this mean.
       PCollection<TableRow> filteredRows = monthFilteredRows
           .apply("ParseAndFilter", ParDo
-              .withSideInputs(globalMeanTemp)
               .of(new DoFn<TableRow, TableRow>() {
                 @ProcessElement
                 public void processElement(ProcessContext c) {
@@ -185,7 +184,7 @@ public class FilterExamples {
                     c.output(c.element());
                   }
                 }
-              }));
+              }).withSideInputs(globalMeanTemp));
 
       return filteredRows;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git 
a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
 
b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index c880061..93e8254 100644
--- 
a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ 
b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -125,7 +125,6 @@ public class GameStats extends LeaderBoard {
       PCollection<KV<String, Integer>> filtered = sumScores
           .apply("ProcessAndFilter", ParDo
               // use the derived mean total score as a side input
-              .withSideInputs(globalMeanScore)
               .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
                 private final Aggregator<Long, Long> numSpammerUsers =
                   createAggregator("SpammerUsers", Sum.ofLongs());
@@ -140,7 +139,7 @@ public class GameStats extends LeaderBoard {
                     c.output(c.element());
                   }
                 }
-              }));
+              }).withSideInputs(globalMeanScore));
       return filtered;
     }
   }
@@ -288,7 +287,6 @@ public class GameStats extends LeaderBoard {
           
FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration()))))
       // Filter out the detected spammer users, using the side input derived 
above.
       .apply("FilterOutSpammers", ParDo
-              .withSideInputs(spammersView)
               .of(new DoFn<GameActionInfo, GameActionInfo>() {
                 @ProcessElement
                 public void processElement(ProcessContext c) {
@@ -297,8 +295,8 @@ public class GameStats extends LeaderBoard {
                     c.output(c.element());
                   }
                 }
-              }))
-      // Extract and sum teamname/score pairs from the event data.
+              }).withSideInputs(spammersView))
+        // Extract and sum teamname/score pairs from the event data.
       .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
       // [END DocInclude_FilterAndCalc]
       // Write the result to BigQuery

http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
index 83e68f7..3bcba00 100644
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java
@@ -281,13 +281,14 @@ public class ParDoTranslatorTest {
 
     PCollectionTuple outputs = pipeline
         .apply(Create.of(inputs))
-        .apply(ParDo.withSideInputs(sideInput1)
-            .withSideInputs(sideInputUnread)
-            .withSideInputs(sideInput2)
-            .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
+        .apply(ParDo
             .of(new TestMultiOutputWithSideInputsFn(
                 Arrays.asList(sideInput1, sideInput2),
-                Arrays.<TupleTag<String>>asList())));
+                Arrays.<TupleTag<String>>asList()))
+            .withSideInputs(sideInput1)
+            .withSideInputs(sideInputUnread)
+            .withSideInputs(sideInput2)
+            .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
 
     outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector()));
     outputs.get(sideOutputTag).setCoder(VoidCoder.of());

http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index 9bf6bc9..946cd69 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -243,7 +243,7 @@ public class StatefulParDoEvaluatorFactoryTest implements 
Serializable {
         mainInput
             .apply(
                 new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>(
-                    ParDo.withSideInputs(sideInput)
+                    ParDo
                         .of(
                             new DoFn<KV<String, Integer>, Integer>() {
                               @StateId(stateId)
@@ -253,6 +253,7 @@ public class StatefulParDoEvaluatorFactoryTest implements 
Serializable {
                               @ProcessElement
                               public void process(ProcessContext c) {}
                             })
+                        .withSideInputs(sideInput)
                         .withOutputTags(mainOutput, TupleTagList.empty())))
             .get(mainOutput)
             .setCoder(VarIntCoder.of());

http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
 
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index 89e261b..8e1df08 100644
--- 
a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ 
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -347,7 +347,6 @@ public class TFIDF {
       // presented to each invocation of the DoFn.
       PCollection<KV<String, Double>> wordToDf = wordToDocCount
           .apply("ComputeDocFrequencies", ParDo
-              .withSideInputs(totalDocuments)
               .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
                 private static final long serialVersionUID = 0;
 
@@ -361,7 +360,7 @@ public class TFIDF {
 
                   c.output(KV.of(word, documentFrequency));
                 }
-              }));
+              }).withSideInputs(totalDocuments));
 
       // Join the term frequency and document frequency
       // collections, each keyed on the word.

http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
index 899902a..f995ff3 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
@@ -87,7 +87,7 @@ public class BatchStatefulParDoOverridesTest implements 
Serializable {
     DummyStatefulDoFn fn = new DummyStatefulDoFn();
     pipeline
         .apply(Create.of(KV.of(1, 2)))
-        .apply(ParDo.withOutputTags(mainOutputTag, 
TupleTagList.empty()).of(fn));
+        .apply(ParDo.of(fn).withOutputTags(mainOutputTag, 
TupleTagList.empty()));
 
     DataflowRunner runner = DataflowRunner.fromOptions(options);
     runner.replaceTransforms(pipeline);

http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 8c8568e..2d63193 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -863,7 +863,7 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
     pipeline
         .apply(Create.of(KV.of(1, 1)))
         .apply(
-            ParDo.withOutputTags(mainOutputTag, TupleTagList.empty()).of(
+            ParDo.of(
                 new DoFn<KV<Integer, Integer>, Integer>() {
                   @StateId("unused")
                   final StateSpec<Object, ValueState<Integer>> stateSpec =
@@ -873,7 +873,7 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
                   public void process(ProcessContext c) {
                     // noop
                   }
-                }));
+                }).withOutputTags(mainOutputTag, TupleTagList.empty()));
 
     runner.replaceTransforms(pipeline);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index 412753c..56df449 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -1110,7 +1110,7 @@ public class PAssert {
           .apply("WindowToken", windowToken)
           .apply(
               "RunChecks",
-              ParDo.withSideInputs(actual).of(new 
SideInputCheckerDoFn<>(checkerFn, actual, site)));
+              ParDo.of(new SideInputCheckerDoFn<>(checkerFn, actual, 
site)).withSideInputs(actual));
 
       return PDone.in(input.getPipeline());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 2c145b4..b403691 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -1503,7 +1503,7 @@ public class Combine {
       final OutputT defaultValue = fn.defaultValue();
       PCollection<OutputT> defaultIfEmpty = maybeEmpty.getPipeline()
           .apply("CreateVoid", Create.of((Void) 
null).withCoder(VoidCoder.of()))
-          .apply("ProduceDefault", ParDo.withSideInputs(maybeEmptyView).of(
+          .apply("ProduceDefault", ParDo.of(
               new DoFn<Void, OutputT>() {
                 @ProcessElement
                 public void processElement(ProcessContext c) {
@@ -1512,7 +1512,7 @@ public class Combine {
                     c.output(defaultValue);
                   }
                 }
-              }))
+              }).withSideInputs(maybeEmptyView))
           .setCoder(maybeEmpty.getCoder())
           .setWindowingStrategyInternal(maybeEmpty.getWindowingStrategy());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 7446737..1e2e5b8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
@@ -153,7 +154,7 @@ import org.apache.beam.sdk.values.TypedPValue;
  * {@link PCollectionView PCollectionViews} express styles of accessing
  * {@link PCollection PCollections} computed by earlier pipeline operations,
  * passed in to the {@link ParDo} transform using
- * {@link #withSideInputs}, and their contents accessible to each of
+ * {@link ParDo.Bound#withSideInputs}, and their contents accessible to each of
  * the {@link DoFn} operations via {@link DoFn.ProcessContext#sideInput 
sideInput}.
  * For example:
  *
@@ -183,7 +184,7 @@ import org.apache.beam.sdk.values.TypedPValue;
  * {@link PCollection PCollections}, each keyed by a distinct {@link TupleTag},
  * and bundled in a {@link PCollectionTuple}. The {@link TupleTag TupleTags}
  * to be used for the output {@link PCollectionTuple} are specified by
- * invoking {@link #withOutputTags}. Unconsumed side outputs do not
+ * invoking {@link ParDo.Bound#withOutputTags}. Unconsumed side outputs do not
  * necessarily need to be explicitly specified, even if the {@link DoFn}
  * generates them. Within the {@link DoFn}, an element is added to the
  * main output {@link PCollection} as normal, using
@@ -206,11 +207,6 @@ import org.apache.beam.sdk.values.TypedPValue;
  * PCollectionTuple results =
  *     words.apply(
  *         ParDo
- *         // Specify the main and consumed side output tags of the
- *         // PCollectionTuple result:
- *         .withOutputTags(wordsBelowCutOffTag,
- *                         TupleTagList.of(wordLengthsAboveCutOffTag)
- *                                     .and(markedWordsTag))
  *         .of(new DoFn<String, String>() {
  *             // Create a tag for the unconsumed side output.
  *             final TupleTag<String> specialWordsTag =
@@ -233,7 +229,12 @@ import org.apache.beam.sdk.values.TypedPValue;
  *                 // Emit this word to the unconsumed side output.
  *                 c.sideOutput(specialWordsTag, word);
  *               }
- *             }}));
+ *             }})
+ *             // Specify the main and consumed side output tags of the
+ *             // PCollectionTuple result:
+ *         .withOutputTags(wordsBelowCutOffTag,
+ *             TupleTagList.of(wordLengthsAboveCutOffTag)
+ *                         .and(markedWordsTag)));
  * // Extract the PCollection results, by tag.
  * PCollection<String> wordsBelowCutOff =
  *     results.get(wordsBelowCutOffTag);
@@ -243,35 +244,6 @@ import org.apache.beam.sdk.values.TypedPValue;
  *     results.get(markedWordsTag);
  * }</pre>
  *
- * <h2>Properties May Be Specified In Any Order</h2>
- *
- * <p>Several properties can be specified for a {@link ParDo}
- * {@link PTransform}, including side inputs, side output tags,
- * and {@link DoFn} to invoke. Only the {@link DoFn} is required; side inputs 
and side
- * output tags are only specified when they're needed. These
- * properties can be specified in any order, as long as they're
- * specified before the {@link ParDo} {@link PTransform} is applied.
- *
- * <p>The approach used to allow these properties to be specified in
- * any order, with some properties omitted, is to have each of the
- * property "setter" methods defined as static factory methods on
- * {@link ParDo} itself, which return an instance of either
- * {@link ParDo.Unbound} or
- * {@link ParDo.Bound} nested classes, each of which offer
- * property setter instance methods to enable setting additional
- * properties. {@link ParDo.Bound} is used for {@link ParDo}
- * transforms whose {@link DoFn} is specified and whose input and
- * output static types have been bound. {@link ParDo.Unbound ParDo.Unbound} is 
used
- * for {@link ParDo} transforms that have not yet had their
- * {@link DoFn} specified. Only {@link ParDo.Bound} instances can be
- * applied.
- *
- * <p>Another benefit of this approach is that it reduces the number
- * of type parameters that need to be specified manually. In
- * particular, the input and output types of the {@link ParDo}
- * {@link PTransform} are inferred automatically from the type
- * parameters of the {@link DoFn} argument passed to {@link ParDo#of}.
- *
  * <h2>Output Coders</h2>
  *
  * <p>By default, the {@link Coder Coder&lt;OutputT&gt;} for the
@@ -446,91 +418,16 @@ import org.apache.beam.sdk.values.TypedPValue;
 public class ParDo {
 
   /**
-   * Creates a {@link ParDo} {@link PTransform} with the given
-   * side inputs.
-   *
-   * <p>Side inputs are {@link PCollectionView PCollectionViews}, whose 
contents are
-   * computed during pipeline execution and then made accessible to
-   * {@link DoFn} code via {@link DoFn.ProcessContext#sideInput sideInput}. 
Each
-   * invocation of the {@link DoFn} receives the same values for these
-   * side inputs.
-   *
-   * <p>See the discussion of Side Inputs above for more explanation.
-   *
-   * <p>The resulting {@link PTransform} is incomplete, and its
-   * input/output types are not yet bound. Use
-   * {@link ParDo.Unbound#of} to specify the {@link DoFn} to
-   * invoke, which will also bind the input/output types of this
-   * {@link PTransform}.
-   */
-  public static Unbound withSideInputs(PCollectionView<?>... sideInputs) {
-    return new Unbound().withSideInputs(sideInputs);
-  }
-
-  /**
-    * Creates a {@link ParDo} with the given side inputs.
-    *
-   * <p>Side inputs are {@link PCollectionView}s, whose contents are
-   * computed during pipeline execution and then made accessible to
-   * {@link DoFn} code via {@link DoFn.ProcessContext#sideInput sideInput}.
-   *
-   * <p>See the discussion of Side Inputs above for more explanation.
-   *
-   * <p>The resulting {@link PTransform} is incomplete, and its
-   * input/output types are not yet bound. Use
-   * {@link ParDo.Unbound#of} to specify the {@link DoFn} to
-   * invoke, which will also bind the input/output types of this
-   * {@link PTransform}.
-   */
-  public static Unbound withSideInputs(
-      Iterable<? extends PCollectionView<?>> sideInputs) {
-    return new Unbound().withSideInputs(sideInputs);
-  }
-
-  /**
-   * Creates a multi-output {@link ParDo} {@link PTransform} whose
-   * output {@link PCollection}s will be referenced using the given main
-   * output and side output tags.
-   *
-   * <p>{@link TupleTag TupleTags} are used to name (with its static element
-   * type {@code T}) each main and side output {@code PCollection<T>}.
-   * This {@link PTransform PTransform's} {@link DoFn} emits elements to the 
main
-   * output {@link PCollection} as normal, using
-   * {@link DoFn.Context#output}. The {@link DoFn} emits elements to
-   * a side output {@code PCollection} using
-   * {@link DoFn.Context#sideOutput}, passing that side output's tag
-   * as an argument. The result of invoking this {@link PTransform}
-   * will be a {@link PCollectionTuple}, and any of the the main and
-   * side output {@code PCollection}s can be retrieved from it via
-   * {@link PCollectionTuple#get}, passing the output's tag as an
-   * argument.
-   *
-   * <p>See the discussion of Side Outputs above for more explanation.
-   *
-   * <p>The resulting {@link PTransform} is incomplete, and its input
-   * type is not yet bound. Use {@link ParDo.UnboundMulti#of}
-   * to specify the {@link DoFn} to invoke, which will also bind the
-   * input type of this {@link PTransform}.
-   */
-  public static <OutputT> UnboundMulti<OutputT> withOutputTags(
-      TupleTag<OutputT> mainOutputTag,
-      TupleTagList sideOutputTags) {
-    return new Unbound().withOutputTags(mainOutputTag, sideOutputTags);
-  }
-
-  /**
    * Creates a {@link ParDo} {@link PTransform} that will invoke the
    * given {@link DoFn} function.
    *
-   * <p>The resulting {@link PTransform PTransform's} types have been bound, 
with the
-   * input being a {@code PCollection<InputT>} and the output a
-   * {@code PCollection<OutputT>}, inferred from the types of the argument
-   * {@code DoFn<InputT, OutputT>}. It is ready to be applied, or further
+   * <p>The resulting {@link PTransform PTransform} is ready to be applied, or 
further
    * properties can be set on it first.
    */
   public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, 
OutputT> fn) {
     validate(fn);
-    return new Unbound().of(fn, displayDataForFn(fn));
+    return new Bound<InputT, OutputT>(
+        fn, Collections.<PCollectionView<?>>emptyList(), displayDataForFn(fn));
   }
 
   private static <T> DisplayData.ItemSpec<? extends Class<?>> 
displayDataForFn(T fn) {
@@ -591,85 +488,6 @@ public class ParDo {
   }
 
   /**
-   * An incomplete {@link ParDo} transform, with unbound input/output types.
-   *
-   * <p>Before being applied, {@link ParDo.Unbound#of} must be
-   * invoked to specify the {@link DoFn} to invoke, which will also
-   * bind the input/output types of this {@link PTransform}.
-   */
-  public static class Unbound {
-    private final List<PCollectionView<?>> sideInputs;
-
-    Unbound() {
-      this(ImmutableList.<PCollectionView<?>>of());
-    }
-
-    Unbound(List<PCollectionView<?>> sideInputs) {
-      this.sideInputs = sideInputs;
-    }
-
-    /**
-     * Returns a new {@link ParDo} transform that's like this
-     * transform but with the specified additional side inputs.
-     * Does not modify this transform. The resulting transform is
-     * still incomplete.
-     *
-     * <p>See the discussion of Side Inputs above and on
-     * {@link ParDo#withSideInputs} for more explanation.
-     */
-    public Unbound withSideInputs(PCollectionView<?>... sideInputs) {
-      return withSideInputs(Arrays.asList(sideInputs));
-    }
-
-    /**
-     * Returns a new {@link ParDo} transform that is like this
-     * transform but with the specified additional side inputs. Does not modify
-     * this transform. The resulting transform is still incomplete.
-     *
-     * <p>See the discussion of Side Inputs above and on
-     * {@link ParDo#withSideInputs} for more explanation.
-     */
-    public Unbound withSideInputs(
-        Iterable<? extends PCollectionView<?>> sideInputs) {
-      ImmutableList.Builder<PCollectionView<?>> builder = 
ImmutableList.builder();
-      builder.addAll(this.sideInputs);
-      builder.addAll(sideInputs);
-      return new Unbound(builder.build());
-    }
-
-    /**
-     * Returns a new {@link ParDo} {@link PTransform} that's like this
-     * transform but which will invoke the given {@link DoFn}
-     * function, and which has its input and output types bound. Does
-     * not modify this transform. The resulting {@link PTransform} is
-     * sufficiently specified to be applied, but more properties can
-     * still be specified.
-     */
-    public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> 
fn) {
-      validate(fn);
-      return of(fn, displayDataForFn(fn));
-    }
-
-    /**
-     * Returns a new multi-output {@link ParDo} transform that's like this 
transform but with the
-     * specified main and side output tags. Does not modify this transform. 
The resulting transform
-     * is still incomplete.
-     *
-     * <p>See the discussion of Side Outputs above and on {@link 
ParDo#withOutputTags} for more
-     * explanation.
-     */
-    public <OutputT> UnboundMulti<OutputT> withOutputTags(
-        TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags) {
-      return new UnboundMulti<>(sideInputs, mainOutputTag, sideOutputTags);
-    }
-
-    private <InputT, OutputT> Bound<InputT, OutputT> of(
-        DoFn<InputT, OutputT> doFn, DisplayData.ItemSpec<? extends Class<?>> 
fnDisplayData) {
-      return new Bound<>(doFn, sideInputs, fnDisplayData);
-    }
-  }
-
-  /**
    * A {@link PTransform} that, when applied to a {@code PCollection<InputT>},
    * invokes a user-specified {@code DoFn<InputT, OutputT>} on all its 
elements,
    * with all its outputs collected into an output
@@ -701,8 +519,7 @@ public class ParDo {
      * {@link PTransform} but with the specified additional side inputs. Does 
not
      * modify this {@link PTransform}.
      *
-     * <p>See the discussion of Side Inputs above and on
-     * {@link ParDo#withSideInputs} for more explanation.
+     * <p>See the discussion of Side Inputs above for more explanation.
      */
     public Bound<InputT, OutputT> withSideInputs(PCollectionView<?>... 
sideInputs) {
       return withSideInputs(Arrays.asList(sideInputs));
@@ -713,8 +530,7 @@ public class ParDo {
      * {@link PTransform} but with the specified additional side inputs. Does 
not
      * modify this {@link PTransform}.
      *
-     * <p>See the discussion of Side Inputs above and on
-     * {@link ParDo#withSideInputs} for more explanation.
+     * <p>See the discussion of Side Inputs above for more explanation.
      */
     public Bound<InputT, OutputT> withSideInputs(
         Iterable<? extends PCollectionView<?>> sideInputs) {
@@ -732,8 +548,7 @@ public class ParDo {
      * PTransform} but with the specified main and side output tags. Does not 
modify this {@link
      * PTransform}.
      *
-     * <p>See the discussion of Side Outputs above and on {@link 
ParDo#withOutputTags} for more
-     * explanation.
+     * <p>See the discussion of Side Outputs above for more explanation.
      */
     public BoundMulti<InputT, OutputT> withOutputTags(
         TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags) {
@@ -798,82 +613,6 @@ public class ParDo {
   }
 
   /**
-   * An incomplete multi-output {@link ParDo} transform, with unbound
-   * input type.
-   *
-   * <p>Before being applied, {@link ParDo.UnboundMulti#of} must be
-   * invoked to specify the {@link DoFn} to invoke, which will also
-   * bind the input type of this {@link PTransform}.
-   *
-   * @param <OutputT> the type of the main output {@code PCollection} elements
-   */
-  public static class UnboundMulti<OutputT> {
-    private final List<PCollectionView<?>> sideInputs;
-    private final TupleTag<OutputT> mainOutputTag;
-    private final TupleTagList sideOutputTags;
-
-    UnboundMulti(List<PCollectionView<?>> sideInputs,
-                 TupleTag<OutputT> mainOutputTag,
-                 TupleTagList sideOutputTags) {
-      this.sideInputs = sideInputs;
-      this.mainOutputTag = mainOutputTag;
-      this.sideOutputTags = sideOutputTags;
-    }
-
-    /**
-     * Returns a new multi-output {@link ParDo} transform that's like
-     * this transform but with the specified side inputs. Does not
-     * modify this transform. The resulting transform is still
-     * incomplete.
-     *
-     * <p>See the discussion of Side Inputs above and on
-     * {@link ParDo#withSideInputs} for more explanation.
-     */
-    public UnboundMulti<OutputT> withSideInputs(
-        PCollectionView<?>... sideInputs) {
-      return withSideInputs(Arrays.asList(sideInputs));
-    }
-
-    /**
-     * Returns a new multi-output {@link ParDo} transform that's like
-     * this transform but with the specified additional side inputs. Does not
-     * modify this transform. The resulting transform is still
-     * incomplete.
-     *
-     * <p>See the discussion of Side Inputs above and on
-     * {@link ParDo#withSideInputs} for more explanation.
-     */
-    public UnboundMulti<OutputT> withSideInputs(
-        Iterable<? extends PCollectionView<?>> sideInputs) {
-      return new UnboundMulti<>(
-          ImmutableList.<PCollectionView<?>>builder()
-              .addAll(this.sideInputs)
-              .addAll(sideInputs)
-              .build(),
-          mainOutputTag,
-          sideOutputTags);
-    }
-
-    /**
-     * Returns a new multi-output {@link ParDo} {@link PTransform}
-     * that's like this transform but which will invoke the given
-     * {@link DoFn} function, and which has its input type bound.
-     * Does not modify this transform. The resulting
-     * {@link PTransform} is sufficiently specified to be applied, but
-     * more properties can still be specified.
-     */
-    public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
-      validate(fn);
-      return of(fn, displayDataForFn(fn));
-    }
-
-    private <InputT> BoundMulti<InputT, OutputT> of(
-        DoFn<InputT, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> 
fnDisplayData) {
-      return new BoundMulti<>(fn, sideInputs, mainOutputTag, sideOutputTags, 
fnDisplayData);
-    }
-  }
-
-  /**
    * A {@link PTransform} that, when applied to a
    * {@code PCollection<InputT>}, invokes a user-specified
    * {@code DoFn<InputT, OutputT>} on all its elements, which can emit elements
@@ -910,8 +649,7 @@ public class ParDo {
      * that's like this {@link PTransform} but with the specified additional 
side
      * inputs. Does not modify this {@link PTransform}.
      *
-     * <p>See the discussion of Side Inputs above and on
-     * {@link ParDo#withSideInputs} for more explanation.
+     * <p>See the discussion of Side Inputs above for more explanation.
      */
     public BoundMulti<InputT, OutputT> withSideInputs(
         PCollectionView<?>... sideInputs) {
@@ -923,8 +661,7 @@ public class ParDo {
      * PTransform} but with the specified additional side inputs. Does not 
modify this {@link
      * PTransform}.
      *
-     * <p>See the discussion of Side Inputs above and on {@link 
ParDo#withSideInputs} for more
-     * explanation.
+     * <p>See the discussion of Side Inputs above for more explanation.
      */
     public BoundMulti<InputT, OutputT> withSideInputs(
         Iterable<? extends PCollectionView<?>> sideInputs) {

http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
index e0b2b61..2031bc9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
@@ -105,8 +105,8 @@ public class Partition<T> extends 
PTransform<PCollection<T>, PCollectionList<T>>
 
     PCollectionTuple outputs = in.apply(
         ParDo
-        .withOutputTags(new TupleTag<Void>(){}, outputTags)
-        .of(partitionDoFn));
+        .of(partitionDoFn)
+        .withOutputTags(new TupleTag<Void>(){}, outputTags));
 
     PCollectionList<T> pcs = PCollectionList.empty(in.getPipeline());
     Coder<T> coder = in.getCoder();

http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index 3734f7b..3d35c80 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -146,12 +146,9 @@ public class Sample {
     @Override
     public PCollection<T> expand(PCollection<T> in) {
       PCollectionView<Iterable<T>> iterableView = 
in.apply(View.<T>asIterable());
-      return
-          in.getPipeline()
+      return in.getPipeline()
           .apply(Create.of((Void) null).withCoder(VoidCoder.of()))
-          .apply(ParDo
-                 .withSideInputs(iterableView)
-                 .of(new SampleAnyDoFn<>(limit, iterableView)))
+          .apply(ParDo.of(new SampleAnyDoFn<>(limit, 
iterableView)).withSideInputs(iterableView))
           .setCoder(in.getCoder());
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index 27e8411..3555db3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -209,7 +209,7 @@ public class MetricsTest implements Serializable {
             bundleDist.update(40L);
           }
         }))
-        .apply("MyStep2", ParDo.withOutputTags(output1, 
TupleTagList.of(output2))
+        .apply("MyStep2", ParDo
             .of(new DoFn<Integer, Integer>() {
               @SuppressWarnings("unused")
               @ProcessElement
@@ -223,7 +223,8 @@ public class MetricsTest implements Serializable {
                 c.output(element);
                 c.sideOutput(output2, element);
               }
-            }));
+            })
+            .withOutputTags(output1, TupleTagList.of(output2)));
     PipelineResult result = pipeline.run();
 
     result.waitUntilFinish();

http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index 1753c49..a4f2545 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -198,14 +198,14 @@ public class FlattenTest implements Serializable {
 
     PCollection<String> output = p
         .apply(Create.of((Void) null).withCoder(VoidCoder.of()))
-        .apply(ParDo.withSideInputs(view).of(new DoFn<Void, String>() {
+        .apply(ParDo.of(new DoFn<Void, String>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (String side : c.sideInput(view)) {
                       c.output(side);
                     }
                   }
-                }));
+                }).withSideInputs(view));
 
     PAssert.that(output).empty();
     p.run();

http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 336f4c0..9a4fd15 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -497,12 +497,13 @@ public class ParDoTest implements Serializable {
 
     PCollectionTuple outputs = pipeline
         .apply(Create.of(inputs))
-        .apply(ParDo.withOutputTags(mainOutputTag, 
TupleTagList.of(sideOutputTag))
+        .apply(ParDo
             .of(new DoFn<Integer, Void>(){
                 @ProcessElement
                 public void processElement(ProcessContext c) {
                   c.sideOutput(sideOutputTag, c.element());
-                }}));
+                }})
+            .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
 
     PAssert.that(outputs.get(mainOutputTag)).empty();
     PAssert.that(outputs.get(sideOutputTag)).containsInAnyOrder(inputs);
@@ -586,10 +587,11 @@ public class ParDoTest implements Serializable {
 
     PCollection<String> output = pipeline
         .apply(Create.of(inputs))
-        .apply(ParDo.withSideInputs(sideInput1, sideInputUnread, sideInput2)
+        .apply(ParDo
             .of(new TestDoFn(
                 Arrays.asList(sideInput1, sideInput2),
-                Arrays.<TupleTag<String>>asList())));
+                Arrays.<TupleTag<String>>asList()))
+            .withSideInputs(sideInput1, sideInputUnread, sideInput2));
 
     PAssert.that(output)
         .satisfies(ParDoTest.HasExpectedOutput
@@ -617,12 +619,13 @@ public class ParDoTest implements Serializable {
 
     PCollection<String> output = pipeline
         .apply(Create.of(inputs))
-        .apply(ParDo.withSideInputs(sideInput1)
-            .withSideInputs(sideInputUnread)
-            .withSideInputs(sideInput2)
+        .apply(ParDo
             .of(new TestDoFn(
                 Arrays.asList(sideInput1, sideInput2),
-                Arrays.<TupleTag<String>>asList())));
+                Arrays.<TupleTag<String>>asList()))
+            .withSideInputs(sideInput1)
+            .withSideInputs(sideInputUnread)
+            .withSideInputs(sideInput2));
 
     PAssert.that(output)
         .satisfies(ParDoTest.HasExpectedOutput
@@ -653,13 +656,14 @@ public class ParDoTest implements Serializable {
 
     PCollectionTuple outputs = pipeline
         .apply(Create.of(inputs))
-        .apply(ParDo.withSideInputs(sideInput1)
-            .withSideInputs(sideInputUnread)
-            .withSideInputs(sideInput2)
-            .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
+        .apply(ParDo
             .of(new TestDoFn(
                 Arrays.asList(sideInput1, sideInput2),
-                Arrays.<TupleTag<String>>asList())));
+                Arrays.<TupleTag<String>>asList()))
+            .withSideInputs(sideInput1)
+            .withSideInputs(sideInputUnread)
+            .withSideInputs(sideInput2)
+            .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
 
     PAssert.that(outputs.get(mainOutputTag))
         .satisfies(ParDoTest.HasExpectedOutput
@@ -690,13 +694,14 @@ public class ParDoTest implements Serializable {
 
     PCollectionTuple outputs = pipeline
         .apply(Create.of(inputs))
-        .apply(ParDo.withSideInputs(sideInput1)
-            .withSideInputs(sideInputUnread)
-            .withSideInputs(sideInput2)
-            .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
+        .apply(ParDo
             .of(new TestDoFn(
                 Arrays.asList(sideInput1, sideInput2),
-                Arrays.<TupleTag<String>>asList())));
+                Arrays.<TupleTag<String>>asList()))
+            .withSideInputs(sideInput1)
+            .withSideInputs(sideInputUnread)
+            .withSideInputs(sideInput2)
+            .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
 
     PAssert.that(outputs.get(mainOutputTag))
         .satisfies(ParDoTest.HasExpectedOutput
@@ -1201,7 +1206,6 @@ public class ParDoTest implements Serializable {
         .apply(Create.of(new TestDummy())
             .withCoder(TestDummyCoder.of()))
         .apply(ParDo
-            .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
             .of(
                 new DoFn<TestDummy, TestDummy>() {
                   @ProcessElement
@@ -1211,7 +1215,8 @@ public class ParDoTest implements Serializable {
                     context.sideOutput(sideOutputTag, element);
                   }
                 })
-    );
+            .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
+        );
 
     // Before fix, tuple.get(mainOutputTag).apply(...) would indirectly trigger
     // tuple.get(sideOutputTag).finishSpecifyingOutput(), which would crash
@@ -1263,14 +1268,15 @@ public class ParDoTest implements Serializable {
 
     PCollection<String> output =
         input
-        .apply(ParDo.withOutputTags(mainOutputTag, 
TupleTagList.of(sideOutputTag)).of(
+        .apply(ParDo.of(
             new DoFn<Integer, Integer>() {
               @ProcessElement
               public void processElement(ProcessContext c) {
                 c.sideOutputWithTimestamp(
                     sideOutputTag, c.element(), new 
Instant(c.element().longValue()));
               }
-            })).get(sideOutputTag)
+            }).withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)))
+        .get(sideOutputTag)
         .apply(ParDo.of(new TestShiftTimestampDoFn<Integer>(Duration.ZERO, 
Duration.ZERO)))
         .apply(ParDo.of(new TestFormatTimestampDoFn<Integer>()));
 
@@ -2297,8 +2303,8 @@ public class ParDoTest implements Serializable {
     };
 
     ParDo.BoundMulti<String, String> parDo = ParDo
-            .withOutputTags(new TupleTag<String>(), TupleTagList.empty())
-            .of(fn);
+            .of(fn)
+            .withOutputTags(new TupleTag<String>(), TupleTagList.empty());
 
     DisplayData displayData = DisplayData.from(parDo);
     assertThat(displayData, includesDisplayDataFor("fn", fn));

http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index 740d808..867fe0a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -99,12 +99,12 @@ public class ViewTest implements Serializable {
     PCollection<Integer> output =
         pipeline.apply("Create123", Create.of(1, 2, 3))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                ParDo.of(new DoFn<Integer, Integer>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(c.sideInput(view));
                   }
-                }));
+                }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder(47, 47, 47);
 
@@ -129,12 +129,12 @@ public class ViewTest implements Serializable {
             TimestampedValue.of(3, new Instant(12))))
             .apply("MainWindowInto", 
Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                ParDo.of(new DoFn<Integer, Integer>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(c.sideInput(view));
                   }
-                }));
+                }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder(47, 47, 48);
 
@@ -150,12 +150,12 @@ public class ViewTest implements Serializable {
             .apply(View.<Integer>asSingleton());
 
     pipeline.apply("Create123", Create.of(1, 2, 3))
-        .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new 
DoFn<Integer, Integer>() {
+        .apply("OutputSideInputs", ParDo.of(new DoFn<Integer, Integer>() {
           @ProcessElement
           public void processElement(ProcessContext c) {
             c.output(c.sideInput(view));
           }
-        }));
+        }).withSideInputs(view));
 
     thrown.expect(PipelineExecutionException.class);
     thrown.expectCause(isA(NoSuchElementException.class));
@@ -174,12 +174,12 @@ public class ViewTest implements Serializable {
     final PCollectionView<Integer> view = 
oneTwoThree.apply(View.<Integer>asSingleton());
 
     oneTwoThree.apply(
-        "OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, 
Integer>() {
+        "OutputSideInputs", ParDo.of(new DoFn<Integer, Integer>() {
           @ProcessElement
           public void processElement(ProcessContext c) {
             c.output(c.sideInput(view));
           }
-        }));
+        }).withSideInputs(view));
 
     thrown.expect(PipelineExecutionException.class);
     thrown.expectCause(isA(IllegalArgumentException.class));
@@ -200,7 +200,7 @@ public class ViewTest implements Serializable {
     PCollection<Integer> output =
         pipeline.apply("CreateMainInput", Create.of(29, 31))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                ParDo.of(new DoFn<Integer, Integer>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     checkArgument(c.sideInput(view).size() == 4);
@@ -209,7 +209,7 @@ public class ViewTest implements Serializable {
                       c.output(i);
                     }
                   }
-                }));
+                }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 11, 13, 17, 23);
 
@@ -240,7 +240,7 @@ public class ViewTest implements Serializable {
             .apply("MainWindowInto", 
Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                ParDo.of(new DoFn<Integer, Integer>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     checkArgument(c.sideInput(view).size() == 4);
@@ -249,7 +249,7 @@ public class ViewTest implements Serializable {
                       c.output(i);
                     }
                   }
-                }));
+                }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 31, 33, 37, 43);
 
@@ -267,14 +267,14 @@ public class ViewTest implements Serializable {
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                ParDo.of(new DoFn<Integer, Integer>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertTrue(c.sideInput(view).isEmpty());
                     assertFalse(c.sideInput(view).iterator().hasNext());
                     c.output(1);
                   }
-                }));
+                }).withSideInputs(view));
 
     // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
@@ -292,7 +292,7 @@ public class ViewTest implements Serializable {
     PCollection<Integer> output =
         pipeline.apply("CreateMainInput", Create.of(29))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                ParDo.of(new DoFn<Integer, Integer>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     try {
@@ -319,7 +319,7 @@ public class ViewTest implements Serializable {
                       c.output(i);
                     }
                   }
-                }));
+                }).withSideInputs(view));
 
     // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(output).containsInAnyOrder(11);
@@ -338,14 +338,14 @@ public class ViewTest implements Serializable {
     PCollection<Integer> output =
         pipeline.apply("CreateMainInput", Create.of(29, 31))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                ParDo.of(new DoFn<Integer, Integer>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (Integer i : c.sideInput(view)) {
                       c.output(i);
                     }
                   }
-                }));
+                }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 11, 13, 17, 23);
 
@@ -377,14 +377,14 @@ public class ViewTest implements Serializable {
                     TimestampedValue.of(35, new Instant(11))))
             .apply("MainWindowInto", 
Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                ParDo.of(new DoFn<Integer, Integer>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (Integer i : c.sideInput(view)) {
                       c.output(i);
                     }
                   }
-                }));
+                }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 31, 33, 37, 43);
 
@@ -402,13 +402,13 @@ public class ViewTest implements Serializable {
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                ParDo.of(new DoFn<Integer, Integer>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertFalse(c.sideInput(view).iterator().hasNext());
                     c.output(1);
                   }
-                }));
+                }).withSideInputs(view));
 
     // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
@@ -426,7 +426,7 @@ public class ViewTest implements Serializable {
     PCollection<Integer> output =
         pipeline.apply("CreateMainInput", Create.of(29))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                ParDo.of(new DoFn<Integer, Integer>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     Iterator<Integer> iterator = c.sideInput(view).iterator();
@@ -439,7 +439,7 @@ public class ViewTest implements Serializable {
                       c.output(iterator.next());
                     }
                   }
-                }));
+                }).withSideInputs(view));
 
     // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(output).containsInAnyOrder(11);
@@ -459,14 +459,14 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", 
"blackberry"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, 
Integer>>() {
+                ParDo.of(new DoFn<String, KV<String, Integer>>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (Integer v : 
c.sideInput(view).get(c.element().substring(0, 1))) {
                       c.output(of(c.element(), v));
                     }
                   }
-                }));
+                }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder(
         KV.of("apple", 1), KV.of("apple", 2), KV.of("banana", 3), 
KV.of("blackberry", 3));
@@ -486,7 +486,7 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of(2 /* size */))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<Integer, KV<String, 
Integer>>() {
+                ParDo.of(new DoFn<Integer, KV<String, Integer>>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertEquals((int) c.element(), c.sideInput(view).size());
@@ -497,7 +497,7 @@ public class ViewTest implements Serializable {
                       }
                     }
                   }
-                }));
+                }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder(
         KV.of("a", 1), KV.of("a", 2), KV.of("b", 3));
@@ -539,14 +539,14 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", 
"blackberry"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, 
Integer>>() {
+                ParDo.of(new DoFn<String, KV<String, Integer>>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (Integer v : 
c.sideInput(view).get(c.element().substring(0, 1))) {
                       c.output(of(c.element(), v));
                     }
                   }
-                }));
+                }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder(
         KV.of("apple", 1), KV.of("apple", 2), KV.of("banana", 3), 
KV.of("blackberry", 3));
@@ -574,7 +574,7 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of("banana", 
new Instant(13)),
                                               
TimestampedValue.of("blackberry", new Instant(16))))
             .apply("MainWindowInto", 
Window.<String>into(FixedWindows.of(Duration.millis(10))))
-            .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
+            .apply("OutputSideInputs", ParDo.of(
                                            new DoFn<String, KV<String, 
Integer>>() {
                                              @ProcessElement
                                              public void 
processElement(ProcessContext c) {
@@ -584,7 +584,7 @@ public class ViewTest implements Serializable {
                                                  c.output(of(c.element(), v));
                                                }
                                              }
-                                           }));
+                                           }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder(
         KV.of("apple", 1), KV.of("apple", 2), KV.of("banana", 3), 
KV.of("blackberry", 3));
@@ -611,7 +611,7 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of(1 /* size 
*/, new Instant(5)),
                                               TimestampedValue.of(1 /* size 
*/, new Instant(16))))
             .apply("MainWindowInto", 
Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
-            .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
+            .apply("OutputSideInputs", ParDo.of(
                                            new DoFn<Integer, KV<String, 
Integer>>() {
                                              @ProcessElement
                                              public void 
processElement(ProcessContext c) {
@@ -626,7 +626,7 @@ public class ViewTest implements Serializable {
                                                  }
                                                }
                                              }
-                                           }));
+                                           }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder(
         KV.of("a", 1), KV.of("a", 2), KV.of("b", 3));
@@ -655,7 +655,7 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of("banana", 
new Instant(13)),
                                               
TimestampedValue.of("blackberry", new Instant(16))))
             .apply("MainWindowInto", 
Window.<String>into(FixedWindows.of(Duration.millis(10))))
-            .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
+            .apply("OutputSideInputs", ParDo.of(
                                            new DoFn<String, KV<String, 
Integer>>() {
                                              @ProcessElement
                                              public void 
processElement(ProcessContext c) {
@@ -665,7 +665,7 @@ public class ViewTest implements Serializable {
                                                  c.output(of(c.element(), v));
                                                }
                                              }
-                                           }));
+                                           }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder(
         KV.of("apple", 1), KV.of("apple", 2), KV.of("banana", 3), 
KV.of("blackberry", 3));
@@ -685,7 +685,7 @@ public class ViewTest implements Serializable {
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                ParDo.of(new DoFn<Integer, Integer>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertTrue(c.sideInput(view).isEmpty());
@@ -693,7 +693,7 @@ public class ViewTest implements Serializable {
                     
assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
                     c.output(c.element());
                   }
-                }));
+                }).withSideInputs(view));
 
     // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
@@ -715,7 +715,7 @@ public class ViewTest implements Serializable {
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                ParDo.of(new DoFn<Integer, Integer>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertTrue(c.sideInput(view).isEmpty());
@@ -723,7 +723,7 @@ public class ViewTest implements Serializable {
                     
assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
                     c.output(c.element());
                   }
-                }));
+                }).withSideInputs(view));
 
     // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
@@ -743,7 +743,7 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, 
Integer>>() {
+                ParDo.of(new DoFn<String, KV<String, Integer>>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     try {
@@ -770,7 +770,7 @@ public class ViewTest implements Serializable {
                       c.output(KV.of(c.element(), v));
                     }
                   }
-                }));
+                }).withSideInputs(view));
 
     // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(output).containsInAnyOrder(KV.of("apple", 1));
@@ -790,13 +790,13 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", 
"blackberry"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, 
Integer>>() {
+                ParDo.of(new DoFn<String, KV<String, Integer>>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(
                         of(c.element(), 
c.sideInput(view).get(c.element().substring(0, 1))));
                   }
-                }));
+                }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder(
         KV.of("apple", 1), KV.of("banana", 3), KV.of("blackberry", 3));
@@ -816,7 +816,7 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of(2 /* size */))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<Integer, KV<String, 
Integer>>() {
+                ParDo.of(new DoFn<Integer, KV<String, Integer>>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertEquals((int) c.element(), c.sideInput(view).size());
@@ -825,7 +825,7 @@ public class ViewTest implements Serializable {
                       c.output(KV.of(entry.getKey(), entry.getValue()));
                     }
                   }
-                }));
+                }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder(
         KV.of("a", 1), KV.of("b", 3));
@@ -847,13 +847,13 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", 
"blackberry"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, 
Integer>>() {
+                ParDo.of(new DoFn<String, KV<String, Integer>>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(
                         of(c.element(), 
c.sideInput(view).get(c.element().substring(0, 1))));
                   }
-                }));
+                }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder(
         KV.of("apple", 1), KV.of("banana", 3), KV.of("blackberry", 3));
@@ -881,7 +881,7 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of("banana", 
new Instant(4)),
                                               
TimestampedValue.of("blackberry", new Instant(16))))
             .apply("MainWindowInto", 
Window.<String>into(FixedWindows.of(Duration.millis(10))))
-            .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
+            .apply("OutputSideInputs", ParDo.of(
                                            new DoFn<String, KV<String, 
Integer>>() {
                                              @ProcessElement
                                              public void 
processElement(ProcessContext c) {
@@ -890,7 +890,7 @@ public class ViewTest implements Serializable {
                                                    c.sideInput(view).get(
                                                        
c.element().substring(0, 1))));
                                              }
-                                           }));
+                                           }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder(
         KV.of("apple", 1), KV.of("banana", 2), KV.of("blackberry", 3));
@@ -917,7 +917,7 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of(2 /* size 
*/, new Instant(5)),
                                               TimestampedValue.of(1 /* size 
*/, new Instant(16))))
             .apply("MainWindowInto", 
Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
-            .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
+            .apply("OutputSideInputs", ParDo.of(
                                            new DoFn<Integer, KV<String, 
Integer>>() {
                                              @ProcessElement
                                              public void 
processElement(ProcessContext c) {
@@ -930,7 +930,7 @@ public class ViewTest implements Serializable {
                                                  
c.output(KV.of(entry.getKey(), entry.getValue()));
                                                }
                                              }
-                                           }));
+                                           }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder(
         KV.of("a", 1), KV.of("b", 2), KV.of("b", 3));
@@ -961,7 +961,7 @@ public class ViewTest implements Serializable {
                                               TimestampedValue.of("banana", 
new Instant(4)),
                                               
TimestampedValue.of("blackberry", new Instant(16))))
             .apply("MainWindowInto", 
Window.<String>into(FixedWindows.of(Duration.millis(10))))
-            .apply("OutputSideInputs", ParDo.withSideInputs(view).of(
+            .apply("OutputSideInputs", ParDo.of(
                                            new DoFn<String, KV<String, 
Integer>>() {
                                              @ProcessElement
                                              public void 
processElement(ProcessContext c) {
@@ -970,7 +970,7 @@ public class ViewTest implements Serializable {
                                                    c.sideInput(view).get(
                                                        
c.element().substring(0, 1))));
                                              }
-                                           }));
+                                           }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder(
         KV.of("apple", 1), KV.of("banana", 2), KV.of("blackberry", 3));
@@ -991,7 +991,7 @@ public class ViewTest implements Serializable {
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                ParDo.of(new DoFn<Integer, Integer>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertTrue(c.sideInput(view).isEmpty());
@@ -999,7 +999,7 @@ public class ViewTest implements Serializable {
                     
assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
                     c.output(c.element());
                   }
-                }));
+                }).withSideInputs(view));
 
     // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
@@ -1019,7 +1019,7 @@ public class ViewTest implements Serializable {
     PCollection<Integer> results =
         pipeline.apply("Create1", Create.of(1))
             .apply("OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+                ParDo.of(new DoFn<Integer, Integer>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     assertTrue(c.sideInput(view).isEmpty());
@@ -1027,7 +1027,7 @@ public class ViewTest implements Serializable {
                     
assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
                     c.output(c.element());
                   }
-                }));
+                }).withSideInputs(view));
 
     // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(results).containsInAnyOrder(1);
@@ -1052,13 +1052,13 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", 
"blackberry"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, 
Integer>>() {
+                ParDo.of(new DoFn<String, KV<String, Integer>>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(
                         KV.of(c.element(), 
c.sideInput(view).get(c.element().substring(0, 1))));
                   }
-                }));
+                }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder(
         KV.of("apple", 1), KV.of("banana", 3), KV.of("blackberry", 3));
@@ -1082,7 +1082,7 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of("apple"))
             .apply(
                 "OutputSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, 
Integer>>() {
+                ParDo.of(new DoFn<String, KV<String, Integer>>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     try {
@@ -1108,7 +1108,7 @@ public class ViewTest implements Serializable {
                     c.output(
                         KV.of(c.element(), 
c.sideInput(view).get(c.element().substring(0, 1))));
                   }
-                }));
+                }).withSideInputs(view));
 
     // Pass at least one value through to guarantee that DoFn executes.
     PAssert.that(output).containsInAnyOrder(KV.of("apple", 1));
@@ -1128,13 +1128,13 @@ public class ViewTest implements Serializable {
     PCollection<KV<String, Integer>> output =
         pipeline.apply("CreateMainInput", Create.of("apple", "banana", 
"blackberry"))
             .apply("Output",
-                ParDo.withSideInputs(view).of(new DoFn<String, KV<String, 
Integer>>() {
+                ParDo.of(new DoFn<String, KV<String, Integer>>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(KV
                         .of(c.element(), 
c.sideInput(view).get(c.element().substring(0, 1))));
                   }
-                }));
+                }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder(
         KV.of("apple", 21), KV.of("banana", 3), KV.of("blackberry", 3));
@@ -1161,13 +1161,13 @@ public class ViewTest implements Serializable {
                                        TimestampedValue.of("B", new 
Instant(15)),
                                        TimestampedValue.of("C", new 
Instant(7))))
             .apply("WindowMainInput", 
Window.<String>into(FixedWindows.of(Duration.millis(10))))
-            .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of(
+            .apply("OutputMainAndSideInputs", ParDo.of(
                                                   new DoFn<String, String>() {
                                                     @ProcessElement
                                                     public void 
processElement(ProcessContext c) {
                                                       c.output(c.element() + 
c.sideInput(view));
                                                     }
-                                                  }));
+                                                  }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder("A1", "B5", "C1");
 
@@ -1193,13 +1193,13 @@ public class ViewTest implements Serializable {
                                        TimestampedValue.of("B", new 
Instant(15)),
                                        TimestampedValue.of("C", new 
Instant(7))))
             .apply("WindowMainInput", 
Window.<String>into(FixedWindows.of(Duration.millis(10))))
-            .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of(
+            .apply("OutputMainAndSideInputs", ParDo.of(
                                                   new DoFn<String, String>() {
                                                     @ProcessElement
                                                     public void 
processElement(ProcessContext c) {
                                                       c.output(c.element() + 
c.sideInput(view));
                                                     }
-                                                  }));
+                                                  }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder("A6", "B6", "C6");
 
@@ -1223,13 +1223,13 @@ public class ViewTest implements Serializable {
                                        TimestampedValue.of("B", new 
Instant(15)),
                                        TimestampedValue.of("C", new 
Instant(7))))
             .apply("WindowMainInput", 
Window.<String>into(FixedWindows.of(Duration.millis(10))))
-            .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of(
+            .apply("OutputMainAndSideInputs", ParDo.of(
                                                   new DoFn<String, String>() {
                                                     @ProcessElement
                                                     public void 
processElement(ProcessContext c) {
                                                       c.output(c.element() + 
c.sideInput(view));
                                                     }
-                                                  }));
+                                                  }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder("A0", "B5", "C0");
 
@@ -1253,12 +1253,12 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateMainInput", Create.of(""))
             .apply(
                 "OutputMainAndSideInputs",
-                ParDo.withSideInputs(view).of(new DoFn<String, String>() {
+                ParDo.of(new DoFn<String, String>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(c.element() + c.sideInput(view));
                   }
-                }));
+                }).withSideInputs(view));
 
     PAssert.that(output).containsInAnyOrder("null");
 
@@ -1282,18 +1282,18 @@ public class ViewTest implements Serializable {
         pipeline.apply("CreateVoid2", Create.of((Void) 
null).withCoder(VoidCoder.of()))
             .apply(
                 "OutputSideInput",
-                ParDo.withSideInputs(view1).of(new DoFn<Void, 
Iterable<Integer>>() {
+                ParDo.of(new DoFn<Void, Iterable<Integer>>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     c.output(c.sideInput(view1));
                   }
-                }))
+                }).withSideInputs(view1))
             .apply("View2", View.<Iterable<Integer>>asIterable());
 
     PCollection<Integer> output =
         pipeline.apply("CreateVoid3", Create.of((Void) 
null).withCoder(VoidCoder.of()))
             .apply("ReadIterableSideInput",
-                ParDo.withSideInputs(view2).of(new DoFn<Void, Integer>() {
+                ParDo.of(new DoFn<Void, Integer>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (Iterable<Integer> input : c.sideInput(view2)) {
@@ -1302,7 +1302,7 @@ public class ViewTest implements Serializable {
                       }
                     }
                   }
-                }));
+                }).withSideInputs(view2));
 
     PAssert.that(output).containsInAnyOrder(17);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
index 5e7cc7d..18d550c 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
@@ -59,8 +59,8 @@ public class TypedPValueTest {
     PCollection<Integer> input = p.apply(Create.of(1, 2, 3));
     PCollectionTuple tuple = input.apply(
         ParDo
-        .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
-        .of(new IdentityDoFn()));
+        .of(new IdentityDoFn())
+        .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)));
     return tuple;
   }
 

Reply via email to