http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
index a87a16d..a8490bf 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
@@ -23,9 +23,9 @@ import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -143,30 +143,22 @@ public class ParDoTranslationTest {
       inputs.putAll(parDo.getAdditionalInputs());
       PCollectionTuple output = mainInput.apply(parDo);
 
-      SdkComponents sdkComponents = SdkComponents.create();
-
-      // Encode
-      RunnerApi.PTransform protoTransform =
-          PTransformTranslation.toProto(
+      SdkComponents components = SdkComponents.create();
+      String transformId =
+          components.registerPTransform(
               AppliedPTransform.<PCollection<KV<Long, String>>, 
PCollection<Void>, MultiOutput>of(
                   "foo", inputs, output.expand(), parDo, p),
-              sdkComponents);
-      Components protoComponents = sdkComponents.toComponents();
-
-      // Decode
-      Pipeline rehydratedPipeline = Pipeline.create();
+              Collections.<AppliedPTransform<?, ?, ?>>emptyList());
 
+      Components protoComponents = components.toComponents();
+      RunnerApi.PTransform protoTransform = 
protoComponents.getTransformsOrThrow(transformId);
       ParDoPayload parDoPayload =
           protoTransform.getSpec().getParameter().unpack(ParDoPayload.class);
       for (PCollectionView<?> view : parDo.getSideInputs()) {
         SideInput sideInput = 
parDoPayload.getSideInputsOrThrow(view.getTagInternal().getId());
         PCollectionView<?> restoredView =
-            ParDoTranslation.viewFromProto(
-                sideInput,
-                view.getTagInternal().getId(),
-                view.getPCollection(),
-                protoTransform,
-                protoComponents);
+            ParDoTranslation.fromProto(
+                sideInput, view.getTagInternal().getId(), protoTransform, 
protoComponents);
         assertThat(restoredView.getTagInternal(), 
equalTo(view.getTagInternal()));
         assertThat(restoredView.getViewFn(), 
instanceOf(view.getViewFn().getClass()));
         assertThat(
@@ -177,7 +169,7 @@ public class ParDoTranslationTest {
                 view.getWindowingStrategyInternal().fixDefaults()));
         assertThat(restoredView.getCoderInternal(), 
equalTo(view.getCoderInternal()));
       }
-      String mainInputId = sdkComponents.registerPCollection(mainInput);
+      String mainInputId = components.registerPCollection(mainInput);
       assertThat(
           ParDoTranslation.getMainInput(protoTransform, protoComponents),
           equalTo(protoComponents.getPcollectionsOrThrow(mainInputId)));

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
index 267232c..6e4d6c4 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.core.construction;
 
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
 import static org.junit.Assert.assertEquals;
 
 import java.io.Serializable;
@@ -25,6 +24,8 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -69,6 +70,7 @@ public class SplittableParDoTest {
     public void checkDone() {}
   }
 
+  @BoundedPerElement
   private static class BoundedFakeFn extends DoFn<Integer, String> {
     @ProcessElement
     public void processElement(ProcessContext context, SomeRestrictionTracker 
tracker) {}
@@ -79,12 +81,10 @@ public class SplittableParDoTest {
     }
   }
 
+  @UnboundedPerElement
   private static class UnboundedFakeFn extends DoFn<Integer, String> {
     @ProcessElement
-    public ProcessContinuation processElement(
-        ProcessContext context, SomeRestrictionTracker tracker) {
-      return stop();
-    }
+    public void processElement(ProcessContext context, SomeRestrictionTracker 
tracker) {}
 
     @GetInitialRestriction
     public SomeRestriction getInitialRestriction(Integer element) {
@@ -122,14 +122,14 @@ public class SplittableParDoTest {
         "Applying a bounded SDF to a bounded collection produces a bounded 
collection",
         PCollection.IsBounded.BOUNDED,
         makeBoundedCollection(pipeline)
-            .apply("bounded to bounded", 
SplittableParDo.forJavaParDo(makeParDo(boundedFn)))
+            .apply("bounded to bounded", new 
SplittableParDo<>(makeParDo(boundedFn)))
             .get(MAIN_OUTPUT_TAG)
             .isBounded());
     assertEquals(
         "Applying a bounded SDF to an unbounded collection produces an 
unbounded collection",
         PCollection.IsBounded.UNBOUNDED,
         makeUnboundedCollection(pipeline)
-            .apply("bounded to unbounded", 
SplittableParDo.forJavaParDo(makeParDo(boundedFn)))
+            .apply("bounded to unbounded", new 
SplittableParDo<>(makeParDo(boundedFn)))
             .get(MAIN_OUTPUT_TAG)
             .isBounded());
   }
@@ -143,14 +143,14 @@ public class SplittableParDoTest {
         "Applying an unbounded SDF to a bounded collection produces a bounded 
collection",
         PCollection.IsBounded.UNBOUNDED,
         makeBoundedCollection(pipeline)
-            .apply("unbounded to bounded", 
SplittableParDo.forJavaParDo(makeParDo(unboundedFn)))
+            .apply("unbounded to bounded", new 
SplittableParDo<>(makeParDo(unboundedFn)))
             .get(MAIN_OUTPUT_TAG)
             .isBounded());
     assertEquals(
         "Applying an unbounded SDF to an unbounded collection produces an 
unbounded collection",
         PCollection.IsBounded.UNBOUNDED,
         makeUnboundedCollection(pipeline)
-            .apply("unbounded to unbounded", 
SplittableParDo.forJavaParDo(makeParDo(unboundedFn)))
+            .apply("unbounded to unbounded", new 
SplittableParDo<>(makeParDo(unboundedFn)))
             .get(MAIN_OUTPUT_TAG)
             .isBounded());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformInputsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformInputsTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformInputsTest.java
deleted file mode 100644
index f5b2c11..0000000
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformInputsTest.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core.construction;
-
-import static org.junit.Assert.assertThat;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.hamcrest.Matchers;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link TransformInputs}. */
-@RunWith(JUnit4.class)
-public class TransformInputsTest {
-  @Rule public TestPipeline pipeline = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void nonAdditionalInputsWithNoInputSucceeds() {
-    AppliedPTransform<PInput, POutput, TestTransform> transform =
-        AppliedPTransform.of(
-            "input-free",
-            Collections.<TupleTag<?>, PValue>emptyMap(),
-            Collections.<TupleTag<?>, PValue>emptyMap(),
-            new TestTransform(),
-            pipeline);
-
-    assertThat(TransformInputs.nonAdditionalInputs(transform), 
Matchers.<PValue>empty());
-  }
-
-  @Test
-  public void nonAdditionalInputsWithOneMainInputSucceeds() {
-    PCollection<Long> input = pipeline.apply(GenerateSequence.from(1L));
-    AppliedPTransform<PInput, POutput, TestTransform> transform =
-        AppliedPTransform.of(
-            "input-single",
-            Collections.<TupleTag<?>, PValue>singletonMap(new TupleTag<Long>() 
{}, input),
-            Collections.<TupleTag<?>, PValue>emptyMap(),
-            new TestTransform(),
-            pipeline);
-
-    assertThat(
-        TransformInputs.nonAdditionalInputs(transform), 
Matchers.<PValue>containsInAnyOrder(input));
-  }
-
-  @Test
-  public void nonAdditionalInputsWithMultipleNonAdditionalInputsSucceeds() {
-    Map<TupleTag<?>, PValue> allInputs = new HashMap<>();
-    PCollection<Integer> mainInts = pipeline.apply("MainInput", Create.of(12, 
3));
-    allInputs.put(new TupleTag<Integer>() {}, mainInts);
-    PCollection<Void> voids = pipeline.apply("VoidInput", 
Create.empty(VoidCoder.of()));
-    allInputs.put(new TupleTag<Void>() {}, voids);
-    AppliedPTransform<PInput, POutput, TestTransform> transform =
-        AppliedPTransform.of(
-            "additional-free",
-            allInputs,
-            Collections.<TupleTag<?>, PValue>emptyMap(),
-            new TestTransform(),
-            pipeline);
-
-    assertThat(
-        TransformInputs.nonAdditionalInputs(transform),
-        Matchers.<PValue>containsInAnyOrder(voids, mainInts));
-  }
-
-  @Test
-  public void nonAdditionalInputsWithAdditionalInputsSucceeds() {
-    Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>();
-    additionalInputs.put(new TupleTag<String>() {}, 
pipeline.apply(Create.of("1, 2", "3")));
-    additionalInputs.put(new TupleTag<Long>() {}, 
pipeline.apply(GenerateSequence.from(3L)));
-
-    Map<TupleTag<?>, PValue> allInputs = new HashMap<>();
-    PCollection<Integer> mainInts = pipeline.apply("MainInput", Create.of(12, 
3));
-    allInputs.put(new TupleTag<Integer>() {}, mainInts);
-    PCollection<Void> voids = pipeline.apply("VoidInput", 
Create.empty(VoidCoder.of()));
-    allInputs.put(
-        new TupleTag<Void>() {}, voids);
-    allInputs.putAll(additionalInputs);
-
-    AppliedPTransform<PInput, POutput, TestTransform> transform =
-        AppliedPTransform.of(
-            "additional",
-            allInputs,
-            Collections.<TupleTag<?>, PValue>emptyMap(),
-            new TestTransform(additionalInputs),
-            pipeline);
-
-    assertThat(
-        TransformInputs.nonAdditionalInputs(transform),
-        Matchers.<PValue>containsInAnyOrder(mainInts, voids));
-  }
-
-  @Test
-  public void nonAdditionalInputsWithOnlyAdditionalInputsThrows() {
-    Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>();
-    additionalInputs.put(new TupleTag<String>() {}, 
pipeline.apply(Create.of("1, 2", "3")));
-    additionalInputs.put(new TupleTag<Long>() {}, 
pipeline.apply(GenerateSequence.from(3L)));
-
-    AppliedPTransform<PInput, POutput, TestTransform> transform =
-        AppliedPTransform.of(
-            "additional-only",
-            additionalInputs,
-            Collections.<TupleTag<?>, PValue>emptyMap(),
-            new TestTransform(additionalInputs),
-            pipeline);
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("at least one");
-    TransformInputs.nonAdditionalInputs(transform);
-  }
-
-  private static class TestTransform extends PTransform<PInput, POutput> {
-    private final Map<TupleTag<?>, PValue> additionalInputs;
-
-    private TestTransform() {
-      this(Collections.<TupleTag<?>, PValue>emptyMap());
-    }
-
-    private TestTransform(Map<TupleTag<?>, PValue> additionalInputs) {
-      this.additionalInputs = additionalInputs;
-    }
-
-    @Override
-    public POutput expand(PInput input) {
-      return PDone.in(input.getPipeline());
-    }
-
-    @Override
-    public Map<TupleTag<?>, PValue> getAdditionalInputs() {
-      return additionalInputs;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
index 7a57fd7..e406545 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
@@ -116,8 +116,5 @@ public class WindowingStrategyTranslationTest {
 
     protoComponents.getCodersOrThrow(
         
components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
-    assertThat(
-        proto.getAssignsToOneWindow(),
-        equalTo(windowingStrategy.getWindowFn().assignsToOneWindow()));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
index 283df16..739034c 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
@@ -26,10 +26,8 @@ import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
-import org.apache.beam.sdk.io.DynamicFileDestinations;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
-import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.io.fs.ResourceId;
@@ -38,8 +36,6 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.junit.Test;
@@ -60,17 +56,16 @@ public class WriteFilesTranslationTest {
   @RunWith(Parameterized.class)
   public static class TestWriteFilesPayloadTranslation {
     @Parameters(name = "{index}: {0}")
-    public static Iterable<WriteFiles<Object, Void, Object>> data() {
-      SerializableFunction<Object, Object> format = 
SerializableFunctions.constant(null);
-      return ImmutableList.of(
-          WriteFiles.to(new DummySink(), format),
-          WriteFiles.to(new DummySink(), format).withWindowedWrites(),
-          WriteFiles.to(new DummySink(), format).withNumShards(17),
-          WriteFiles.to(new DummySink(), 
format).withWindowedWrites().withNumShards(42));
+    public static Iterable<WriteFiles<?>> data() {
+      return ImmutableList.<WriteFiles<?>>of(
+          WriteFiles.to(new DummySink()),
+          WriteFiles.to(new DummySink()).withWindowedWrites(),
+          WriteFiles.to(new DummySink()).withNumShards(17),
+          WriteFiles.to(new 
DummySink()).withWindowedWrites().withNumShards(42));
     }
 
     @Parameter(0)
-    public WriteFiles<String, Void, String> writeFiles;
+    public WriteFiles<String> writeFiles;
 
     public static TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
 
@@ -85,7 +80,7 @@ public class WriteFilesTranslationTest {
       assertThat(payload.getWindowedWrites(), 
equalTo(writeFiles.isWindowedWrites()));
 
       assertThat(
-          (FileBasedSink<String, Void>) 
WriteFilesTranslation.sinkFromProto(payload.getSink()),
+          (FileBasedSink<String>) 
WriteFilesTranslation.sinkFromProto(payload.getSink()),
           equalTo(writeFiles.getSink()));
     }
 
@@ -94,9 +89,9 @@ public class WriteFilesTranslationTest {
       PCollection<String> input = p.apply(Create.of("hello"));
       PDone output = input.apply(writeFiles);
 
-      AppliedPTransform<PCollection<String>, PDone, WriteFiles<String, Void, 
String>>
-          appliedPTransform =
-              AppliedPTransform.of("foo", input.expand(), output.expand(), 
writeFiles, p);
+      AppliedPTransform<PCollection<String>, PDone, WriteFiles<String>> 
appliedPTransform =
+          AppliedPTransform.<PCollection<String>, PDone, WriteFiles<String>>of(
+              "foo", input.expand(), output.expand(), writeFiles, p);
 
       assertThat(
           WriteFilesTranslation.isRunnerDeterminedSharding(appliedPTransform),
@@ -106,9 +101,7 @@ public class WriteFilesTranslationTest {
           WriteFilesTranslation.isWindowedWrites(appliedPTransform),
           equalTo(writeFiles.isWindowedWrites()));
 
-      assertThat(
-          WriteFilesTranslation.<String, Void, 
String>getSink(appliedPTransform),
-          equalTo(writeFiles.getSink()));
+      assertThat(WriteFilesTranslation.getSink(appliedPTransform), 
equalTo(writeFiles.getSink()));
     }
   }
 
@@ -116,16 +109,16 @@ public class WriteFilesTranslationTest {
    * A simple {@link FileBasedSink} for testing serialization/deserialization. 
Not mocked to avoid
    * any issues serializing mocks.
    */
-  private static class DummySink extends FileBasedSink<Object, Void> {
+  private static class DummySink extends FileBasedSink<String> {
 
     DummySink() {
       super(
           StaticValueProvider.of(FileSystems.matchNewResource("nowhere", 
false)),
-          DynamicFileDestinations.constant(new DummyFilenamePolicy()));
+          new DummyFilenamePolicy());
     }
 
     @Override
-    public WriteOperation<Object, Void> createWriteOperation() {
+    public WriteOperation<String> createWriteOperation() {
       return new DummyWriteOperation(this);
     }
 
@@ -137,39 +130,46 @@ public class WriteFilesTranslationTest {
 
       DummySink that = (DummySink) other;
 
-      return getTempDirectoryProvider().isAccessible()
-          && that.getTempDirectoryProvider().isAccessible()
-          && 
getTempDirectoryProvider().get().equals(that.getTempDirectoryProvider().get());
+      return getFilenamePolicy().equals(((DummySink) 
other).getFilenamePolicy())
+          && getBaseOutputDirectoryProvider().isAccessible()
+          && that.getBaseOutputDirectoryProvider().isAccessible()
+          && getBaseOutputDirectoryProvider()
+              .get()
+              .equals(that.getBaseOutputDirectoryProvider().get());
     }
 
     @Override
     public int hashCode() {
       return Objects.hash(
           DummySink.class,
-          getTempDirectoryProvider().isAccessible() ? 
getTempDirectoryProvider().get() : null);
+          getFilenamePolicy(),
+          getBaseOutputDirectoryProvider().isAccessible()
+              ? getBaseOutputDirectoryProvider().get()
+              : null);
     }
   }
 
-  private static class DummyWriteOperation extends 
FileBasedSink.WriteOperation<Object, Void> {
-    public DummyWriteOperation(FileBasedSink<Object, Void> sink) {
+  private static class DummyWriteOperation extends 
FileBasedSink.WriteOperation<String> {
+    public DummyWriteOperation(FileBasedSink<String> sink) {
       super(sink);
     }
 
     @Override
-    public FileBasedSink.Writer<Object, Void> createWriter() throws Exception {
+    public FileBasedSink.Writer<String> createWriter() throws Exception {
       throw new UnsupportedOperationException("Should never be called.");
     }
   }
 
   private static class DummyFilenamePolicy extends FilenamePolicy {
     @Override
-    public ResourceId windowedFilename(WindowedContext c, OutputFileHints 
outputFileHints) {
+    public ResourceId windowedFilename(
+        ResourceId outputDirectory, WindowedContext c, String extension) {
       throw new UnsupportedOperationException("Should never be called.");
     }
 
     @Nullable
     @Override
-    public ResourceId unwindowedFilename(Context c, OutputFileHints 
outputFileHints) {
+    public ResourceId unwindowedFilename(ResourceId outputDirectory, Context 
c, String extension) {
       throw new UnsupportedOperationException("Should never be called.");
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index 8c8e599..c3a8d25 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 28938c1..1cf1509 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -134,27 +134,26 @@ public class LateDataDroppingDoFnRunner<K, InputT, 
OutputT, W extends BoundedWin
           // The element is too late for this window.
           droppedDueToLateness.inc();
           WindowTracing.debug(
-              "{}: Dropping element at {} for key:{}; window:{} "
-                  + "since too far behind inputWatermark:{}; 
outputWatermark:{}",
-              LateDataFilter.class.getSimpleName(),
-              input.getTimestamp(),
-              key,
-              window,
-              timerInternals.currentInputWatermarkTime(),
+              "ReduceFnRunner.processElement: Dropping element at {} for 
key:{}; window:{} "
+              + "since too far behind inputWatermark:{}; outputWatermark:{}",
+              input.getTimestamp(), key, window, 
timerInternals.currentInputWatermarkTime(),
               timerInternals.currentOutputWatermarkTime());
         }
       }
 
-      Iterable<WindowedValue<InputT>> nonLateElements =
-          Iterables.filter(
-              concatElements,
-              new Predicate<WindowedValue<InputT>>() {
-                @Override
-                public boolean apply(WindowedValue<InputT> input) {
-                  BoundedWindow window = 
Iterables.getOnlyElement(input.getWindows());
-                  return !canDropDueToExpiredWindow(window);
-                }
-              });
+      Iterable<WindowedValue<InputT>> nonLateElements = Iterables.filter(
+          concatElements,
+          new Predicate<WindowedValue<InputT>>() {
+            @Override
+            public boolean apply(WindowedValue<InputT> input) {
+              BoundedWindow window = 
Iterables.getOnlyElement(input.getWindows());
+              if (canDropDueToExpiredWindow(window)) {
+                return false;
+              } else {
+                return true;
+              }
+            }
+          });
       return nonLateElements;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index 0c956d5..2db6531 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -96,7 +96,7 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvoker<
       final WindowedValue<InputT> element,
       final TrackerT tracker) {
     final ProcessContext processContext = new ProcessContext(element, tracker);
-    DoFn.ProcessContinuation cont = invoker.invokeProcessElement(
+    invoker.invokeProcessElement(
         new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
           @Override
           public DoFn<InputT, OutputT>.ProcessContext processContext(
@@ -118,11 +118,6 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvoker<
           }
 
           @Override
-          public PipelineOptions pipelineOptions() {
-            return pipelineOptions;
-          }
-
-          @Override
           public StartBundleContext startBundleContext(DoFn<InputT, OutputT> 
doFn) {
             throw new IllegalStateException(
                 "Should not access startBundleContext() from @"
@@ -155,37 +150,10 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvoker<
                 "Access to timers not supported in Splittable DoFn");
           }
         });
-    // TODO: verify that if there was a failed tryClaim() call, then 
cont.shouldResume() is false.
-    // Currently we can't verify this because there are no hooks into 
tryClaim().
-    // See https://issues.apache.org/jira/browse/BEAM-2607
-    RestrictionT residual = processContext.extractCheckpoint();
-    if (cont.shouldResume()) {
-      if (residual == null) {
-        // No checkpoint had been taken by the runner while the ProcessElement 
call ran, however
-        // the call says that not the whole restriction has been processed. So 
we need to take
-        // a checkpoint now: checkpoint() guarantees that the primary 
restriction describes exactly
-        // the work that was done in the current ProcessElement call, and 
returns a residual
-        // restriction that describes exactly the work that wasn't done in the 
current call.
-        residual = tracker.checkpoint();
-      } else {
-        // A checkpoint was taken by the runner, and then the ProcessElement 
call returned resume()
-        // without making more tryClaim() calls (since no tryClaim() calls can 
succeed after
-        // checkpoint(), and since if it had made a failed tryClaim() call, it 
should have returned
-        // stop()).
-        // This means that the resulting primary restriction and the taken 
checkpoint already
-        // accurately describe respectively the work that was and wasn't done 
in the current
-        // ProcessElement call.
-        // In other words, if we took a checkpoint *after* ProcessElement 
completed (like in the
-        // branch above), it would have been equivalent to this one.
-      }
-    } else {
-      // The ProcessElement call returned stop() - that means the tracker's 
current restriction
-      // has been fully processed by the call. A checkpoint may or may not 
have been taken in
-      // "residual"; if it was, then we'll need to process it; if no, then we 
don't - nothing
-      // special needs to be done.
-    }
+
     tracker.checkDone();
-    return new Result(residual, cont, 
processContext.getLastReportedWatermark());
+    return new Result(
+        processContext.extractCheckpoint(), 
processContext.getLastReportedWatermark());
   }
 
   private class ProcessContext extends DoFn<InputT, OutputT>.ProcessContext {

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
index 88275d6..31e86bd 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
@@ -24,11 +24,11 @@ import java.util.Collection;
 import java.util.Collections;
 import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.construction.ElementAndRestriction;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.joda.time.Instant;
 
@@ -38,13 +38,16 @@ import org.joda.time.Instant;
  */
 public class ProcessFnRunner<InputT, OutputT, RestrictionT>
     implements PushbackSideInputDoFnRunner<
-        KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> {
-  private final DoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, 
OutputT> underlying;
+        KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, 
OutputT> {
+  private final DoFnRunner<
+          KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, 
OutputT>
+      underlying;
   private final Collection<PCollectionView<?>> views;
   private final ReadyCheckingSideInputReader sideInputReader;
 
   ProcessFnRunner(
-      DoFnRunner<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> 
underlying,
+      DoFnRunner<KeyedWorkItem<String, ElementAndRestriction<InputT, 
RestrictionT>>, OutputT>
+          underlying,
       Collection<PCollectionView<?>> views,
       ReadyCheckingSideInputReader sideInputReader) {
     this.underlying = underlying;
@@ -58,9 +61,10 @@ public class ProcessFnRunner<InputT, OutputT, RestrictionT>
   }
 
   @Override
-  public Iterable<WindowedValue<KeyedWorkItem<String, KV<InputT, 
RestrictionT>>>>
+  public Iterable<WindowedValue<KeyedWorkItem<String, 
ElementAndRestriction<InputT, RestrictionT>>>>
       processElementInReadyWindows(
-          WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>> 
windowedKWI) {
+          WindowedValue<KeyedWorkItem<String, ElementAndRestriction<InputT, 
RestrictionT>>>
+              windowedKWI) {
     checkTrivialOuterWindows(windowedKWI);
     BoundedWindow window = getUnderlyingWindow(windowedKWI.getValue());
     if (!isReady(window)) {

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 634a2d1..62d519f 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -29,6 +29,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -50,7 +51,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowTracing;
@@ -637,9 +637,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
   }
 
   /**
-   * A descriptor of the activation for a window based on a timer.
+   * Enriches TimerData with state necessary for processing a timer as well as
+   * common queries about a timer.
    */
-  private class WindowActivation {
+  private class EnrichedTimerData {
+    public final Instant timestamp;
     public final ReduceFn<K, InputT, OutputT, W>.Context directContext;
     public final ReduceFn<K, InputT, OutputT, W>.Context renamedContext;
     // If this is an end-of-window timer then we may need to set a garbage 
collection timer
@@ -650,34 +652,18 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
     // end-of-window time to be a signal to garbage collect.
     public final boolean isGarbageCollection;
 
-    WindowActivation(
+    EnrichedTimerData(
+        TimerData timer,
         ReduceFn<K, InputT, OutputT, W>.Context directContext,
         ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
+      this.timestamp = timer.getTimestamp();
       this.directContext = directContext;
       this.renamedContext = renamedContext;
       W window = directContext.window();
-
-      // The output watermark is before the end of the window if it is either 
unknown
-      // or it is known to be before it. If it is unknown, that means that 
there hasn't been
-      // enough data to advance it.
-      boolean outputWatermarkBeforeEOW =
-              timerInternals.currentOutputWatermarkTime() == null
-          || 
!timerInternals.currentOutputWatermarkTime().isAfter(window.maxTimestamp());
-
-      // The "end of the window" is reached when the local input watermark 
(for this key) surpasses
-      // it but the local output watermark (also for this key) has not. After 
data is emitted and
-      // the output watermark hold is released, the output watermark on this 
key will immediately
-      // exceed the end of the window (otherwise we could see multiple ON_TIME 
outputs)
-      this.isEndOfWindow =
-          
timerInternals.currentInputWatermarkTime().isAfter(window.maxTimestamp())
-              && outputWatermarkBeforeEOW;
-
-      // The "GC time" is reached when the input watermark surpasses the end 
of the window
-      // plus allowed lateness. After this, the window is expired and expunged.
-      this.isGarbageCollection =
-          timerInternals
-              .currentInputWatermarkTime()
-              .isAfter(LateDataUtils.garbageCollectionTime(window, 
windowingStrategy));
+      this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
+          && timer.getTimestamp().equals(window.maxTimestamp());
+      Instant cleanupTime = LateDataUtils.garbageCollectionTime(window, 
windowingStrategy);
+      this.isGarbageCollection = !timer.getTimestamp().isBefore(cleanupTime);
     }
 
     // Has this window had its trigger finish?
@@ -696,47 +682,24 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
       return;
     }
 
-    // Create a reusable context for each window and begin prefetching 
necessary
+    // Create a reusable context for each timer and begin prefetching necessary
     // state.
-    Map<BoundedWindow, WindowActivation> windowActivations = new HashMap();
-
+    List<EnrichedTimerData> enrichedTimers = new LinkedList();
     for (TimerData timer : timers) {
       checkArgument(timer.getNamespace() instanceof WindowNamespace,
           "Expected timer to be in WindowNamespace, but was in %s", 
timer.getNamespace());
       @SuppressWarnings("unchecked")
         WindowNamespace<W> windowNamespace = (WindowNamespace<W>) 
timer.getNamespace();
       W window = windowNamespace.getWindow();
-
-      WindowTracing.debug("{}: Received timer key:{}; window:{}; data:{} with "
-              + "inputWatermark:{}; outputWatermark:{}",
-          ReduceFnRunner.class.getSimpleName(),
-          key, window, timer,
-          timerInternals.currentInputWatermarkTime(),
-          timerInternals.currentOutputWatermarkTime());
-
-      // Processing time timers for an expired window are ignored, just like 
elements
-      // that show up too late. Window GC is management by an event time timer
-      if (TimeDomain.EVENT_TIME != timer.getDomain() && 
windowIsExpired(window)) {
-        continue;
-      }
-
-      // How a window is processed is a function only of the current state, 
not the details
-      // of the timer. This makes us robust to large leaps in processing time 
and watermark
-      // time, where both EOW and GC timers come in together and we need to GC 
and emit
-      // the final pane.
-      if (windowActivations.containsKey(window)) {
-        continue;
-      }
-
       ReduceFn<K, InputT, OutputT, W>.Context directContext =
           contextFactory.base(window, StateStyle.DIRECT);
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
           contextFactory.base(window, StateStyle.RENAMED);
-      WindowActivation windowActivation = new WindowActivation(directContext, 
renamedContext);
-      windowActivations.put(window, windowActivation);
+      EnrichedTimerData enrichedTimer = new EnrichedTimerData(timer, 
directContext, renamedContext);
+      enrichedTimers.add(enrichedTimer);
 
       // Perform prefetching of state to determine if the trigger should fire.
-      if (windowActivation.isGarbageCollection) {
+      if (enrichedTimer.isGarbageCollection) {
         triggerRunner.prefetchIsClosed(directContext.state());
       } else {
         triggerRunner.prefetchShouldFire(directContext.window(), 
directContext.state());
@@ -744,7 +707,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
     }
 
     // For those windows that are active and open, prefetch the triggering or 
emitting state.
-    for (WindowActivation timer : windowActivations.values()) {
+    for (EnrichedTimerData timer : enrichedTimers) {
       if (timer.windowIsActiveAndOpen()) {
         ReduceFn<K, InputT, OutputT, W>.Context directContext = 
timer.directContext;
         if (timer.isGarbageCollection) {
@@ -757,27 +720,25 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
     }
 
     // Perform processing now that everything is prefetched.
-    for (WindowActivation windowActivation : windowActivations.values()) {
-      ReduceFn<K, InputT, OutputT, W>.Context directContext = 
windowActivation.directContext;
-      ReduceFn<K, InputT, OutputT, W>.Context renamedContext = 
windowActivation.renamedContext;
+    for (EnrichedTimerData timer : enrichedTimers) {
+      ReduceFn<K, InputT, OutputT, W>.Context directContext = 
timer.directContext;
+      ReduceFn<K, InputT, OutputT, W>.Context renamedContext = 
timer.renamedContext;
 
-      if (windowActivation.isGarbageCollection) {
-        WindowTracing.debug(
-            "{}: Cleaning up for key:{}; window:{} with inputWatermark:{}; 
outputWatermark:{}",
-            ReduceFnRunner.class.getSimpleName(),
-            key,
-            directContext.window(),
+      if (timer.isGarbageCollection) {
+        WindowTracing.debug("ReduceFnRunner.onTimer: Cleaning up for key:{}; 
window:{} at {} with "
+                + "inputWatermark:{}; outputWatermark:{}",
+            key, directContext.window(), timer.timestamp,
             timerInternals.currentInputWatermarkTime(),
             timerInternals.currentOutputWatermarkTime());
 
-        boolean windowIsActiveAndOpen = 
windowActivation.windowIsActiveAndOpen();
+        boolean windowIsActiveAndOpen = timer.windowIsActiveAndOpen();
         if (windowIsActiveAndOpen) {
           // We need to call onTrigger to emit the final pane if required.
           // The final pane *may* be ON_TIME if no prior ON_TIME pane has been 
emitted,
           // and the watermark has passed the end of the window.
           @Nullable
           Instant newHold = onTrigger(
-              directContext, renamedContext, true /* isFinished */, 
windowActivation.isEndOfWindow);
+              directContext, renamedContext, true /* isFinished */, 
timer.isEndOfWindow);
           checkState(newHold == null, "Hold placed at %s despite isFinished 
being true.", newHold);
         }
 
@@ -785,20 +746,18 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
         // see elements for it again.
         clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
       } else {
-        WindowTracing.debug(
-            "{}.onTimers: Triggering for key:{}; window:{} at {} with "
+        WindowTracing.debug("ReduceFnRunner.onTimer: Triggering for key:{}; 
window:{} at {} with "
                 + "inputWatermark:{}; outputWatermark:{}",
-            key,
-            directContext.window(),
+            key, directContext.window(), timer.timestamp,
             timerInternals.currentInputWatermarkTime(),
             timerInternals.currentOutputWatermarkTime());
-        if (windowActivation.windowIsActiveAndOpen()
+        if (timer.windowIsActiveAndOpen()
             && triggerRunner.shouldFire(
                    directContext.window(), directContext.timers(), 
directContext.state())) {
           emit(directContext, renamedContext);
         }
 
-        if (windowActivation.isEndOfWindow) {
+        if (timer.isEndOfWindow) {
           // If the window strategy trigger includes a watermark trigger then 
at this point
           // there should be no data holds, either because we'd already 
cleared them on an
           // earlier onTrigger, or because we just cleared them on the above 
emit.
@@ -960,9 +919,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
       // The pane has elements.
       return true;
     }
-    if (timing == Timing.ON_TIME
-        && windowingStrategy.getOnTimeBehavior() == 
Window.OnTimeBehavior.FIRE_ALWAYS) {
-      // This is an empty ON_TIME pane.
+    if (timing == Timing.ON_TIME) {
+      // This is the unique ON_TIME pane.
       return true;
     }
     if (isFinished && windowingStrategy.getClosingBehavior() == 
ClosingBehavior.FIRE_ALWAYS) {
@@ -990,8 +948,13 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
   private Instant onTrigger(
       final ReduceFn<K, InputT, OutputT, W>.Context directContext,
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
-      final boolean isFinished, boolean isEndOfWindow)
+      boolean isFinished, boolean isEndOfWindow)
           throws Exception {
+    Instant inputWM = timerInternals.currentInputWatermarkTime();
+
+    // Calculate the pane info.
+    final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, 
isFinished).read();
+
     // Extract the window hold, and as a side effect clear it.
     final WatermarkHold.OldAndNewHolds pair =
         watermarkHold.extractAndRelease(renamedContext, isFinished).read();
@@ -1000,13 +963,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
     @Nullable Instant newHold = pair.newHold;
 
     final boolean isEmpty = 
nonEmptyPanes.isEmpty(renamedContext.state()).read();
-    if (isEmpty
-        && windowingStrategy.getClosingBehavior() == 
ClosingBehavior.FIRE_IF_NON_EMPTY
-        && windowingStrategy.getOnTimeBehavior() == 
Window.OnTimeBehavior.FIRE_IF_NON_EMPTY) {
-      return newHold;
-    }
 
-    Instant inputWM = timerInternals.currentInputWatermarkTime();
     if (newHold != null) {
       // We can't be finished yet.
       checkState(
@@ -1038,9 +995,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
       }
     }
 
-    // Calculate the pane info.
-    final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, 
isFinished).read();
-
     // Only emit a pane if it has data or empty panes are observable.
     if (needToEmit(isEmpty, isFinished, pane.getTiming())) {
       // Run reduceFn.onTrigger method.
@@ -1051,11 +1005,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W 
extends BoundedWindow> {
                 @Override
                 public void output(OutputT toOutput) {
                   // We're going to output panes, so commit the (now used) 
PaneInfo.
-                  // This is unnecessary if the trigger isFinished since the 
saved
+                  // TODO: This is unnecessary if the trigger isFinished since 
the saved
                   // state will be immediately deleted.
-                  if (!isFinished) {
-                    paneInfoTracker.storeCurrentPaneInfo(directContext, pane);
-                  }
+                  paneInfoTracker.storeCurrentPaneInfo(directContext, pane);
 
                   // Output the actual value.
                   outputter.outputWindowedValue(
@@ -1129,9 +1081,4 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
     }
   }
 
-  private boolean windowIsExpired(BoundedWindow w) {
-    return timerInternals
-        .currentInputWatermarkTime()
-        
.isAfter(w.maxTimestamp().plus(windowingStrategy.getAllowedLateness()));
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index c3bfef6..7d7babd 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -233,11 +233,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
 
     @Override
-    public PipelineOptions pipelineOptions() {
-      return getPipelineOptions();
-    }
-
-    @Override
     public DoFn<InputT, OutputT>.StartBundleContext 
startBundleContext(DoFn<InputT, OutputT> doFn) {
       return this;
     }
@@ -303,11 +298,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
 
     @Override
-    public PipelineOptions pipelineOptions() {
-      return getPipelineOptions();
-    }
-
-    @Override
     public DoFn<InputT, OutputT>.StartBundleContext 
startBundleContext(DoFn<InputT, OutputT> doFn) {
       throw new UnsupportedOperationException(
           "Cannot access StartBundleContext outside of @StartBundle method.");
@@ -477,11 +467,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
 
     @Override
-    public PipelineOptions pipelineOptions() {
-      return getPipelineOptions();
-    }
-
-    @Override
     public DoFn<InputT, OutputT>.StartBundleContext 
startBundleContext(DoFn<InputT, OutputT> doFn) {
       throw new UnsupportedOperationException("StartBundleContext parameters 
are not supported.");
     }
@@ -583,11 +568,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
 
     @Override
-    public PipelineOptions pipelineOptions() {
-      return getPipelineOptions();
-    }
-
-    @Override
     public DoFn<InputT, OutputT>.StartBundleContext 
startBundleContext(DoFn<InputT, OutputT> doFn) {
       throw new UnsupportedOperationException("StartBundleContext parameters 
are not supported.");
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 6e97645..c4b086a 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.construction.ElementAndRestriction;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import 
org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
@@ -85,15 +86,15 @@ public class SplittableParDoViaKeyedWorkItems {
   /** Overrides a {@link ProcessKeyedElements} into {@link 
SplittableProcessViaKeyedWorkItems}. */
   public static class OverrideFactory<InputT, OutputT, RestrictionT>
       implements PTransformOverrideFactory<
-          PCollection<KV<String, KV<InputT, RestrictionT>>>, PCollectionTuple,
-          ProcessKeyedElements<InputT, OutputT, RestrictionT>> {
+          PCollection<KV<String, ElementAndRestriction<InputT, 
RestrictionT>>>, PCollectionTuple,
+      ProcessKeyedElements<InputT, OutputT, RestrictionT>> {
     @Override
     public PTransformReplacement<
-            PCollection<KV<String, KV<InputT, RestrictionT>>>, 
PCollectionTuple>
+            PCollection<KV<String, ElementAndRestriction<InputT, 
RestrictionT>>>, PCollectionTuple>
         getReplacementTransform(
             AppliedPTransform<
-                    PCollection<KV<String, KV<InputT, RestrictionT>>>, 
PCollectionTuple,
-                    ProcessKeyedElements<InputT, OutputT, RestrictionT>>
+                    PCollection<KV<String, ElementAndRestriction<InputT, 
RestrictionT>>>,
+                    PCollectionTuple, ProcessKeyedElements<InputT, OutputT, 
RestrictionT>>
                 transform) {
       return PTransformReplacement.of(
           PTransformReplacements.getSingletonMainInput(transform),
@@ -112,7 +113,8 @@ public class SplittableParDoViaKeyedWorkItems {
    * method for a splittable {@link DoFn}.
    */
   public static class SplittableProcessViaKeyedWorkItems<InputT, OutputT, 
RestrictionT>
-      extends PTransform<PCollection<KV<String, KV<InputT, RestrictionT>>>, 
PCollectionTuple> {
+      extends PTransform<
+          PCollection<KV<String, ElementAndRestriction<InputT, 
RestrictionT>>>, PCollectionTuple> {
     private final ProcessKeyedElements<InputT, OutputT, RestrictionT> original;
 
     public SplittableProcessViaKeyedWorkItems(
@@ -121,13 +123,15 @@ public class SplittableParDoViaKeyedWorkItems {
     }
 
     @Override
-    public PCollectionTuple expand(PCollection<KV<String, KV<InputT, 
RestrictionT>>> input) {
+    public PCollectionTuple expand(
+        PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>> 
input) {
       return input
-          .apply(new GBKIntoKeyedWorkItems<String, KV<InputT, RestrictionT>>())
+          .apply(new GBKIntoKeyedWorkItems<String, 
ElementAndRestriction<InputT, RestrictionT>>())
           .setCoder(
               KeyedWorkItemCoder.of(
                   StringUtf8Coder.of(),
-                  ((KvCoder<String, KV<InputT, RestrictionT>>) 
input.getCoder()).getValueCoder(),
+                  ((KvCoder<String, ElementAndRestriction<InputT, 
RestrictionT>>) input.getCoder())
+                      .getValueCoder(),
                   input.getWindowingStrategy().getWindowFn().windowCoder()))
           .apply(new ProcessElements<>(original));
     }
@@ -137,7 +141,8 @@ public class SplittableParDoViaKeyedWorkItems {
   public static class ProcessElements<
           InputT, OutputT, RestrictionT, TrackerT extends 
RestrictionTracker<RestrictionT>>
       extends PTransform<
-          PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>>, 
PCollectionTuple> {
+          PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, 
RestrictionT>>>,
+          PCollectionTuple> {
     private final ProcessKeyedElements<InputT, OutputT, RestrictionT> original;
 
     public ProcessElements(ProcessKeyedElements<InputT, OutputT, RestrictionT> 
original) {
@@ -171,7 +176,7 @@ public class SplittableParDoViaKeyedWorkItems {
 
     @Override
     public PCollectionTuple expand(
-        PCollection<KeyedWorkItem<String, KV<InputT, RestrictionT>>> input) {
+        PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, 
RestrictionT>>> input) {
       return ProcessKeyedElements.createPrimitiveOutputFor(
           input,
           original.getFn(),
@@ -196,12 +201,12 @@ public class SplittableParDoViaKeyedWorkItems {
   @VisibleForTesting
   public static class ProcessFn<
           InputT, OutputT, RestrictionT, TrackerT extends 
RestrictionTracker<RestrictionT>>
-      extends DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> {
+      extends DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, 
RestrictionT>>, OutputT> {
     /**
      * The state cell containing a watermark hold for the output of this 
{@link DoFn}. The hold is
      * acquired during the first {@link DoFn.ProcessElement} call for each 
element and restriction,
-     * and is released when the {@link DoFn.ProcessElement} call returns {@link
-     * ProcessContinuation#stop()}.
+     * and is released when the {@link DoFn.ProcessElement} call returns and 
there is no residual
+     * restriction captured by the {@link SplittableProcessElementInvoker}.
      *
      * <p>A hold is needed to avoid letting the output watermark immediately 
progress together with
      * the input watermark when the first {@link DoFn.ProcessElement} call for 
this element
@@ -316,7 +321,7 @@ public class SplittableParDoViaKeyedWorkItems {
       boolean isSeedCall = (timer == null);
       StateNamespace stateNamespace;
       if (isSeedCall) {
-        WindowedValue<KV<InputT, RestrictionT>> windowedValue =
+        WindowedValue<ElementAndRestriction<InputT, RestrictionT>> 
windowedValue =
             Iterables.getOnlyElement(c.element().elementsIterable());
         BoundedWindow window = 
Iterables.getOnlyElement(windowedValue.getWindows());
         stateNamespace =
@@ -332,25 +337,27 @@ public class SplittableParDoViaKeyedWorkItems {
           stateInternals.state(stateNamespace, restrictionTag);
       WatermarkHoldState holdState = stateInternals.state(stateNamespace, 
watermarkHoldTag);
 
-      KV<WindowedValue<InputT>, RestrictionT> elementAndRestriction;
+      ElementAndRestriction<WindowedValue<InputT>, RestrictionT> 
elementAndRestriction;
       if (isSeedCall) {
-        WindowedValue<KV<InputT, RestrictionT>> windowedValue =
+        WindowedValue<ElementAndRestriction<InputT, RestrictionT>> 
windowedValue =
             Iterables.getOnlyElement(c.element().elementsIterable());
-        WindowedValue<InputT> element = 
windowedValue.withValue(windowedValue.getValue().getKey());
+        WindowedValue<InputT> element = 
windowedValue.withValue(windowedValue.getValue().element());
         elementState.write(element);
-        elementAndRestriction = KV.of(element, 
windowedValue.getValue().getValue());
+        elementAndRestriction =
+            ElementAndRestriction.of(element, 
windowedValue.getValue().restriction());
       } else {
         // This is not the first ProcessElement call for this 
element/restriction - rather,
         // this is a timer firing, so we need to fetch the element and 
restriction from state.
         elementState.readLater();
         restrictionState.readLater();
-        elementAndRestriction = KV.of(elementState.read(), 
restrictionState.read());
+        elementAndRestriction =
+            ElementAndRestriction.of(elementState.read(), 
restrictionState.read());
       }
 
-      final TrackerT tracker = 
invoker.invokeNewTracker(elementAndRestriction.getValue());
+      final TrackerT tracker = 
invoker.invokeNewTracker(elementAndRestriction.restriction());
       SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, 
TrackerT>.Result result =
           processElementInvoker.invokeProcessElement(
-              invoker, elementAndRestriction.getKey(), tracker);
+              invoker, elementAndRestriction.element(), tracker);
 
       // Save state for resuming.
       if (result.getResidualRestriction() == null) {
@@ -363,14 +370,13 @@ public class SplittableParDoViaKeyedWorkItems {
       restrictionState.write(result.getResidualRestriction());
       Instant futureOutputWatermark = result.getFutureOutputWatermark();
       if (futureOutputWatermark == null) {
-        futureOutputWatermark = elementAndRestriction.getKey().getTimestamp();
+        futureOutputWatermark = elementAndRestriction.element().getTimestamp();
       }
-      Instant wakeupTime =
-          
timerInternals.currentProcessingTime().plus(result.getContinuation().resumeDelay());
       holdState.add(futureOutputWatermark);
       // Set a timer to continue processing this element.
       timerInternals.setTimer(
-          TimerInternals.TimerData.of(stateNamespace, wakeupTime, 
TimeDomain.PROCESSING_TIME));
+          TimerInternals.TimerData.of(
+              stateNamespace, timerInternals.currentProcessingTime(), 
TimeDomain.PROCESSING_TIME));
     }
 
     private DoFn<InputT, OutputT>.StartBundleContext wrapContextAsStartBundle(

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
index 7732df3..ced6c01 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.runners.core;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
@@ -36,35 +34,20 @@ public abstract class SplittableProcessElementInvoker<
   public class Result {
     @Nullable
     private final RestrictionT residualRestriction;
-    private final DoFn.ProcessContinuation continuation;
     private final Instant futureOutputWatermark;
 
     public Result(
-        @Nullable RestrictionT residualRestriction,
-        DoFn.ProcessContinuation continuation,
-        Instant futureOutputWatermark) {
-      this.continuation = checkNotNull(continuation);
-      if (continuation.shouldResume()) {
-        checkNotNull(residualRestriction);
-      }
+        @Nullable RestrictionT residualRestriction, Instant 
futureOutputWatermark) {
       this.residualRestriction = residualRestriction;
       this.futureOutputWatermark = futureOutputWatermark;
     }
 
-    /**
-     * Can be {@code null} only if {@link #getContinuation} specifies the call 
should not resume.
-     * However, the converse is not true: this can be non-null even if {@link 
#getContinuation}
-     * is {@link DoFn.ProcessContinuation#stop()}.
-     */
+    /** If {@code null}, means the call should not resume. */
     @Nullable
     public RestrictionT getResidualRestriction() {
       return residualRestriction;
     }
 
-    public DoFn.ProcessContinuation getContinuation() {
-      return continuation;
-    }
-
     public Instant getFutureOutputWatermark() {
       return futureOutputWatermark;
     }
@@ -74,8 +57,8 @@ public abstract class SplittableProcessElementInvoker<
    * Invokes the {@link DoFn.ProcessElement} method using the given {@link 
DoFnInvoker} for the
    * original {@link DoFn}, on the given element and with the given {@link 
RestrictionTracker}.
    *
-   * @return Information on how to resume the call: residual restriction, a 
{@link
-   *     DoFn.ProcessContinuation}, and a future output watermark.
+   * @return Information on how to resume the call: residual restriction and a
+   * future output watermark.
    */
   public abstract Result invokeProcessElement(
       DoFnInvoker<InputT, OutputT> invoker, WindowedValue<InputT> element, 
TrackerT tracker);

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
index 3144bd6..c189b0d 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.core;
 
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.CombiningState;
@@ -104,11 +103,6 @@ public abstract class SystemReduceFn<K, InputT, AccumT, 
OutputT, W extends Bound
     this.bufferTag = bufferTag;
   }
 
-  @VisibleForTesting
-  StateTag<? extends GroupingState<InputT, OutputT>> getBufferTag() {
-    return bufferTag;
-  }
-
   @Override
   public void processValue(ProcessValueContext c) throws Exception {
     c.state().access(bufferTag).add(c.value());

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
index 3530ed1..0f0c17c 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
@@ -23,6 +23,7 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import java.util.Arrays;
 import java.util.List;
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.annotations.Experimental;
 
 /**
@@ -30,7 +31,7 @@ import org.apache.beam.sdk.annotations.Experimental;
  * have fired.
  */
 @Experimental(Experimental.Kind.TRIGGER)
-public class AfterAllStateMachine extends TriggerStateMachine {
+public class AfterAllStateMachine extends OnceTriggerStateMachine {
 
   private AfterAllStateMachine(List<TriggerStateMachine> subTriggers) {
     super(subTriggers);
@@ -41,11 +42,11 @@ public class AfterAllStateMachine extends 
TriggerStateMachine {
    * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers.
    */
   @SafeVarargs
-  public static TriggerStateMachine of(TriggerStateMachine... triggers) {
+  public static OnceTriggerStateMachine of(TriggerStateMachine... triggers) {
     return new 
AfterAllStateMachine(Arrays.<TriggerStateMachine>asList(triggers));
   }
 
-  public static TriggerStateMachine of(Iterable<? extends TriggerStateMachine> 
triggers) {
+  public static OnceTriggerStateMachine of(Iterable<? extends 
TriggerStateMachine> triggers) {
     return new AfterAllStateMachine(ImmutableList.copyOf(triggers));
   }
 
@@ -77,21 +78,24 @@ public class AfterAllStateMachine extends 
TriggerStateMachine {
    */
   @Override
   public boolean shouldFire(TriggerContext context) throws Exception {
-    for (ExecutableTriggerStateMachine subTrigger : 
context.trigger().subTriggers()) {
-      if (!context.forTrigger(subTrigger).trigger().isFinished()
-          && !subTrigger.invokeShouldFire(context)) {
+    for (ExecutableTriggerStateMachine subtrigger : 
context.trigger().subTriggers()) {
+      if (!context.forTrigger(subtrigger).trigger().isFinished()
+          && !subtrigger.invokeShouldFire(context)) {
         return false;
       }
     }
     return true;
   }
 
+  /**
+   * Invokes {@link #onFire} for all subtriggers, eliding redundant calls to 
{@link #shouldFire}
+   * because they all must be ready to fire.
+   */
   @Override
-  public void onFire(TriggerContext context) throws Exception {
-    for (ExecutableTriggerStateMachine subTrigger : 
context.trigger().subTriggers()) {
-      subTrigger.invokeOnFire(context);
+  public void onOnlyFiring(TriggerContext context) throws Exception {
+    for (ExecutableTriggerStateMachine subtrigger : 
context.trigger().subTriggers()) {
+      subtrigger.invokeOnFire(context);
     }
-    context.trigger().setFinished(true);
   }
 
   @Override
@@ -99,6 +103,7 @@ public class AfterAllStateMachine extends 
TriggerStateMachine {
     StringBuilder builder = new StringBuilder("AfterAll.of(");
     Joiner.on(", ").appendTo(builder, subTriggers);
     builder.append(")");
+
     return builder.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index 06c2066..8d8d0de 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -27,6 +27,7 @@ import org.apache.beam.runners.core.StateAccessor;
 import org.apache.beam.runners.core.StateMerging;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.state.CombiningState;
@@ -49,7 +50,7 @@ import org.joda.time.format.PeriodFormatter;
 // This class should be inlined to subclasses and deleted, simplifying them too
 // https://issues.apache.org/jira/browse/BEAM-1486
 @Experimental(Experimental.Kind.TRIGGER)
-public abstract class AfterDelayFromFirstElementStateMachine extends 
TriggerStateMachine {
+public abstract class AfterDelayFromFirstElementStateMachine extends 
OnceTriggerStateMachine {
 
   protected static final List<SerializableFunction<Instant, Instant>> IDENTITY 
=
       ImmutableList.<SerializableFunction<Instant, Instant>>of();
@@ -236,9 +237,8 @@ public abstract class 
AfterDelayFromFirstElementStateMachine extends TriggerStat
   }
 
   @Override
-  public final void onFire(TriggerContext context) throws Exception {
+  protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) 
throws Exception {
     clear(context);
-    context.trigger().setFinished(true);
   }
 
   protected Instant computeTargetTimestamp(Instant time) {

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
index 58c24c5..840a65c 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
@@ -23,6 +23,7 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import java.util.Arrays;
 import java.util.List;
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.annotations.Experimental;
 
 /**
@@ -30,7 +31,7 @@ import org.apache.beam.sdk.annotations.Experimental;
  * sub-triggers have fired.
  */
 @Experimental(Experimental.Kind.TRIGGER)
-public class AfterFirstStateMachine extends TriggerStateMachine {
+public class AfterFirstStateMachine extends OnceTriggerStateMachine {
 
   AfterFirstStateMachine(List<TriggerStateMachine> subTriggers) {
     super(subTriggers);
@@ -41,12 +42,12 @@ public class AfterFirstStateMachine extends 
TriggerStateMachine {
    * Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers.
    */
   @SafeVarargs
-  public static TriggerStateMachine of(
+  public static OnceTriggerStateMachine of(
       TriggerStateMachine... triggers) {
     return new 
AfterFirstStateMachine(Arrays.<TriggerStateMachine>asList(triggers));
   }
 
-  public static TriggerStateMachine of(
+  public static OnceTriggerStateMachine of(
       Iterable<? extends TriggerStateMachine> triggers) {
     return new AfterFirstStateMachine(ImmutableList.copyOf(triggers));
   }
@@ -78,19 +79,18 @@ public class AfterFirstStateMachine extends 
TriggerStateMachine {
   }
 
   @Override
-  public void onFire(TriggerContext context) throws Exception {
-    for (ExecutableTriggerStateMachine subTrigger : 
context.trigger().subTriggers()) {
-      TriggerContext subContext = context.forTrigger(subTrigger);
-      if (subTrigger.invokeShouldFire(subContext)) {
+  protected void onOnlyFiring(TriggerContext context) throws Exception {
+    for (ExecutableTriggerStateMachine subtrigger : 
context.trigger().subTriggers()) {
+      TriggerContext subContext = context.forTrigger(subtrigger);
+      if (subtrigger.invokeShouldFire(subContext)) {
         // If the trigger is ready to fire, then do whatever it needs to do.
-        subTrigger.invokeOnFire(subContext);
+        subtrigger.invokeOnFire(subContext);
       } else {
         // If the trigger is not ready to fire, it is nonetheless true that 
whatever
         // pending pane it was tracking is now gone.
-        subTrigger.invokeClear(subContext);
+        subtrigger.invokeClear(subContext);
       }
     }
-    context.trigger().setFinished(true);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
index 1ce035a..b9fbac3 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
@@ -23,6 +23,7 @@ import org.apache.beam.runners.core.StateAccessor;
 import org.apache.beam.runners.core.StateMerging;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.state.CombiningState;
@@ -32,7 +33,7 @@ import org.apache.beam.sdk.transforms.Sum;
  * {@link TriggerStateMachine}s that fire based on properties of the elements 
in the current pane.
  */
 @Experimental(Experimental.Kind.TRIGGER)
-public class AfterPaneStateMachine extends TriggerStateMachine {
+public class AfterPaneStateMachine extends OnceTriggerStateMachine {
 
 private static final StateTag<CombiningState<Long, long[], Long>>
       ELEMENTS_IN_PANE_TAG =
@@ -129,8 +130,7 @@ private static final StateTag<CombiningState<Long, long[], 
Long>>
   }
 
   @Override
-  public void onFire(TriggerStateMachine.TriggerContext context) throws 
Exception {
+  protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) 
throws Exception {
     clear(context);
-    context.trigger().setFinished(true);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
index 509c96b..c9eee15 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
@@ -22,6 +22,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import com.google.common.collect.ImmutableList;
 import java.util.Objects;
 import javax.annotation.Nullable;
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.state.TimeDomain;
 
@@ -241,7 +242,7 @@ public class AfterWatermarkStateMachine {
   /**
    * A watermark trigger targeted relative to the end of the window.
    */
-  public static class FromEndOfWindow extends TriggerStateMachine {
+  public static class FromEndOfWindow extends OnceTriggerStateMachine {
 
     private FromEndOfWindow() {
       super(null);
@@ -318,8 +319,6 @@ public class AfterWatermarkStateMachine {
     }
 
     @Override
-    public void onFire(TriggerStateMachine.TriggerContext context) throws 
Exception {
-      context.trigger().setFinished(true);
-    }
+    protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) 
throws Exception { }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java
index cdcff64..c4d89c2 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkState;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 
 /**
@@ -45,14 +46,17 @@ public class ExecutableTriggerStateMachine implements 
Serializable {
 
   private static <W extends BoundedWindow> ExecutableTriggerStateMachine 
create(
       TriggerStateMachine trigger, int nextUnusedIndex) {
-
+    if (trigger instanceof OnceTriggerStateMachine) {
+      return new ExecutableOnceTriggerStateMachine(
+          (OnceTriggerStateMachine) trigger, nextUnusedIndex);
+    } else {
       return new ExecutableTriggerStateMachine(trigger, nextUnusedIndex);
-
+    }
   }
 
   public static <W extends BoundedWindow> ExecutableTriggerStateMachine 
createForOnceTrigger(
-      TriggerStateMachine trigger, int nextUnusedIndex) {
-    return new ExecutableTriggerStateMachine(trigger, nextUnusedIndex);
+      OnceTriggerStateMachine trigger, int nextUnusedIndex) {
+    return new ExecutableOnceTriggerStateMachine(trigger, nextUnusedIndex);
   }
 
   private ExecutableTriggerStateMachine(TriggerStateMachine trigger, int 
nextUnusedIndex) {
@@ -142,4 +146,15 @@ public class ExecutableTriggerStateMachine implements 
Serializable {
   public void invokeClear(TriggerStateMachine.TriggerContext c) throws 
Exception {
     trigger.clear(c.forTrigger(this));
   }
+
+  /**
+   * {@link ExecutableTriggerStateMachine} that enforces the fact that the 
trigger should always
+   * FIRE_AND_FINISH and never just FIRE.
+   */
+  private static class ExecutableOnceTriggerStateMachine extends 
ExecutableTriggerStateMachine {
+
+    public ExecutableOnceTriggerStateMachine(OnceTriggerStateMachine trigger, 
int nextUnusedIndex) {
+      super(trigger, nextUnusedIndex);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java
index f8c5e8b..f32c7a8 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.core.triggers;
 
+import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 
@@ -26,7 +27,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
  * <p>Using this trigger will only produce output when the watermark passes 
the end of the
  * {@link BoundedWindow window} plus the allowed lateness.
  */
-public final class NeverStateMachine extends TriggerStateMachine {
+public final class NeverStateMachine extends OnceTriggerStateMachine {
   /**
    * Returns a trigger which never fires. Output will be produced from the 
using {@link GroupByKey}
    * when the {@link BoundedWindow} closes.
@@ -52,7 +53,7 @@ public final class NeverStateMachine extends 
TriggerStateMachine {
   }
 
   @Override
-  public void onFire(TriggerStateMachine.TriggerContext context) {
+  protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) {
     throw new UnsupportedOperationException(
         String.format("%s should never fire", getClass().getSimpleName()));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java
index 880aa48..6a2cf0c 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java
@@ -453,8 +453,35 @@ public abstract class TriggerStateMachine implements 
Serializable {
    * }
    * </pre>
    *
+   * <p>Note that if {@code t1} is {@link OnceTriggerStateMachine}, then 
{@code t1.orFinally(t2)} is
+   * the same as {@code AfterFirst.of(t1, t2)}.
    */
   public TriggerStateMachine orFinally(TriggerStateMachine until) {
     return new OrFinallyStateMachine(this, until);
   }
+
+  /**
+   * {@link TriggerStateMachine}s that are guaranteed to fire at most once 
should extend from this,
+   * rather than the general {@link TriggerStateMachine} class to indicate 
that behavior.
+   */
+  public abstract static class OnceTriggerStateMachine extends 
TriggerStateMachine {
+    protected OnceTriggerStateMachine(List<TriggerStateMachine> subTriggers) {
+      super(subTriggers);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public final void onFire(TriggerContext context) throws Exception {
+      onOnlyFiring(context);
+      context.trigger().setFinished(true);
+    }
+
+    /**
+     * Called exactly once by {@link #onFire} when the trigger is fired. By 
default,
+     * invokes {@link #onFire} on all subtriggers for which {@link 
#shouldFire} is {@code true}.
+     */
+    protected abstract void onOnlyFiring(TriggerContext context) throws 
Exception;
+  }
 }

Reply via email to