This is an automated email from the ASF dual-hosted git repository.

scott pushed a commit to branch release-2.10.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.10.0 by this push:
     new b352df3  [BEAM-6352] Revert PR#6467 to fix Watch transform from 
swegner/revert_pr6467
     new bfd6893  Merge pull request #7575: [BEAM-6352] Revert PR#6467 to fix 
Watch transform
b352df3 is described below

commit b352df3976c6a687213b611a9a47b6e6093a2dc5
Author: Kenn Knowles <k...@kennknowles.com>
AuthorDate: Fri Jan 18 13:33:16 2019 -0800

    [BEAM-6352] Revert PR#6467 to fix Watch transform from swegner/revert_pr6467
---
 .../runners/apex/translation/ParDoTranslator.java  |   8 +-
 .../construction/SplittableParDoNaiveBounded.java  |   5 +-
 .../core/construction/PTransformMatchersTest.java  |   3 +-
 .../core/construction/SplittableParDoTest.java     |   7 +-
 runners/core-java/build.gradle                     |   1 -
 ...TimeBoundedSplittableProcessElementInvoker.java |  30 ++--
 .../core/SplittableParDoViaKeyedWorkItems.java     |  17 +--
 .../core/SplittableProcessElementInvoker.java      |   7 +-
 ...BoundedSplittableProcessElementInvokerTest.java |  35 +++--
 .../runners/core/SplittableParDoProcessFnTest.java |  37 ++---
 .../SplittableProcessElementsEvaluatorFactory.java |  23 +--
 .../flink/FlinkStreamingTransformTranslators.java  |   7 +-
 .../wrappers/streaming/SplittableDoFnOperator.java |   4 +-
 .../dataflow/DataflowPipelineTranslatorTest.java   |   4 +-
 .../java/org/apache/beam/sdk/transforms/DoFn.java  |  16 ++-
 .../java/org/apache/beam/sdk/transforms/Watch.java |   2 +-
 .../beam/sdk/transforms/reflect/DoFnInvoker.java   |   2 +-
 .../sdk/transforms/reflect/DoFnSignatures.java     |  29 +++-
 .../splittabledofn/ByteKeyRangeTracker.java        |  12 +-
 .../splittabledofn/OffsetRangeTracker.java         |  10 +-
 .../splittabledofn/RestrictionTracker.java         |  51 ++++++-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java    |   2 -
 .../java/org/apache/beam/sdk/io/FileIOTest.java    |   2 -
 .../org/apache/beam/sdk/io/TextIOReadTest.java     |   2 -
 .../beam/sdk/transforms/SplittableDoFnTest.java    |  22 ++-
 .../org/apache/beam/sdk/transforms/WatchTest.java  |   9 --
 .../sdk/transforms/reflect/DoFnInvokersTest.java   |  15 +-
 .../reflect/DoFnSignaturesSplittableDoFnTest.java  |  46 +++---
 .../sdk/fn/splittabledofn/RestrictionTrackers.java | 138 ------------------
 .../beam/sdk/fn/splittabledofn/package-info.java   |  28 ----
 .../fn/splittabledofn/RestrictionTrackersTest.java | 156 ---------------------
 .../harness/SplittableProcessElementsRunner.java   |  11 +-
 .../beam/sdk/io/hbase/HBaseReadSplittableDoFn.java |   4 +-
 33 files changed, 239 insertions(+), 506 deletions(-)

diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
index ca1c7ff..c54ad98 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
@@ -125,12 +126,13 @@ class ParDoTranslator<InputT, OutputT>
     }
   }
 
