http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index 87aa8c2..a7e64af 100644
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -18,109 +18,350 @@
 package org.apache.beam.runners.apex.translation.utils;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
 import com.datatorrent.lib.util.KryoCloneUtils;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateInternalsTest;
+import java.util.Arrays;
+import 
org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
+import 
org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateInternalsFactory;
+import org.apache.beam.runners.core.StateMerging;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaceForTest;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.GroupingState;
+import org.apache.beam.sdk.state.ReadableState;
 import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.hamcrest.Matchers;
-import org.junit.Ignore;
+import org.joda.time.Instant;
+import org.junit.Before;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.junit.runners.Suite;
 
 /**
  * Tests for {@link ApexStateInternals}. This is based on the tests for
- * {@code StateInternalsTest}.
+ * {@code InMemoryStateInternals}.
  */
-@RunWith(Suite.class)
[email protected]({
-    ApexStateInternalsTest.StandardStateInternalsTests.class,
-    ApexStateInternalsTest.OtherTests.class
-})
 public class ApexStateInternalsTest {
+  private static final BoundedWindow WINDOW_1 = new IntervalWindow(new 
Instant(0), new Instant(10));
+  private static final StateNamespace NAMESPACE_1 = new 
StateNamespaceForTest("ns1");
+  private static final StateNamespace NAMESPACE_2 = new 
StateNamespaceForTest("ns2");
+  private static final StateNamespace NAMESPACE_3 = new 
StateNamespaceForTest("ns3");
 
-  private static StateInternals newStateInternals() {
-    return new ApexStateInternals.ApexStateBackend()
+  private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
+      StateTags.value("stringValue", StringUtf8Coder.of());
+  private static final StateTag<CombiningState<Integer, int[], Integer>>
+      SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
+          "sumInteger", VarIntCoder.of(), Sum.ofIntegers());
+  private static final StateTag<BagState<String>> STRING_BAG_ADDR =
+      StateTags.bag("stringBag", StringUtf8Coder.of());
+  private static final StateTag<WatermarkHoldState>
+      WATERMARK_EARLIEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", 
TimestampCombiner.EARLIEST);
+  private static final StateTag<WatermarkHoldState> WATERMARK_LATEST_ADDR =
+      StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
+  private static final StateTag<WatermarkHoldState> WATERMARK_EOW_ADDR =
+      StateTags.watermarkStateInternal("watermark", 
TimestampCombiner.END_OF_WINDOW);
+
+  private ApexStateInternals<String> underTest;
+
+  @Before
+  public void initStateInternals() {
+    underTest = new ApexStateInternals.ApexStateBackend()
         .newStateInternalsFactory(StringUtf8Coder.of())
-        .stateInternalsForKey("dummyKey");
-  }
-
-  /**
-   * A standard StateInternals test. Ignore set and map tests.
-   */
-  @RunWith(JUnit4.class)
-  public static class StandardStateInternalsTests extends StateInternalsTest {
-    @Override
-    protected StateInternals createStateInternals() {
-      return newStateInternals();
-    }
-
-    @Override
-    @Ignore
-    public void testSet() {}
-
-    @Override
-    @Ignore
-    public void testSetIsEmpty() {}
-
-    @Override
-    @Ignore
-    public void testMergeSetIntoSource() {}
-
-    @Override
-    @Ignore
-    public void testMergeSetIntoNewNamespace() {}
-
-    @Override
-    @Ignore
-    public void testMap() {}
-
-    @Override
-    @Ignore
-    public void testSetReadable() {}
-
-    @Override
-    @Ignore
-    public void testMapReadable() {}
-  }
-
-  /**
-   * A specific test of ApexStateInternalsTest.
-   */
-  @RunWith(JUnit4.class)
-  public static class OtherTests {
-
-    private static final StateNamespace NAMESPACE = new 
StateNamespaceForTest("ns");
-    private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
-        StateTags.value("stringValue", StringUtf8Coder.of());
-
-    @Test
-    public void testSerialization() throws Exception {
-      ApexStateInternals.ApexStateInternalsFactory<String> sif =
-          new ApexStateInternals.ApexStateBackend().
-          newStateInternalsFactory(StringUtf8Coder.of());
-      ApexStateInternals<String> keyAndState = 
sif.stateInternalsForKey("dummy");
-
-      ValueState<String> value = keyAndState.state(NAMESPACE, 
STRING_VALUE_ADDR);
-      assertEquals(keyAndState.state(NAMESPACE, STRING_VALUE_ADDR), value);
-      value.write("hello");
-
-      ApexStateInternals.ApexStateInternalsFactory<String> cloned;
-      assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(sif));
-      ApexStateInternals<String> clonedKeyAndState = 
cloned.stateInternalsForKey("dummy");
-
-      ValueState<String> clonedValue = clonedKeyAndState.state(NAMESPACE, 
STRING_VALUE_ADDR);
-      assertThat(clonedValue.read(), Matchers.equalTo("hello"));
-      assertEquals(clonedKeyAndState.state(NAMESPACE, STRING_VALUE_ADDR), 
value);
-    }
+        .stateInternalsForKey((String) null);
+  }
+
+  @Test
+  public void testBag() throws Exception {
+    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+    assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR)));
+
+    assertThat(value.read(), Matchers.emptyIterable());
+    value.add("hello");
+    assertThat(value.read(), Matchers.containsInAnyOrder("hello"));
+
+    value.add("world");
+    assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world"));
+
+    value.clear();
+    assertThat(value.read(), Matchers.emptyIterable());
+    assertEquals(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), value);
+
+  }
+
+  @Test
+  public void testBagIsEmpty() throws Exception {
+    BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add("hello");
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeBagIntoSource() throws Exception {
+    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+
+    bag1.add("Hello");
+    bag2.add("World");
+    bag1.add("!");
+
+    StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1);
+
+    // Reading the merged bag gets both the contents
+    assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", 
"!"));
+    assertThat(bag2.read(), Matchers.emptyIterable());
+  }
+
+  @Test
+  public void testMergeBagIntoNewNamespace() throws Exception {
+    BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR);
+    BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR);
+    BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR);
+
+    bag1.add("Hello");
+    bag2.add("World");
+    bag1.add("!");
+
+    StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3);
+
+    // Reading the merged bag gets both the contents
+    assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", 
"!"));
+    assertThat(bag1.read(), Matchers.emptyIterable());
+    assertThat(bag2.read(), Matchers.emptyIterable());
   }
