This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new feea2ba  [BEAM-2939] Implement interfaces and concrete watermark 
estimators
     new feefaca  Merge pull request #10992 from lukecwik/splittabledofn2
feea2ba is described below

commit feea2ba610aa35344d30bae23a76d18e7b6afe93
Author: Luke Cwik <[email protected]>
AuthorDate: Thu Feb 27 13:00:30 2020 -0800

    [BEAM-2939] Implement interfaces and concrete watermark estimators
    
    Plumb watermark estimator, watermark estimator state, and watermark 
estimator state coder methods through ByteBuddy DoFnInvoker adding method 
definitions to DoFnSignature and appropriate validation.
    
    This does not update any of the existing Splittable Dofns to use these 
watermark estimators or any existing Splittable DoFn "runners" (beyond adding 
throws unsupported operations exception). This is marked as work under 
BEAM-9430.
---
 .../construction/SplittableParDoNaiveBounded.java  |  12 +
 ...TimeBoundedSplittableProcessElementInvoker.java |   7 +-
 .../apache/beam/runners/core/SimpleDoFnRunner.java |  23 ++
 .../java/org/apache/beam/sdk/transforms/DoFn.java  | 143 +++++++-
 .../org/apache/beam/sdk/transforms/DoFnTester.java |  54 +--
 .../reflect/ByteBuddyDoFnInvokerFactory.java       | 114 +++++-
 .../beam/sdk/transforms/reflect/DoFnInvoker.java   |  50 +++
 .../beam/sdk/transforms/reflect/DoFnSignature.java | 162 ++++++++-
 .../sdk/transforms/reflect/DoFnSignatures.java     | 394 ++++++++++++++++++---
 .../HasDefaultWatermarkEstimator.java              |  34 ++
 .../splittabledofn/ManualWatermarkEstimator.java   |  46 +++
 .../TimestampObservingWatermarkEstimator.java      |  38 ++
 .../splittabledofn/WatermarkEstimator.java         |  47 +++
 .../splittabledofn/WatermarkEstimators.java        | 153 ++++++++
 .../sdk/transforms/reflect/DoFnInvokersTest.java   | 120 ++++++-
 .../reflect/DoFnSignaturesSplittableDoFnTest.java  | 321 ++++++++++++++++-
 .../splittabledofn/WatermarkEstimatorsTest.java    | 102 ++++++
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    |  12 +
 18 files changed, 1706 insertions(+), 126 deletions(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
index 92a443a..17d435c 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
@@ -42,6 +42,7 @@ import 
org.apache.beam.sdk.transforms.reflect.DoFnInvoker.BaseArgumentProvider;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.KV;
@@ -416,6 +417,17 @@ public class SplittableParDoNaiveBounded {
         // Ignore watermark updates
       }
 
+      @Override
+      public Object watermarkEstimatorState() {
+        throw new UnsupportedOperationException(
+            "@WatermarkEstimatorState parameters are not supported.");
+      }
+
+      @Override
+      public WatermarkEstimator<?> watermarkEstimator() {
+        throw new UnsupportedOperationException("WatermarkEstimator parameters 
are not supported.");
+      }
+
       // ----------- Unsupported methods --------------------
 
       @Override
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index 010eb11..9c0e79a 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -111,7 +111,12 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvoker<
 
     DoFn.ProcessContinuation cont =
         invoker.invokeProcessElement(
-            new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
+            new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+              @Override
+              public String getErrorContext() {
+                return 
OutputAndTimeBoundedSplittableProcessElementInvoker.class.getSimpleName();
+              }
+
               @Override
               public DoFn<InputT, OutputT>.ProcessContext processContext(
                   DoFn<InputT, OutputT> doFn) {
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index a37644a..aec8365 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
@@ -541,6 +542,17 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
 
     @Override
+    public Object watermarkEstimatorState() {
+      throw new UnsupportedOperationException(
+          "@WatermarkEstimatorState parameters are not supported.");
+    }
+
+    @Override
+    public WatermarkEstimator<?> watermarkEstimator() {
+      throw new UnsupportedOperationException("WatermarkEstimator parameters 
are not supported.");
+    }
+
+    @Override
     public State state(String stateId, boolean alwaysFetched) {
       try {
         StateSpec<?> spec =
@@ -745,6 +757,17 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
 
     @Override
+    public Object watermarkEstimatorState() {
+      throw new UnsupportedOperationException(
+          "@WatermarkEstimatorState parameters are not supported.");
+    }
+
+    @Override
+    public WatermarkEstimator<?> watermarkEstimator() {
+      throw new UnsupportedOperationException("WatermarkEstimator parameters 
are not supported.");
+    }
+
+    @Override
     public State state(String stateId, boolean alwaysFetched) {
       try {
         StateSpec<?> spec =
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index d00950d..1790ecc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -39,8 +39,12 @@ import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import 
org.apache.beam.sdk.transforms.splittabledofn.HasDefaultWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.Sizes;
+import 
org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -670,6 +674,14 @@ public abstract class DoFn<InputT, OutputT> implements 
Serializable, HasDisplayD
    *       GetInitialRestriction}. This method is optional only if the 
restriction type returned by
    *       {@link GetInitialRestriction} implements {@link HasDefaultTracker}.
    *   <li>It <i>may</i> define a {@link GetRestrictionCoder} method.
+   *   <li>It <i>may</i> define a {@link GetInitialWatermarkEstimatorState} 
method. If none is
+   *       defined then the watermark estimator state is of type {@link Void}.
+   *   <li>It <i>may</i> define a {@link GetWatermarkEstimatorStateCoder} 
method.
+   *   <li>It <i>may</i> define a {@link NewWatermarkEstimator} method 
returning a subtype of {@code
+   *       WatermarkEstimator<W>} where {@code W} is the watermark estimator 
state type returned by
+   *       {@link GetInitialWatermarkEstimatorState}. This method is optional 
only if {@link
+   *       GetInitialWatermarkEstimatorState} has not been defined or {@code 
W} implements {@link
+   *       HasDefaultWatermarkEstimator}.
    *   <li>The {@link DoFn} itself <i>may</i> be annotated with {@link 
BoundedPerElement} or {@link
    *       UnboundedPerElement}, but not both at the same time. If it's not 
annotated with either of
    *       these, it's assumed to be {@link BoundedPerElement} if its {@link 
ProcessElement} method
@@ -692,6 +704,12 @@ public abstract class DoFn<InputT, OutputT> implements 
Serializable, HasDisplayD
    *   <li>If one of its arguments is tagged with the {@link Timestamp} 
annotation, then it will be
    *       passed the timestamp of the current element being processed; the 
argument must be of type
    *       {@link Instant}.
+   *   <li>If one of its arguments is of the type {@link WatermarkEstimator}, 
then it will be passed
+   *       the watermark estimator.
+   *   <li>If one of its arguments is of the type {@link 
ManualWatermarkEstimator}, then it will be
+   *       passed a watermark estimator that can be updated manually. This 
parameter can only be
+   *       supplied if the method annotated with {@link 
GetInitialWatermarkEstimatorState} returns a
+   *       sub-type of {@link ManualWatermarkEstimator}.
    *   <li>If one of its arguments is a subtype of {@link BoundedWindow}, then 
it will be passed the
    *       window of the current element. When applied by {@link ParDo} the 
subtype of {@link
    *       BoundedWindow} must match the type of windows on the input {@link 
PCollection}. If the
@@ -720,7 +738,8 @@ public abstract class DoFn<InputT, OutputT> implements 
Serializable, HasDisplayD
 
   /**
    * Parameter annotation for the input element for {@link ProcessElement}, 
{@link
-   * GetInitialRestriction}, {@link GetSize}, {@link SplitRestriction}, and 
{@link NewTracker}
+   * GetInitialRestriction}, {@link GetSize}, {@link SplitRestriction}, {@link
+   * GetInitialWatermarkEstimatorState}, {@link NewWatermarkEstimator}, and 
{@link NewTracker}
    * methods.
    */
   @Documented
@@ -729,8 +748,9 @@ public abstract class DoFn<InputT, OutputT> implements 
Serializable, HasDisplayD
   public @interface Element {}
 
   /**
-   * Parameter annotation for the restriction for {@link GetSize}, {@link 
SplitRestriction}, and
-   * {@link NewTracker} methods. Must match the return type used on the method 
annotated with {@link
+   * Parameter annotation for the restriction for {@link GetSize}, {@link 
SplitRestriction}, {@link
+   * GetInitialWatermarkEstimatorState}, {@link NewWatermarkEstimator}, and 
{@link NewTracker}
+   * methods. Must match the return type used on the method annotated with 
{@link
    * GetInitialRestriction}.
    */
   @Documented
@@ -740,7 +760,8 @@ public abstract class DoFn<InputT, OutputT> implements 
Serializable, HasDisplayD
 
   /**
    * Parameter annotation for the input element timestamp for {@link 
ProcessElement}, {@link
-   * GetInitialRestriction}, {@link GetSize}, {@link SplitRestriction}, and 
{@link NewTracker}
+   * GetInitialRestriction}, {@link GetSize}, {@link SplitRestriction}, {@link
+   * GetInitialWatermarkEstimatorState}, {@link NewWatermarkEstimator}, and 
{@link NewTracker}
    * methods.
    */
   @Documented
@@ -1071,6 +1092,120 @@ public abstract class DoFn<InputT, OutputT> implements 
Serializable, HasDisplayD
   public @interface NewTracker {}
 
   /**
+   * Annotation for the method that maps an element and restriction to initial 
watermark estimator
+   * state for a <a 
href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link DoFn}.
+   *
+   * <p>Signature: {@code WatermarkEstimatorStateT 
getInitialWatermarkState(<arguments>);}
+   *
+   * <p>This method must satisfy the following constraints:
+   *
+   * <ul>
+   *   <li>The return type {@code WatermarkEstimatorStateT} defines the 
watermark state type used
+   *       within this splittable DoFn. All other methods that use a {@link
+   *       WatermarkEstimatorState @WatermarkEstimatorState} parameter must 
use the same type that
+   *       is used here. It is suggested to use as narrow of a return type 
definition as possible
+   *       (for example prefer to use a square type over a shape type as a 
square is a type of a
+   *       shape).
+   *   <li>If one of its arguments is tagged with the {@link Element} 
annotation, then it will be
+   *       passed the current element being processed; the argument must be of 
type {@code InputT}.
+   *       Note that automatic conversion of {@link Row}s and {@link 
FieldAccess} parameters are
+   *       currently unsupported.
+   *   <li>If one of its arguments is tagged with the {@link Restriction} 
annotation, then it will
+   *       be passed the current restriction being processed; the argument 
must be of type {@code
+   *       RestrictionT}.
+   *   <li>If one of its arguments is tagged with the {@link Timestamp} 
annotation, then it will be
+   *       passed the timestamp of the current element being processed; the 
argument must be of type
+   *       {@link Instant}.
+   *   <li>If one of its arguments is a subtype of {@link BoundedWindow}, then 
it will be passed the
+   *       window of the current element. When applied by {@link ParDo} the 
subtype of {@link
+   *       BoundedWindow} must match the type of windows on the input {@link 
PCollection}. If the
+   *       window is not accessed a runner may perform additional 
optimizations.
+   *   <li>If one of its arguments is of type {@link PaneInfo}, then it will 
be passed information
+   *       about the current triggering pane.
+   *   <li>If one of the parameters is of type {@link PipelineOptions}, then 
it will be passed the
+   *       options for the current pipeline.
+   * </ul>
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface GetInitialWatermarkEstimatorState {}
+
+  /**
+   * Annotation for the method that returns the coder to use for the watermark 
estimator state of a
+   * <a href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link 
DoFn}.
+   *
+   * <p>If not defined, a coder will be inferred using standard coder 
inference rules and the
+   * pipeline's {@link Pipeline#getCoderRegistry coder registry}.
+   *
+   * <p>This method will be called only at pipeline construction time.
+   *
+   * <p>Signature: {@code Coder<WatermarkEstimatorStateT> 
getWatermarkEstimatorStateCoder();}
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface GetWatermarkEstimatorStateCoder {}
+
+  /**
+   * Annotation for the method that creates a new {@link WatermarkEstimator} 
for the watermark state
+   * of a <a href="https://s.apache.org/splittable-do-fn";>splittable</a> 
{@link DoFn}.
+   *
+   * <p>Signature: {@code MyWatermarkEstimator newWatermarkEstimator(<optional 
arguments>);}
+   *
+   * <p>If the return type is a subtype of {@link 
TimestampObservingWatermarkEstimator} then the
+   * timestamp of each element output from this DoFn is provided to the 
watermark estimator.
+   *
+   * <p>This method must satisfy the following constraints:
+   *
+   * <ul>
+   *   <li>The return type must be a subtype of {@code
+   *       WatermarkEstimator<WatermarkEstimatorStateT>}. It is suggested to 
use as narrow of a
+   *       return type definition as possible (for example prefer to use a 
square type over a shape
+   *       type as a square is a type of a shape).
+   *   <li>If one of its arguments is tagged with the {@link 
WatermarkEstimatorState} annotation,
+   *       then it will be passed the current watermark estimator state; the 
argument must be of
+   *       type {@code WatermarkEstimatorStateT}.
+   *   <li>If one of its arguments is tagged with the {@link Element} 
annotation, then it will be
+   *       passed the current element being processed; the argument must be of 
type {@code InputT}.
+   *       Note that automatic conversion of {@link Row}s and {@link 
FieldAccess} parameters are
+   *       currently unsupported.
+   *   <li>If one of its arguments is tagged with the {@link Restriction} 
annotation, then it will
+   *       be passed the current restriction being processed; the argument 
must be of type {@code
+   *       RestrictionT}.
+   *   <li>If one of its arguments is tagged with the {@link Timestamp} 
annotation, then it will be
+   *       passed the timestamp of the current element being processed; the 
argument must be of type
+   *       {@link Instant}.
+   *   <li>If one of its arguments is a subtype of {@link BoundedWindow}, then 
it will be passed the
+   *       window of the current element. When applied by {@link ParDo} the 
subtype of {@link
+   *       BoundedWindow} must match the type of windows on the input {@link 
PCollection}. If the
+   *       window is not accessed a runner may perform additional 
optimizations.
+   *   <li>If one of its arguments is of type {@link PaneInfo}, then it will 
be passed information
+   *       about the current triggering pane.
+   *   <li>If one of the parameters is of type {@link PipelineOptions}, then 
it will be passed the
+   *       options for the current pipeline.
+   * </ul>
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface NewWatermarkEstimator {}
+
+  /**
+   * Parameter annotation for the watermark estimator state for the {@link 
NewWatermarkEstimator}
+   * method. Must match the return type on the method annotated with {@link
+   * GetInitialWatermarkEstimatorState}.
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.PARAMETER)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface WatermarkEstimatorState {}
+
+  /**
    * Annotation on a <a 
href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link DoFn}
    * specifying that the {@link DoFn} performs a bounded amount of work per 
input element, so
    * applying it to a bounded {@link PCollection} will produce also a bounded 
{@link PCollection}.
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 112046a..6c445ec 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -33,13 +33,9 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.state.Timer;
-import org.apache.beam.sdk.state.TimerMap;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
 import org.apache.beam.sdk.transforms.DoFn.FinishBundleContext;
 import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.StartBundleContext;
 import org.apache.beam.sdk.transforms.Materializations.MultimapView;
@@ -55,7 +51,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
@@ -217,7 +212,13 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
           createProcessContext(
               ValueInSingleWindow.of(element, timestamp, window, 
PaneInfo.NO_FIRING));
       fnInvoker.invokeProcessElement(
-          new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
+          new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+
+            @Override
+            public String getErrorContext() {
+              return "DoFnTester";
+            }
+
             @Override
             public BoundedWindow window() {
               return window;
@@ -258,16 +259,6 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
             }
 
             @Override
-            public InputT sideInput(String sideInputTag) {
-              throw new UnsupportedOperationException("SideInputs are not 
supported by DoFnTester");
-            }
-
-            @Override
-            public InputT schemaElement(int index) {
-              throw new UnsupportedOperationException("Schemas are not 
supported by DoFnTester");
-            }
-
-            @Override
             public Instant timestamp(DoFn<InputT, OutputT> doFn) {
               return processContext.timestamp();
             }
@@ -290,21 +281,11 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
             }
 
             @Override
-            public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> 
doFn) {
-              throw new UnsupportedOperationException("Schemas are not 
supported by DoFnTester");
-            }
-
-            @Override
             public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, 
OutputT> doFn) {
               return DoFnOutputReceivers.windowedMultiReceiver(processContext, 
null);
             }
 
             @Override
-            public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
-              throw new UnsupportedOperationException("DoFnTester doesn't 
support timers yet.");
-            }
-
-            @Override
             public Object restriction() {
               throw new UnsupportedOperationException(
                   "Not expected to access Restriction from a regular DoFn in 
DoFnTester");
@@ -315,27 +296,6 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
               throw new UnsupportedOperationException(
                   "Not expected to access RestrictionTracker from a regular 
DoFn in DoFnTester");
             }
-
-            @Override
-            public org.apache.beam.sdk.state.State state(String stateId, 
boolean alwaysFetched) {
-              throw new UnsupportedOperationException("DoFnTester doesn't 
support state yet");
-            }
-
-            @Override
-            public Timer timer(String timerId) {
-              throw new UnsupportedOperationException("DoFnTester doesn't 
support timers yet");
-            }
-
-            @Override
-            public TimerMap timerFamily(String tagId) {
-              throw new UnsupportedOperationException("DoFnTester doesn't 
support timerFamily yet");
-            }
-
-            @Override
-            public BundleFinalizer bundleFinalizer() {
-              throw new UnsupportedOperationException(
-                  "DoFnTester doesn't support bundleFinalizer yet");
-            }
           });
     } catch (UserCodeException e) {
       unwrapUserCodeException(e);
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index f8aa338..014e6dd 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -55,13 +55,19 @@ import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimeDomain
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerFamilyParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimestampParameter;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WatermarkEstimatorParameter;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WatermarkEstimatorStateParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import 
org.apache.beam.sdk.transforms.splittabledofn.HasDefaultWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.Sizes;
 import org.apache.beam.sdk.transforms.splittabledofn.Sizes.HasSize;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.ByteBuddy;
 import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.description.field.FieldDescription;
 import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.description.method.MethodDescription;
@@ -98,6 +104,7 @@ import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.jar.asm.Type;
 import 
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.matcher.ElementMatchers;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Primitives;
+import org.joda.time.Instant;
 
 /** Dynamically generates a {@link DoFnInvoker} instances for invoking a 
{@link DoFn}. */
 class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
@@ -119,6 +126,8 @@ class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
   public static final String PIPELINE_OPTIONS_PARAMETER_METHOD = 
"pipelineOptions";
   public static final String RESTRICTION_PARAMETER_METHOD = "restriction";
   public static final String RESTRICTION_TRACKER_PARAMETER_METHOD = 
"restrictionTracker";
+  public static final String WATERMARK_ESTIMATOR_PARAMETER_METHOD = 
"watermarkEstimator";
+  public static final String WATERMARK_ESTIMATOR_STATE_PARAMETER_METHOD = 
"watermarkEstimatorState";
   public static final String STATE_PARAMETER_METHOD = "state";
   public static final String TIMER_PARAMETER_METHOD = "timer";
   public static final String SIDE_INPUT_PARAMETER_METHOD = "sideInput";
@@ -313,6 +322,51 @@ class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
     }
   }
 
+  /**
+   * Default implementation of {@link DoFn.GetWatermarkEstimatorStateCoder}, 
for delegation by
+   * bytebuddy.
+   */
+  public static class DefaultWatermarkEstimatorStateCoder {
+    private final TypeDescriptor<?> watermarkEstimatorStateType;
+
+    DefaultWatermarkEstimatorStateCoder(TypeDescriptor<?> 
watermarkEstimatorStateType) {
+      this.watermarkEstimatorStateType = watermarkEstimatorStateType;
+    }
+
+    @SuppressWarnings({"unused", "unchecked"})
+    public <WatermarkEstimatorStateT>
+        Coder<WatermarkEstimatorStateT> invokeGetWatermarkEstimatorStateCoder(
+            CoderRegistry registry) throws CannotProvideCoderException {
+      return (Coder) registry.getCoder(watermarkEstimatorStateType);
+    }
+  }
+
+  /** Default implementation of {@link DoFn.NewWatermarkEstimator}, for 
delegation by bytebuddy. */
+  public static class DefaultNewWatermarkEstimator {
+
+    /** Returns a watermark estimator that always reports the minimum 
watermark. */
+    @SuppressWarnings("unused")
+    public static <InputT, OutputT, WatermarkEstimatorStateT>
+        WatermarkEstimator<WatermarkEstimatorStateT> invokeNewTracker(
+            DoFnInvoker.ArgumentProvider<InputT, OutputT> argumentProvider) {
+      if (argumentProvider.watermarkEstimatorState() instanceof 
HasDefaultWatermarkEstimator) {
+        return ((HasDefaultWatermarkEstimator) 
argumentProvider.watermarkEstimatorState())
+            .newWatermarkEstimator();
+      }
+      return new WatermarkEstimator<WatermarkEstimatorStateT>() {
+        @Override
+        public Instant currentWatermark() {
+          return GlobalWindow.TIMESTAMP_MIN_VALUE;
+        }
+
+        @Override
+        public WatermarkEstimatorStateT getState() {
+          return null;
+        }
+      };
+    }
+  }
+
   /** Default implementation of {@link DoFn.NewTracker}, for delegation by 
bytebuddy. */
   public static class DefaultNewTracker {
     /** Uses {@link HasDefaultTracker} to produce the tracker. */
@@ -392,7 +446,17 @@ class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
             .method(ElementMatchers.named("invokeNewTracker"))
             .intercept(newTrackerDelegation(clazzDescription, 
signature.newTracker()))
             .method(ElementMatchers.named("invokeGetSize"))
-            .intercept(getSizeDelegation(clazzDescription, 
signature.getSize()));
+            .intercept(getSizeDelegation(clazzDescription, 
signature.getSize()))
+            
.method(ElementMatchers.named("invokeGetWatermarkEstimatorStateCoder"))
+            
.intercept(getWatermarkEstimatorStateCoderDelegation(clazzDescription, 
signature))
+            
.method(ElementMatchers.named("invokeGetInitialWatermarkEstimatorState"))
+            .intercept(
+                delegateMethodWithExtraParametersOrNoop(
+                    clazzDescription, 
signature.getInitialWatermarkEstimatorState()))
+            .method(ElementMatchers.named("invokeNewWatermarkEstimator"))
+            .intercept(
+                newWatermarkEstimatorDelegation(
+                    clazzDescription, signature.newWatermarkEstimator()));
 
     DynamicType.Unloaded<?> unloaded = builder.make();
     @SuppressWarnings("unchecked")
@@ -421,6 +485,24 @@ class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
     }
   }
 
+  private static Implementation getWatermarkEstimatorStateCoderDelegation(
+      TypeDescription doFnType, DoFnSignature signature) {
+    if (signature.processElement().isSplittable()) {
+      if (signature.getWatermarkEstimatorStateCoder() == null) {
+        return MethodDelegation.to(
+            new DefaultWatermarkEstimatorStateCoder(
+                signature.getInitialWatermarkEstimatorState() == null
+                    ? TypeDescriptors.voids()
+                    : 
signature.getInitialWatermarkEstimatorState().watermarkEstimatorStateT()));
+      } else {
+        return new DowncastingParametersMethodDelegation(
+            doFnType, 
signature.getWatermarkEstimatorStateCoder().targetMethod());
+      }
+    } else {
+      return ExceptionMethod.throwing(UnsupportedOperationException.class);
+    }
+  }
+
   private static Implementation splitRestrictionDelegation(
       TypeDescription doFnType, DoFnSignature.SplitRestrictionMethod 
signature) {
     if (signature == null) {
@@ -430,6 +512,15 @@ class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
     }
   }
 
+  private static Implementation newWatermarkEstimatorDelegation(
+      TypeDescription doFnType, @Nullable 
DoFnSignature.NewWatermarkEstimatorMethod signature) {
+    if (signature == null) {
+      return MethodDelegation.to(DefaultNewWatermarkEstimator.class);
+    } else {
+      return new DoFnMethodWithExtraParametersDelegation(doFnType, signature);
+    }
+  }
+
   private static Implementation newTrackerDelegation(
       TypeDescription doFnType, @Nullable DoFnSignature.NewTrackerMethod 
signature) {
     if (signature == null) {
@@ -862,6 +953,27 @@ class ByteBuddyDoFnInvokerFactory implements 
DoFnInvokerFactory {
           }
 
           @Override
+          public StackManipulation dispatch(WatermarkEstimatorParameter p) {
+            // DoFnInvoker.ArgumentProvider.watermarkEstimator() returns a 
WatermarkEstimator,
+            // but the methods expect a concrete subtype of it.
+            // Insert a downcast.
+            return new StackManipulation.Compound(
+                
simpleExtraContextParameter(WATERMARK_ESTIMATOR_PARAMETER_METHOD),
+                TypeCasting.to(new 
TypeDescription.ForLoadedType(p.estimatorT().getRawType())));
+          }
+
+          @Override
+          public StackManipulation dispatch(WatermarkEstimatorStateParameter 
p) {
+            // DoFnInvoker.ArgumentProvider.watermarkEstimatorState() returns 
an Object,
+            // but the methods expect a concrete subtype of it.
+            // Insert a downcast.
+            return new StackManipulation.Compound(
+                
simpleExtraContextParameter(WATERMARK_ESTIMATOR_STATE_PARAMETER_METHOD),
+                TypeCasting.to(
+                    new 
TypeDescription.ForLoadedType(p.estimatorStateT().getRawType())));
+          }
+
+          @Override
           public StackManipulation dispatch(StateParameter p) {
             return new StackManipulation.Compound(
                 new TextConstant(p.referent().id()),
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index d52872e..412d5f7 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.DoFn.StateId;
 import org.apache.beam.sdk.transforms.DoFn.TimerId;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.Sizes;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.Row;
@@ -104,6 +105,21 @@ public interface DoFnInvoker<InputT, OutputT> {
   <RestrictionT, PositionT> RestrictionTracker<RestrictionT, PositionT> 
invokeNewTracker(
       ArgumentProvider<InputT, OutputT> arguments);
 
+  /** Invoke the {@link DoFn.NewWatermarkEstimator} method on the bound {@link 
DoFn}. */
+  @SuppressWarnings("TypeParameterUnusedInFormals")
+  <WatermarkEstimatorStateT>
+      WatermarkEstimator<WatermarkEstimatorStateT> invokeNewWatermarkEstimator(
+          ArgumentProvider<InputT, OutputT> arguments);
+
+  /** Invoke the {@link DoFn.GetInitialWatermarkEstimatorState} method on the 
bound {@link DoFn}. */
+  Object invokeGetInitialWatermarkEstimatorState(ArgumentProvider<InputT, 
OutputT> arguments);
+
+  /**
+   * Invoke the {@link DoFn.GetWatermarkEstimatorStateCoder} method on the 
bound {@link DoFn}.
+   * Called only during pipeline construction time.
+   */
+  Coder<?> invokeGetWatermarkEstimatorStateCoder(CoderRegistry coderRegistry);
+
   /** Get the bound {@link DoFn}. */
   DoFn<InputT, OutputT> getFn();
 
@@ -190,6 +206,18 @@ public interface DoFnInvoker<InputT, OutputT> {
      */
     RestrictionTracker<?, ?> restrictionTracker();
 
+    /**
+     * If this is a splittable {@link DoFn}, returns the associated watermark 
estimator state with
+     * the current call.
+     */
+    Object watermarkEstimatorState();
+
+    /**
+     * If this is a splittable {@link DoFn}, returns the associated {@link 
WatermarkEstimator} with
+     * the current call.
+     */
+    WatermarkEstimator<?> watermarkEstimator();
+
     /** Returns the state cell for the given {@link StateId}. */
     State state(String stateId, boolean alwaysFetched);
 
@@ -338,6 +366,18 @@ public interface DoFnInvoker<InputT, OutputT> {
     }
 
     @Override
+    public Object watermarkEstimatorState() {
+      throw new UnsupportedOperationException(
+          String.format("WatermarkEstimatorState unsupported in %s", 
getErrorContext()));
+    }
+
+    @Override
+    public WatermarkEstimator<?> watermarkEstimator() {
+      throw new UnsupportedOperationException(
+          String.format("WatermarkEstimator unsupported in %s", 
getErrorContext()));
+    }
+
+    @Override
     public BundleFinalizer bundleFinalizer() {
       throw new UnsupportedOperationException(
           String.format("BundleFinalizer unsupported in %s", 
getErrorContext()));
@@ -449,6 +489,16 @@ public interface DoFnInvoker<InputT, OutputT> {
     }
 
     @Override
+    public Object watermarkEstimatorState() {
+      return delegate.watermarkEstimatorState();
+    }
+
+    @Override
+    public WatermarkEstimator<?> watermarkEstimator() {
+      return delegate.watermarkEstimator();
+    }
+
+    @Override
     public State state(String stateId, boolean alwaysFetch) {
       return delegate.state(stateId, alwaysFetch);
     }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index b675596..1df70db 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -46,6 +46,7 @@ import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParam
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -114,6 +115,18 @@ public abstract class DoFnSignature {
   @Nullable
   public abstract GetRestrictionCoderMethod getRestrictionCoder();
 
+  /** Details about this {@link DoFn}'s {@link 
DoFn.GetWatermarkEstimatorStateCoder} method. */
+  @Nullable
+  public abstract GetWatermarkEstimatorStateCoderMethod 
getWatermarkEstimatorStateCoder();
+
+  /** Details about this {@link DoFn}'s {@link 
DoFn.GetInitialWatermarkEstimatorState} method. */
+  @Nullable
+  public abstract GetInitialWatermarkEstimatorStateMethod 
getInitialWatermarkEstimatorState();
+
+  /** Details about this {@link DoFn}'s {@link DoFn.NewWatermarkEstimator} 
method. */
+  @Nullable
+  public abstract NewWatermarkEstimatorMethod newWatermarkEstimator();
+
   /** Details about this {@link DoFn}'s {@link DoFn.NewTracker} method. */
   @Nullable
   public abstract NewTrackerMethod newTracker();
@@ -178,6 +191,14 @@ public abstract class DoFnSignature {
 
     abstract Builder setGetSize(GetSizeMethod getSize);
 
+    abstract Builder setGetInitialWatermarkEstimatorState(
+        GetInitialWatermarkEstimatorStateMethod 
getInitialWatermarkEstimatorState);
+
+    abstract Builder setNewWatermarkEstimator(NewWatermarkEstimatorMethod 
newWatermarkEstimator);
+
+    abstract Builder setGetWatermarkEstimatorStateCoder(
+        GetWatermarkEstimatorStateCoderMethod getWatermarkEstimatorStateCoder);
+
     abstract Builder setStateDeclarations(Map<String, StateDeclaration> 
stateDeclarations);
 
     abstract Builder setTimerDeclarations(Map<String, TimerDeclaration> 
timerDeclarations);
@@ -247,6 +268,10 @@ public abstract class DoFnSignature {
         return cases.dispatch((RestrictionParameter) this);
       } else if (this instanceof RestrictionTrackerParameter) {
         return cases.dispatch((RestrictionTrackerParameter) this);
+      } else if (this instanceof WatermarkEstimatorParameter) {
+        return cases.dispatch((WatermarkEstimatorParameter) this);
+      } else if (this instanceof WatermarkEstimatorStateParameter) {
+        return cases.dispatch((WatermarkEstimatorStateParameter) this);
       } else if (this instanceof StateParameter) {
         return cases.dispatch((StateParameter) this);
       } else if (this instanceof TimerParameter) {
@@ -311,6 +336,10 @@ public abstract class DoFnSignature {
 
       ResultT dispatch(RestrictionTrackerParameter p);
 
+      ResultT dispatch(WatermarkEstimatorParameter p);
+
+      ResultT dispatch(WatermarkEstimatorStateParameter p);
+
       ResultT dispatch(StateParameter p);
 
       ResultT dispatch(TimerParameter p);
@@ -406,6 +435,16 @@ public abstract class DoFnSignature {
         }
 
         @Override
+        public ResultT dispatch(WatermarkEstimatorParameter p) {
+          return dispatchDefault(p);
+        }
+
+        @Override
+        public ResultT dispatch(WatermarkEstimatorStateParameter p) {
+          return dispatchDefault(p);
+        }
+
+        @Override
         public ResultT dispatch(BundleFinalizerParameter p) {
           return dispatchDefault(p);
         }
@@ -551,6 +590,19 @@ public abstract class DoFnSignature {
       return new 
AutoValue_DoFnSignature_Parameter_RestrictionTrackerParameter(trackerT);
     }
 
+    /** Returns a {@link WatermarkEstimatorParameter}. */
+    public static WatermarkEstimatorParameter watermarkEstimator(
+        TypeDescriptor<?> watermarkEstimatorT) {
+      return new 
AutoValue_DoFnSignature_Parameter_WatermarkEstimatorParameter(watermarkEstimatorT);
+    }
+
+    /** Returns a {@link WatermarkEstimatorStateParameter}. */
+    public static WatermarkEstimatorStateParameter watermarkEstimatorState(
+        TypeDescriptor<?> watermarkEstimatorStateT) {
+      return new 
AutoValue_DoFnSignature_Parameter_WatermarkEstimatorStateParameter(
+          watermarkEstimatorStateT);
+    }
+
     /** Returns a {@link StateParameter} referring to the given {@link 
StateDeclaration}. */
     public static StateParameter stateParameter(StateDeclaration decl, boolean 
alwaysFetched) {
       return new AutoValue_DoFnSignature_Parameter_StateParameter(decl, 
alwaysFetched);
@@ -768,6 +820,32 @@ public abstract class DoFnSignature {
     }
 
     /**
+     * Descriptor for a {@link Parameter} of type {@link 
DoFn.WatermarkEstimatorState}.
+     *
+     * <p>All such descriptors are equal.
+     */
+    @AutoValue
+    public abstract static class WatermarkEstimatorStateParameter extends 
Parameter {
+      // Package visible for AutoValue
+      WatermarkEstimatorStateParameter() {}
+
+      public abstract TypeDescriptor<?> estimatorStateT();
+    }
+
+    /**
+     * Descriptor for a {@link Parameter} of type {@link 
DoFn.WatermarkEstimatorState}.
+     *
+     * <p>All such descriptors are equal.
+     */
+    @AutoValue
+    public abstract static class WatermarkEstimatorParameter extends Parameter 
{
+      // Package visible for AutoValue
+      WatermarkEstimatorParameter() {}
+
+      public abstract TypeDescriptor<?> estimatorT();
+    }
+
+    /**
      * Descriptor for a {@link Parameter} of a subclass of {@link 
RestrictionTracker}.
      *
      * <p>All such descriptors are equal.
@@ -845,6 +923,10 @@ public abstract class DoFnSignature {
     @Nullable
     public abstract TypeDescriptor<?> trackerT();
 
+    /** Concrete type of the {@link WatermarkEstimator} parameter, if present. 
*/
+    @Nullable
+    public abstract TypeDescriptor<?> watermarkEstimatorT();
+
     /** The window type used by this method, if any. */
     @Nullable
     @Override
@@ -859,6 +941,7 @@ public abstract class DoFnSignature {
         boolean requiresStableInput,
         boolean requiresTimeSortedInput,
         TypeDescriptor<?> trackerT,
+        TypeDescriptor<?> watermarkEstimatorT,
         @Nullable TypeDescriptor<? extends BoundedWindow> windowT,
         boolean hasReturnValue) {
       return new AutoValue_DoFnSignature_ProcessElementMethod(
@@ -867,6 +950,7 @@ public abstract class DoFnSignature {
           requiresStableInput,
           requiresTimeSortedInput,
           trackerT,
+          watermarkEstimatorT,
           windowT,
           hasReturnValue);
     }
@@ -1228,7 +1312,7 @@ public abstract class DoFnSignature {
     }
   }
 
-  /** Describes a {@link DoFn.NewTracker} method. */
+  /** Describes a {@link DoFn.GetSize} method. */
   @AutoValue
   public abstract static class GetSizeMethod implements 
MethodWithExtraParameters {
     /** The annotated method itself. */
@@ -1266,4 +1350,80 @@ public abstract class DoFnSignature {
       return new 
AutoValue_DoFnSignature_GetRestrictionCoderMethod(targetMethod, coderT);
     }
   }
+
+  /** Describes a {@link DoFn.GetInitialWatermarkEstimatorState} method. */
+  @AutoValue
+  public abstract static class GetInitialWatermarkEstimatorStateMethod
+      implements MethodWithExtraParameters {
+    /** The annotated method itself. */
+    @Override
+    public abstract Method targetMethod();
+
+    /** Type of the returned watermark estimator state. */
+    public abstract TypeDescriptor<?> watermarkEstimatorStateT();
+
+    /** The window type used by this method, if any. */
+    @Nullable
+    @Override
+    public abstract TypeDescriptor<? extends BoundedWindow> windowT();
+
+    /** Types of optional parameters of the annotated method, in the order 
they appear. */
+    @Override
+    public abstract List<Parameter> extraParameters();
+
+    static GetInitialWatermarkEstimatorStateMethod create(
+        Method targetMethod,
+        TypeDescriptor<?> watermarkEstimatorStateT,
+        TypeDescriptor<? extends BoundedWindow> windowT,
+        List<Parameter> extraParameters) {
+      return new 
AutoValue_DoFnSignature_GetInitialWatermarkEstimatorStateMethod(
+          targetMethod, watermarkEstimatorStateT, windowT, extraParameters);
+    }
+  }
+
+  /** Describes a {@link DoFn.NewWatermarkEstimator} method. */
+  @AutoValue
+  public abstract static class NewWatermarkEstimatorMethod implements 
MethodWithExtraParameters {
+    /** The annotated method itself. */
+    @Override
+    public abstract Method targetMethod();
+
+    /** Type of the returned {@link WatermarkEstimator}. */
+    public abstract TypeDescriptor<?> watermarkEstimatorT();
+
+    /** The window type used by this method, if any. */
+    @Nullable
+    @Override
+    public abstract TypeDescriptor<? extends BoundedWindow> windowT();
+
+    /** Types of optional parameters of the annotated method, in the order 
they appear. */
+    @Override
+    public abstract List<Parameter> extraParameters();
+
+    static NewWatermarkEstimatorMethod create(
+        Method targetMethod,
+        TypeDescriptor<?> watermarkEstimatorT,
+        TypeDescriptor<? extends BoundedWindow> windowT,
+        List<Parameter> extraParameters) {
+      return new AutoValue_DoFnSignature_NewWatermarkEstimatorMethod(
+          targetMethod, watermarkEstimatorT, windowT, extraParameters);
+    }
+  }
+
+  /** Describes a {@link DoFn.GetRestrictionCoder} method. */
+  @AutoValue
+  public abstract static class GetWatermarkEstimatorStateCoderMethod 
implements DoFnMethod {
+    /** The annotated method itself. */
+    @Override
+    public abstract Method targetMethod();
+
+    /** Type of the returned {@link Coder}. */
+    public abstract TypeDescriptor<?> coderT();
+
+    static GetWatermarkEstimatorStateCoderMethod create(
+        Method targetMethod, TypeDescriptor<?> coderT) {
+      return new AutoValue_DoFnSignature_GetWatermarkEstimatorStateCoderMethod(
+          targetMethod, coderT);
+    }
+  }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index fb76330..cea704b 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -37,6 +37,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
@@ -62,25 +63,33 @@ import org.apache.beam.sdk.transforms.DoFn.StateId;
 import org.apache.beam.sdk.transforms.DoFn.TimerId;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.FieldAccessDeclaration;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.GetInitialRestrictionMethod;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.GetInitialWatermarkEstimatorStateMethod;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.PipelineOptionsParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.SchemaElementParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerFamilyParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WatermarkEstimatorParameter;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WatermarkEstimatorStateParameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerFamilyDeclaration;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import 
org.apache.beam.sdk.transforms.splittabledofn.HasDefaultWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.sdk.values.TypeParameter;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicates;
@@ -127,6 +136,7 @@ public class DoFnSignatures {
               Parameter.TaggedOutputReceiverParameter.class,
               Parameter.ProcessContextParameter.class,
               Parameter.RestrictionTrackerParameter.class,
+              Parameter.WatermarkEstimatorParameter.class,
               Parameter.SideInputParameter.class,
               Parameter.BundleFinalizerParameter.class);
 
@@ -213,12 +223,32 @@ public class DoFnSignatures {
       ImmutableList.of(
           Parameter.ElementParameter.class,
           Parameter.RestrictionParameter.class,
-          Parameter.RestrictionTrackerParameter.class,
           Parameter.WindowParameter.class,
           Parameter.TimestampParameter.class,
           Parameter.PaneInfoParameter.class,
           Parameter.PipelineOptionsParameter.class);
 
+  private static final Collection<Class<? extends Parameter>>
+      ALLOWED_GET_INITIAL_WATERMARK_ESTIMATOR_STATE_PARAMETERS =
+          ImmutableList.of(
+              Parameter.ElementParameter.class,
+              Parameter.RestrictionParameter.class,
+              Parameter.WindowParameter.class,
+              Parameter.TimestampParameter.class,
+              Parameter.PaneInfoParameter.class,
+              Parameter.PipelineOptionsParameter.class);
+
+  private static final Collection<Class<? extends Parameter>>
+      ALLOWED_NEW_WATERMARK_ESTIMATOR_PARAMETERS =
+          ImmutableList.of(
+              Parameter.WatermarkEstimatorStateParameter.class,
+              Parameter.ElementParameter.class,
+              Parameter.RestrictionParameter.class,
+              Parameter.WindowParameter.class,
+              Parameter.TimestampParameter.class,
+              Parameter.PaneInfoParameter.class,
+              Parameter.PipelineOptionsParameter.class);
+
   /** @return the {@link DoFnSignature} for the given {@link DoFn} instance. */
   public static <FnT extends DoFn<?, ?>> DoFnSignature signatureForDoFn(FnT 
fn) {
     return getSignature(fn.getClass());
@@ -332,21 +362,34 @@ public class DoFnSignatures {
 
     private MethodAnalysisContext() {}
 
-    /** Indicates whether a {@link RestrictionTrackerParameter} is known in 
this context. */
-    public boolean hasRestrictionTrackerParameter() {
-      return extraParameters.stream()
-          
.anyMatch(Predicates.instanceOf(RestrictionTrackerParameter.class)::apply);
+    /** Indicates whether the specified {@link Parameter} is known in this 
context. */
+    public boolean hasParameter(Class<? extends Parameter> type) {
+      return 
extraParameters.stream().anyMatch(Predicates.instanceOf(type)::apply);
     }
 
-    /** Indicates whether a {@link WindowParameter} is known in this context. 
*/
-    public boolean hasWindowParameter() {
-      return 
extraParameters.stream().anyMatch(Predicates.instanceOf(WindowParameter.class)::apply);
+    /**
+     * Returns the specified {@link Parameter} if it is known in this context. 
Throws {@link
+     * IllegalStateException} if there is more than one instance of the 
parameter.
+     */
+    @Nullable
+    public <T extends Parameter> Optional<T> findParameter(Class<T> type) {
+      List<T> parameters = findParameters(type);
+      switch (parameters.size()) {
+        case 0:
+          return Optional.empty();
+        case 1:
+          return Optional.of(parameters.get(0));
+        default:
+          throw new IllegalStateException(
+              String.format(
+                  "Expected to have found at most one parameter of type %s but 
found %s.",
+                  type, parameters));
+      }
     }
 
-    /** Indicates whether a {@link Parameter.PipelineOptionsParameter} is 
known in this context. */
-    public boolean hasPipelineOptionsParamter() {
-      return extraParameters.stream()
-          
.anyMatch(Predicates.instanceOf(Parameter.PipelineOptionsParameter.class)::apply);
+    public <T extends Parameter> List<T> findParameters(Class<T> type) {
+      return (List<T>)
+          
extraParameters.stream().filter(Predicates.instanceOf(type)).collect(Collectors.toList());
     }
 
     /** The window type, if any, used by this method. */
@@ -477,6 +520,12 @@ public class DoFnSignatures {
         findAnnotatedMethod(errors, DoFn.GetRestrictionCoder.class, fnClass, 
false);
     Method newTrackerMethod = findAnnotatedMethod(errors, 
DoFn.NewTracker.class, fnClass, false);
     Method getSizeMethod = findAnnotatedMethod(errors, DoFn.GetSize.class, 
fnClass, false);
+    Method getWatermarkEstimatorStateCoderMethod =
+        findAnnotatedMethod(errors, 
DoFn.GetWatermarkEstimatorStateCoder.class, fnClass, false);
+    Method getInitialWatermarkEstimatorStateMethod =
+        findAnnotatedMethod(errors, 
DoFn.GetInitialWatermarkEstimatorState.class, fnClass, false);
+    Method newWatermarkEstimatorMethod =
+        findAnnotatedMethod(errors, DoFn.NewWatermarkEstimator.class, fnClass, 
false);
 
     Collection<Method> onTimerMethods =
         declaredMethodsWithAnnotation(DoFn.OnTimer.class, fnClass, DoFn.class);
@@ -601,7 +650,7 @@ public class DoFnSignatures {
           "Splittable, but does not define the required @%s method.",
           DoFnSignatures.format(DoFn.GetInitialRestriction.class));
 
-      GetInitialRestrictionMethod method =
+      GetInitialRestrictionMethod initialRestrictionMethod =
           analyzeGetInitialRestrictionMethod(
               errors.forMethod(DoFn.GetInitialRestriction.class, 
getInitialRestrictionMethod),
               fnT,
@@ -610,8 +659,24 @@ public class DoFnSignatures {
               outputT,
               fnContext);
 
-      signatureBuilder.setGetInitialRestriction(method);
-      TypeDescriptor<?> restrictionT = method.restrictionT();
+      signatureBuilder.setGetInitialRestriction(initialRestrictionMethod);
+      TypeDescriptor<?> restrictionT = initialRestrictionMethod.restrictionT();
+
+      TypeDescriptor<?> watermarkEstimatorStateT = TypeDescriptors.voids();
+      if (getInitialWatermarkEstimatorStateMethod != null) {
+        GetInitialWatermarkEstimatorStateMethod 
initialWatermarkEstimatorStateMethod =
+            analyzeGetInitialWatermarkEstimatorStateMethod(
+                errors.forMethod(
+                    DoFn.GetInitialWatermarkEstimatorState.class,
+                    getInitialWatermarkEstimatorStateMethod),
+                fnT,
+                getInitialWatermarkEstimatorStateMethod,
+                inputT,
+                outputT,
+                fnContext);
+        watermarkEstimatorStateT = 
initialWatermarkEstimatorStateMethod.watermarkEstimatorStateT();
+        
signatureBuilder.setGetInitialWatermarkEstimatorState(initialWatermarkEstimatorStateMethod);
+      }
 
       if (newTrackerMethod != null) {
         signatureBuilder.setNewTracker(
@@ -665,6 +730,39 @@ public class DoFnSignatures {
                 fnT,
                 getRestrictionCoderMethod));
       }
+
+      if (getWatermarkEstimatorStateCoderMethod != null) {
+        signatureBuilder.setGetWatermarkEstimatorStateCoder(
+            analyzeGetWatermarkEstimatorStateCoderMethod(
+                errors.forMethod(
+                    DoFn.GetWatermarkEstimatorStateCoder.class,
+                    getWatermarkEstimatorStateCoderMethod),
+                fnT,
+                getWatermarkEstimatorStateCoderMethod));
+      }
+
+      if (newWatermarkEstimatorMethod != null) {
+        signatureBuilder.setNewWatermarkEstimator(
+            analyzeNewWatermarkEstimatorMethod(
+                errors.forMethod(DoFn.NewWatermarkEstimator.class, 
newWatermarkEstimatorMethod),
+                fnT,
+                newWatermarkEstimatorMethod,
+                inputT,
+                outputT,
+                restrictionT,
+                watermarkEstimatorStateT,
+                fnContext));
+      } else if (getInitialWatermarkEstimatorStateMethod != null) {
+        errors
+            .forMethod(DoFn.NewWatermarkEstimator.class, null)
+            .checkArgument(
+                watermarkEstimatorStateT.isSubtypeOf(
+                    TypeDescriptor.of(HasDefaultWatermarkEstimator.class)),
+                "Splittable, either @%s method must be defined or %s must 
implement %s.",
+                format(DoFn.NewWatermarkEstimator.class),
+                format(watermarkEstimatorStateT),
+                format(HasDefaultWatermarkEstimator.class));
+      }
     } else {
       // Validate that none of the splittable DoFn only methods have been 
declared.
       List<String> forbiddenMethods = new ArrayList<>();
@@ -683,6 +781,15 @@ public class DoFnSignatures {
       if (getSizeMethod != null) {
         forbiddenMethods.add("@" + format(DoFn.GetSize.class));
       }
+      if (getInitialWatermarkEstimatorStateMethod != null) {
+        forbiddenMethods.add("@" + 
format(DoFn.GetInitialWatermarkEstimatorState.class));
+      }
+      if (getWatermarkEstimatorStateCoderMethod != null) {
+        forbiddenMethods.add("@" + 
format(DoFn.GetWatermarkEstimatorStateCoder.class));
+      }
+      if (newWatermarkEstimatorMethod != null) {
+        forbiddenMethods.add("@" + format(DoFn.NewWatermarkEstimator.class));
+      }
       errors.checkArgument(
           forbiddenMethods.isEmpty(), "Non-splittable, but defines methods: 
%s", forbiddenMethods);
     }
@@ -785,11 +892,19 @@ public class DoFnSignatures {
         signature.getInitialRestriction();
     DoFnSignature.NewTrackerMethod newTracker = signature.newTracker();
     DoFnSignature.GetRestrictionCoderMethod getRestrictionCoder = 
signature.getRestrictionCoder();
+    DoFnSignature.GetInitialWatermarkEstimatorStateMethod 
getInitialWatermarkEstimatorState =
+        signature.getInitialWatermarkEstimatorState();
+    DoFnSignature.GetWatermarkEstimatorStateCoderMethod 
getWatermarkEstimatorStateCoder =
+        signature.getWatermarkEstimatorStateCoder();
 
     ErrorReporter processElementErrors =
         errors.forMethod(DoFn.ProcessElement.class, 
processElement.targetMethod());
 
     TypeDescriptor<?> restrictionT = getInitialRestriction.restrictionT();
+    TypeDescriptor<?> watermarkEstimatorStateT =
+        getInitialWatermarkEstimatorState == null
+            ? TypeDescriptors.voids()
+            : getInitialWatermarkEstimatorState.watermarkEstimatorStateT();
 
     if (newTracker == null) {
       ErrorReporter newTrackerErrors = errors.forMethod(DoFn.NewTracker.class, 
null);
@@ -806,6 +921,17 @@ public class DoFnSignatures {
         "Has tracker type %s, but the DoFn's tracker type must be of type 
RestrictionTracker.",
         format(processElement.trackerT()));
 
+    if (processElement.watermarkEstimatorT() != null) {
+      processElementErrors.checkArgument(
+          
processElement.watermarkEstimatorT().getRawType().equals(WatermarkEstimator.class)
+              || processElement
+                  .watermarkEstimatorT()
+                  .getRawType()
+                  .equals(ManualWatermarkEstimator.class),
+          "Has watermark estimator type %s, but the DoFn's watermark estimator 
type must be one of [WatermarkEstimator, ManualWatermarkEstimator] types.",
+          format(processElement.watermarkEstimatorT()));
+    }
+
     if (getRestrictionCoder != null) {
       ErrorReporter getInitialRestrictionErrors =
           errors.forMethod(DoFn.GetInitialRestriction.class, 
getInitialRestriction.targetMethod());
@@ -819,6 +945,26 @@ public class DoFnSignatures {
           format(getRestrictionCoder.coderT()),
           format(coderTypeOf(restrictionT)));
     }
+
+    if (getWatermarkEstimatorStateCoder != null) {
+      ErrorReporter getInitialWatermarkEstimatorStateReporter =
+          errors.forMethod(
+              DoFn.GetInitialWatermarkEstimatorState.class,
+              getInitialWatermarkEstimatorState == null
+                  ? null
+                  : getInitialWatermarkEstimatorState.targetMethod());
+      getInitialWatermarkEstimatorStateReporter.checkArgument(
+          getWatermarkEstimatorStateCoder
+              .coderT()
+              .isSubtypeOf(coderTypeOf(watermarkEstimatorStateT)),
+          "Uses watermark estimator state type %s, but @%s method %s returns 
%s "
+              + "which is not a subtype of %s",
+          format(watermarkEstimatorStateT),
+          format(DoFn.GetInitialWatermarkEstimatorState.class),
+          format(getWatermarkEstimatorStateCoder.targetMethod()),
+          format(getWatermarkEstimatorStateCoder.coderT()),
+          format(coderTypeOf(watermarkEstimatorStateT)));
+    }
   }
 
   /**
@@ -1022,11 +1168,9 @@ public class DoFnSignatures {
     boolean requiresStableInput = 
m.isAnnotationPresent(DoFn.RequiresStableInput.class);
     boolean requiresTimeSortedInput = 
m.isAnnotationPresent(DoFn.RequiresTimeSortedInput.class);
 
-    Type[] params = m.getGenericParameterTypes();
-
-    TypeDescriptor<?> trackerT = getTrackerType(fnClass, m);
     TypeDescriptor<? extends BoundedWindow> windowT = getWindowType(fnClass, 
m);
 
+    Type[] params = m.getGenericParameterTypes();
     for (int i = 0; i < params.length; ++i) {
       Parameter extraParam =
           analyzeExtraParameter(
@@ -1055,8 +1199,19 @@ public class DoFnSignatures {
       }
     }
 
+    TypeDescriptor<?> trackerT =
+        methodContext
+            .findParameter(RestrictionTrackerParameter.class)
+            .map(p -> p.trackerT())
+            .orElse(null);
+    TypeDescriptor<?> watermarkEstimatorT =
+        methodContext
+            .findParameter(WatermarkEstimatorParameter.class)
+            .map(p -> p.estimatorT())
+            .orElse(null);
+
     // The allowed parameters depend on whether this DoFn is splittable
-    if (methodContext.hasRestrictionTrackerParameter()) {
+    if (trackerT != null) {
       for (Parameter parameter : methodContext.getExtraParameters()) {
         checkParameterOneOf(errors, parameter, 
ALLOWED_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS);
       }
@@ -1072,6 +1227,7 @@ public class DoFnSignatures {
         requiresStableInput,
         requiresTimeSortedInput,
         trackerT,
+        watermarkEstimatorT,
         windowT,
         DoFn.ProcessContinuation.class.equals(m.getReturnType()));
   }
@@ -1113,20 +1269,22 @@ public class DoFnSignatures {
     String fieldAccessString = getFieldAccessId(param.getAnnotations());
     if (fieldAccessString != null) {
       return Parameter.schemaElementParameter(paramT, fieldAccessString, 
param.getIndex());
-    } else if (hasElementAnnotation(param.getAnnotations())) {
+    } else if (hasAnnotation(DoFn.Element.class, param.getAnnotations())) {
       return (paramT.equals(inputT))
           ? Parameter.elementParameter(paramT)
           : Parameter.schemaElementParameter(paramT, null, param.getIndex());
-    } else if (hasRestrictionAnnotation(param.getAnnotations())) {
+    } else if (hasAnnotation(DoFn.Restriction.class, param.getAnnotations())) {
       return Parameter.restrictionParameter(paramT);
-    } else if (hasTimestampAnnotation(param.getAnnotations())) {
+    } else if (hasAnnotation(DoFn.WatermarkEstimatorState.class, 
param.getAnnotations())) {
+      return Parameter.watermarkEstimatorState(paramT);
+    } else if (hasAnnotation(DoFn.Timestamp.class, param.getAnnotations())) {
       methodErrors.checkArgument(
           rawType.equals(Instant.class),
           "@Timestamp argument must have type org.joda.time.Instant.");
       return Parameter.timestampParameter();
     } else if (rawType.equals(TimeDomain.class)) {
       return Parameter.timeDomainParameter();
-    } else if (hasSideInputAnnotation(param.getAnnotations())) {
+    } else if (hasAnnotation(DoFn.SideInput.class, param.getAnnotations())) {
       String sideInputId = getSideInputId(param.getAnnotations());
       paramErrors.checkArgument(
           sideInputId != null, "%s missing %s annotation", 
format(SideInput.class));
@@ -1161,7 +1319,7 @@ public class DoFnSignatures {
       return Parameter.onTimerContext();
     } else if (BoundedWindow.class.isAssignableFrom(rawType)) {
       methodErrors.checkArgument(
-          !methodContext.hasWindowParameter(),
+          !methodContext.hasParameter(WindowParameter.class),
           "Multiple %s parameters",
           format(BoundedWindow.class));
       return Parameter.boundedWindow((TypeDescriptor<? extends BoundedWindow>) 
paramT);
@@ -1183,17 +1341,22 @@ public class DoFnSignatures {
       return Parameter.taggedOutputReceiverParameter();
     } else if (PipelineOptions.class.equals(rawType)) {
       methodErrors.checkArgument(
-          !methodContext.hasPipelineOptionsParamter(),
+          !methodContext.hasParameter(PipelineOptionsParameter.class),
           "Multiple %s parameters",
           format(PipelineOptions.class));
       return Parameter.pipelineOptions();
     } else if (RestrictionTracker.class.isAssignableFrom(rawType)) {
       methodErrors.checkArgument(
-          !methodContext.hasRestrictionTrackerParameter(),
+          !methodContext.hasParameter(RestrictionTrackerParameter.class),
           "Multiple %s parameters",
           format(RestrictionTracker.class));
       return Parameter.restrictionTracker(paramT);
-
+    } else if (WatermarkEstimator.class.isAssignableFrom(rawType)) {
+      methodErrors.checkArgument(
+          !methodContext.hasParameter(WatermarkEstimatorParameter.class),
+          "Multiple %s parameters",
+          format(WatermarkEstimator.class));
+      return Parameter.watermarkEstimator(paramT);
     } else if (rawType.equals(Timer.class)) {
       // m.getParameters() is not available until Java 8
       String id = getTimerId(param.getAnnotations());
@@ -1221,7 +1384,7 @@ public class DoFnSignatures {
 
       return Parameter.timerParameter(timerDecl);
 
-    } else if (hasTimerIdAnnotation(param.getAnnotations())) {
+    } else if (hasAnnotation(DoFn.TimerId.class, param.getAnnotations())) {
       boolean isValidTimerIdForTimerFamily =
           fnContext.getTimerFamilyDeclarations().size() > 0 && 
rawType.equals(String.class);
       paramErrors.checkArgument(
@@ -1348,36 +1511,8 @@ public class DoFnSignatures {
     return annotation.isPresent() ? (T) annotation.get() : null;
   }
 
-  private static boolean hasElementAnnotation(List<Annotation> annotations) {
-    return annotations.stream().anyMatch(a -> 
a.annotationType().equals(DoFn.Element.class));
-  }
-
-  private static boolean hasRestrictionAnnotation(List<Annotation> 
annotations) {
-    return annotations.stream().anyMatch(a -> 
a.annotationType().equals(DoFn.Restriction.class));
-  }
-
-  private static boolean hasTimestampAnnotation(List<Annotation> annotations) {
-    return annotations.stream().anyMatch(a -> 
a.annotationType().equals(DoFn.Timestamp.class));
-  }
-
-  private static boolean hasSideInputAnnotation(List<Annotation> annotations) {
-    return annotations.stream().anyMatch(a -> 
a.annotationType().equals(DoFn.SideInput.class));
-  }
-
-  private static boolean hasTimerIdAnnotation(List<Annotation> annotations) {
-    return annotations.stream().anyMatch(a -> 
a.annotationType().equals(DoFn.TimerId.class));
-  }
-
-  @Nullable
-  private static TypeDescriptor<?> getTrackerType(TypeDescriptor<?> fnClass, 
Method method) {
-    Type[] params = method.getGenericParameterTypes();
-    for (Type param : params) {
-      TypeDescriptor<?> paramT = fnClass.resolveType(param);
-      if (RestrictionTracker.class.isAssignableFrom(paramT.getRawType())) {
-        return paramT;
-      }
-    }
-    return null;
+  private static boolean hasAnnotation(Class<?> annotation, List<Annotation> 
annotations) {
+    return annotations.stream().anyMatch(a -> 
a.annotationType().equals(annotation));
   }
 
   @Nullable
@@ -1509,6 +1644,53 @@ public class DoFnSignatures {
         m, fnT.resolveType(m.getGenericReturnType()), windowT, 
methodContext.extraParameters);
   }
 
+  @VisibleForTesting
+  static DoFnSignature.GetInitialWatermarkEstimatorStateMethod
+      analyzeGetInitialWatermarkEstimatorStateMethod(
+          ErrorReporter errors,
+          TypeDescriptor<? extends DoFn<?, ?>> fnT,
+          Method m,
+          TypeDescriptor<?> inputT,
+          TypeDescriptor<?> outputT,
+          FnAnalysisContext fnContext) {
+    // Method is of the form:
+    // @GetInitialWatermarkEstimatorState
+    // WatermarkEstimatorStateT getInitialWatermarkEstimatorState(... 
parameters ...);
+
+    Type[] params = m.getGenericParameterTypes();
+    MethodAnalysisContext methodContext = MethodAnalysisContext.create();
+    TypeDescriptor<? extends BoundedWindow> windowT = getWindowType(fnT, m);
+    for (int i = 0; i < params.length; ++i) {
+      Parameter extraParam =
+          analyzeExtraParameter(
+              errors,
+              fnContext,
+              methodContext,
+              fnT,
+              ParameterDescription.of(
+                  m, i, fnT.resolveType(params[i]), 
Arrays.asList(m.getParameterAnnotations()[i])),
+              inputT,
+              outputT);
+      if (extraParam instanceof SchemaElementParameter) {
+        errors.throwIllegalArgument(
+            "Schema @%s are not supported for @%s method. Found %s, did you 
mean to use %s?",
+            format(DoFn.Element.class),
+            format(DoFn.GetInitialWatermarkEstimatorState.class),
+            format(((SchemaElementParameter) extraParam).elementT()),
+            format(inputT));
+      }
+      methodContext.addParameter(extraParam);
+    }
+
+    for (Parameter parameter : methodContext.getExtraParameters()) {
+      checkParameterOneOf(
+          errors, parameter, 
ALLOWED_GET_INITIAL_WATERMARK_ESTIMATOR_STATE_PARAMETERS);
+    }
+
+    return DoFnSignature.GetInitialWatermarkEstimatorStateMethod.create(
+        m, fnT.resolveType(m.getGenericReturnType()), windowT, 
methodContext.extraParameters);
+  }
+
   /**
    * Generates a {@link TypeDescriptor} for {@code 
DoFn.OutputReceiver<OutputT>} given {@code
    * OutputT}.
@@ -1683,8 +1865,21 @@ public class DoFnSignatures {
     return DoFnSignature.GetRestrictionCoderMethod.create(m, resT);
   }
 
+  @VisibleForTesting
+  static DoFnSignature.GetWatermarkEstimatorStateCoderMethod
+      analyzeGetWatermarkEstimatorStateCoderMethod(
+          ErrorReporter errors, TypeDescriptor<? extends DoFn> fnT, Method m) {
+    errors.checkArgument(m.getParameterTypes().length == 0, "Must have zero 
arguments");
+    TypeDescriptor<?> resT = fnT.resolveType(m.getGenericReturnType());
+    errors.checkArgument(
+        resT.isSubtypeOf(TypeDescriptor.of(Coder.class)),
+        "Must return a Coder, but returns %s",
+        format(resT));
+    return DoFnSignature.GetWatermarkEstimatorStateCoderMethod.create(m, resT);
+  }
+
   /**
-   * Generates a {@link TypeDescriptor} for {@code 
RestrictionTracker<RestrictionT>} given {@code
+   * Generates a {@link TypeDescriptor} for {@code 
RestrictionTracker<RestrictionT, ?>} given {@code
    * RestrictionT}.
    */
   private static <RestrictionT>
@@ -1694,6 +1889,17 @@ public class DoFnSignatures {
         new TypeParameter<RestrictionT>() {}, restrictionT);
   }
 
+  /**
+   * Generates a {@link TypeDescriptor} for {@code 
WatermarkEstimator<WatermarkEstimatorStateT>}
+   * given {@code WatermarkEstimatorStateT}.
+   */
+  private static <WatermarkEstimatorStateT>
+      TypeDescriptor<WatermarkEstimator<WatermarkEstimatorStateT>> 
watermarkEstimatorTypeOf(
+          TypeDescriptor<WatermarkEstimatorStateT> watermarkEstimatorStateT) {
+    return new TypeDescriptor<WatermarkEstimator<WatermarkEstimatorStateT>>() 
{}.where(
+        new TypeParameter<WatermarkEstimatorStateT>() {}, 
watermarkEstimatorStateT);
+  }
+
   @VisibleForTesting
   static DoFnSignature.NewTrackerMethod analyzeNewTrackerMethod(
       ErrorReporter errors,
@@ -1755,6 +1961,76 @@ public class DoFnSignatures {
   }
 
   @VisibleForTesting
+  static DoFnSignature.NewWatermarkEstimatorMethod 
analyzeNewWatermarkEstimatorMethod(
+      ErrorReporter errors,
+      TypeDescriptor<? extends DoFn<?, ?>> fnT,
+      Method m,
+      TypeDescriptor<?> inputT,
+      TypeDescriptor<?> outputT,
+      TypeDescriptor<?> restrictionT,
+      TypeDescriptor<?> watermarkEstimatorStateT,
+      FnAnalysisContext fnContext) {
+    // Method is of the form:
+    // @NewWatermarkEstimator
+    // WatermarkEstimatorT newWatermarkEstimator(... parameters ...);
+    Type[] params = m.getGenericParameterTypes();
+    TypeDescriptor<?> watermarkEstimatorT = 
fnT.resolveType(m.getGenericReturnType());
+    TypeDescriptor<?> expectedWatermarkEstimatorT =
+        watermarkEstimatorTypeOf(watermarkEstimatorStateT);
+    errors.checkArgument(
+        watermarkEstimatorT.isSubtypeOf(expectedWatermarkEstimatorT),
+        "Returns %s, but must return a subtype of %s",
+        format(watermarkEstimatorT),
+        format(expectedWatermarkEstimatorT));
+
+    MethodAnalysisContext methodContext = MethodAnalysisContext.create();
+    TypeDescriptor<? extends BoundedWindow> windowT = getWindowType(fnT, m);
+    for (int i = 0; i < params.length; ++i) {
+      Parameter extraParam =
+          analyzeExtraParameter(
+              errors,
+              fnContext,
+              methodContext,
+              fnT,
+              ParameterDescription.of(
+                  m, i, fnT.resolveType(params[i]), 
Arrays.asList(m.getParameterAnnotations()[i])),
+              inputT,
+              outputT);
+      if (extraParam instanceof SchemaElementParameter) {
+        errors.throwIllegalArgument(
+            "Schema @%s are not supported for @%s method. Found %s, did you 
mean to use %s?",
+            format(DoFn.Element.class),
+            format(DoFn.NewWatermarkEstimator.class),
+            format(((SchemaElementParameter) extraParam).elementT()),
+            format(inputT));
+      } else if (extraParam instanceof RestrictionParameter) {
+        errors.checkArgument(
+            restrictionT.equals(((RestrictionParameter) 
extraParam).restrictionT()),
+            "Uses restriction type %s, but @%s method uses restriction type 
%s",
+            format(((RestrictionParameter) extraParam).restrictionT()),
+            format(DoFn.GetInitialWatermarkEstimatorState.class),
+            format(restrictionT));
+      } else if (extraParam instanceof WatermarkEstimatorStateParameter) {
+        errors.checkArgument(
+            watermarkEstimatorStateT.equals(
+                ((WatermarkEstimatorStateParameter) 
extraParam).estimatorStateT()),
+            "Uses watermark estimator state type %s, but @%s method uses 
watermark estimator state type %s",
+            format(((WatermarkEstimatorStateParameter) 
extraParam).estimatorStateT()),
+            format(DoFn.GetInitialWatermarkEstimatorState.class),
+            format(watermarkEstimatorStateT));
+      }
+      methodContext.addParameter(extraParam);
+    }
+
+    for (Parameter parameter : methodContext.getExtraParameters()) {
+      checkParameterOneOf(errors, parameter, 
ALLOWED_NEW_WATERMARK_ESTIMATOR_PARAMETERS);
+    }
+
+    return DoFnSignature.NewWatermarkEstimatorMethod.create(
+        m, fnT.resolveType(m.getGenericReturnType()), windowT, 
methodContext.getExtraParameters());
+  }
+
+  @VisibleForTesting
   static DoFnSignature.GetSizeMethod analyzeGetSizeMethod(
       ErrorReporter errors,
       TypeDescriptor<? extends DoFn<?, ?>> fnT,
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/HasDefaultWatermarkEstimator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/HasDefaultWatermarkEstimator.java
new file mode 100644
index 0000000..fac64ad
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/HasDefaultWatermarkEstimator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * Interface for watermark estimator state for which a default implementation 
of {@link
+ * DoFn.NewWatermarkEstimator} is available, depending only on the watermark 
estimator state itself.
+ */
+@Experimental(Kind.SPLITTABLE_DO_FN)
+public interface HasDefaultWatermarkEstimator<
+    WatermarkEstimatorStateT,
+    WatermarkEstimatorT extends WatermarkEstimator<WatermarkEstimatorStateT>> {
+  /** Creates a new watermark estimator for {@code this}. */
+  WatermarkEstimatorT newWatermarkEstimator();
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java
new file mode 100644
index 0000000..a6e2797
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.joda.time.Instant;
+
+/**
+ * A {@link WatermarkEstimator} which is controlled manually from within a 
{@link DoFn}. The {@link
+ * DoFn} must invoke {@link #setWatermark} to advance the watermark.
+ */
+@Experimental(Kind.SPLITTABLE_DO_FN)
+public interface ManualWatermarkEstimator<WatermarkEstimatorStateT>
+    extends WatermarkEstimator<WatermarkEstimatorStateT> {
+
+  /**
+   * Sets a timestamp before or at the timestamps of all future elements 
produced by the associated
+   * DoFn.
+   *
+   * <p>This can be approximate. If records are output that violate this 
guarantee, they will be
+   * considered late, which will affect how they will be processed. See <a
+   * 
href="https://beam.apache.org/documentation/programming-guide/#watermarks-and-late-data";>watermarks
+   * and late data</a> for more information on late data and how to handle it.
+   *
+   * <p>However, this value should be as late as possible. Downstream windows 
may not be able to
+   * close until this watermark passes their end.
+   */
+  void setWatermark(Instant watermark);
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/TimestampObservingWatermarkEstimator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/TimestampObservingWatermarkEstimator.java
new file mode 100644
index 0000000..b217690
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/TimestampObservingWatermarkEstimator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.joda.time.Instant;
+
+/**
+ * A {@link WatermarkEstimator} that observes the timestamps of all records 
output from a {@link
+ * DoFn}.
+ */
+@Experimental(Kind.SPLITTABLE_DO_FN)
+public interface TimestampObservingWatermarkEstimator<WatermarkEstimatorStateT>
+    extends WatermarkEstimator<WatermarkEstimatorStateT> {
+
+  /**
+   * Update watermark estimate with latest output timestamp. This is called 
with the timestamp of
+   * every element output from the DoFn.
+   */
+  void observeTimestamp(Instant timestamp);
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java
new file mode 100644
index 0000000..343fea6
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.joda.time.Instant;
+
+/**
+ * A {@link WatermarkEstimator} which is used for estimating output watermarks 
of a splittable
+ * {@link DoFn}.
+ */
+@Experimental(Kind.SPLITTABLE_DO_FN)
+public interface WatermarkEstimator<WatermarkEstimatorStateT> {
+  /**
+   * Return estimated output watermark. This method must return monotonically 
increasing watermarks
+   * across instances that are constructed from prior state.
+   */
+  Instant currentWatermark();
+
+  /**
+   * Get current state of the {@link WatermarkEstimator} instance, which can 
be used to recreate the
+   * {@link WatermarkEstimator} when processing the restriction. See {@link
+   * DoFn.NewWatermarkEstimator} for additional details.
+   *
+   * <p>The internal state of the estimator must not be mutated by this method.
+   *
+   * <p>The state returned must not be mutated.
+   */
+  WatermarkEstimatorStateT getState();
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java
new file mode 100644
index 0000000..40992f1
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.joda.time.Instant;
+
+/**
+ * A set of {@link WatermarkEstimator}s that users can use to advance the 
output watermark for their
+ * associated {@link DoFn splittable DoFn}s.
+ */
+@Experimental(Kind.SPLITTABLE_DO_FN)
+public class WatermarkEstimators {
+  /** Concrete implementation of a {@link ManualWatermarkEstimator}. */
+  public static class Manual implements ManualWatermarkEstimator<Instant> {
+    private Instant watermark;
+
+    public Manual(Instant watermark) {
+      this.watermark = checkNotNull(watermark, "watermark must not be null.");
+      if (watermark.isBefore(GlobalWindow.TIMESTAMP_MIN_VALUE)
+          || watermark.isAfter(GlobalWindow.TIMESTAMP_MAX_VALUE)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Provided watermark %s must be within bounds [%s, %s].",
+                watermark, GlobalWindow.TIMESTAMP_MIN_VALUE, 
GlobalWindow.TIMESTAMP_MAX_VALUE));
+      }
+    }
+
+    @Override
+    public void setWatermark(Instant watermark) {
+      if (watermark.isBefore(GlobalWindow.TIMESTAMP_MIN_VALUE)
+          || watermark.isAfter(GlobalWindow.TIMESTAMP_MAX_VALUE)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Provided watermark %s must be within bounds [%s, %s].",
+                watermark, GlobalWindow.TIMESTAMP_MIN_VALUE, 
GlobalWindow.TIMESTAMP_MAX_VALUE));
+      }
+      if (watermark.isBefore(this.watermark)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Watermark must be monotonically increasing. Provided 
watermark %s is less then "
+                    + "current watermark %s.",
+                watermark, this.watermark));
+      }
+      this.watermark = watermark;
+    }
+
+    @Override
+    public Instant currentWatermark() {
+      return watermark;
+    }
+
+    @Override
+    public Instant getState() {
+      return watermark;
+    }
+  }
+
+  /** A watermark estimator that tracks wall time. */
+  public static class WallTime implements WatermarkEstimator<Instant> {
+    private Instant watermark;
+
+    public WallTime(Instant watermark) {
+      this.watermark = checkNotNull(watermark, "watermark must not be null.");
+      if (watermark.isBefore(GlobalWindow.TIMESTAMP_MIN_VALUE)
+          || watermark.isAfter(GlobalWindow.TIMESTAMP_MAX_VALUE)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Provided watermark %s must be within bounds [%s, %s].",
+                watermark, GlobalWindow.TIMESTAMP_MIN_VALUE, 
GlobalWindow.TIMESTAMP_MAX_VALUE));
+      }
+    }
+
+    @Override
+    public Instant currentWatermark() {
+      Instant now = Instant.now();
+      this.watermark = now.isAfter(watermark) ? now : watermark;
+      return watermark;
+    }
+
+    @Override
+    public Instant getState() {
+      return watermark;
+    }
+  }
+
+  /**
+   * A watermark estimator that observes and timestamps of records output from 
a DoFn reporting the
+   * timestamp of the last element seen as the current watermark.
+   *
+   * <p>Note that this watermark estimator requires output timestamps in 
monotonically increasing
+   * order.
+   */
+  public static class MonotonicallyIncreasing
+      implements TimestampObservingWatermarkEstimator<Instant> {
+    private Instant watermark;
+
+    public MonotonicallyIncreasing(Instant watermark) {
+      this.watermark = checkNotNull(watermark, "timestamp must not be null.");
+      if (watermark.isBefore(GlobalWindow.TIMESTAMP_MIN_VALUE)
+          || watermark.isAfter(GlobalWindow.TIMESTAMP_MAX_VALUE)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Provided watermark %s must be within bounds [%s, %s].",
+                watermark, GlobalWindow.TIMESTAMP_MIN_VALUE, 
GlobalWindow.TIMESTAMP_MAX_VALUE));
+      }
+    }
+
+    @Override
+    public void observeTimestamp(Instant timestamp) {
+      // Beyond bounds error checking isn't important since the system is 
expected to perform output
+      // timestamp bounds checking already.
+      if (timestamp.isBefore(this.watermark)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Timestamp must be monotonically increasing. Provided 
timestamp %s is less then "
+                    + "previously provided timestamp %s.",
+                timestamp, this.watermark));
+      }
+      this.watermark = timestamp;
+    }
+
+    @Override
+    public Instant currentWatermark() {
+      return watermark;
+    }
+
+    @Override
+    public Instant getState() {
+      return watermark;
+    }
+  }
+}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index f2e4676..9dcc86a 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -34,14 +34,17 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderProviders;
 import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
@@ -57,8 +60,11 @@ import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
 import 
org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import 
org.apache.beam.sdk.transforms.splittabledofn.HasDefaultWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.UserCodeException;
@@ -402,7 +408,9 @@ public class DoFnInvokersTest {
   public static class MockFn extends DoFn<String, String> {
     @ProcessElement
     public ProcessContinuation processElement(
-        ProcessContext c, RestrictionTracker<SomeRestriction, Void> tracker) {
+        ProcessContext c,
+        RestrictionTracker<SomeRestriction, Void> tracker,
+        WatermarkEstimator<Instant> watermarkEstimator) {
       return null;
     }
 
@@ -426,6 +434,22 @@ public class DoFnInvokersTest {
     public SomeRestrictionCoder getRestrictionCoder() {
       return null;
     }
+
+    @GetInitialWatermarkEstimatorState
+    public Instant getInitialWatermarkEstimatorState() {
+      return null;
+    }
+
+    @GetWatermarkEstimatorStateCoder
+    public InstantCoder getWatermarkEstimatorStateCoder() {
+      return null;
+    }
+
+    @NewWatermarkEstimator
+    public WatermarkEstimator<Instant> newWatermarkEstimator(
+        @WatermarkEstimatorState Instant watermarkEstimatorState) {
+      return null;
+    }
   }
 
   @Test
@@ -434,11 +458,16 @@ public class DoFnInvokersTest {
     DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
     final SomeRestrictionTracker tracker = mock(SomeRestrictionTracker.class);
     final SomeRestrictionCoder coder = mock(SomeRestrictionCoder.class);
+    final InstantCoder watermarkEstimatorStateCoder = InstantCoder.of();
+    final Instant watermarkEstimatorState = Instant.now();
+    final WatermarkEstimator<Instant> watermarkEstimator =
+        new WatermarkEstimators.Manual(watermarkEstimatorState);
     SomeRestriction restriction = new SomeRestriction();
     final SomeRestriction part1 = new SomeRestriction();
     final SomeRestriction part2 = new SomeRestriction();
     final SomeRestriction part3 = new SomeRestriction();
     when(fn.getRestrictionCoder()).thenReturn(coder);
+    
when(fn.getWatermarkEstimatorStateCoder()).thenReturn(watermarkEstimatorStateCoder);
     when(fn.getInitialRestriction(mockElement)).thenReturn(restriction);
     doAnswer(
             AdditionalAnswers.delegatesTo(
@@ -456,11 +485,15 @@ public class DoFnInvokersTest {
                 }))
         .when(fn)
         .splitRestriction(eq(mockElement), same(restriction), any());
+    
when(fn.getInitialWatermarkEstimatorState()).thenReturn(watermarkEstimatorState);
     when(fn.newTracker(restriction)).thenReturn(tracker);
-    when(fn.processElement(mockProcessContext, tracker)).thenReturn(resume());
+    
when(fn.newWatermarkEstimator(watermarkEstimatorState)).thenReturn(watermarkEstimator);
+    when(fn.processElement(mockProcessContext, tracker, 
watermarkEstimator)).thenReturn(resume());
 
     assertEquals(coder, 
invoker.invokeGetRestrictionCoder(CoderRegistry.createDefault()));
-
+    assertEquals(
+        watermarkEstimatorStateCoder,
+        
invoker.invokeGetWatermarkEstimatorStateCoder(CoderRegistry.createDefault()));
     assertEquals(
         restriction,
         invoker.invokeGetInitialRestriction(
@@ -501,6 +534,9 @@ public class DoFnInvokersTest {
 
     assertEquals(Arrays.asList(part1, part2, part3), outputs);
     assertEquals(
+        watermarkEstimatorState,
+        invoker.invokeGetInitialWatermarkEstimatorState(new 
FakeArgumentProvider<>()));
+    assertEquals(
         tracker,
         invoker.invokeNewTracker(
             new FakeArgumentProvider<String, String>() {
@@ -515,6 +551,15 @@ public class DoFnInvokersTest {
               }
             }));
     assertEquals(
+        watermarkEstimator,
+        invoker.invokeNewWatermarkEstimator(
+            new FakeArgumentProvider<String, String>() {
+              @Override
+              public Object watermarkEstimatorState() {
+                return watermarkEstimatorState;
+              }
+            }));
+    assertEquals(
         resume(),
         invoker.invokeProcessElement(
             new FakeArgumentProvider<String, String>() {
@@ -527,6 +572,11 @@ public class DoFnInvokersTest {
               public RestrictionTracker<?, ?> restrictionTracker() {
                 return tracker;
               }
+
+              @Override
+              public WatermarkEstimator<?> watermarkEstimator() {
+                return watermarkEstimator;
+              }
             }));
   }
 
@@ -573,17 +623,64 @@ public class DoFnInvokersTest {
     }
   }
 
+  private static class WatermarkEstimatorStateWithDefaultWatermarkEstimator
+      implements HasDefaultWatermarkEstimator<
+          WatermarkEstimatorStateWithDefaultWatermarkEstimator, 
DefaultWatermarkEstimator> {
+
+    @Override
+    public DefaultWatermarkEstimator newWatermarkEstimator() {
+      return new DefaultWatermarkEstimator();
+    }
+  }
+
+  private static class DefaultWatermarkEstimator
+      implements 
WatermarkEstimator<WatermarkEstimatorStateWithDefaultWatermarkEstimator> {
+    @Override
+    public Instant currentWatermark() {
+      return null;
+    }
+
+    @Override
+    public WatermarkEstimatorStateWithDefaultWatermarkEstimator getState() {
+      return null;
+    }
+  }
+
+  private static class 
CoderForWatermarkEstimatorStateWithDefaultWatermarkEstimator
+      extends 
AtomicCoder<WatermarkEstimatorStateWithDefaultWatermarkEstimator> {
+
+    @Override
+    public void encode(
+        WatermarkEstimatorStateWithDefaultWatermarkEstimator value, 
OutputStream outStream)
+        throws CoderException, IOException {}
+
+    @Override
+    public WatermarkEstimatorStateWithDefaultWatermarkEstimator 
decode(InputStream inStream)
+        throws CoderException, IOException {
+      return null;
+    }
+  }
+
   @Test
   public void testSplittableDoFnDefaultMethods() throws Exception {
     class MockFn extends DoFn<String, String> {
       @ProcessElement
       public void processElement(
-          ProcessContext c, RestrictionTracker<RestrictionWithDefaultTracker, 
Void> tracker) {}
+          ProcessContext c,
+          RestrictionTracker<RestrictionWithDefaultTracker, Void> tracker,
+          
WatermarkEstimator<WatermarkEstimatorStateWithDefaultWatermarkEstimator>
+              watermarkEstimator) {}
 
       @GetInitialRestriction
       public RestrictionWithDefaultTracker getInitialRestriction(@Element 
String element) {
         return null;
       }
+
+      @GetInitialWatermarkEstimatorState
+      public WatermarkEstimatorStateWithDefaultWatermarkEstimator
+          getInitialWatermarkEstimatorState() {
+        return null;
+      }
     }
 
     MockFn fn = mock(MockFn.class);
@@ -593,9 +690,15 @@ public class DoFnInvokersTest {
     coderRegistry.registerCoderProvider(
         CoderProviders.fromStaticMethods(
             RestrictionWithDefaultTracker.class, 
CoderForDefaultTracker.class));
+    coderRegistry.registerCoderForClass(
+        WatermarkEstimatorStateWithDefaultWatermarkEstimator.class,
+        new CoderForWatermarkEstimatorStateWithDefaultWatermarkEstimator());
     assertThat(
         
invoker.<RestrictionWithDefaultTracker>invokeGetRestrictionCoder(coderRegistry),
         instanceOf(CoderForDefaultTracker.class));
+    assertThat(
+        invoker.invokeGetWatermarkEstimatorStateCoder(coderRegistry),
+        
instanceOf(CoderForWatermarkEstimatorStateWithDefaultWatermarkEstimator.class));
     invoker.invokeSplitRestriction(
         new FakeArgumentProvider<String, String>() {
           @Override
@@ -639,6 +742,15 @@ public class DoFnInvokersTest {
               }
             }),
         instanceOf(DefaultTracker.class));
+    assertThat(
+        invoker.invokeNewWatermarkEstimator(
+            new FakeArgumentProvider<String, String>() {
+              @Override
+              public Object watermarkEstimatorState() {
+                return new 
WatermarkEstimatorStateWithDefaultWatermarkEstimator();
+              }
+            }),
+        instanceOf(DefaultWatermarkEstimator.class));
   }
 
   // 
---------------------------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index 2a2a08e..e030d99 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.lang.reflect.Method;
 import java.util.List;
+import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.StructuredCoder;
@@ -36,15 +37,19 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
 import org.apache.beam.sdk.transforms.DoFn.Element;
 import org.apache.beam.sdk.transforms.DoFn.Restriction;
-import org.apache.beam.sdk.transforms.DoFn.StateId;
 import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionParameter;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WatermarkEstimatorStateParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures.FnAnalysisContext;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.AnonymousMethod;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import 
org.apache.beam.sdk.transforms.splittabledofn.HasDefaultWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.PCollection;
@@ -272,7 +277,9 @@ public class DoFnSignaturesSplittableDoFnTest {
     class GoodSplittableDoFn extends DoFn<Integer, String> {
       @ProcessElement
       public ProcessContinuation processElement(
-          ProcessContext context, RestrictionTracker<SomeRestriction, Void> 
tracker) {
+          ProcessContext context,
+          RestrictionTracker<SomeRestriction, Void> tracker,
+          ManualWatermarkEstimator<Instant> watermarkEstimator) {
         return null;
       }
 
@@ -312,7 +319,6 @@ public class DoFnSignaturesSplittableDoFnTest {
       public double getSize(
           @Element Integer element,
           @Restriction SomeRestriction restriction,
-          RestrictionTracker<SomeRestriction, Void> restrictionTracker,
           PipelineOptions pipelineOptions,
           BoundedWindow boundedWindow,
           PaneInfo paneInfo,
@@ -324,10 +330,41 @@ public class DoFnSignaturesSplittableDoFnTest {
       public SomeRestrictionCoder getRestrictionCoder() {
         return null;
       }
+
+      @GetInitialWatermarkEstimatorState
+      public Instant getInitialWatermarkEstimatorState(
+          @Element Integer element,
+          @Restriction SomeRestriction restriction,
+          PipelineOptions pipelineOptions,
+          BoundedWindow boundedWindow,
+          PaneInfo paneInfo,
+          @Timestamp Instant timestamp) {
+        return null;
+      }
+
+      @GetWatermarkEstimatorStateCoder
+      public InstantCoder getWatermarkEstimatorStateCoder() {
+        return null;
+      }
+
+      @NewWatermarkEstimator
+      public WatermarkEstimators.Manual newWatermarkEstimator(
+          @WatermarkEstimatorState Instant watermarkEstimatorState,
+          @Element Integer element,
+          @Restriction SomeRestriction restriction,
+          PipelineOptions pipelineOptions,
+          BoundedWindow boundedWindow,
+          PaneInfo paneInfo,
+          @Timestamp Instant timestamp) {
+        return null;
+      }
     }
 
     DoFnSignature signature = 
DoFnSignatures.getSignature(GoodSplittableDoFn.class);
     assertEquals(RestrictionTracker.class, 
signature.processElement().trackerT().getRawType());
+    assertEquals(
+        ManualWatermarkEstimator.class,
+        signature.processElement().watermarkEstimatorT().getRawType());
     assertTrue(signature.processElement().isSplittable());
     assertTrue(signature.processElement().hasReturnValue());
     assertEquals(
@@ -350,6 +387,18 @@ public class DoFnSignaturesSplittableDoFnTest {
         getParameterOfType(signature.getSize().extraParameters(), 
RestrictionParameter.class)
             .restrictionT()
             .getRawType());
+    assertEquals(
+        Instant.class,
+        
signature.getInitialWatermarkEstimatorState().watermarkEstimatorStateT().getRawType());
+    assertEquals(
+        Instant.class,
+        getParameterOfType(
+                signature.newWatermarkEstimator().extraParameters(),
+                WatermarkEstimatorStateParameter.class)
+            .estimatorStateT()
+            .getRawType());
+    assertEquals(
+        InstantCoder.class, 
signature.getWatermarkEstimatorStateCoder().coderT().getRawType());
   }
 
   /**
@@ -358,9 +407,17 @@ public class DoFnSignaturesSplittableDoFnTest {
    */
   @Test
   public void testSplittableWithAllFunctionsGeneric() throws Exception {
-    class GoodGenericSplittableDoFn<RestrictionT, TrackerT, CoderT> extends 
DoFn<Integer, String> {
+    class GoodGenericSplittableDoFn<
+            RestrictionT,
+            TrackerT,
+            RestrictionCoderT,
+            WatermarkEstimatorStateT,
+            WatermarkEstimatorStateCoderT,
+            WatermarkEstimatorT>
+        extends DoFn<Integer, String> {
       @ProcessElement
-      public ProcessContinuation processElement(ProcessContext context, 
TrackerT tracker) {
+      public ProcessContinuation processElement(
+          ProcessContext context, TrackerT tracker, WatermarkEstimatorT 
watermarkEstimatorT) {
         return null;
       }
 
@@ -379,14 +436,30 @@ public class DoFnSignaturesSplittableDoFnTest {
       }
 
       @GetRestrictionCoder
-      public CoderT getRestrictionCoder() {
+      public RestrictionCoderT getRestrictionCoder() {
         return null;
       }
 
       @GetSize
-      public double getSize(@Restriction RestrictionT restriction, TrackerT 
restrictionTracker) {
+      public double getSize(@Restriction RestrictionT restriction) {
         return 1.0;
       }
+
+      @GetInitialWatermarkEstimatorState
+      public WatermarkEstimatorStateT getInitialWatermarkEstimatorState() {
+        return null;
+      }
+
+      @GetWatermarkEstimatorStateCoder
+      public WatermarkEstimatorStateCoderT getWatermarkEstimatorStateCoder() {
+        return null;
+      }
+
+      @NewWatermarkEstimator
+      public WatermarkEstimatorT newWatermarkEstimator(
+          @WatermarkEstimatorState WatermarkEstimatorStateT 
watermarkEstimatorState) {
+        return null;
+      }
     }
 
     DoFnSignature signature =
@@ -394,8 +467,14 @@ public class DoFnSignaturesSplittableDoFnTest {
             new GoodGenericSplittableDoFn<
                 SomeRestriction,
                 RestrictionTracker<SomeRestriction, ?>,
-                SomeRestrictionCoder>() {}.getClass());
+                SomeRestrictionCoder,
+                Instant,
+                InstantCoder,
+                ManualWatermarkEstimator<Instant>>() {}.getClass());
     assertEquals(RestrictionTracker.class, 
signature.processElement().trackerT().getRawType());
+    assertEquals(
+        ManualWatermarkEstimator.class,
+        signature.processElement().watermarkEstimatorT().getRawType());
     assertTrue(signature.processElement().isSplittable());
     assertTrue(signature.processElement().hasReturnValue());
     assertEquals(
@@ -413,6 +492,18 @@ public class DoFnSignaturesSplittableDoFnTest {
             .restrictionT()
             .getRawType());
     assertEquals(SomeRestrictionCoder.class, 
signature.getRestrictionCoder().coderT().getRawType());
+    assertEquals(
+        Instant.class,
+        
signature.getInitialWatermarkEstimatorState().watermarkEstimatorStateT().getRawType());
+    assertEquals(
+        Instant.class,
+        getParameterOfType(
+                signature.newWatermarkEstimator().extraParameters(),
+                WatermarkEstimatorStateParameter.class)
+            .estimatorStateT()
+            .getRawType());
+    assertEquals(
+        InstantCoder.class, 
signature.getWatermarkEstimatorStateCoder().coderT().getRawType());
   }
 
   @Test
@@ -446,6 +537,68 @@ public class DoFnSignaturesSplittableDoFnTest {
   }
 
   @Test
+  public void 
testGetInitialWatermarkEstimatorStateUnsupportedSchemaElementArgument()
+      throws Exception {
+    thrown.expectMessage(
+        "Schema @Element are not supported for 
@GetInitialWatermarkEstimatorState method. Found String, did you mean to use 
Integer?");
+    DoFnSignatures.analyzeGetInitialWatermarkEstimatorStateMethod(
+        errors(),
+        TypeDescriptor.of(FakeDoFn.class),
+        new AnonymousMethod() {
+          SomeRestriction method(@Element String element) {
+            return null;
+          }
+        }.getMethod(),
+        TypeDescriptor.of(Integer.class),
+        TypeDescriptor.of(String.class),
+        FnAnalysisContext.create());
+  }
+
+  @Test
+  public void testNewWatermarkEstimatorUnsupportedSchemaElementArgument() 
throws Exception {
+    thrown.expectMessage(
+        "Schema @Element are not supported for @NewWatermarkEstimator method. 
Found String, did you mean to use Integer?");
+    DoFnSignatures.analyzeNewWatermarkEstimatorMethod(
+        errors(),
+        TypeDescriptor.of(FakeDoFn.class),
+        new AnonymousMethod() {
+          WatermarkEstimator<Instant> method(@Element String element) {
+            return null;
+          }
+        }.getMethod(),
+        TypeDescriptor.of(Integer.class),
+        TypeDescriptor.of(String.class),
+        TypeDescriptor.of(SomeRestriction.class),
+        TypeDescriptor.of(Instant.class),
+        FnAnalysisContext.create());
+  }
+
+  @Test
+  public void testMissingNewWatermarkEstimatorMethod() throws Exception {
+    class BadFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(
+          ProcessContext context,
+          RestrictionTracker<SomeRestriction, Void> tracker,
+          ManualWatermarkEstimator<Instant> watermarkEstimator) {}
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction() {
+        return null;
+      }
+
+      @GetInitialWatermarkEstimatorState
+      public Instant getInitialWatermarkEstimatorState() {
+        return null;
+      }
+    }
+
+    thrown.expectMessage(
+        "Splittable, either @NewWatermarkEstimator method must be defined or 
Instant must implement HasDefaultWatermarkEstimator.");
+    DoFnSignatures.getSignature(BadFn.class);
+  }
+
+  @Test
   public void testSplittableMissingNewTrackerMethod() throws Exception {
     class OtherRestriction {}
 
@@ -488,6 +641,40 @@ public class DoFnSignaturesSplittableDoFnTest {
     assertEquals(RestrictionTracker.class, 
signature.processElement().trackerT().getRawType());
   }
 
+  abstract static class SomeDefaultWatermarkEstimator
+      implements 
WatermarkEstimator<WatermarkEstimatorStateWithDefaultWatermarkEstimator> {}
+
+  abstract static class WatermarkEstimatorStateWithDefaultWatermarkEstimator
+      implements HasDefaultWatermarkEstimator<
+          WatermarkEstimatorStateWithDefaultWatermarkEstimator, 
SomeDefaultWatermarkEstimator> {}
+
+  @Test
+  public void testHasDefaultWatermarkEstimator() throws Exception {
+    class Fn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(
+          ProcessContext c,
+          RestrictionTracker<SomeRestriction, Void> tracker,
+          
WatermarkEstimator<WatermarkEstimatorStateWithDefaultWatermarkEstimator>
+              watermarkEstimator) {}
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(@Element Integer element) {
+        return null;
+      }
+
+      @GetInitialWatermarkEstimatorState
+      public WatermarkEstimatorStateWithDefaultWatermarkEstimator
+          getInitialWatermarkEstimatorState() {
+        return null;
+      }
+    }
+
+    DoFnSignature signature = DoFnSignatures.getSignature(Fn.class);
+    assertEquals(
+        WatermarkEstimator.class, 
signature.processElement().watermarkEstimatorT().getRawType());
+  }
+
   @Test
   public void testRestrictionHasDefaultTrackerProcessUsesWrongTracker() throws 
Exception {
     class Fn extends DoFn<Integer, String> {
@@ -507,6 +694,34 @@ public class DoFnSignaturesSplittableDoFnTest {
   }
 
   @Test
+  public void
+      
testWatermarkEstimatorStateHasDefaultWatermarkEstimatorProcessUsesWrongWatermarkEstimator()
+          throws Exception {
+    class Fn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(
+          ProcessContext c,
+          RestrictionTracker<SomeRestriction, Void> tracker,
+          SomeDefaultWatermarkEstimator watermarkEstimator) {}
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(@Element Integer element) {
+        return null;
+      }
+
+      @GetInitialWatermarkEstimatorState
+      public WatermarkEstimatorStateWithDefaultWatermarkEstimator
+          getInitialWatermarkEstimatorState() {
+        return null;
+      }
+    }
+
+    thrown.expectMessage(
+        "Has watermark estimator type SomeDefaultWatermarkEstimator, but the 
DoFn's watermark estimator type must be one of [WatermarkEstimator, 
ManualWatermarkEstimator] types.");
+    DoFnSignature signature = DoFnSignatures.getSignature(Fn.class);
+  }
+
+  @Test
   public void testNewTrackerReturnsWrongType() throws Exception {
     class BadFn extends DoFn<Integer, String> {
       @ProcessElement
@@ -528,6 +743,26 @@ public class DoFnSignaturesSplittableDoFnTest {
   }
 
   @Test
+  public void testNewWatermarkEstimatorReturnsWrongType() throws Exception {
+    class BadFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(
+          ProcessContext context, RestrictionTracker<SomeRestriction, Void> 
tracker) {}
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(@Element Integer element) {
+        return null;
+      }
+
+      @NewWatermarkEstimator
+      public void newWatermarkEstimator() {}
+    }
+
+    thrown.expectMessage("Returns void, but must return a subtype of 
WatermarkEstimator<Void>");
+    DoFnSignatures.getSignature(BadFn.class);
+  }
+
+  @Test
   public void testGetInitialRestrictionMismatchesNewTracker() throws Exception 
{
     class BadFn extends DoFn<Integer, String> {
       @ProcessElement
@@ -551,6 +786,36 @@ public class DoFnSignaturesSplittableDoFnTest {
   }
 
   @Test
+  public void 
testGetInitialWatermarkEstimatorStateMismatchesNewWatermarkEstimator()
+      throws Exception {
+    class BadFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(
+          ProcessContext context, RestrictionTracker<SomeRestriction, Void> 
tracker) {}
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(@Element Integer element) {
+        return null;
+      }
+
+      @GetInitialWatermarkEstimatorState
+      public Instant getInitalWatermarkEstimatorState() {
+        return null;
+      }
+
+      @NewWatermarkEstimator
+      public WatermarkEstimator<Void> newWatermarkEstimator(
+          @WatermarkEstimatorState Instant watermarkEstimatorState) {
+        return null;
+      }
+    }
+
+    thrown.expectMessage("but must return a subtype of 
WatermarkEstimator<Instant>");
+    thrown.expectMessage("newWatermarkEstimator(Instant): Returns 
WatermarkEstimator<Void>");
+    DoFnSignatures.getSignature(BadFn.class);
+  }
+
+  @Test
   public void testGetRestrictionCoderReturnsWrongType() throws Exception {
     class BadFn extends DoFn<Integer, String> {
       @ProcessElement
@@ -579,6 +844,29 @@ public class DoFnSignaturesSplittableDoFnTest {
   }
 
   @Test
+  public void testGetWatermarkEstimatorStateCoderReturnsWrongType() throws 
Exception {
+    class BadFn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(
+          ProcessContext context, RestrictionTracker<SomeRestriction, Void> 
tracker) {}
+
+      @GetInitialRestriction
+      public SomeRestriction getInitialRestriction(@Element Integer element) {
+        return null;
+      }
+
+      @GetWatermarkEstimatorStateCoder
+      public KvCoder getWatermarkEstimatorStateCoder() {
+        return null;
+      }
+    }
+
+    thrown.expectMessage(
+        "getWatermarkEstimatorStateCoder() returns KvCoder which is not a 
subtype of Coder<Void>");
+    DoFnSignatures.getSignature(BadFn.class);
+  }
+
+  @Test
   public void testSplitRestrictionReturnsWrongType() throws Exception {
     thrown.expectMessage(
         "OutputReceiver should be parameterized by "
@@ -696,11 +984,26 @@ public class DoFnSignaturesSplittableDoFnTest {
       public double getSize() {
         return 1.0;
       }
+
+      @GetInitialWatermarkEstimatorState
+      public Instant getInitialWatermarkEstimatorState() {
+        return null;
+      }
+
+      @GetWatermarkEstimatorStateCoder
+      public InstantCoder getWatermarkEstimatorStateCoder() {
+        return null;
+      }
+
+      @NewWatermarkEstimator
+      public WatermarkEstimator<Instant> newWatermarkEstimator() {
+        return null;
+      }
     }
 
     thrown.expectMessage(
         "Non-splittable, but defines methods: "
-            + "[@GetInitialRestriction, @SplitRestriction, @NewTracker, 
@GetRestrictionCoder, @GetSize]");
+            + "[@GetInitialRestriction, @SplitRestriction, @NewTracker, 
@GetRestrictionCoder, @GetSize, @GetInitialWatermarkEstimatorState, 
@GetWatermarkEstimatorStateCoder, @NewWatermarkEstimator]");
     DoFnSignatures.getSignature(BadFn.class);
   }
 
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimatorsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimatorsTest.java
new file mode 100644
index 0000000..3fd0dec
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimatorsTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import org.apache.beam.sdk.testing.ResetDateTimeProvider;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.joda.time.DateTimeUtils;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class WatermarkEstimatorsTest {
+  public @Rule ResetDateTimeProvider resetDateTimeProvider = new 
ResetDateTimeProvider();
+
+  @Test
+  public void testManualWatermarkEstimator() throws Exception {
+    ManualWatermarkEstimator<Instant> watermarkEstimator =
+        new WatermarkEstimators.Manual(GlobalWindow.TIMESTAMP_MIN_VALUE);
+    assertEquals(GlobalWindow.TIMESTAMP_MIN_VALUE, 
watermarkEstimator.currentWatermark());
+    watermarkEstimator.setWatermark(GlobalWindow.TIMESTAMP_MIN_VALUE);
+    watermarkEstimator.setWatermark(
+        watermarkEstimator.currentWatermark().plus(Duration.standardHours(1)));
+    assertEquals(
+        GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.standardHours(1)),
+        watermarkEstimator.currentWatermark());
+    assertThrows(
+        "must be within bounds",
+        IllegalArgumentException.class,
+        () -> 
watermarkEstimator.setWatermark(GlobalWindow.TIMESTAMP_MIN_VALUE.minus(1)));
+    assertThrows(
+        "must be within bounds",
+        IllegalArgumentException.class,
+        () -> 
watermarkEstimator.setWatermark(GlobalWindow.TIMESTAMP_MAX_VALUE.plus(1)));
+    assertThrows(
+        "monotonically increasing",
+        IllegalArgumentException.class,
+        () -> 
watermarkEstimator.setWatermark(GlobalWindow.TIMESTAMP_MIN_VALUE));
+    watermarkEstimator.setWatermark(GlobalWindow.TIMESTAMP_MAX_VALUE);
+    assertEquals(GlobalWindow.TIMESTAMP_MAX_VALUE, 
watermarkEstimator.currentWatermark());
+  }
+
+  @Test
+  public void testWallTimeWatermarkEstimator() throws Exception {
+    
DateTimeUtils.setCurrentMillisFixed(GlobalWindow.TIMESTAMP_MIN_VALUE.getMillis());
+    WatermarkEstimator<Instant> watermarkEstimator =
+        new WatermarkEstimators.WallTime(new Instant());
+    
DateTimeUtils.setCurrentMillisFixed(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1).getMillis());
+    assertEquals(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1), 
watermarkEstimator.currentWatermark());
+
+    
DateTimeUtils.setCurrentMillisFixed(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(2).getMillis());
+    // Make sure that we don't mutate state even if the clock advanced
+    assertEquals(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1), 
watermarkEstimator.getState());
+    assertEquals(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(2), 
watermarkEstimator.currentWatermark());
+    assertEquals(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(2), 
watermarkEstimator.getState());
+
+    // Handle the case if the clock ever goes backwards. Could happen if we 
resumed processing
+    // on a machine that had misconfigured clock or due to clock skew.
+    
DateTimeUtils.setCurrentMillisFixed(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1).getMillis());
+    assertEquals(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(2), 
watermarkEstimator.currentWatermark());
+  }
+
+  @Test
+  public void testMonotonicallyIncreasingWatermarkEstimator() throws Exception 
{
+    TimestampObservingWatermarkEstimator<Instant> watermarkEstimator =
+        new 
WatermarkEstimators.MonotonicallyIncreasing(GlobalWindow.TIMESTAMP_MIN_VALUE);
+    assertEquals(GlobalWindow.TIMESTAMP_MIN_VALUE, 
watermarkEstimator.currentWatermark());
+    watermarkEstimator.observeTimestamp(GlobalWindow.TIMESTAMP_MIN_VALUE);
+    watermarkEstimator.observeTimestamp(
+        watermarkEstimator.currentWatermark().plus(Duration.standardHours(1)));
+    assertEquals(
+        GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.standardHours(1)),
+        watermarkEstimator.currentWatermark());
+    assertThrows(
+        "monotonically increasing",
+        IllegalArgumentException.class,
+        () -> 
watermarkEstimator.observeTimestamp(GlobalWindow.TIMESTAMP_MIN_VALUE));
+    watermarkEstimator.observeTimestamp(GlobalWindow.TIMESTAMP_MAX_VALUE);
+    assertEquals(GlobalWindow.TIMESTAMP_MAX_VALUE, 
watermarkEstimator.currentWatermark());
+  }
+}
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 9fb5d3e..681d041 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -81,6 +81,7 @@ import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -1242,6 +1243,17 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, OutputT> {
           currentOutputWatermark);
       currentOutputWatermark = watermark;
     }
+
+    @Override
+    public Object watermarkEstimatorState() {
+      throw new UnsupportedOperationException(
+          "@WatermarkEstimatorState parameters are not supported.");
+    }
+
+    @Override
+    public WatermarkEstimator<?> watermarkEstimator() {
+      throw new UnsupportedOperationException("WatermarkEstimator parameters 
are not supported.");
+    }
   }
 
   /** Provides arguments for a {@link DoFnInvoker} for {@link DoFn.OnTimer 
@OnTimer}. */

Reply via email to