-  static class SplittableProcessElementsTranslator<InputT, OutputT, 
RestrictionT, PositionT>
-      implements TransformTranslator<ProcessElements<InputT, OutputT, 
RestrictionT, PositionT>> {
+  static class SplittableProcessElementsTranslator<
+          InputT, OutputT, RestrictionT, TrackerT extends 
RestrictionTracker<RestrictionT, ?>>
+      implements TransformTranslator<ProcessElements<InputT, OutputT, 
RestrictionT, TrackerT>> {
 
     @Override
     public void translate(
-        ProcessElements<InputT, OutputT, RestrictionT, PositionT> transform,
+        ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform,
         TranslationContext context) {
 
       Map<TupleTag<?>, PValue> outputs = context.getOutputs();
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
index 3197c69..2239743 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
@@ -109,7 +109,8 @@ public class SplittableParDoNaiveBounded {
     }
   }
 
-  static class NaiveProcessFn<InputT, OutputT, RestrictionT, PositionT>
+  static class NaiveProcessFn<
+          InputT, OutputT, RestrictionT, TrackerT extends 
RestrictionTracker<RestrictionT, ?>>
       extends DoFn<KV<InputT, RestrictionT>, OutputT> {
     private final DoFn<InputT, OutputT> fn;
 
@@ -141,7 +142,7 @@ public class SplittableParDoNaiveBounded {
       InputT element = c.element().getKey();
       RestrictionT restriction = c.element().getValue();
       while (true) {
-        RestrictionTracker<RestrictionT, PositionT> tracker = 
invoker.invokeNewTracker(restriction);
+        TrackerT tracker = invoker.invokeNewTracker(restriction);
         ProcessContinuation continuation =
             invoker.invokeProcessElement(new NestedProcessContext<>(fn, c, 
element, w, tracker));
         if (continuation.shouldResume()) {
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 7f4ebda..618a12e 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -163,8 +163,7 @@ public class PTransformMatchersTest implements Serializable 
{
   private DoFn<KV<String, Integer>, Integer> splittableDoFn =
       new DoFn<KV<String, Integer>, Integer>() {
         @ProcessElement
-        public void processElement(
-            ProcessContext context, RestrictionTracker<Void, Void> tracker) {}
+        public void processElement(ProcessContext context, SomeTracker 
tracker) {}
 
         @GetInitialRestriction
         public Void getInitialRestriction(KV<String, Integer> element) {
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
index 959120c..68365c8 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java
@@ -58,7 +58,7 @@ public class SplittableParDoTest {
     }
 
     @Override
-    public boolean tryClaim(Void position) {
+    protected boolean tryClaimImpl(Void position) {
       return false;
     }
 
@@ -78,8 +78,7 @@ public class SplittableParDoTest {
 
   private static class BoundedFakeFn extends DoFn<Integer, String> {
     @ProcessElement
-    public void processElement(
-        ProcessContext context, RestrictionTracker<SomeRestriction, Void> 
tracker) {}
+    public void processElement(ProcessContext context, SomeRestrictionTracker 
tracker) {}
 
     @GetInitialRestriction
     public SomeRestriction getInitialRestriction(Integer element) {
@@ -90,7 +89,7 @@ public class SplittableParDoTest {
   private static class UnboundedFakeFn extends DoFn<Integer, String> {
     @ProcessElement
     public ProcessContinuation processElement(
-        ProcessContext context, RestrictionTracker<SomeRestriction, Void> 
tracker) {
+        ProcessContext context, SomeRestrictionTracker tracker) {
       return stop();
     }
 
diff --git a/runners/core-java/build.gradle b/runners/core-java/build.gradle
index 6483167..3468c01 100644
--- a/runners/core-java/build.gradle
+++ b/runners/core-java/build.gradle
@@ -35,7 +35,6 @@ dependencies {
   shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
   shadow project(path: ":beam-model-fn-execution", configuration: "shadow")
   shadow project(path: ":beam-runners-core-construction-java", configuration: 
"shadow")
-  shadow project(path: ":beam-sdks-java-fn-execution", configuration: "shadow")
   shadow library.java.vendored_guava_20_0
   shadow library.java.joda_time
   shadowTest project(path: ":beam-sdks-java-core", configuration: "shadowTest")
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index e8c57b5..d1ccd69 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -25,7 +25,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.TimeDomain;
@@ -56,8 +55,12 @@ import org.joda.time.Instant;
  * outputs), or runs for the given duration.
  */
 public class OutputAndTimeBoundedSplittableProcessElementInvoker<
-        InputT, OutputT, RestrictionT, PositionT>
-    extends SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, 
PositionT> {
+        InputT,
+        OutputT,
+        RestrictionT,
+        PositionT,
+        TrackerT extends RestrictionTracker<RestrictionT, PositionT>>
+    extends SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, 
TrackerT> {
   private final DoFn<InputT, OutputT> fn;
   private final PipelineOptions pipelineOptions;
   private final OutputWindowedValue<OutputT> output;
@@ -103,9 +106,9 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvoker<
   public Result invokeProcessElement(
       DoFnInvoker<InputT, OutputT> invoker,
       final WindowedValue<InputT> element,
-      final RestrictionTracker<RestrictionT, PositionT> tracker) {
+      final TrackerT tracker) {
     final ProcessContext processContext = new ProcessContext(element, tracker);
-
+    tracker.setClaimObserver(processContext);
     DoFn.ProcessContinuation cont =
         invoker.invokeProcessElement(
             new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
@@ -153,7 +156,7 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvoker<
 
               @Override
               public RestrictionTracker<?, ?> restrictionTracker() {
-                return processContext.tracker;
+                return tracker;
               }
 
               // Unsupported methods below.
@@ -223,7 +226,7 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvoker<
         // restriction that describes exactly the work that wasn't done in the 
current call.
         if (processContext.numClaimedBlocks > 0) {
           residual = checkNotNull(processContext.takeCheckpointNow());
-          processContext.tracker.checkDone();
+          tracker.checkDone();
         } else {
           // The call returned resume() without trying to claim any blocks, 
i.e. it is unaware
           // of any work to be done at the moment, but more might emerge 
later. This is a valid
@@ -251,14 +254,14 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvoker<
         // ProcessElement call.
         // In other words, if we took a checkpoint *after* ProcessElement 
completed (like in the
         // branch above), it would have been equivalent to this one.
-        processContext.tracker.checkDone();
+        tracker.checkDone();
       }
     } else {
       // The ProcessElement call returned stop() - that means the tracker's 
current restriction
       // has been fully processed by the call. A checkpoint may or may not 
have been taken in
       // "residual"; if it was, then we'll need to process it; if no, then we 
don't - nothing
       // special needs to be done.
-      processContext.tracker.checkDone();
+      tracker.checkDone();
     }
     if (residual == null) {
       // Can only be true if cont.shouldResume() is false and no checkpoint 
was taken.
@@ -270,9 +273,9 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvoker<
   }
 
   private class ProcessContext extends DoFn<InputT, OutputT>.ProcessContext
-      implements RestrictionTrackers.ClaimObserver<PositionT> {
+      implements RestrictionTracker.ClaimObserver<PositionT> {
     private final WindowedValue<InputT> element;
-    private final RestrictionTracker<RestrictionT, PositionT> tracker;
+    private final TrackerT tracker;
     private int numClaimedBlocks;
     private boolean hasClaimFailed;
 
@@ -290,11 +293,10 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvoker<
     private @Nullable Future<?> scheduledCheckpoint;
     private @Nullable Instant lastReportedWatermark;
 
-    public ProcessContext(
-        WindowedValue<InputT> element, RestrictionTracker<RestrictionT, 
PositionT> tracker) {
+    public ProcessContext(WindowedValue<InputT> element, TrackerT tracker) {
       fn.super();
       this.element = element;
-      this.tracker = RestrictionTrackers.observe(tracker, this);
+      this.tracker = tracker;
     }
 
     @Override
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 45c847e..3454e75 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -153,7 +153,8 @@ public class SplittableParDoViaKeyedWorkItems {
   }
 
   /** A primitive transform wrapping around {@link ProcessFn}. */
-  public static class ProcessElements<InputT, OutputT, RestrictionT, PositionT>
+  public static class ProcessElements<
+          InputT, OutputT, RestrictionT, TrackerT extends 
RestrictionTracker<RestrictionT, ?>>
       extends PTransform<
           PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>, 
PCollectionTuple> {
     private final ProcessKeyedElements<InputT, OutputT, RestrictionT> original;
@@ -162,7 +163,7 @@ public class SplittableParDoViaKeyedWorkItems {
       this.original = original;
     }
 
-    public ProcessFn<InputT, OutputT, RestrictionT, PositionT> newProcessFn(
+    public ProcessFn<InputT, OutputT, RestrictionT, TrackerT> newProcessFn(
         DoFn<InputT, OutputT> fn) {
       return new ProcessFn<>(
           fn,
@@ -213,7 +214,8 @@ public class SplittableParDoViaKeyedWorkItems {
    * <p>See also: https://issues.apache.org/jira/browse/BEAM-1983
    */
   @VisibleForTesting
-  public static class ProcessFn<InputT, OutputT, RestrictionT, PositionT>
+  public static class ProcessFn<
+          InputT, OutputT, RestrictionT, TrackerT extends 
RestrictionTracker<RestrictionT, ?>>
       extends DoFn<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> {
     /**
      * The state cell containing a watermark hold for the output of this 
{@link DoFn}. The hold is
@@ -250,7 +252,7 @@ public class SplittableParDoViaKeyedWorkItems {
     private transient @Nullable StateInternalsFactory<byte[]> 
stateInternalsFactory;
     private transient @Nullable TimerInternalsFactory<byte[]> 
timerInternalsFactory;
     private transient @Nullable SplittableProcessElementInvoker<
-            InputT, OutputT, RestrictionT, PositionT>
+            InputT, OutputT, RestrictionT, TrackerT>
         processElementInvoker;
 
     private transient @Nullable DoFnInvoker<InputT, OutputT> invoker;
@@ -281,7 +283,7 @@ public class SplittableParDoViaKeyedWorkItems {
     }
 
     public void setProcessElementInvoker(
-        SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, 
PositionT> invoker) {
+        SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, 
TrackerT> invoker) {
       this.processElementInvoker = invoker;
     }
 
@@ -366,9 +368,8 @@ public class SplittableParDoViaKeyedWorkItems {
         elementAndRestriction = KV.of(elementState.read(), 
restrictionState.read());
       }
 
-      final RestrictionTracker<RestrictionT, PositionT> tracker =
-          invoker.invokeNewTracker(elementAndRestriction.getValue());
-      SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, 
PositionT>.Result result =
+      final TrackerT tracker = 
invoker.invokeNewTracker(elementAndRestriction.getValue());
+      SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, 
TrackerT>.Result result =
           processElementInvoker.invokeProcessElement(
               invoker, elementAndRestriction.getKey(), tracker);
 
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
index 07aa0ba..81f0bd4 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
@@ -30,7 +30,8 @@ import org.joda.time.Instant;
  * A runner-specific hook for invoking a {@link DoFn.ProcessElement} method 
for a splittable {@link
  * DoFn}, in particular, allowing the runner to access the {@link 
RestrictionTracker}.
  */
-public abstract class SplittableProcessElementInvoker<InputT, OutputT, 
RestrictionT, PositionT> {
+public abstract class SplittableProcessElementInvoker<
+    InputT, OutputT, RestrictionT, TrackerT extends 
RestrictionTracker<RestrictionT, ?>> {
   /** Specifies how to resume a splittable {@link DoFn.ProcessElement} call. */
   public class Result {
     @Nullable private final RestrictionT residualRestriction;
@@ -76,7 +77,5 @@ public abstract class SplittableProcessElementInvoker<InputT, 
OutputT, Restricti
    *     DoFn.ProcessContinuation}, and a future output watermark.
    */
   public abstract Result invokeProcessElement(
-      DoFnInvoker<InputT, OutputT> invoker,
-      WindowedValue<InputT> element,
-      RestrictionTracker<RestrictionT, PositionT> tracker);
+      DoFnInvoker<InputT, OutputT> invoker, WindowedValue<InputT> element, 
TrackerT tracker);
 }
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
index a05aa8d..c54080c 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -67,8 +66,7 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvokerTest {
     }
 
     @ProcessElement
-    public ProcessContinuation process(
-        ProcessContext context, RestrictionTracker<OffsetRange, Long> tracker) 
{
+    public ProcessContinuation process(ProcessContext context, 
OffsetRangeTracker tracker) {
       Uninterruptibles.sleepUninterruptibly(
           sleepBeforeFirstClaim.getMillis(), TimeUnit.MILLISECONDS);
       for (long i = tracker.currentRestriction().getFrom(), numIterations = 1;
@@ -90,19 +88,20 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvokerTest {
     }
   }
 
-  private SplittableProcessElementInvoker<Void, String, OffsetRange, 
Long>.Result runTest(
-      int totalNumOutputs,
-      Duration sleepBeforeFirstClaim,
-      int numOutputsPerProcessCall,
-      Duration sleepBeforeEachOutput) {
+  private SplittableProcessElementInvoker<Void, String, OffsetRange, 
OffsetRangeTracker>.Result
+      runTest(
+          int totalNumOutputs,
+          Duration sleepBeforeFirstClaim,
+          int numOutputsPerProcessCall,
+          Duration sleepBeforeEachOutput) {
     SomeFn fn = new SomeFn(sleepBeforeFirstClaim, numOutputsPerProcessCall, 
sleepBeforeEachOutput);
     OffsetRange initialRestriction = new OffsetRange(0, totalNumOutputs);
     return runTest(fn, initialRestriction);
   }
 
-  private SplittableProcessElementInvoker<Void, String, OffsetRange, 
Long>.Result runTest(
-      DoFn<Void, String> fn, OffsetRange initialRestriction) {
-    SplittableProcessElementInvoker<Void, String, OffsetRange, Long> invoker =
+  private SplittableProcessElementInvoker<Void, String, OffsetRange, 
OffsetRangeTracker>.Result
+      runTest(DoFn<Void, String> fn, OffsetRange initialRestriction) {
+    SplittableProcessElementInvoker<Void, String, OffsetRange, 
OffsetRangeTracker> invoker =
         new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
             fn,
             PipelineOptionsFactory.create(),
@@ -135,7 +134,7 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvokerTest {
 
   @Test
   public void testInvokeProcessElementOutputBounded() throws Exception {
-    SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result 
res =
+    SplittableProcessElementInvoker<Void, String, OffsetRange, 
OffsetRangeTracker>.Result res =
         runTest(10000, Duration.ZERO, Integer.MAX_VALUE, Duration.ZERO);
     assertFalse(res.getContinuation().shouldResume());
     OffsetRange residualRange = res.getResidualRestriction();
@@ -146,7 +145,7 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvokerTest {
 
   @Test
   public void testInvokeProcessElementTimeBounded() throws Exception {
-    SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result 
res =
+    SplittableProcessElementInvoker<Void, String, OffsetRange, 
OffsetRangeTracker>.Result res =
         runTest(10000, Duration.ZERO, Integer.MAX_VALUE, Duration.millis(100));
     assertFalse(res.getContinuation().shouldResume());
     OffsetRange residualRange = res.getResidualRestriction();
@@ -159,7 +158,7 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvokerTest {
 
   @Test
   public void testInvokeProcessElementTimeBoundedWithStartupDelay() throws 
Exception {
-    SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result 
res =
+    SplittableProcessElementInvoker<Void, String, OffsetRange, 
OffsetRangeTracker>.Result res =
         runTest(10000, Duration.standardSeconds(3), Integer.MAX_VALUE, 
Duration.millis(100));
     assertFalse(res.getContinuation().shouldResume());
     OffsetRange residualRange = res.getResidualRestriction();
@@ -171,7 +170,7 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvokerTest {
 
   @Test
   public void testInvokeProcessElementVoluntaryReturnStop() throws Exception {
-    SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result 
res =
+    SplittableProcessElementInvoker<Void, String, OffsetRange, 
OffsetRangeTracker>.Result res =
         runTest(5, Duration.ZERO, Integer.MAX_VALUE, Duration.millis(100));
     assertFalse(res.getContinuation().shouldResume());
     assertNull(res.getResidualRestriction());
@@ -179,7 +178,7 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvokerTest {
 
   @Test
   public void testInvokeProcessElementVoluntaryReturnResume() throws Exception 
{
-    SplittableProcessElementInvoker<Void, String, OffsetRange, Long>.Result 
res =
+    SplittableProcessElementInvoker<Void, String, OffsetRange, 
OffsetRangeTracker>.Result res =
         runTest(10, Duration.ZERO, 5, Duration.millis(100));
     assertTrue(res.getContinuation().shouldResume());
     assertEquals(new OffsetRange(5, 10), res.getResidualRestriction());
@@ -190,7 +189,7 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvokerTest {
     DoFn<Void, String> brokenFn =
         new DoFn<Void, String>() {
           @ProcessElement
-          public void process(ProcessContext c, 
RestrictionTracker<OffsetRange, Long> tracker) {
+          public void process(ProcessContext c, OffsetRangeTracker tracker) {
             c.output("foo");
           }
 
@@ -208,7 +207,7 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvokerTest {
     DoFn<Void, String> brokenFn =
         new DoFn<Void, String>() {
           @ProcessElement
-          public void process(ProcessContext c, 
RestrictionTracker<OffsetRange, Long> tracker) {
+          public void process(ProcessContext c, OffsetRangeTracker tracker) {
             assertFalse(tracker.tryClaim(6L));
             c.output("foo");
           }
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
index d312234..4fcd1df 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
@@ -95,7 +95,7 @@ public class SplittableParDoProcessFnTest {
     }
 
     @Override
-    public boolean tryClaim(Void position) {
+    protected boolean tryClaimImpl(Void position) {
       return true;
     }
 
@@ -119,7 +119,12 @@ public class SplittableParDoProcessFnTest {
    * A helper for testing {@link ProcessFn} on 1 element (but possibly over 
multiple {@link
    * DoFn.ProcessElement} calls).
    */
-  private static class ProcessFnTester<InputT, OutputT, RestrictionT, 
PositionT>
+  private static class ProcessFnTester<
+          InputT,
+          OutputT,
+          RestrictionT,
+          PositionT,
+          TrackerT extends RestrictionTracker<RestrictionT, PositionT>>
       implements AutoCloseable {
     private final DoFnTester<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, 
OutputT> tester;
     private Instant currentProcessingTime;
@@ -139,7 +144,7 @@ public class SplittableParDoProcessFnTest {
       // encode IntervalWindow's because that's what all tests here use.
       WindowingStrategy<InputT, BoundedWindow> windowingStrategy =
           (WindowingStrategy) 
WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(1)));
-      final ProcessFn<InputT, OutputT, RestrictionT, PositionT> processFn =
+      final ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
           new ProcessFn<>(fn, inputCoder, restrictionCoder, windowingStrategy);
       this.tester = DoFnTester.of(processFn);
       this.timerInternals = new InMemoryTimerInternals();
@@ -267,7 +272,7 @@ public class SplittableParDoProcessFnTest {
   /** A simple splittable {@link DoFn} that's actually monolithic. */
   private static class ToStringFn extends DoFn<Integer, String> {
     @ProcessElement
-    public void process(ProcessContext c, RestrictionTracker<SomeRestriction, 
Void> tracker) {
+    public void process(ProcessContext c, SomeRestrictionTracker tracker) {
       checkState(tracker.tryClaim(null));
       c.output(c.element().toString() + "a");
       c.output(c.element().toString() + "b");
@@ -293,7 +298,7 @@ public class SplittableParDoProcessFnTest {
         new IntervalWindow(
             base.minus(Duration.standardMinutes(1)), 
base.plus(Duration.standardMinutes(1)));
 
-    ProcessFnTester<Integer, String, SomeRestriction, Void> tester =
+    ProcessFnTester<Integer, String, SomeRestriction, Void, 
SomeRestrictionTracker> tester =
         new ProcessFnTester<>(
             base,
             fn,
@@ -318,7 +323,7 @@ public class SplittableParDoProcessFnTest {
 
   private static class WatermarkUpdateFn extends DoFn<Instant, String> {
     @ProcessElement
-    public void process(ProcessContext c, RestrictionTracker<OffsetRange, 
Long> tracker) {
+    public void process(ProcessContext c, OffsetRangeTracker tracker) {
       for (long i = tracker.currentRestriction().getFrom(); 
tracker.tryClaim(i); ++i) {
         c.updateWatermark(c.element().plus(Duration.standardSeconds(i)));
         c.output(String.valueOf(i));
@@ -342,7 +347,7 @@ public class SplittableParDoProcessFnTest {
     Instant base = Instant.now();
     dateTimeProvider.setDateTimeFixed(base.getMillis());
 
-    ProcessFnTester<Instant, String, OffsetRange, Long> tester =
+    ProcessFnTester<Instant, String, OffsetRange, Long, OffsetRangeTracker> 
tester =
         new ProcessFnTester<>(
             base,
             fn,
@@ -369,8 +374,7 @@ public class SplittableParDoProcessFnTest {
   /** A simple splittable {@link DoFn} that outputs the given element every 5 
seconds forever. */
   private static class SelfInitiatedResumeFn extends DoFn<Integer, String> {
     @ProcessElement
-    public ProcessContinuation process(
-        ProcessContext c, RestrictionTracker<SomeRestriction, Void> tracker) {
+    public ProcessContinuation process(ProcessContext c, 
SomeRestrictionTracker tracker) {
       checkState(tracker.tryClaim(null));
       c.output(c.element().toString());
       return resume().withResumeDelay(Duration.standardSeconds(5));
@@ -387,7 +391,7 @@ public class SplittableParDoProcessFnTest {
     DoFn<Integer, String> fn = new SelfInitiatedResumeFn();
     Instant base = Instant.now();
     dateTimeProvider.setDateTimeFixed(base.getMillis());
-    ProcessFnTester<Integer, String, SomeRestriction, Void> tester =
+    ProcessFnTester<Integer, String, SomeRestriction, Void, 
SomeRestrictionTracker> tester =
         new ProcessFnTester<>(
             base,
             fn,
@@ -429,8 +433,7 @@ public class SplittableParDoProcessFnTest {
     }
 
     @ProcessElement
-    public ProcessContinuation process(
-        ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
+    public ProcessContinuation process(ProcessContext c, OffsetRangeTracker 
tracker) {
       for (long i = tracker.currentRestriction().getFrom(), numIterations = 0;
           tracker.tryClaim(i);
           ++i, ++numIterations) {
@@ -453,7 +456,7 @@ public class SplittableParDoProcessFnTest {
     DoFn<Integer, String> fn = new CounterFn(1);
     Instant base = Instant.now();
     dateTimeProvider.setDateTimeFixed(base.getMillis());
-    ProcessFnTester<Integer, String, OffsetRange, Long> tester =
+    ProcessFnTester<Integer, String, OffsetRange, Long, OffsetRangeTracker> 
tester =
         new ProcessFnTester<>(
             base,
             fn,
@@ -487,7 +490,7 @@ public class SplittableParDoProcessFnTest {
     dateTimeProvider.setDateTimeFixed(base.getMillis());
     int baseIndex = 42;
 
-    ProcessFnTester<Integer, String, OffsetRange, Long> tester =
+    ProcessFnTester<Integer, String, OffsetRange, Long, OffsetRangeTracker> 
tester =
         new ProcessFnTester<>(
             base,
             fn,
@@ -535,7 +538,7 @@ public class SplittableParDoProcessFnTest {
     Instant base = Instant.now();
     int baseIndex = 42;
 
-    ProcessFnTester<Integer, String, OffsetRange, Long> tester =
+    ProcessFnTester<Integer, String, OffsetRange, Long, OffsetRangeTracker> 
tester =
         new ProcessFnTester<>(
             base,
             fn,
@@ -567,7 +570,7 @@ public class SplittableParDoProcessFnTest {
     private State state = State.BEFORE_SETUP;
 
     @ProcessElement
-    public void process(ProcessContext c, RestrictionTracker<SomeRestriction, 
Void> tracker) {
+    public void process(ProcessContext c, SomeRestrictionTracker tracker) {
       assertEquals(State.INSIDE_BUNDLE, state);
     }
 
@@ -604,7 +607,7 @@ public class SplittableParDoProcessFnTest {
   @Test
   public void testInvokesLifecycleMethods() throws Exception {
     DoFn<Integer, String> fn = new LifecycleVerifyingFn();
-    try (ProcessFnTester<Integer, String, SomeRestriction, Void> tester =
+    try (ProcessFnTester<Integer, String, SomeRestriction, Void, 
SomeRestrictionTracker> tester =
         new ProcessFnTester<>(
             Instant.now(),
             fn,
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 737098f..bdafd95 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -32,6 +32,7 @@ import 
org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessElem
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessFn;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -45,7 +46,12 @@ import 
org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Thre
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
-class SplittableProcessElementsEvaluatorFactory<InputT, OutputT, RestrictionT, 
PositionT>
+class SplittableProcessElementsEvaluatorFactory<
+        InputT,
+        OutputT,
+        RestrictionT,
+        PositionT,
+        TrackerT extends RestrictionTracker<RestrictionT, PositionT>>
     implements TransformEvaluatorFactory {
   private final ParDoEvaluatorFactory<KeyedWorkItem<byte[], KV<InputT, 
RestrictionT>>, OutputT>
       delegateFactory;
@@ -68,8 +74,8 @@ class SplittableProcessElementsEvaluatorFactory<InputT, 
OutputT, RestrictionT, P
                 checkArgument(
                     
ProcessElements.class.isInstance(application.getTransform()),
                     "No know extraction of the fn from " + application);
-                final ProcessElements<InputT, OutputT, RestrictionT, 
PositionT> transform =
-                    (ProcessElements<InputT, OutputT, RestrictionT, PositionT>)
+                final ProcessElements<InputT, OutputT, RestrictionT, TrackerT> 
transform =
+                    (ProcessElements<InputT, OutputT, RestrictionT, TrackerT>)
                         application.getTransform();
                 return 
DoFnLifecycleManager.of(transform.newProcessFn(transform.getFn()));
               }
@@ -103,12 +109,13 @@ class SplittableProcessElementsEvaluatorFactory<InputT, 
OutputT, RestrictionT, P
   @SuppressWarnings({"unchecked", "rawtypes"})
   private TransformEvaluator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> 
createEvaluator(
       AppliedPTransform<
-              PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>, 
PCollectionTuple,
-              ProcessElements<InputT, OutputT, RestrictionT, PositionT>>
+              PCollection<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>>,
+              PCollectionTuple,
+              ProcessElements<InputT, OutputT, RestrictionT, TrackerT>>
           application,
       CommittedBundle<InputT> inputBundle)
       throws Exception {
-    final ProcessElements<InputT, OutputT, RestrictionT, PositionT> transform =
+    final ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform =
         application.getTransform();
 
     final DoFnLifecycleManagerRemovingTransformEvaluator<
@@ -124,8 +131,8 @@ class SplittableProcessElementsEvaluatorFactory<InputT, 
OutputT, RestrictionT, P
                 application.getTransform().getAdditionalOutputTags().getAll());
     final ParDoEvaluator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> pde =
         evaluator.getParDoEvaluator();
-    final ProcessFn<InputT, OutputT, RestrictionT, PositionT> processFn =
-        (ProcessFn<InputT, OutputT, RestrictionT, PositionT>)
+    final ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
+        (ProcessFn<InputT, OutputT, RestrictionT, TrackerT>)
             ProcessFnRunner.class.cast(pde.getFnRunner()).getFn();
 
     final DirectExecutionContext.DirectStepContext stepContext = 
pde.getStepContext();
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index c770cdf..1f87438 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -65,6 +65,7 @@ import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.join.UnionCoder;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -661,14 +662,14 @@ class FlinkStreamingTransformTranslators {
   }
 
   private static class SplittableProcessElementsStreamingTranslator<
-          InputT, OutputT, RestrictionT, PositionT>
+          InputT, OutputT, RestrictionT, TrackerT extends 
RestrictionTracker<RestrictionT, ?>>
       extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
           SplittableParDoViaKeyedWorkItems.ProcessElements<
-              InputT, OutputT, RestrictionT, PositionT>> {
+              InputT, OutputT, RestrictionT, TrackerT>> {
 
     @Override
     public void translateNode(
-        SplittableParDoViaKeyedWorkItems.ProcessElements<InputT, OutputT, 
RestrictionT, PositionT>
+        SplittableParDoViaKeyedWorkItems.ProcessElements<InputT, OutputT, 
RestrictionT, TrackerT>
             transform,
         FlinkStreamingTranslationContext context) {
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 413777f..7f276b6 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -57,7 +58,8 @@ import org.joda.time.Instant;
  * Flink operator for executing splittable {@link DoFn DoFns}. Specifically, 
for executing the
  * {@code @ProcessElement} method of a splittable {@link DoFn}.
  */
-public class SplittableDoFnOperator<InputT, OutputT, RestrictionT>
+public class SplittableDoFnOperator<
+        InputT, OutputT, RestrictionT, TrackerT extends 
RestrictionTracker<RestrictionT, ?>>
     extends DoFnOperator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, 
OutputT> {
 
   private transient ScheduledExecutorService executorService;
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 0bc1b2e..a087232 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -84,7 +84,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -975,7 +975,7 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
 
   private static class TestSplittableFn extends DoFn<String, Integer> {
     @ProcessElement
-    public void process(ProcessContext c, RestrictionTracker<OffsetRange, 
Long> tracker) {
+    public void process(ProcessContext c, OffsetRangeTracker tracker) {
       // noop
     }
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 73aa429..fd79cb8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -540,7 +540,7 @@ public abstract class DoFn<InputT, OutputT> implements 
Serializable, HasDisplayD
    * <p>The signature of this method must satisfy the following constraints:
    *
    * <ul>
-   *   <li>If one of its arguments is a {@link RestrictionTracker}, then it is 
a <a
+   *   <li>If one of its arguments is a subtype of {@link RestrictionTracker}, 
then it is a <a
    *       href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link 
DoFn} subject to the
    *       separate requirements described below. Items below are assuming 
this is not a splittable
    *       {@link DoFn}.
@@ -573,8 +573,8 @@ public abstract class DoFn<InputT, OutputT> implements 
Serializable, HasDisplayD
    * <h2>Splittable DoFn's</h2>
    *
    * <p>A {@link DoFn} is <i>splittable</i> if its {@link ProcessElement} 
method has a parameter
-   * whose type is of {@link RestrictionTracker}. This is an advanced feature 
and an overwhelming
-   * majority of users will never need to write a splittable {@link DoFn}.
+   * whose type is a subtype of {@link RestrictionTracker}. This is an 
advanced feature and an
+   * overwhelming majority of users will never need to write a splittable 
{@link DoFn}.
    *
    * <p>Not all runners support Splittable DoFn. See the <a
    * 
href="https://beam.apache.org/documentation/runners/capability-matrix/";>capability
 matrix</a>.
@@ -587,10 +587,12 @@ public abstract class DoFn<InputT, OutputT> implements 
Serializable, HasDisplayD
    * <ul>
    *   <li>It <i>must</i> define a {@link GetInitialRestriction} method.
    *   <li>It <i>may</i> define a {@link SplitRestriction} method.
-   *   <li>It <i>may</i> define a {@link NewTracker} method returning a 
subtype of {@code
-   *       RestrictionTracker<R>} where {@code R} is the restriction type 
returned by {@link
-   *       GetInitialRestriction}. This method is optional in case the 
restriction type returned by
-   *       {@link GetInitialRestriction} implements {@link HasDefaultTracker}.
+   *   <li>It <i>may</i> define a {@link NewTracker} method returning the same 
type as the type of
+   *       the {@link RestrictionTracker} argument of {@link ProcessElement}, 
which in turn must be
+   *       a subtype of {@code RestrictionTracker<R>} where {@code R} is the 
restriction type
+   *       returned by {@link GetInitialRestriction}. This method is optional 
in case the
+   *       restriction type returned by {@link GetInitialRestriction} 
implements {@link
+   *       HasDefaultTracker}.
    *   <li>It <i>may</i> define a {@link GetRestrictionCoder} method.
    *   <li>The type of restrictions used by all of these methods must be the 
same.
    *   <li>Its {@link ProcessElement} method <i>may</i> return a {@link 
ProcessContinuation} to
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index fd52a81..3dc24d9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
@@ -990,7 +990,7 @@ public class Watch {
     }
 
     @Override
-    public synchronized boolean tryClaim(HashCode hash) {
+    protected synchronized boolean tryClaimImpl(HashCode hash) {
       if (shouldStop) {
         return false;
       }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 45cf9f4..239f4d5 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -92,7 +92,7 @@ public interface DoFnInvoker<InputT, OutputT> {
 
   /** Invoke the {@link DoFn.NewTracker} method on the bound {@link DoFn}. */
   @SuppressWarnings("TypeParameterUnusedInFormals")
-  <RestrictionT, PositionT> RestrictionTracker<RestrictionT, PositionT> 
invokeNewTracker(
+  <RestrictionT, TrackerT extends RestrictionTracker<RestrictionT, ?>> 
TrackerT invokeNewTracker(
       RestrictionT restriction);
 
   /** Get the bound {@link DoFn}. */
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 61089df..0900e27 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -556,6 +556,9 @@ public class DoFnSignatures {
     ErrorReporter processElementErrors =
         errors.forMethod(DoFn.ProcessElement.class, 
processElement.targetMethod());
 
+    final TypeDescriptor<?> trackerT;
+    final String originOfTrackerT;
+
     List<String> missingRequiredMethods = new ArrayList<>();
     if (getInitialRestriction == null) {
       missingRequiredMethods.add("@" + 
DoFn.GetInitialRestriction.class.getSimpleName());
@@ -565,11 +568,27 @@ public class DoFnSignatures {
           && getInitialRestriction
               .restrictionT()
               .isSubtypeOf(TypeDescriptor.of(HasDefaultTracker.class))) {
-        // no-op we are using the annotation @HasDefaultTracker
+        trackerT =
+            getInitialRestriction
+                .restrictionT()
+                .resolveType(HasDefaultTracker.class.getTypeParameters()[1]);
+        originOfTrackerT =
+            String.format(
+                "restriction type %s of @%s method %s",
+                formatType(getInitialRestriction.restrictionT()),
+                DoFn.GetInitialRestriction.class.getSimpleName(),
+                format(getInitialRestriction.targetMethod()));
       } else {
         missingRequiredMethods.add("@" + 
DoFn.NewTracker.class.getSimpleName());
+        trackerT = null;
+        originOfTrackerT = null;
       }
     } else {
+      trackerT = newTracker.trackerT();
+      originOfTrackerT =
+          String.format(
+              "%s method %s",
+              DoFn.NewTracker.class.getSimpleName(), 
format(newTracker.targetMethod()));
       ErrorReporter getInitialRestrictionErrors =
           errors.forMethod(DoFn.GetInitialRestriction.class, 
getInitialRestriction.targetMethod());
       TypeDescriptor<?> restrictionT = getInitialRestriction.restrictionT();
@@ -592,9 +611,11 @@ public class DoFnSignatures {
         errors.forMethod(DoFn.GetInitialRestriction.class, 
getInitialRestriction.targetMethod());
     TypeDescriptor<?> restrictionT = getInitialRestriction.restrictionT();
     processElementErrors.checkArgument(
-        
processElement.trackerT().getRawType().equals(RestrictionTracker.class),
-        "Has tracker type %s, but the DoFn's tracker type must be of type 
RestrictionTracker.",
-        formatType(processElement.trackerT()));
+        processElement.trackerT().equals(trackerT),
+        "Has tracker type %s, but the DoFn's tracker type was inferred as %s 
from %s",
+        formatType(processElement.trackerT()),
+        trackerT,
+        originOfTrackerT);
 
     if (getRestrictionCoder != null) {
       getInitialRestrictionErrors.checkArgument(
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
index 44f2f0b..6f72d84 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
@@ -56,12 +56,12 @@ public class ByteKeyRangeTracker extends 
RestrictionTracker<ByteKeyRange, ByteKe
   }
 
   @Override
-  public ByteKeyRange currentRestriction() {
+  public synchronized ByteKeyRange currentRestriction() {
     return range;
   }
 
   @Override
-  public ByteKeyRange checkpoint() {
+  public synchronized ByteKeyRange checkpoint() {
     // If we haven't done any work, we should return the original range we 
were processing
     // as the checkpoint.
     if (lastAttemptedKey == null) {
@@ -99,7 +99,7 @@ public class ByteKeyRangeTracker extends 
RestrictionTracker<ByteKeyRange, ByteKe
    *     current {@link ByteKeyRange} of this tracker.
    */
   @Override
-  public boolean tryClaim(ByteKey key) {
+  protected synchronized boolean tryClaimImpl(ByteKey key) {
     // Handle claiming the end of range EMPTY key
     if (key.isEmpty()) {
       checkArgument(
@@ -132,7 +132,7 @@ public class ByteKeyRangeTracker extends 
RestrictionTracker<ByteKeyRange, ByteKe
   }
 
   @Override
-  public void checkDone() throws IllegalStateException {
+  public synchronized void checkDone() throws IllegalStateException {
     // Handle checking the empty range which is implicitly done.
     // This case can occur if the range tracker is checkpointed before any 
keys have been claimed
     // or if the range tracker is checkpointed once the range is done.
@@ -162,7 +162,7 @@ public class ByteKeyRangeTracker extends 
RestrictionTracker<ByteKeyRange, ByteKe
   }
 
   @Override
-  public String toString() {
+  public synchronized String toString() {
     return MoreObjects.toStringHelper(this)
         .add("range", range)
         .add("lastClaimedKey", lastClaimedKey)
@@ -184,7 +184,7 @@ public class ByteKeyRangeTracker extends 
RestrictionTracker<ByteKeyRange, ByteKe
   private static final byte[] ZERO_BYTE_ARRAY = new byte[] {0};
 
   @Override
-  public Backlog getBacklog() {
+  public synchronized Backlog getBacklog() {
     // Return 0 for the empty range which is implicitly done.
     // This case can occur if the range tracker is checkpointed before any 
keys have been claimed
     // or if the range tracker is checkpointed once the range is done.
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
index 9d90c69..549aa9b 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
@@ -41,12 +41,12 @@ public class OffsetRangeTracker extends 
RestrictionTracker<OffsetRange, Long>
   }
 
   @Override
-  public OffsetRange currentRestriction() {
+  public synchronized OffsetRange currentRestriction() {
     return range;
   }
 
   @Override
-  public OffsetRange checkpoint() {
+  public synchronized OffsetRange checkpoint() {
     checkState(
         lastClaimedOffset != null, "Can't checkpoint before any offset was 
successfully claimed");
     OffsetRange res = new OffsetRange(lastClaimedOffset + 1, range.getTo());
@@ -63,7 +63,7 @@ public class OffsetRangeTracker extends 
RestrictionTracker<OffsetRange, Long>
    *     current {@link OffsetRange} of this tracker (in that case this 
operation is a no-op).
    */
   @Override
-  public boolean tryClaim(Long i) {
+  protected synchronized boolean tryClaimImpl(Long i) {
     checkArgument(
         lastAttemptedOffset == null || i > lastAttemptedOffset,
         "Trying to claim offset %s while last attempted was %s",
@@ -81,7 +81,7 @@ public class OffsetRangeTracker extends 
RestrictionTracker<OffsetRange, Long>
   }
 
   @Override
-  public void checkDone() throws IllegalStateException {
+  public synchronized void checkDone() throws IllegalStateException {
     checkState(
         lastAttemptedOffset >= range.getTo() - 1,
         "Last attempted offset was %s in range %s, claiming work in [%s, %s) 
was not attempted",
@@ -101,7 +101,7 @@ public class OffsetRangeTracker extends 
RestrictionTracker<OffsetRange, Long>
   }
 
   @Override
-  public Backlog getBacklog() {
+  public synchronized Backlog getBacklog() {
     // If we have never attempted an offset, we return the length of the 
entire range.
     if (lastAttemptedOffset == null) {
       return Backlog.of(BigDecimal.valueOf(range.getTo() - range.getFrom()));
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
index 55e150f..7bf807c 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
@@ -17,10 +17,15 @@
  */
 package org.apache.beam.sdk.transforms.splittabledofn;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.transforms.DoFn;
 
 /**
- * Manages access to the restriction and keeps track of its claimed part for a 
<a
+ * Manages concurrent access to the restriction and keeps track of its claimed 
part for a <a
  * href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link DoFn}.
  *
  * <p>Restriction trackers which can provide an estimate for the known amount 
of outstanding work
@@ -31,6 +36,29 @@ import org.apache.beam.sdk.transforms.DoFn;
  * &amp; Splitting</a> for further details.
  */
 public abstract class RestrictionTracker<RestrictionT, PositionT> {
+  /** Internal interface allowing a runner to observe the calls to {@link 
#tryClaim}. */
+  @Internal
+  public interface ClaimObserver<PositionT> {
+    /** Called when {@link #tryClaim} returns true. */
+    void onClaimed(PositionT position);
+
+    /** Called when {@link #tryClaim} returns false. */
+    void onClaimFailed(PositionT position);
+  }
+
+  @Nullable private ClaimObserver<PositionT> claimObserver;
+
+  /**
+   * Sets a {@link ClaimObserver} to be invoked on every call to {@link 
#tryClaim}. Internal:
+   * intended only for runner authors.
+   */
+  @Internal
+  public void setClaimObserver(ClaimObserver<PositionT> claimObserver) {
+    checkNotNull(claimObserver, "claimObserver");
+    checkState(this.claimObserver == null, "A claim observer has already been 
set");
+    this.claimObserver = claimObserver;
+  }
+
   /**
    * Attempts to claim the block of work in the current restriction identified 
by the given
    * position.
@@ -44,8 +72,27 @@ public abstract class RestrictionTracker<RestrictionT, 
PositionT> {
    *       call to this method).
    *   <li>{@link RestrictionTracker#checkDone} MUST succeed.
    * </ul>
+   *
+   * <p>Under the hood, calls {@link #tryClaimImpl} and notifies {@link 
ClaimObserver} of the
+   * result.
    */
-  public abstract boolean tryClaim(PositionT position);
+  public final boolean tryClaim(PositionT position) {
+    if (tryClaimImpl(position)) {
+      if (claimObserver != null) {
+        claimObserver.onClaimed(position);
+      }
+      return true;
+    } else {
+      if (claimObserver != null) {
+        claimObserver.onClaimFailed(position);
+      }
+      return false;
+    }
+  }
+
+  /** Tracker-specific implementation of {@link #tryClaim}. */
+  @Internal
+  protected abstract boolean tryClaimImpl(PositionT position);
 
   /**
    * Returns a restriction accurately describing the full range of work the 
current {@link
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 3e59344..8d70686 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -99,7 +99,6 @@ import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -513,7 +512,6 @@ public class AvroIOTest implements Serializable {
     }
 
     @Test
-    @Ignore("https://issues.apache.org/jira/browse/BEAM-6352";)
     @Category(NeedsRunner.class)
     public void testContinuouslyWriteAndReadMultipleFilepatterns() throws 
Throwable {
       SimpleFunction<Long, GenericClass> mapFn = new CreateGenericClass();
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
index a282acf..223a9e2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
@@ -54,7 +54,6 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Charsets;
 import org.joda.time.Duration;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -192,7 +191,6 @@ public class FileIOTest implements Serializable {
   }
 
   @Test
-  @Ignore("https://issues.apache.org/jira/browse/BEAM-6352";)
   @Category(NeedsRunner.class)
   public void testMatchWatchForNewFiles() throws IOException, 
InterruptedException {
     final Path basePath = tmpFolder.getRoot().toPath().resolve("watch");
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index 31fc273..0e6e992 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -86,7 +86,6 @@ import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
 import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import 
org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
 import org.joda.time.Duration;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -825,7 +824,6 @@ public class TextIOReadTest {
     }
 
     @Test
-    @Ignore("https://issues.apache.org/jira/browse/BEAM-6352";)
     @Category(NeedsRunner.class)
     public void testReadWatchForNewFiles() throws IOException, 
InterruptedException {
       final Path basePath = tempFolder.getRoot().toPath().resolve("readWatch");
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index c617b18..6a102b7 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -49,7 +49,7 @@ import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
 import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
 import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Never;
@@ -82,8 +82,7 @@ public class SplittableDoFnTest implements Serializable {
 
   static class PairStringWithIndexToLengthBase extends DoFn<String, KV<String, 
Integer>> {
     @ProcessElement
-    public ProcessContinuation process(
-        ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
+    public ProcessContinuation process(ProcessContext c, OffsetRangeTracker 
tracker) {
       for (long i = tracker.currentRestriction().getFrom(), numIterations = 0;
           tracker.tryClaim(i);
           ++i, ++numIterations) {
@@ -245,8 +244,7 @@ public class SplittableDoFnTest implements Serializable {
     }
 
     @ProcessElement
-    public ProcessContinuation processElement(
-        ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
+    public ProcessContinuation processElement(ProcessContext c, 
OffsetRangeTracker tracker) {
       int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
       int trueStart = snapToNextBlock((int) 
tracker.currentRestriction().getFrom(), blockStarts);
       for (int i = trueStart, numIterations = 1;
@@ -325,7 +323,7 @@ public class SplittableDoFnTest implements Serializable {
     }
 
     @ProcessElement
-    public void process(ProcessContext c, RestrictionTracker<OffsetRange, 
Long> tracker) {
+    public void process(ProcessContext c, OffsetRangeTracker tracker) {
       checkState(tracker.tryClaim(tracker.currentRestriction().getFrom()));
       String side = c.sideInput(sideInput);
       c.output(side + ":" + c.element());
@@ -457,8 +455,7 @@ public class SplittableDoFnTest implements Serializable {
     }
 
     @ProcessElement
-    public ProcessContinuation processElement(
-        ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
+    public ProcessContinuation processElement(ProcessContext c, 
OffsetRangeTracker tracker) {
       int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
       int trueStart = snapToNextBlock((int) 
tracker.currentRestriction().getFrom(), blockStarts);
       for (int i = trueStart, numIterations = 1;
@@ -580,7 +577,7 @@ public class SplittableDoFnTest implements Serializable {
     }
 
     @ProcessElement
-    public void process(ProcessContext c, RestrictionTracker<OffsetRange, 
Long> tracker) {
+    public void process(ProcessContext c, OffsetRangeTracker tracker) {
       checkState(tracker.tryClaim(tracker.currentRestriction().getFrom()));
       c.output("main:" + c.element());
       c.output(additionalOutput, "additional:" + c.element());
@@ -721,7 +718,7 @@ public class SplittableDoFnTest implements Serializable {
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c, 
RestrictionTracker<OffsetRange, Long> tracker) {
+    public void processElement(ProcessContext c, OffsetRangeTracker tracker) {
       assertEquals(State.INSIDE_BUNDLE, state);
       assertTrue(tracker.tryClaim(0L));
       c.output(c.element());
@@ -783,8 +780,7 @@ public class SplittableDoFnTest implements Serializable {
               ParDo.of(
                   new DoFn<String, String>() {
                     @ProcessElement
-                    public void process(
-                        @Element String element, 
RestrictionTracker<OffsetRange, Long> tracker) {
+                    public void process(@Element String element, 
OffsetRangeTracker tracker) {
                       // Doesn't matter
                     }
 
@@ -802,7 +798,7 @@ public class SplittableDoFnTest implements Serializable {
                   new DoFn<String, String>() {
                     @ProcessElement
                     public ProcessContinuation process(
-                        @Element String element, 
RestrictionTracker<OffsetRange, Long> tracker) {
+                        @Element String element, OffsetRangeTracker tracker) {
                       return stop();
                     }
 
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
index 8f1615d..b762161 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
@@ -63,7 +63,6 @@ import 
org.apache.beam.vendor.guava.v20_0.com.google.common.hash.HashCode;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.ReadableDuration;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -76,7 +75,6 @@ public class WatchTest implements Serializable {
   @Rule public transient TestPipeline p = TestPipeline.create();
 
   @Test
-  @Ignore("https://issues.apache.org/jira/browse/BEAM-6352";)
   @Category(NeedsRunner.class)
   public void testSinglePollMultipleInputs() {
     PCollection<KV<String, String>> res =
@@ -103,7 +101,6 @@ public class WatchTest implements Serializable {
   }
 
   @Test
-  @Ignore("https://issues.apache.org/jira/browse/BEAM-6352";)
   @Category(NeedsRunner.class)
   public void testSinglePollMultipleInputsWithSideInput() {
     final PCollectionView<String> moo =
@@ -134,14 +131,12 @@ public class WatchTest implements Serializable {
   }
 
   @Test
-  @Ignore("https://issues.apache.org/jira/browse/BEAM-6352";)
   @Category(NeedsRunner.class)
   public void testMultiplePollsWithTerminationBecauseOutputIsFinal() {
     testMultiplePolls(false);
   }
 
   @Test
-  @Ignore("https://issues.apache.org/jira/browse/BEAM-6352";)
   @Category(NeedsRunner.class)
   public void testMultiplePollsWithTerminationDueToTerminationCondition() {
     testMultiplePolls(true);
@@ -179,7 +174,6 @@ public class WatchTest implements Serializable {
   }
 
   @Test
-  @Ignore("https://issues.apache.org/jira/browse/BEAM-6352";)
   @Category(NeedsRunner.class)
   public void testMultiplePollsWithKeyExtractor() {
     List<KV<Integer, String>> polls =
@@ -229,7 +223,6 @@ public class WatchTest implements Serializable {
   }
 
   @Test
-  @Ignore("https://issues.apache.org/jira/browse/BEAM-6352";)
   @Category(NeedsRunner.class)
   public void testMultiplePollsStopAfterTimeSinceNewOutput() {
     List<Integer> all = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
@@ -256,7 +249,6 @@ public class WatchTest implements Serializable {
   }
 
   @Test
-  @Ignore("https://issues.apache.org/jira/browse/BEAM-6352";)
   @Category(NeedsRunner.class)
   public void testSinglePollWithManyResults() {
     // More than the default 100 elements per checkpoint for direct runner.
@@ -303,7 +295,6 @@ public class WatchTest implements Serializable {
   }
 
   @Test
-  @Ignore("https://issues.apache.org/jira/browse/BEAM-6352";)
   @Category(NeedsRunner.class)
   public void testMultiplePollsWithManyResults() {
     final long numResults = 3000;
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 5e6a040..292ecf5 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -323,8 +323,8 @@ public class DoFnInvokersTest {
 
     class MockFn extends DoFn<String, String> {
       @DoFn.ProcessElement
-      public ProcessContinuation processElement(
-          ProcessContext c, RestrictionTracker<SomeRestriction, Void> tracker) 
throws Exception {
+      public ProcessContinuation processElement(ProcessContext c, 
SomeRestrictionTracker tracker)
+          throws Exception {
         return null;
       }
 
@@ -400,8 +400,7 @@ public class DoFnInvokersTest {
   /** Public so Mockito can do "delegatesTo()" in the test below. */
   public static class MockFn extends DoFn<String, String> {
     @ProcessElement
-    public ProcessContinuation processElement(
-        ProcessContext c, RestrictionTracker<SomeRestriction, Void> tracker) {
+    public ProcessContinuation processElement(ProcessContext c, 
SomeRestrictionTracker tracker) {
       return null;
     }
 
@@ -511,7 +510,7 @@ public class DoFnInvokersTest {
   private static class DefaultTracker
       extends RestrictionTracker<RestrictionWithDefaultTracker, Void> {
     @Override
-    public boolean tryClaim(Void position) {
+    protected boolean tryClaimImpl(Void position) {
       throw new UnsupportedOperationException();
     }
 
@@ -547,8 +546,7 @@ public class DoFnInvokersTest {
   public void testSplittableDoFnDefaultMethods() throws Exception {
     class MockFn extends DoFn<String, String> {
       @ProcessElement
-      public void processElement(
-          ProcessContext c, RestrictionTracker<RestrictionWithDefaultTracker, 
Void> tracker) {}
+      public void processElement(ProcessContext c, DefaultTracker tracker) {}
 
       @GetInitialRestriction
       public RestrictionWithDefaultTracker getInitialRestriction(String 
element) {
@@ -758,8 +756,7 @@ public class DoFnInvokersTest {
             new DoFn<Integer, Integer>() {
               @ProcessElement
               public ProcessContinuation processElement(
-                  @SuppressWarnings("unused") ProcessContext c,
-                  RestrictionTracker<SomeRestriction, Void> tracker) {
+                  @SuppressWarnings("unused") ProcessContext c, 
SomeRestrictionTracker tracker) {
                 throw new IllegalArgumentException("bogus");
               }
 
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index b1d00e6..af4281d 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -114,8 +114,7 @@ public class DoFnSignaturesSplittableDoFnTest {
   public void testInfersBoundednessFromAnnotation() throws Exception {
     class BaseSplittableFn extends DoFn<Integer, String> {
       @ProcessElement
-      public void processElement(
-          ProcessContext context, RestrictionTracker<SomeRestriction, Void> 
tracker) {}
+      public void processElement(ProcessContext context, 
SomeRestrictionTracker tracker) {}
 
       @GetInitialRestriction
       public SomeRestriction getInitialRestriction(Integer element) {
@@ -142,8 +141,7 @@ public class DoFnSignaturesSplittableDoFnTest {
 
   private static class BaseFnWithoutContinuation extends DoFn<Integer, String> 
{
     @ProcessElement
-    public void processElement(
-        ProcessContext context, RestrictionTracker<SomeRestriction, Void> 
tracker) {}
+    public void processElement(ProcessContext context, SomeRestrictionTracker 
tracker) {}
 
     @GetInitialRestriction
     public SomeRestriction getInitialRestriction(Integer element) {
@@ -154,7 +152,7 @@ public class DoFnSignaturesSplittableDoFnTest {
   private static class BaseFnWithContinuation extends DoFn<Integer, String> {
     @ProcessElement
     public ProcessContinuation processElement(
-        ProcessContext context, RestrictionTracker<SomeRestriction, Void> 
tracker) {
+        ProcessContext context, SomeRestrictionTracker tracker) {
       return null;
     }
 
@@ -233,7 +231,7 @@ public class DoFnSignaturesSplittableDoFnTest {
     class GoodSplittableDoFn extends DoFn<Integer, String> {
       @ProcessElement
       public ProcessContinuation processElement(
-          ProcessContext context, RestrictionTracker<SomeRestriction, Void> 
tracker) {
+          ProcessContext context, SomeRestrictionTracker tracker) {
         return null;
       }
 
@@ -261,7 +259,7 @@ public class DoFnSignaturesSplittableDoFnTest {
     }
 
     DoFnSignature signature = 
DoFnSignatures.getSignature(GoodSplittableDoFn.class);
-    assertEquals(RestrictionTracker.class, 
signature.processElement().trackerT().getRawType());
+    assertEquals(SomeRestrictionTracker.class, 
signature.processElement().trackerT().getRawType());
     assertTrue(signature.processElement().isSplittable());
     assertTrue(signature.processElement().hasReturnValue());
     assertEquals(
@@ -310,15 +308,14 @@ public class DoFnSignaturesSplittableDoFnTest {
     DoFnSignature signature =
         DoFnSignatures.getSignature(
             new GoodGenericSplittableDoFn<
-                SomeRestriction, RestrictionTracker<SomeRestriction, ?>,
-                SomeRestrictionCoder>() {}.getClass());
-    assertEquals(RestrictionTracker.class, 
signature.processElement().trackerT().getRawType());
+                SomeRestriction, SomeRestrictionTracker, 
SomeRestrictionCoder>() {}.getClass());
+    assertEquals(SomeRestrictionTracker.class, 
signature.processElement().trackerT().getRawType());
     assertTrue(signature.processElement().isSplittable());
     assertTrue(signature.processElement().hasReturnValue());
     assertEquals(
         SomeRestriction.class, 
signature.getInitialRestriction().restrictionT().getRawType());
     assertEquals(SomeRestriction.class, 
signature.splitRestriction().restrictionT().getRawType());
-    assertEquals(RestrictionTracker.class, 
signature.newTracker().trackerT().getRawType());
+    assertEquals(SomeRestrictionTracker.class, 
signature.newTracker().trackerT().getRawType());
     assertEquals(SomeRestriction.class, 
signature.newTracker().restrictionT().getRawType());
     assertEquals(SomeRestrictionCoder.class, 
signature.getRestrictionCoder().coderT().getRawType());
   }
@@ -327,8 +324,7 @@ public class DoFnSignaturesSplittableDoFnTest {
   public void testSplittableMissingRequiredMethods() throws Exception {
     class BadFn extends DoFn<Integer, String> {
       @ProcessElement
-      public void process(
-          ProcessContext context, RestrictionTracker<SomeRestriction, Void> 
tracker) {}
+      public void process(ProcessContext context, SomeRestrictionTracker 
tracker) {}
     }
 
     thrown.expectMessage(
@@ -347,8 +343,7 @@ public class DoFnSignaturesSplittableDoFnTest {
   public void testHasDefaultTracker() throws Exception {
     class Fn extends DoFn<Integer, String> {
       @ProcessElement
-      public void process(
-          ProcessContext c, RestrictionTracker<RestrictionWithDefaultTracker, 
Void> tracker) {}
+      public void process(ProcessContext c, SomeDefaultTracker tracker) {}
 
       @GetInitialRestriction
       public RestrictionWithDefaultTracker getInitialRestriction(Integer 
element) {
@@ -357,7 +352,7 @@ public class DoFnSignaturesSplittableDoFnTest {
     }
 
     DoFnSignature signature = DoFnSignatures.getSignature(Fn.class);
-    assertEquals(RestrictionTracker.class, 
signature.processElement().trackerT().getRawType());
+    assertEquals(SomeDefaultTracker.class, 
signature.processElement().trackerT().getRawType());
   }
 
   @Test
@@ -373,8 +368,11 @@ public class DoFnSignaturesSplittableDoFnTest {
     }
 
     thrown.expectMessage(
-        "Has tracker type SomeRestrictionTracker, "
-            + "but the DoFn's tracker type must be of type 
RestrictionTracker.");
+        "Has tracker type SomeRestrictionTracker, but the DoFn's tracker type 
was inferred as ");
+    thrown.expectMessage("SomeDefaultTracker");
+    thrown.expectMessage(
+        "from restriction type RestrictionWithDefaultTracker "
+            + "of @GetInitialRestriction method 
getInitialRestriction(Integer)");
     DoFnSignatures.getSignature(Fn.class);
   }
 
@@ -382,8 +380,7 @@ public class DoFnSignaturesSplittableDoFnTest {
   public void testNewTrackerReturnsWrongType() throws Exception {
     class BadFn extends DoFn<Integer, String> {
       @ProcessElement
-      public void process(
-          ProcessContext context, RestrictionTracker<SomeRestriction, Void> 
tracker) {}
+      public void process(ProcessContext context, SomeRestrictionTracker 
tracker) {}
 
       @NewTracker
       public void newTracker(SomeRestriction restriction) {}
@@ -403,8 +400,7 @@ public class DoFnSignaturesSplittableDoFnTest {
   public void testGetInitialRestrictionMismatchesNewTracker() throws Exception 
{
     class BadFn extends DoFn<Integer, String> {
       @ProcessElement
-      public void process(
-          ProcessContext context, RestrictionTracker<SomeRestriction, Void> 
tracker) {}
+      public void process(ProcessContext context, SomeRestrictionTracker 
tracker) {}
 
       @NewTracker
       public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
@@ -427,8 +423,7 @@ public class DoFnSignaturesSplittableDoFnTest {
   public void testGetRestrictionCoderReturnsWrongType() throws Exception {
     class BadFn extends DoFn<Integer, String> {
       @ProcessElement
-      public void process(
-          ProcessContext context, RestrictionTracker<SomeRestriction, Void> 
tracker) {}
+      public void process(ProcessContext context, SomeRestrictionTracker 
tracker) {}
 
       @NewTracker
       public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
@@ -536,8 +531,7 @@ public class DoFnSignaturesSplittableDoFnTest {
 
     class BadFn extends DoFn<Integer, String> {
       @ProcessElement
-      public void process(
-          ProcessContext context, RestrictionTracker<SomeRestriction, Void> 
tracker) {}
+      public void process(ProcessContext context, SomeRestrictionTracker 
tracker) {}
 
       @NewTracker
       public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java
deleted file mode 100644
index addeb68..0000000
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackers.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.fn.splittabledofn;
-
-import javax.annotation.concurrent.ThreadSafe;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlogs;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-
-/** Support utilities for interacting with {@link RestrictionTracker 
RestrictionTrackers}. */
-public class RestrictionTrackers {
-
-  /** Interface allowing a runner to observe the calls to {@link 
RestrictionTracker#tryClaim}. */
-  public interface ClaimObserver<PositionT> {
-    /** Called when {@link RestrictionTracker#tryClaim} returns true. */
-    void onClaimed(PositionT position);
-
-    /** Called when {@link RestrictionTracker#tryClaim} returns false. */
-    void onClaimFailed(PositionT position);
-  }
-
-  /**
-   * A {@link RestrictionTracker} which forwards all calls to the delegate 
{@link
-   * RestrictionTracker}.
-   */
-  @ThreadSafe
-  private static class RestrictionTrackerObserver<RestrictionT, PositionT>
-      extends RestrictionTracker<RestrictionT, PositionT> {
-    protected final RestrictionTracker<RestrictionT, PositionT> delegate;
-    private final ClaimObserver<PositionT> claimObserver;
-
-    protected RestrictionTrackerObserver(
-        RestrictionTracker<RestrictionT, PositionT> delegate,
-        ClaimObserver<PositionT> claimObserver) {
-      this.delegate = delegate;
-      this.claimObserver = claimObserver;
-    }
-
-    @Override
-    public synchronized boolean tryClaim(PositionT position) {
-      if (delegate.tryClaim(position)) {
-        claimObserver.onClaimed(position);
-        return true;
-      } else {
-        claimObserver.onClaimFailed(position);
-        return false;
-      }
-    }
-
-    @Override
-    public synchronized RestrictionT currentRestriction() {
-      return delegate.currentRestriction();
-    }
-
-    @Override
-    public synchronized RestrictionT checkpoint() {
-      return delegate.checkpoint();
-    }
-
-    @Override
-    public synchronized void checkDone() throws IllegalStateException {
-      delegate.checkDone();
-    }
-  }
-
-  /**
-   * A {@link RestrictionTracker} which forwards all calls to the delegate 
backlog reporting {@link
-   * RestrictionTracker}.
-   */
-  @ThreadSafe
-  private static class RestrictionTrackerObserverWithBacklog<RestrictionT, 
PositionT>
-      extends RestrictionTrackerObserver<RestrictionT, PositionT> implements 
Backlogs.HasBacklog {
-
-    protected RestrictionTrackerObserverWithBacklog(
-        RestrictionTracker<RestrictionT, PositionT> delegate,
-        ClaimObserver<PositionT> claimObserver) {
-      super(delegate, claimObserver);
-    }
-
-    @Override
-    public synchronized Backlog getBacklog() {
-      return ((Backlogs.HasBacklog) delegate).getBacklog();
-    }
-  }
-
-  /**
-   * A {@link RestrictionTracker} which forwards all calls to the delegate 
partitioned backlog
-   * reporting {@link RestrictionTracker}.
-   */
-  @ThreadSafe
-  private static class 
RestrictionTrackerObserverWithPartitionedBacklog<RestrictionT, PositionT>
-      extends RestrictionTrackerObserverWithBacklog<RestrictionT, PositionT>
-      implements Backlogs.HasPartitionedBacklog {
-
-    protected RestrictionTrackerObserverWithPartitionedBacklog(
-        RestrictionTracker<RestrictionT, PositionT> delegate,
-        ClaimObserver<PositionT> claimObserver) {
-      super(delegate, claimObserver);
-    }
-
-    @Override
-    public synchronized byte[] getBacklogPartition() {
-      return ((Backlogs.HasPartitionedBacklog) delegate).getBacklogPartition();
-    }
-  }
-
-  /**
-   * Returns a thread safe {@link RestrictionTracker} which reports all claim 
attempts to the
-   * specified {@link ClaimObserver}.
-   */
-  public static <RestrictionT, PositionT> RestrictionTracker<RestrictionT, 
PositionT> observe(
-      RestrictionTracker<RestrictionT, PositionT> restrictionTracker,
-      ClaimObserver<PositionT> claimObserver) {
-    if (restrictionTracker instanceof Backlogs.HasPartitionedBacklog) {
-      return new RestrictionTrackerObserverWithPartitionedBacklog<>(
-          restrictionTracker, claimObserver);
-    } else if (restrictionTracker instanceof Backlogs.HasBacklog) {
-      return new RestrictionTrackerObserverWithBacklog<>(restrictionTracker, 
claimObserver);
-    } else {
-      return new RestrictionTrackerObserver<>(restrictionTracker, 
claimObserver);
-    }
-  }
-}
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/package-info.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/package-info.java
deleted file mode 100644
index 0f2cbd9..0000000
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/splittabledofn/package-info.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Defines utilities related to executing <a
- * href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link
- * org.apache.beam.sdk.transforms.DoFn}.
- */
-@DefaultAnnotation(NonNull.class)
-package org.apache.beam.sdk.fn.splittabledofn;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
diff --git 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java
 
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java
deleted file mode 100644
index c3bb289..0000000
--- 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/splittabledofn/RestrictionTrackersTest.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.fn.splittabledofn;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers.ClaimObserver;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
-import org.apache.beam.sdk.transforms.splittabledofn.Backlogs;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link RestrictionTrackers}. */
-@RunWith(JUnit4.class)
-public class RestrictionTrackersTest {
-  @Test
-  public void testObservingClaims() {
-    RestrictionTracker<String, String> observedTracker =
-        new RestrictionTracker() {
-
-          @Override
-          public boolean tryClaim(Object position) {
-            return "goodClaim".equals(position);
-          }
-
-          @Override
-          public Object currentRestriction() {
-            throw new UnsupportedOperationException();
-          }
-
-          @Override
-          public Object checkpoint() {
-            throw new UnsupportedOperationException();
-          }
-
-          @Override
-          public void checkDone() throws IllegalStateException {
-            throw new UnsupportedOperationException();
-          }
-        };
-
-    List<String> positionsObserved = new ArrayList<>();
-    ClaimObserver<String> observer =
-        new ClaimObserver<String>() {
-
-          @Override
-          public void onClaimed(String position) {
-            positionsObserved.add(position);
-            assertEquals("goodClaim", position);
-          }
-
-          @Override
-          public void onClaimFailed(String position) {
-            positionsObserved.add(position);
-          }
-        };
-
-    RestrictionTracker<String, String> observingTracker =
-        RestrictionTrackers.observe(observedTracker, observer);
-    observingTracker.tryClaim("goodClaim");
-    observingTracker.tryClaim("badClaim");
-
-    assertThat(positionsObserved, contains("goodClaim", "badClaim"));
-  }
-
-  private static class RestrictionTrackerWithBacklog extends 
RestrictionTracker<Object, Object>
-      implements Backlogs.HasBacklog {
-
-    @Override
-    public Backlog getBacklog() {
-      return null;
-    }
-
-    @Override
-    public boolean tryClaim(Object position) {
-      return false;
-    }
-
-    @Override
-    public Object currentRestriction() {
-      return null;
-    }
-
-    @Override
-    public Object checkpoint() {
-      return null;
-    }
-
-    @Override
-    public void checkDone() throws IllegalStateException {}
-  }
-
-  private static class RestrictionTrackerWithBacklogPartitionedBacklog
-      extends RestrictionTracker<Object, Object> implements 
Backlogs.HasPartitionedBacklog {
-
-    @Override
-    public Backlog getBacklog() {
-      return null;
-    }
-
-    @Override
-    public boolean tryClaim(Object position) {
-      return false;
-    }
-
-    @Override
-    public Object currentRestriction() {
-      return null;
-    }
-
-    @Override
-    public Object checkpoint() {
-      return null;
-    }
-
-    @Override
-    public void checkDone() throws IllegalStateException {}
-
-    @Override
-    public byte[] getBacklogPartition() {
-      return null;
-    }
-  }
-
-  @Test
-  public void testClaimObserversMaintainBacklogInterfaces() {
-    RestrictionTracker hasBacklog =
-        RestrictionTrackers.observe(new RestrictionTrackerWithBacklog(), null);
-    assertThat(hasBacklog, instanceOf(Backlogs.HasBacklog.class));
-    RestrictionTracker hasPartitionedBacklog =
-        RestrictionTrackers.observe(new 
RestrictionTrackerWithBacklogPartitionedBacklog(), null);
-    assertThat(hasPartitionedBacklog, 
instanceOf(Backlogs.HasPartitionedBacklog.class));
-  }
-}
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
index ef5c40e..ab47ca7 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
@@ -154,7 +154,8 @@ public class SplittableProcessElementsRunner<InputT, 
RestrictionT, OutputT>
     processElementTyped(elem);
   }
 
-  private <PositionT> void processElementTyped(WindowedValue<KV<InputT, 
RestrictionT>> elem) {
+  private <PositionT, TrackerT extends RestrictionTracker<RestrictionT, 
PositionT>>
+      void processElementTyped(WindowedValue<KV<InputT, RestrictionT>> elem) {
     checkArgument(
         elem.getWindows().size() == 1,
         "SPLITTABLE_PROCESS_ELEMENTS expects its input to be in 1 window, but 
got %s windows",
@@ -172,9 +173,9 @@ public class SplittableProcessElementsRunner<InputT, 
RestrictionT, OutputT>
             (Coder<BoundedWindow>) context.windowCoder,
             () -> elem,
             () -> window);
-    RestrictionTracker<RestrictionT, PositionT> tracker =
-        doFnInvoker.invokeNewTracker(elem.getValue().getValue());
-    OutputAndTimeBoundedSplittableProcessElementInvoker<InputT, OutputT, 
RestrictionT, PositionT>
+    TrackerT tracker = 
doFnInvoker.invokeNewTracker(elem.getValue().getValue());
+    OutputAndTimeBoundedSplittableProcessElementInvoker<
+            InputT, OutputT, RestrictionT, PositionT, TrackerT>
         processElementInvoker =
             new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
                 context.doFn,
@@ -210,7 +211,7 @@ public class SplittableProcessElementsRunner<InputT, 
RestrictionT, OutputT>
                 executor,
                 10000,
                 Duration.standardSeconds(10));
-    SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, 
PositionT>.Result result =
+    SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, 
TrackerT>.Result result =
         processElementInvoker.invokeProcessElement(doFnInvoker, element, 
tracker);
     this.stateAccessor = null;
 
diff --git 
a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
 
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
index 801a190..aace05e 100644
--- 
a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
+++ 
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java
@@ -26,7 +26,6 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
 import org.apache.beam.sdk.transforms.splittabledofn.Backlog;
 import org.apache.beam.sdk.transforms.splittabledofn.ByteKeyRangeTracker;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
@@ -59,8 +58,7 @@ class HBaseReadSplittableDoFn extends DoFn<HBaseQuery, 
Result> {
   }
 
   @ProcessElement
-  public void processElement(ProcessContext c, 
RestrictionTracker<ByteKeyRange, ByteKey> tracker)
-      throws Exception {
+  public void processElement(ProcessContext c, ByteKeyRangeTracker tracker) 
throws Exception {
     final HBaseQuery query = c.element();
     TableName tableName = TableName.valueOf(query.getTableId());
     Table table = connection.getTable(tableName);

Reply via email to