+
+  @Test
+  public void testCombiningValue() throws Exception {
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, 
SUM_INTEGER_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR)));
+
+    assertThat(value.read(), Matchers.equalTo(0));
+    value.add(2);
+    assertThat(value.read(), Matchers.equalTo(2));
+
+    value.add(3);
+    assertThat(value.read(), Matchers.equalTo(5));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(0));
+    assertEquals(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), value);
+  }
+
+  @Test
+  public void testCombiningIsEmpty() throws Exception {
+    GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, 
SUM_INTEGER_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add(5);
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeCombiningValueIntoSource() throws Exception {
+    CombiningState<Integer, int[], Integer> value1 =
+        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    CombiningState<Integer, int[], Integer> value2 =
+        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
+
+    value1.add(5);
+    value2.add(10);
+    value1.add(6);
+
+    assertThat(value1.read(), Matchers.equalTo(11));
+    assertThat(value2.read(), Matchers.equalTo(10));
+
+    // Merging clears the old values and updates the result value.
+    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1);
+
+    assertThat(value1.read(), Matchers.equalTo(21));
+    assertThat(value2.read(), Matchers.equalTo(0));
+  }
+
+  @Test
+  public void testMergeCombiningValueIntoNewNamespace() throws Exception {
+    CombiningState<Integer, int[], Integer> value1 =
+        underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    CombiningState<Integer, int[], Integer> value2 =
+        underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
+    CombiningState<Integer, int[], Integer> value3 =
+        underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);
+
+    value1.add(5);
+    value2.add(10);
+    value1.add(6);
+
+    StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3);
+
+    // Merging clears the old values and updates the result value.
+    assertThat(value1.read(), Matchers.equalTo(0));
+    assertThat(value2.read(), Matchers.equalTo(0));
+    assertThat(value3.read(), Matchers.equalTo(21));
+  }
+
+  @Test
+  public void testWatermarkEarliestState() throws Exception {
+    WatermarkHoldState value =
+        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, 
WATERMARK_EARLIEST_ADDR)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.add(new Instant(2000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.add(new Instant(3000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.add(new Instant(1000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(1000)));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(null));
+    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), value);
+  }
+
+  @Test
+  public void testWatermarkLatestState() throws Exception {
+    WatermarkHoldState value =
+        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, 
WATERMARK_LATEST_ADDR)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.add(new Instant(2000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.add(new Instant(3000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
+
+    value.add(new Instant(1000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(3000)));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(null));
+    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), value);
+  }
+
+  @Test
+  public void testWatermarkEndOfWindowState() throws Exception {
+    WatermarkHoldState value = underTest.state(NAMESPACE_1, 
WATERMARK_EOW_ADDR);
+
+    // State instances are cached, but depend on the namespace.
+    assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
+    assertFalse(value.equals(underTest.state(NAMESPACE_2, 
WATERMARK_EOW_ADDR)));
+
+    assertThat(value.read(), Matchers.nullValue());
+    value.add(new Instant(2000));
+    assertThat(value.read(), Matchers.equalTo(new Instant(2000)));
+
+    value.clear();
+    assertThat(value.read(), Matchers.equalTo(null));
+    assertEquals(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), value);
+  }
+
+  @Test
+  public void testWatermarkStateIsEmpty() throws Exception {
+    WatermarkHoldState value =
+        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+
+    assertThat(value.isEmpty().read(), Matchers.is(true));
+    ReadableState<Boolean> readFuture = value.isEmpty();
+    value.add(new Instant(1000));
+    assertThat(readFuture.read(), Matchers.is(false));
+
+    value.clear();
+    assertThat(readFuture.read(), Matchers.is(true));
+  }
+
+  @Test
+  public void testMergeEarliestWatermarkIntoSource() throws Exception {
+    WatermarkHoldState value1 =
+        underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
+    WatermarkHoldState value2 =
+        underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);
+
+    value1.add(new Instant(3000));
+    value2.add(new Instant(5000));
+    value1.add(new Instant(4000));
+    value2.add(new Instant(2000));
+
+    // Merging clears the old values and updates the merged value.
+    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, 
WINDOW_1);
+
+    assertThat(value1.read(), Matchers.equalTo(new Instant(2000)));
+    assertThat(value2.read(), Matchers.equalTo(null));
+  }
+
+  @Test
+  public void testMergeLatestWatermarkIntoSource() throws Exception {
+    WatermarkHoldState value1 =
+        underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
+    WatermarkHoldState value2 =
+        underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
+    WatermarkHoldState value3 =
+        underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);
+
+    value1.add(new Instant(3000));
+    value2.add(new Instant(5000));
+    value1.add(new Instant(4000));
+    value2.add(new Instant(2000));
+
+    // Merging clears the old values and updates the result value.
+    StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, 
WINDOW_1);
+
+    // Merging clears the old values and updates the result value.
+    assertThat(value3.read(), Matchers.equalTo(new Instant(5000)));
+    assertThat(value1.read(), Matchers.equalTo(null));
+    assertThat(value2.read(), Matchers.equalTo(null));
+  }
+
+  @Test
+  public void testSerialization() throws Exception {
+    ApexStateInternalsFactory<String> sif = new ApexStateBackend().
+        newStateInternalsFactory(StringUtf8Coder.of());
+    ApexStateInternals<String> keyAndState = sif.stateInternalsForKey("dummy");
+
+    ValueState<String> value = keyAndState.state(NAMESPACE_1, 
STRING_VALUE_ADDR);
+    assertEquals(keyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), value);
+    value.write("hello");
+
+    ApexStateInternalsFactory<String> cloned;
+    assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(sif));
+    ApexStateInternals<String> clonedKeyAndState = 
cloned.stateInternalsForKey("dummy");
+
+    ValueState<String> clonedValue = clonedKeyAndState.state(NAMESPACE_1, 
STRING_VALUE_ADDR);
+    assertThat(clonedValue.read(), Matchers.equalTo("hello"));
+    assertEquals(clonedKeyAndState.state(NAMESPACE_1, STRING_VALUE_ADDR), 
value);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml 
b/runners/core-construction-java/pom.xml
index b85b5f5..67951e9 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>beam-runners-parent</artifactId>
     <groupId>org.apache.beam</groupId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
index 8fc99b9..aa24909 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
@@ -56,8 +56,8 @@ public class CreatePCollectionViewTranslation {
   @Deprecated
   public static <ElemT, ViewT> PCollectionView<ViewT> getView(
       AppliedPTransform<
-              PCollection<ElemT>, PCollection<ElemT>,
-              PTransform<PCollection<ElemT>, PCollection<ElemT>>>
+              PCollection<ElemT>, PCollectionView<ViewT>,
+              PTransform<PCollection<ElemT>, PCollectionView<ViewT>>>
           application)
       throws IOException {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestriction.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestriction.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestriction.java
new file mode 100644
index 0000000..53a86b1
--- /dev/null
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestriction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * A tuple of an element and a restriction applied to processing it with a
+ * <a href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link DoFn}.
+ */
+@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+@AutoValue
+public abstract class ElementAndRestriction<ElementT, RestrictionT> {
+  /** The element to process. */
+  public abstract ElementT element();
+
+  /** The restriction applied to processing the element. */
+  public abstract RestrictionT restriction();
+
+  /** Constructs the {@link ElementAndRestriction}. */
+  public static <InputT, RestrictionT> ElementAndRestriction<InputT, 
RestrictionT> of(
+      InputT element, RestrictionT restriction) {
+    return new AutoValue_ElementAndRestriction<>(element, restriction);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoder.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoder.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoder.java
new file mode 100644
index 0000000..5ff0aae
--- /dev/null
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StructuredCoder;
+
+/** A {@link Coder} for {@link ElementAndRestriction}. */
+@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+public class ElementAndRestrictionCoder<ElementT, RestrictionT>
+    extends StructuredCoder<ElementAndRestriction<ElementT, RestrictionT>> {
+  private final Coder<ElementT> elementCoder;
+  private final Coder<RestrictionT> restrictionCoder;
+
+  /**
+   * Creates an {@link ElementAndRestrictionCoder} from an element coder and a 
restriction coder.
+   */
+  public static <ElementT, RestrictionT> ElementAndRestrictionCoder<ElementT, 
RestrictionT> of(
+      Coder<ElementT> elementCoder, Coder<RestrictionT> restrictionCoder) {
+    return new ElementAndRestrictionCoder<>(elementCoder, restrictionCoder);
+  }
+
+  private ElementAndRestrictionCoder(
+      Coder<ElementT> elementCoder, Coder<RestrictionT> restrictionCoder) {
+    this.elementCoder = elementCoder;
+    this.restrictionCoder = restrictionCoder;
+  }
+
+  @Override
+  public void encode(
+      ElementAndRestriction<ElementT, RestrictionT> value, OutputStream 
outStream)
+      throws IOException {
+    if (value == null) {
+      throw new CoderException("cannot encode a null ElementAndRestriction");
+    }
+    elementCoder.encode(value.element(), outStream);
+    restrictionCoder.encode(value.restriction(), outStream);
+  }
+
+  @Override
+  public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream 
inStream)
+      throws IOException {
+    ElementT key = elementCoder.decode(inStream);
+    RestrictionT value = restrictionCoder.decode(inStream);
+    return ElementAndRestriction.of(key, value);
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return ImmutableList.of(elementCoder, restrictionCoder);
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    elementCoder.verifyDeterministic();
+    restrictionCoder.verifyDeterministic();
+  }
+
+  public Coder<ElementT> getElementCoder() {
+    return elementCoder;
+  }
+
+  public Coder<RestrictionT> getRestrictionCoder() {
+    return restrictionCoder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
index 52526bb..968966f 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.core.construction;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.values.PCollection;
@@ -48,21 +47,6 @@ public class PCollectionTranslation {
         .build();
   }
 
-  public static PCollection<?> fromProto(
-      Pipeline pipeline, RunnerApi.PCollection pCollection, 
RunnerApi.Components components)
-      throws IOException {
-    return PCollection.createPrimitiveOutputInternal(
-            pipeline,
-            WindowingStrategyTranslation.fromProto(
-                
components.getWindowingStrategiesOrThrow(pCollection.getWindowingStrategyId()),
-                components),
-            fromProto(pCollection.getIsBounded()))
-        .setCoder(
-            (Coder)
-                CoderTranslation.fromProto(
-                    components.getCodersOrThrow(pCollection.getCoderId()), 
components));
-  }
-
   public static IsBounded isBounded(RunnerApi.PCollection pCollection) {
     return fromProto(pCollection.getIsBounded());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index 0d27241..bfe24a0 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -17,14 +17,12 @@
  */
 package org.apache.beam.runners.core.construction;
 
-import static 
org.apache.beam.runners.core.construction.PTransformTranslation.WRITE_FILES_TRANSFORM_URN;
-
 import com.google.common.base.MoreObjects;
-import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformMatcher;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -52,34 +50,6 @@ public class PTransformMatchers {
   private PTransformMatchers() {}
 
   /**
-   * Returns a {@link PTransformMatcher} that matches a {@link PTransform} if 
the URN of the
-   * {@link PTransform} is equal to the URN provided ot this matcher.
-   */
-  public static PTransformMatcher urnEqualTo(String urn) {
-    return new EqualUrnPTransformMatcher(urn);
-  }
-
-  private static class EqualUrnPTransformMatcher implements PTransformMatcher {
-    private final String urn;
-
-    private EqualUrnPTransformMatcher(String urn) {
-      this.urn = urn;
-    }
-
-    @Override
-    public boolean matches(AppliedPTransform<?, ?, ?> application) {
-      return 
urn.equals(PTransformTranslation.urnForTransformOrNull(application.getTransform()));
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("urn", urn)
-          .toString();
-    }
-  }
-
-  /**
    * Returns a {@link PTransformMatcher} that matches a {@link PTransform} if 
the class of the
    * {@link PTransform} is equal to the {@link Class} provided ot this matcher.
    */
@@ -181,68 +151,6 @@ public class PTransformMatchers {
   }
 
   /**
-   * A {@link PTransformMatcher} that matches a {@link ParDo} by URN if it has 
a splittable {@link
-   * DoFn}.
-   */
-  public static PTransformMatcher splittableParDo() {
-    return new PTransformMatcher() {
-      @Override
-      public boolean matches(AppliedPTransform<?, ?, ?> application) {
-        if (PTransformTranslation.PAR_DO_TRANSFORM_URN.equals(
-            
PTransformTranslation.urnForTransformOrNull(application.getTransform()))) {
-
-          try {
-            return ParDoTranslation.isSplittable(application);
-          } catch (IOException e) {
-            throw new RuntimeException(
-                String.format(
-                    "Transform with URN %s could not be translated",
-                    PTransformTranslation.PAR_DO_TRANSFORM_URN),
-                e);
-          }
-        }
-        return false;
-      }
-
-      @Override
-      public String toString() {
-        return 
MoreObjects.toStringHelper("SplittableParDoMultiMatcher").toString();
-      }
-    };
-  }
-
-  /**
-   * A {@link PTransformMatcher} that matches a {@link ParDo} transform by URN
-   * and whether it contains state or timers as specified by {@link 
ParDoTranslation}.
-   */
-  public static PTransformMatcher stateOrTimerParDo() {
-    return new PTransformMatcher() {
-      @Override
-      public boolean matches(AppliedPTransform<?, ?, ?> application) {
-        if (PTransformTranslation.PAR_DO_TRANSFORM_URN.equals(
-            
PTransformTranslation.urnForTransformOrNull(application.getTransform()))) {
-
-          try {
-            return ParDoTranslation.usesStateOrTimers(application);
-          } catch (IOException e) {
-            throw new RuntimeException(
-                String.format(
-                    "Transform with URN %s could not be translated",
-                    PTransformTranslation.PAR_DO_TRANSFORM_URN),
-                e);
-          }
-        }
-        return false;
-      }
-
-      @Override
-      public String toString() {
-        return 
MoreObjects.toStringHelper("StateOrTimerParDoMatcher").toString();
-      }
-    };
-  }
-
-  /**
    * A {@link PTransformMatcher} that matches a {@link ParDo.MultiOutput} 
containing a {@link DoFn}
    * that uses state or timers, as specified by {@link 
DoFnSignature#usesState()} and
    * {@link DoFnSignature#usesTimers()}.
@@ -360,18 +268,9 @@ public class PTransformMatchers {
     return new PTransformMatcher() {
       @Override
       public boolean matches(AppliedPTransform<?, ?, ?> application) {
-        if (WRITE_FILES_TRANSFORM_URN.equals(
-            
PTransformTranslation.urnForTransformOrNull(application.getTransform()))) {
-          try {
-            return WriteFilesTranslation.isRunnerDeterminedSharding(
-                (AppliedPTransform) application);
-          } catch (IOException exc) {
-            throw new RuntimeException(
-                String.format(
-                    "Transform with URN %s failed to parse: %s",
-                    WRITE_FILES_TRANSFORM_URN, application.getTransform()),
-                exc);
-          }
+        if (application.getTransform() instanceof WriteFiles) {
+          WriteFiles write = (WriteFiles) application.getTransform();
+          return write.getSharding() == null && write.getNumShards() == null;
         }
         return false;
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index bae7b05..32ecf43 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -179,12 +179,13 @@ public class PTransformTranslation {
    * Returns the URN for the transform if it is known, otherwise throws.
    */
   public static String urnForTransform(PTransform<?, ?> transform) {
-    String urn = urnForTransformOrNull(transform);
-    if (urn == null) {
+    TransformPayloadTranslator translator = 
KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass());
+    if (translator == null) {
       throw new IllegalStateException(
           String.format("No translator known for %s", 
transform.getClass().getName()));
     }
-    return urn;
+
+    return translator.getUrn(transform);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 03f29ff..34e0d86 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -19,7 +19,6 @@
 package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN;
 
@@ -35,11 +34,9 @@ import com.google.protobuf.BytesValue;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
@@ -75,10 +72,8 @@ import 
org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
-import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.WindowingStrategy;
 
 /**
@@ -220,67 +215,11 @@ public class ParDoTranslation {
     return doFnAndMainOutputTagFromProto(payload.getDoFn()).getDoFn();
   }
 
-  public static DoFn<?, ?> getDoFn(AppliedPTransform<?, ?, ?> application) 
throws IOException {
-    return getDoFn(getParDoPayload(application));
-  }
-
   public static TupleTag<?> getMainOutputTag(ParDoPayload payload)
       throws InvalidProtocolBufferException {
     return doFnAndMainOutputTagFromProto(payload.getDoFn()).getMainOutputTag();
   }
 
-  public static TupleTag<?> getMainOutputTag(AppliedPTransform<?, ?, ?> 
application)
-      throws IOException {
-    return getMainOutputTag(getParDoPayload(application));
-  }
-
-  public static TupleTagList getAdditionalOutputTags(AppliedPTransform<?, ?, 
?> application)
-      throws IOException {
-
-    RunnerApi.PTransform protoTransform =
-        PTransformTranslation.toProto(application, SdkComponents.create());
-
-    ParDoPayload payload = 
protoTransform.getSpec().getParameter().unpack(ParDoPayload.class);
-    TupleTag<?> mainOutputTag = getMainOutputTag(payload);
-    Set<String> outputTags =
-        Sets.difference(
-            protoTransform.getOutputsMap().keySet(), 
Collections.singleton(mainOutputTag.getId()));
-
-    ArrayList<TupleTag<?>> additionalOutputTags = new ArrayList<>();
-    for (String outputTag : outputTags) {
-      additionalOutputTags.add(new TupleTag<>(outputTag));
-    }
-    return TupleTagList.of(additionalOutputTags);
-  }
-
-  public static List<PCollectionView<?>> getSideInputs(AppliedPTransform<?, ?, 
?> application)
-      throws IOException {
-
-    SdkComponents sdkComponents = SdkComponents.create();
-    RunnerApi.PTransform parDoProto =
-        PTransformTranslation.toProto(application, sdkComponents);
-    ParDoPayload payload = 
parDoProto.getSpec().getParameter().unpack(ParDoPayload.class);
-
-    List<PCollectionView<?>> views = new ArrayList<>();
-    for (Map.Entry<String, SideInput> sideInputEntry : 
payload.getSideInputsMap().entrySet()) {
-      String sideInputTag = sideInputEntry.getKey();
-      RunnerApi.SideInput sideInput = sideInputEntry.getValue();
-      PCollection<?> originalPCollection =
-          checkNotNull(
-              (PCollection<?>) application.getInputs().get(new 
TupleTag<>(sideInputTag)),
-              "no input with tag %s",
-              sideInputTag);
-      views.add(
-          viewFromProto(
-              sideInput,
-              sideInputTag,
-              originalPCollection,
-              parDoProto,
-              sdkComponents.toComponents()));
-    }
-    return views;
-  }
-
   public static RunnerApi.PCollection getMainInput(
       RunnerApi.PTransform ptransform, Components components) throws 
IOException {
     checkArgument(
@@ -508,27 +447,15 @@ public class ParDoTranslation {
     return builder.build();
   }
 
-  /**
-   * Create a {@link PCollectionView} from a side input spec and an 
already-deserialized {@link
-   * PCollection} that should be wired up.
-   */
-  public static PCollectionView<?> viewFromProto(
-      SideInput sideInput,
-      String localName,
-      PCollection<?> pCollection,
-      RunnerApi.PTransform parDoTransform,
-      Components components)
+  public static PCollectionView<?> fromProto(
+      SideInput sideInput, String id, RunnerApi.PTransform parDoTransform, 
Components components)
       throws IOException {
-    checkArgument(
-        localName != null,
-        "%s.viewFromProto: localName must not be null",
-        ParDoTranslation.class.getSimpleName());
-    TupleTag<?> tag = new TupleTag<>(localName);
+    TupleTag<?> tag = new TupleTag<>(id);
     WindowMappingFn<?> windowMappingFn = 
windowMappingFnFromProto(sideInput.getWindowMappingFn());
     ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn());
 
     RunnerApi.PCollection inputCollection =
-        
components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(localName));
+        components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(id));
     WindowingStrategy<?, ?> windowingStrategy =
         WindowingStrategyTranslation.fromProto(
             
components.getWindowingStrategiesOrThrow(inputCollection.getWindowingStrategyId()),
@@ -548,7 +475,6 @@ public class ParDoTranslation {
 
     PCollectionView<?> view =
         new RunnerPCollectionView<>(
-            pCollection,
             (TupleTag<Iterable<WindowedValue<?>>>) tag,
             (ViewFn<Iterable<WindowedValue<?>>, ?>) viewFn,
             windowMappingFn,

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
index 85139e8..89e8784 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
@@ -18,8 +18,6 @@
 
 package org.apache.beam.runners.core.construction;
 
-import java.util.Map;
-import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
@@ -28,7 +26,6 @@ import 
org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.PValueBase;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -40,19 +37,16 @@ class RunnerPCollectionView<T> extends PValueBase 
implements PCollectionView<T>
   private final WindowMappingFn<?> windowMappingFn;
   private final WindowingStrategy<?, ?> windowingStrategy;
   private final Coder<Iterable<WindowedValue<?>>> coder;
-  private final transient PCollection<?> pCollection;
 
   /**
    * Create a new {@link RunnerPCollectionView} from the provided components.
    */
   RunnerPCollectionView(
-      PCollection<?> pCollection,
       TupleTag<Iterable<WindowedValue<?>>> tag,
       ViewFn<Iterable<WindowedValue<?>>, T> viewFn,
       WindowMappingFn<?> windowMappingFn,
       @Nullable WindowingStrategy<?, ?> windowingStrategy,
       @Nullable Coder<Iterable<WindowedValue<?>>> coder) {
-    this.pCollection = pCollection;
     this.tag = tag;
     this.viewFn = viewFn;
     this.windowMappingFn = windowMappingFn;
@@ -60,9 +54,11 @@ class RunnerPCollectionView<T> extends PValueBase implements 
PCollectionView<T>
     this.coder = coder;
   }
 
+  @Nullable
   @Override
   public PCollection<?> getPCollection() {
-    return pCollection;
+    throw new IllegalStateException(
+        String.format("Cannot call getPCollection on a %s", 
getClass().getSimpleName()));
   }
 
   @Override
@@ -89,25 +85,4 @@ class RunnerPCollectionView<T> extends PValueBase implements 
PCollectionView<T>
   public Coder<Iterable<WindowedValue<?>>> getCoderInternal() {
     return coder;
   }
-
-  @Override
-  public Map<TupleTag<?>, PValue> expand() {
-    throw new UnsupportedOperationException(String.format(
-        "A %s cannot be expanded", 
RunnerPCollectionView.class.getSimpleName()));
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (!(other instanceof PCollectionView)) {
-      return false;
-    }
-    @SuppressWarnings("unchecked")
-    PCollectionView<?> otherView = (PCollectionView<?>) other;
-    return tag.equals(otherView.getTagInternal());
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(tag);
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index e71187b..665e39d 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -18,16 +18,13 @@
 package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
-import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import 
org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -42,8 +39,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
-import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -59,7 +54,7 @@ import org.apache.beam.sdk.values.WindowingStrategy;
  * <li>Explode windows, since splitting within each window has to happen 
independently
  * <li>Assign a unique key to each element/restriction pair
  * <li>Process the keyed element/restriction pairs in a runner-specific way 
with the splittable
- *     {@link DoFn}'s {@link DoFn.ProcessElement} method.
+ *   {@link DoFn}'s {@link DoFn.ProcessElement} method.
  * </ol>
  *
  * <p>This transform is intended as a helper for internal use by runners when 
implementing {@code
@@ -68,11 +63,7 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
 public class SplittableParDo<InputT, OutputT, RestrictionT>
     extends PTransform<PCollection<InputT>, PCollectionTuple> {
-
-  private final DoFn<InputT, OutputT> doFn;
-  private final List<PCollectionView<?>> sideInputs;
-  private final TupleTag<OutputT> mainOutputTag;
-  private final TupleTagList additionalOutputTags;
+  private final ParDo.MultiOutput<InputT, OutputT> parDo;
 
   public static final String SPLITTABLE_PROCESS_URN =
       "urn:beam:runners_core:transforms:splittable_process:v1";
@@ -83,97 +74,56 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
   public static final String SPLITTABLE_GBKIKWI_URN =
       "urn:beam:runners_core:transforms:splittable_gbkikwi:v1";
 
-  private SplittableParDo(
-      DoFn<InputT, OutputT> doFn,
-      TupleTag<OutputT> mainOutputTag,
-      List<PCollectionView<?>> sideInputs,
-      TupleTagList additionalOutputTags) {
-    checkArgument(
-        
DoFnSignatures.getSignature(doFn.getClass()).processElement().isSplittable(),
-        "fn must be a splittable DoFn");
-    this.doFn = doFn;
-    this.mainOutputTag = mainOutputTag;
-    this.sideInputs = sideInputs;
-    this.additionalOutputTags = additionalOutputTags;
-  }
-
   /**
-   * Creates a {@link SplittableParDo} from an original Java {@link ParDo}.
+   * Creates the transform for the given original multi-output {@link ParDo}.
    *
    * @param parDo The splittable {@link ParDo} transform.
    */
-  public static <InputT, OutputT> SplittableParDo<InputT, OutputT, ?> 
forJavaParDo(
-      ParDo.MultiOutput<InputT, OutputT> parDo) {
-    checkArgument(parDo != null, "parDo must not be null");
+  public SplittableParDo(ParDo.MultiOutput<InputT, OutputT> parDo) {
+    checkNotNull(parDo, "parDo must not be null");
+    this.parDo = parDo;
     checkArgument(
         
DoFnSignatures.getSignature(parDo.getFn().getClass()).processElement().isSplittable(),
         "fn must be a splittable DoFn");
-    return new SplittableParDo(
-        parDo.getFn(),
-        parDo.getMainOutputTag(),
-        parDo.getSideInputs(),
-        parDo.getAdditionalOutputTags());
-  }
-
-  /**
-   * Creates the transform for a {@link ParDo}-compatible {@link 
AppliedPTransform}.
-   *
-   * <p>The input may generally be a deserialized transform so it may not 
actually be a {@link
-   * ParDo}. Instead {@link ParDoTranslation} will be used to extract fields.
-   */
-  public static SplittableParDo<?, ?, ?> forAppliedParDo(AppliedPTransform<?, 
?, ?> parDo) {
-    checkArgument(parDo != null, "parDo must not be null");
-
-    try {
-      return new SplittableParDo<>(
-          ParDoTranslation.getDoFn(parDo),
-          (TupleTag) ParDoTranslation.getMainOutputTag(parDo),
-          ParDoTranslation.getSideInputs(parDo),
-          ParDoTranslation.getAdditionalOutputTags(parDo));
-    } catch (IOException exc) {
-      throw new RuntimeException(exc);
-    }
   }
 
   @Override
   public PCollectionTuple expand(PCollection<InputT> input) {
+    DoFn<InputT, OutputT> fn = parDo.getFn();
     Coder<RestrictionT> restrictionCoder =
-        DoFnInvokers.invokerFor(doFn)
+        DoFnInvokers.invokerFor(fn)
             .invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry());
-    Coder<KV<InputT, RestrictionT>> splitCoder = KvCoder.of(input.getCoder(), 
restrictionCoder);
+    Coder<ElementAndRestriction<InputT, RestrictionT>> splitCoder =
+        ElementAndRestrictionCoder.of(input.getCoder(), restrictionCoder);
 
-    PCollection<KV<String, KV<InputT, RestrictionT>>> keyedRestrictions =
+    PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>> 
keyedRestrictions =
         input
             .apply(
                 "Pair with initial restriction",
-                ParDo.of(new PairWithRestrictionFn<InputT, OutputT, 
RestrictionT>(doFn)))
+                ParDo.of(new PairWithRestrictionFn<InputT, OutputT, 
RestrictionT>(fn)))
             .setCoder(splitCoder)
-            .apply(
-                "Split restriction", ParDo.of(new SplitRestrictionFn<InputT, 
RestrictionT>(doFn)))
+            .apply("Split restriction", ParDo.of(new 
SplitRestrictionFn<InputT, RestrictionT>(fn)))
             .setCoder(splitCoder)
             // ProcessFn requires all input elements to be in a single window 
and have a single
             // element per work item. This must precede the unique keying so 
each key has a single
             // associated element.
-            .apply("Explode windows", ParDo.of(new ExplodeWindowsFn<KV<InputT, 
RestrictionT>>()))
+            .apply(
+                "Explode windows",
+                ParDo.of(new ExplodeWindowsFn<ElementAndRestriction<InputT, 
RestrictionT>>()))
             .apply(
                 "Assign unique key",
-                WithKeys.of(new RandomUniqueKeyFn<KV<InputT, 
RestrictionT>>()));
+                WithKeys.of(new 
RandomUniqueKeyFn<ElementAndRestriction<InputT, RestrictionT>>()));
 
     return keyedRestrictions.apply(
         "ProcessKeyedElements",
         new ProcessKeyedElements<>(
-            doFn,
+            fn,
             input.getCoder(),
             restrictionCoder,
             (WindowingStrategy<InputT, ?>) input.getWindowingStrategy(),
-            sideInputs,
-            mainOutputTag,
-            additionalOutputTags));
-  }
-
-  @Override
-  public Map<TupleTag<?>, PValue> getAdditionalInputs() {
-    return PCollectionViews.toAdditionalInputs(sideInputs);
+            parDo.getSideInputs(),
+            parDo.getMainOutputTag(),
+            parDo.getAdditionalOutputTags()));
   }
 
   /**
@@ -190,11 +140,12 @@ public class SplittableParDo<InputT, OutputT, 
RestrictionT>
 
   /**
    * Runner-specific primitive {@link PTransform} that invokes the {@link 
DoFn.ProcessElement}
-   * method for a splittable {@link DoFn} on each {@link KV} of the input 
{@link PCollection} of
-   * {@link KV KVs} keyed with arbitrary but globally unique keys.
+   * method for a splittable {@link DoFn} on each {@link 
ElementAndRestriction} of the input {@link
+   * PCollection} of {@link KV KVs} keyed with arbitrary but globally unique 
keys.
    */
   public static class ProcessKeyedElements<InputT, OutputT, RestrictionT>
-      extends RawPTransform<PCollection<KV<String, KV<InputT, RestrictionT>>>, 
PCollectionTuple> {
+      extends RawPTransform<
+          PCollection<KV<String, ElementAndRestriction<InputT, 
RestrictionT>>>, PCollectionTuple> {
     private final DoFn<InputT, OutputT> fn;
     private final Coder<InputT> elementCoder;
     private final Coder<RestrictionT> restrictionCoder;
@@ -257,7 +208,9 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     }
 
     @Override
-    public PCollectionTuple expand(PCollection<KV<String, KV<InputT, 
RestrictionT>>> input) {
+    public PCollectionTuple expand(
+        PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>>
+            input) {
       return createPrimitiveOutputFor(
           input, fn, mainOutputTag, additionalOutputTags, windowingStrategy);
     }
@@ -283,11 +236,6 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     }
 
     @Override
-    public Map<TupleTag<?>, PValue> getAdditionalInputs() {
-      return PCollectionViews.toAdditionalInputs(sideInputs);
-    }
-
-    @Override
     public String getUrn() {
       return SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN;
     }
@@ -309,7 +257,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
    * Pairs each input element with its initial restriction using the given 
splittable {@link DoFn}.
    */
   private static class PairWithRestrictionFn<InputT, OutputT, RestrictionT>
-      extends DoFn<InputT, KV<InputT, RestrictionT>> {
+      extends DoFn<InputT, ElementAndRestriction<InputT, RestrictionT>> {
     private DoFn<InputT, OutputT> fn;
     private transient DoFnInvoker<InputT, OutputT> invoker;
 
@@ -325,7 +273,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     @ProcessElement
     public void processElement(ProcessContext context) {
       context.output(
-          KV.of(
+          ElementAndRestriction.of(
               context.element(),
               
invoker.<RestrictionT>invokeGetInitialRestriction(context.element())));
     }
@@ -333,7 +281,9 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
 
   /** Splits the restriction using the given {@link SplitRestriction} method. 
*/
   private static class SplitRestrictionFn<InputT, RestrictionT>
-      extends DoFn<KV<InputT, RestrictionT>, KV<InputT, RestrictionT>> {
+      extends DoFn<
+          ElementAndRestriction<InputT, RestrictionT>,
+          ElementAndRestriction<InputT, RestrictionT>> {
     private final DoFn<InputT, ?> splittableFn;
     private transient DoFnInvoker<InputT, ?> invoker;
 
@@ -348,14 +298,14 @@ public class SplittableParDo<InputT, OutputT, 
RestrictionT>
 
     @ProcessElement
     public void processElement(final ProcessContext c) {
-      final InputT element = c.element().getKey();
+      final InputT element = c.element().element();
       invoker.invokeSplitRestriction(
           element,
-          c.element().getValue(),
+          c.element().restriction(),
           new OutputReceiver<RestrictionT>() {
             @Override
             public void output(RestrictionT part) {
-              c.output(KV.of(element, part));
+              c.output(ElementAndRestriction.of(element, part));
             }
           });
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
index 515de57..90e6304 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
@@ -18,9 +18,6 @@
 
 package org.apache.beam.runners.core.construction;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static 
org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
-
 import com.google.auto.service.AutoService;
 import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
@@ -36,8 +33,6 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -62,48 +57,6 @@ public class TestStreamTranslation {
     return builder.build();
   }
 
-  private static TestStream<?> fromProto(
-      RunnerApi.TestStreamPayload testStreamPayload, RunnerApi.Components 
components)
-      throws IOException {
-
-    Coder<Object> coder =
-        (Coder<Object>)
-            CoderTranslation.fromProto(
-                components.getCodersOrThrow(testStreamPayload.getCoderId()), 
components);
-
-    List<TestStream.Event<Object>> events = new ArrayList<>();
-
-    for (RunnerApi.TestStreamPayload.Event event : 
testStreamPayload.getEventsList()) {
-      events.add(fromProto(event, coder));
-    }
-    return TestStream.fromRawEvents(coder, events);
-  }
-
-  /**
-   * Converts an {@link AppliedPTransform}, which may be a rehydrated 
transform or an original
-   * {@link TestStream}, to a {@link TestStream}.
-   */
-  public static <T> TestStream<T> getTestStream(
-      AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, 
PCollection<T>>> application)
-      throws IOException {
-    // For robustness, we don't take this shortcut:
-    // if (application.getTransform() instanceof TestStream) {
-    //   return application.getTransform()
-    // }
-
-    SdkComponents sdkComponents = SdkComponents.create();
-    RunnerApi.PTransform transformProto = 
PTransformTranslation.toProto(application, sdkComponents);
-    checkArgument(
-        TEST_STREAM_TRANSFORM_URN.equals(transformProto.getSpec().getUrn()),
-        "Attempt to get %s from a transform with wrong URN %s",
-        TestStream.class.getSimpleName(),
-        transformProto.getSpec().getUrn());
-    RunnerApi.TestStreamPayload testStreamPayload =
-        
transformProto.getSpec().getParameter().unpack(RunnerApi.TestStreamPayload.class);
-
-    return (TestStream<T>) fromProto(testStreamPayload, 
sdkComponents.toComponents());
-  }
-
   static <T> RunnerApi.TestStreamPayload.Event toProto(TestStream.Event<T> 
event, Coder<T> coder)
       throws IOException {
     switch (event.getType()) {
@@ -177,7 +130,7 @@ public class TestStreamTranslation {
   static class TestStreamTranslator implements 
TransformPayloadTranslator<TestStream<?>> {
     @Override
     public String getUrn(TestStream<?> transform) {
-      return TEST_STREAM_TRANSFORM_URN;
+      return PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformInputs.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformInputs.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformInputs.java
deleted file mode 100644
index 2baf93a..0000000
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformInputs.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core.construction;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.ImmutableList;
-import java.util.Collection;
-import java.util.Map;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/** Utilities for extracting subsets of inputs from an {@link 
AppliedPTransform}. */
-public class TransformInputs {
-  /**
-   * Gets all inputs of the {@link AppliedPTransform} that are not returned by 
{@link
-   * PTransform#getAdditionalInputs()}.
-   */
-  public static Collection<PValue> nonAdditionalInputs(AppliedPTransform<?, ?, 
?> application) {
-    ImmutableList.Builder<PValue> mainInputs = ImmutableList.builder();
-    PTransform<?, ?> transform = application.getTransform();
-    for (Map.Entry<TupleTag<?>, PValue> input : 
application.getInputs().entrySet()) {
-      if (!transform.getAdditionalInputs().containsKey(input.getKey())) {
-        mainInputs.add(input.getValue());
-      }
-    }
-    checkArgument(
-        !mainInputs.build().isEmpty() || application.getInputs().isEmpty(),
-        "Expected at least one main input if any inputs exist");
-    return mainInputs.build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index 1456a3f..718efe7 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -38,7 +38,6 @@ import 
org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.transforms.windowing.Window.OnTimeBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -120,27 +119,6 @@ public class WindowingStrategyTranslation implements 
Serializable {
     }
   }
 
-
-  public static OnTimeBehavior fromProto(RunnerApi.OnTimeBehavior proto) {
-    switch (proto) {
-      case FIRE_ALWAYS:
-        return OnTimeBehavior.FIRE_ALWAYS;
-      case FIRE_IF_NONEMPTY:
-        return OnTimeBehavior.FIRE_IF_NON_EMPTY;
-      case UNRECOGNIZED:
-      default:
-        // Whether or not it is proto that cannot recognize it (due to the 
version of the
-        // generated code we link to) or the switch hasn't been updated to 
handle it,
-        // the situation is the same: we don't know what this OutputTime means
-        throw new IllegalArgumentException(
-            String.format(
-                "Cannot convert unknown %s to %s: %s",
-                RunnerApi.OnTimeBehavior.class.getCanonicalName(),
-                OnTimeBehavior.class.getCanonicalName(),
-                proto));
-    }
-  }
-
   public static RunnerApi.OutputTime toProto(TimestampCombiner 
timestampCombiner) {
     switch(timestampCombiner) {
       case EARLIEST:
@@ -307,7 +285,6 @@ public class WindowingStrategyTranslation implements 
Serializable {
             
.setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
             
.setTrigger(TriggerTranslation.toProto(windowingStrategy.getTrigger()))
             .setWindowFn(windowFnSpec)
-            
.setAssignsToOneWindow(windowingStrategy.getWindowFn().assignsToOneWindow())
             .setWindowCoderId(
                 
components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
 
@@ -346,15 +323,13 @@ public class WindowingStrategyTranslation implements 
Serializable {
     Trigger trigger = TriggerTranslation.fromProto(proto.getTrigger());
     ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior());
     Duration allowedLateness = Duration.millis(proto.getAllowedLateness());
-    OnTimeBehavior onTimeBehavior = fromProto(proto.getOnTimeBehavior());
 
     return WindowingStrategy.of(windowFn)
         .withAllowedLateness(allowedLateness)
         .withMode(accumulationMode)
         .withTrigger(trigger)
         .withTimestampCombiner(timestampCombiner)
-        .withClosingBehavior(closingBehavior)
-        .withOnTimeBehavior(onTimeBehavior);
+        .withClosingBehavior(closingBehavior);
   }
 
   public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec)

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index b1d2da4..99b77ef 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -26,7 +26,6 @@ import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.BytesValue;
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.Collections;
 import java.util.Map;
 import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
@@ -38,7 +37,6 @@ import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -53,45 +51,32 @@ public class WriteFilesTranslation {
   public static final String CUSTOM_JAVA_FILE_BASED_SINK_URN =
       "urn:beam:file_based_sink:javasdk:0.1";
 
-  public static final String CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN =
-      "urn:beam:file_based_sink_format_function:javasdk:0.1";
-
   @VisibleForTesting
-  static WriteFilesPayload toProto(WriteFiles<?, ?, ?> transform) {
+  static WriteFilesPayload toProto(WriteFiles<?> transform) {
     return WriteFilesPayload.newBuilder()
         .setSink(toProto(transform.getSink()))
-        .setFormatFunction(toProto(transform.getFormatFunction()))
         .setWindowedWrites(transform.isWindowedWrites())
         .setRunnerDeterminedSharding(
             transform.getNumShards() == null && transform.getSharding() == 
null)
         .build();
   }
 
-  private static SdkFunctionSpec toProto(FileBasedSink<?, ?> sink) {
-    return toProto(CUSTOM_JAVA_FILE_BASED_SINK_URN, sink);
-  }
-
-  private static SdkFunctionSpec toProto(SerializableFunction<?, ?> 
serializableFunction) {
-    return toProto(CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN, 
serializableFunction);
-  }
-
-  private static SdkFunctionSpec toProto(String urn, Serializable 
serializable) {
+  private static SdkFunctionSpec toProto(FileBasedSink<?> sink) {
     return SdkFunctionSpec.newBuilder()
         .setSpec(
             FunctionSpec.newBuilder()
-                .setUrn(urn)
+                .setUrn(CUSTOM_JAVA_FILE_BASED_SINK_URN)
                 .setParameter(
                     Any.pack(
                         BytesValue.newBuilder()
                             .setValue(
-                                ByteString.copyFrom(
-                                    
SerializableUtils.serializeToByteArray(serializable)))
+                                
ByteString.copyFrom(SerializableUtils.serializeToByteArray(sink)))
                             .build())))
         .build();
   }
 
   @VisibleForTesting
-  static FileBasedSink<?, ?> sinkFromProto(SdkFunctionSpec sinkProto) throws 
IOException {
+  static FileBasedSink<?> sinkFromProto(SdkFunctionSpec sinkProto) throws 
IOException {
     checkArgument(
         sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_URN),
         "Cannot extract %s instance from %s with URN %s",
@@ -102,44 +87,16 @@ public class WriteFilesTranslation {
     byte[] serializedSink =
         
sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
 
-    return (FileBasedSink<?, ?>)
+    return (FileBasedSink<?>)
         SerializableUtils.deserializeFromByteArray(
             serializedSink, FileBasedSink.class.getSimpleName());
   }
 
-  @VisibleForTesting
-  static <InputT, OutputT> SerializableFunction<InputT, OutputT> 
formatFunctionFromProto(
-      SdkFunctionSpec sinkProto) throws IOException {
-    checkArgument(
-        
sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN),
-        "Cannot extract %s instance from %s with URN %s",
-        SerializableFunction.class.getSimpleName(),
-        FunctionSpec.class.getSimpleName(),
-        sinkProto.getSpec().getUrn());
-
-    byte[] serializedFunction =
-        
sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
-
-    return (SerializableFunction<InputT, OutputT>)
-        SerializableUtils.deserializeFromByteArray(
-            serializedFunction, FileBasedSink.class.getSimpleName());
-  }
-
-  public static <UserT, DestinationT, OutputT> FileBasedSink<OutputT, 
DestinationT> getSink(
-      AppliedPTransform<PCollection<UserT>, PDone, ? extends 
PTransform<PCollection<UserT>, PDone>>
-          transform)
-      throws IOException {
-    return (FileBasedSink<OutputT, DestinationT>)
-        sinkFromProto(getWriteFilesPayload(transform).getSink());
-  }
-
-  public static <InputT, OutputT> SerializableFunction<InputT, OutputT> 
getFormatFunction(
-      AppliedPTransform<
-              PCollection<InputT>, PDone, ? extends 
PTransform<PCollection<InputT>, PDone>>
+  public static <T> FileBasedSink<T> getSink(
+      AppliedPTransform<PCollection<T>, PDone, ? extends 
PTransform<PCollection<T>, PDone>>
           transform)
       throws IOException {
-    return formatFunctionFromProto(
-        getWriteFilesPayload(transform).<InputT, OutputT>getFormatFunction());
+    return (FileBasedSink<T>) 
sinkFromProto(getWriteFilesPayload(transform).getSink());
   }
 
   public static <T> boolean isWindowedWrites(
@@ -167,15 +124,15 @@ public class WriteFilesTranslation {
         .unpack(WriteFilesPayload.class);
   }
 
-  static class WriteFilesTranslator implements 
TransformPayloadTranslator<WriteFiles<?, ?, ?>> {
+  static class WriteFilesTranslator implements 
TransformPayloadTranslator<WriteFiles<?>> {
     @Override
-    public String getUrn(WriteFiles<?, ?, ?> transform) {
+    public String getUrn(WriteFiles<?> transform) {
       return PTransformTranslation.WRITE_FILES_TRANSFORM_URN;
     }
 
     @Override
     public FunctionSpec translate(
-        AppliedPTransform<?, ?, WriteFiles<?, ?, ?>> transform, SdkComponents 
components) {
+        AppliedPTransform<?, ?, WriteFiles<?>> transform, SdkComponents 
components) {
       return FunctionSpec.newBuilder()
           .setUrn(getUrn(transform.getTransform()))
           .setParameter(Any.pack(toProto(transform.getTransform())))

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoderTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoderTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoderTest.java
new file mode 100644
index 0000000..051cbaa
--- /dev/null
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ElementAndRestrictionCoderTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+/**
+ * Tests for {@link ElementAndRestrictionCoder}.
+ */
+@RunWith(Parameterized.class)
+public class ElementAndRestrictionCoderTest<K, V> {
+  private static class CoderAndData<T> {
+    Coder<T> coder;
+    List<T> data;
+  }
+
+  private static class AnyCoderAndData {
+    private CoderAndData<?> coderAndData;
+  }
+
+  private static <T> AnyCoderAndData coderAndData(Coder<T> coder, List<T> 
data) {
+    CoderAndData<T> coderAndData = new CoderAndData<>();
+    coderAndData.coder = coder;
+    coderAndData.data = data;
+    AnyCoderAndData res = new AnyCoderAndData();
+    res.coderAndData = coderAndData;
+    return res;
+  }
+
+  private static final List<AnyCoderAndData> TEST_DATA =
+      Arrays.asList(
+          coderAndData(
+              VarIntCoder.of(), Arrays.asList(-1, 0, 1, 13, Integer.MAX_VALUE, 
Integer.MIN_VALUE)),
+          coderAndData(
+              BigEndianLongCoder.of(),
+              Arrays.asList(-1L, 0L, 1L, 13L, Long.MAX_VALUE, Long.MIN_VALUE)),
+          coderAndData(StringUtf8Coder.of(), Arrays.asList("", "hello", 
"goodbye", "1")),
+          coderAndData(
+              ElementAndRestrictionCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of()),
+              Arrays.asList(
+                  ElementAndRestriction.of("", -1),
+                  ElementAndRestriction.of("hello", 0),
+                  ElementAndRestriction.of("goodbye", Integer.MAX_VALUE))),
+          coderAndData(
+              ListCoder.of(VarLongCoder.of()),
+              Arrays.asList(Arrays.asList(1L, 2L, 3L), 
Collections.<Long>emptyList())));
+
+  @Parameterized.Parameters(name = "{index}: keyCoder={0} key={1} 
valueCoder={2} value={3}")
+  public static Collection<Object[]> data() {
+    List<Object[]> parameters = new ArrayList<>();
+    for (AnyCoderAndData keyCoderAndData : TEST_DATA) {
+      Coder keyCoder = keyCoderAndData.coderAndData.coder;
+      for (Object key : keyCoderAndData.coderAndData.data) {
+        for (AnyCoderAndData valueCoderAndData : TEST_DATA) {
+          Coder valueCoder = valueCoderAndData.coderAndData.coder;
+          for (Object value : valueCoderAndData.coderAndData.data) {
+            parameters.add(new Object[] {keyCoder, key, valueCoder, value});
+          }
+        }
+      }
+    }
+    return parameters;
+  }
+
+  @Parameter(0)
+  public Coder<K> keyCoder;
+  @Parameter(1)
+  public K key;
+  @Parameter(2)
+  public Coder<V> valueCoder;
+  @Parameter(3)
+  public V value;
+
+  @Test
+  @SuppressWarnings("rawtypes")
+  public void testDecodeEncodeEqual() throws Exception {
+    CoderProperties.coderDecodeEncodeEqual(
+        ElementAndRestrictionCoder.of(keyCoder, valueCoder),
+        ElementAndRestriction.of(key, value));
+  }
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void encodeNullThrowsCoderException() throws Exception {
+    thrown.expect(CoderException.class);
+    thrown.expectMessage("cannot encode a null ElementAndRestriction");
+
+    CoderUtils.encodeToBase64(
+        ElementAndRestrictionCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), 
null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
index 5c45487..3b94220 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
@@ -113,28 +113,6 @@ public class PCollectionTranslationTest {
 
   @Test
   public void testEncodeDecodeCycle() throws Exception {
-    // Encode
-    SdkComponents sdkComponents = SdkComponents.create();
-    RunnerApi.PCollection protoCollection =
-        PCollectionTranslation.toProto(testCollection, sdkComponents);
-    RunnerApi.Components protoComponents = sdkComponents.toComponents();
-
-    // Decode
-    Pipeline pipeline = Pipeline.create();
-    PCollection<?> decodedCollection =
-        PCollectionTranslation.fromProto(pipeline, protoCollection, 
protoComponents);
-
-    // Verify
-    assertThat(decodedCollection.getCoder(), 
Matchers.<Coder<?>>equalTo(testCollection.getCoder()));
-    assertThat(
-        decodedCollection.getWindowingStrategy(),
-        Matchers.<WindowingStrategy<?, ?>>equalTo(
-            testCollection.getWindowingStrategy().fixDefaults()));
-    assertThat(decodedCollection.isBounded(), 
equalTo(testCollection.isBounded()));
-  }
-
-  @Test
-  public void testEncodeDecodeFields() throws Exception {
     SdkComponents sdkComponents = SdkComponents.create();
     RunnerApi.PCollection protoCollection = PCollectionTranslation
         .toProto(testCollection, sdkComponents);

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 99d3dd1..2497598 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -27,12 +27,9 @@ import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableMap;
 import java.io.Serializable;
 import java.util.Collections;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.DefaultFilenamePolicy;
-import org.apache.beam.sdk.io.DynamicFileDestinations;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.LocalResources;
@@ -56,7 +53,6 @@ import org.apache.beam.sdk.transforms.Materialization;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
@@ -99,14 +95,9 @@ public class PTransformMatchersTest implements Serializable {
     PCollection<KV<String, Integer>> input =
         PCollection.createPrimitiveOutputInternal(
             p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
-    input.setName("dummy input");
-    input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
-
     PCollection<Integer> output =
         PCollection.createPrimitiveOutputInternal(
             p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
-    output.setName("dummy output");
-    output.setCoder(VarIntCoder.of());
 
     return AppliedPTransform.of("pardo", input.expand(), output.expand(), 
pardo, p);
   }
@@ -281,18 +272,6 @@ public class PTransformMatchersTest implements 
Serializable {
   }
 
   @Test
-  public void parDoSplittable() {
-    AppliedPTransform<?, ?, ?> parDoApplication =
-        getAppliedTransform(
-            ParDo.of(splittableDoFn).withOutputTags(new TupleTag<Integer>(), 
TupleTagList.empty()));
-    assertThat(PTransformMatchers.splittableParDo().matches(parDoApplication), 
is(true));
-
-    
assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication),
 is(false));
-    
assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication),
 is(false));
-    
assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication),
 is(false));
-  }
-
-  @Test
   public void parDoMultiWithState() {
     AppliedPTransform<?, ?, ?> parDoApplication =
         getAppliedTransform(
@@ -305,19 +284,6 @@ public class PTransformMatchersTest implements 
Serializable {
   }
 
   @Test
-  public void parDoWithState() {
-    AppliedPTransform<?, ?, ?> statefulApplication =
-        getAppliedTransform(
-            ParDo.of(doFnWithState).withOutputTags(new TupleTag<Integer>(), 
TupleTagList.empty()));
-    
assertThat(PTransformMatchers.stateOrTimerParDo().matches(statefulApplication), 
is(true));
-
-    AppliedPTransform<?, ?, ?> splittableApplication =
-        getAppliedTransform(
-            ParDo.of(splittableDoFn).withOutputTags(new TupleTag<Integer>(), 
TupleTagList.empty()));
-    
assertThat(PTransformMatchers.stateOrTimerParDo().matches(splittableApplication),
 is(false));
-  }
-
-  @Test
   public void parDoMultiWithTimers() {
     AppliedPTransform<?, ?, ?> parDoApplication =
         getAppliedTransform(
@@ -539,32 +505,30 @@ public class PTransformMatchersTest implements 
Serializable {
   public void writeWithRunnerDeterminedSharding() {
     ResourceId outputDirectory = LocalResources.fromString("/foo/bar", true /* 
isDirectory */);
     FilenamePolicy policy =
-        DefaultFilenamePolicy.fromStandardParameters(
+        DefaultFilenamePolicy.constructUsingStandardParameters(
             StaticValueProvider.of(outputDirectory),
             DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE,
             "",
             false);
-    WriteFiles<Integer, Void, Integer> write =
+    WriteFiles<Integer> write =
         WriteFiles.to(
-            new FileBasedSink<Integer, Void>(
-                StaticValueProvider.of(outputDirectory), 
DynamicFileDestinations.constant(null)) {
+            new 
FileBasedSink<Integer>(StaticValueProvider.of(outputDirectory), policy) {
               @Override
-              public WriteOperation<Integer, Void> createWriteOperation() {
+              public WriteOperation<Integer> createWriteOperation() {
                 return null;
               }
-            },
-            SerializableFunctions.<Integer>identity());
+            });
     assertThat(
         
PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)),
         is(true));
 
-    WriteFiles<Integer, Void, Integer> withStaticSharding = 
write.withNumShards(3);
+    WriteFiles<Integer> withStaticSharding = write.withNumShards(3);
     assertThat(
         PTransformMatchers.writeWithRunnerDeterminedSharding()
             .matches(appliedWrite(withStaticSharding)),
         is(false));
 
-    WriteFiles<Integer, Void, Integer> withCustomSharding =
+    WriteFiles<Integer> withCustomSharding =
         write.withSharding(Sum.integersGlobally().asSingletonView());
     assertThat(
         PTransformMatchers.writeWithRunnerDeterminedSharding()
@@ -572,8 +536,8 @@ public class PTransformMatchersTest implements Serializable 
{
         is(false));
   }
 
-  private AppliedPTransform<?, ?, ?> appliedWrite(WriteFiles<Integer, Void, 
Integer> write) {
-    return AppliedPTransform.<PCollection<Integer>, PDone, WriteFiles<Integer, 
Void, Integer>>of(
+  private AppliedPTransform<?, ?, ?> appliedWrite(WriteFiles<Integer> write) {
+    return AppliedPTransform.<PCollection<Integer>, PDone, 
WriteFiles<Integer>>of(
         "WriteFiles",
         Collections.<TupleTag<?>, PValue>emptyMap(),
         Collections.<TupleTag<?>, PValue>emptyMap(),

Reply via email to