This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new feea2ba [BEAM-2939] Implement interfaces and concrete watermark
estimators
new feefaca Merge pull request #10992 from lukecwik/splittabledofn2
feea2ba is described below
commit feea2ba610aa35344d30bae23a76d18e7b6afe93
Author: Luke Cwik <[email protected]>
AuthorDate: Thu Feb 27 13:00:30 2020 -0800
[BEAM-2939] Implement interfaces and concrete watermark estimators
Plumb watermark estimator, watermark estimator state, and watermark
estimator state coder methods through ByteBuddy DoFnInvoker adding method
definitions to DoFnSignature and appropriate validation.
This does not update any of the existing Splittable Dofns to use these
watermark estimators or any existing Splittable DoFn "runners" (beyond adding
throws unsupported operations exception). This is marked as work under
BEAM-9430.
---
.../construction/SplittableParDoNaiveBounded.java | 12 +
...TimeBoundedSplittableProcessElementInvoker.java | 7 +-
.../apache/beam/runners/core/SimpleDoFnRunner.java | 23 ++
.../java/org/apache/beam/sdk/transforms/DoFn.java | 143 +++++++-
.../org/apache/beam/sdk/transforms/DoFnTester.java | 54 +--
.../reflect/ByteBuddyDoFnInvokerFactory.java | 114 +++++-
.../beam/sdk/transforms/reflect/DoFnInvoker.java | 50 +++
.../beam/sdk/transforms/reflect/DoFnSignature.java | 162 ++++++++-
.../sdk/transforms/reflect/DoFnSignatures.java | 394 ++++++++++++++++++---
.../HasDefaultWatermarkEstimator.java | 34 ++
.../splittabledofn/ManualWatermarkEstimator.java | 46 +++
.../TimestampObservingWatermarkEstimator.java | 38 ++
.../splittabledofn/WatermarkEstimator.java | 47 +++
.../splittabledofn/WatermarkEstimators.java | 153 ++++++++
.../sdk/transforms/reflect/DoFnInvokersTest.java | 120 ++++++-
.../reflect/DoFnSignaturesSplittableDoFnTest.java | 321 ++++++++++++++++-
.../splittabledofn/WatermarkEstimatorsTest.java | 102 ++++++
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 12 +
18 files changed, 1706 insertions(+), 126 deletions(-)
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
index 92a443a..17d435c 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
@@ -42,6 +42,7 @@ import
org.apache.beam.sdk.transforms.reflect.DoFnInvoker.BaseArgumentProvider;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
@@ -416,6 +417,17 @@ public class SplittableParDoNaiveBounded {
// Ignore watermark updates
}
+ @Override
+ public Object watermarkEstimatorState() {
+ throw new UnsupportedOperationException(
+ "@WatermarkEstimatorState parameters are not supported.");
+ }
+
+ @Override
+ public WatermarkEstimator<?> watermarkEstimator() {
+ throw new UnsupportedOperationException("WatermarkEstimator parameters
are not supported.");
+ }
+
// ----------- Unsupported methods --------------------
@Override
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index 010eb11..9c0e79a 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -111,7 +111,12 @@ public class
OutputAndTimeBoundedSplittableProcessElementInvoker<
DoFn.ProcessContinuation cont =
invoker.invokeProcessElement(
- new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
+ new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+ @Override
+ public String getErrorContext() {
+ return
OutputAndTimeBoundedSplittableProcessElementInvoker.class.getSimpleName();
+ }
+
@Override
public DoFn<InputT, OutputT>.ProcessContext processContext(
DoFn<InputT, OutputT> doFn) {
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index a37644a..aec8365 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.SystemDoFnInternal;
@@ -541,6 +542,17 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
}
@Override
+ public Object watermarkEstimatorState() {
+ throw new UnsupportedOperationException(
+ "@WatermarkEstimatorState parameters are not supported.");
+ }
+
+ @Override
+ public WatermarkEstimator<?> watermarkEstimator() {
+ throw new UnsupportedOperationException("WatermarkEstimator parameters
are not supported.");
+ }
+
+ @Override
public State state(String stateId, boolean alwaysFetched) {
try {
StateSpec<?> spec =
@@ -745,6 +757,17 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
}
@Override
+ public Object watermarkEstimatorState() {
+ throw new UnsupportedOperationException(
+ "@WatermarkEstimatorState parameters are not supported.");
+ }
+
+ @Override
+ public WatermarkEstimator<?> watermarkEstimator() {
+ throw new UnsupportedOperationException("WatermarkEstimator parameters
are not supported.");
+ }
+
+ @Override
public State state(String stateId, boolean alwaysFetched) {
try {
StateSpec<?> spec =
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index d00950d..1790ecc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -39,8 +39,12 @@ import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import
org.apache.beam.sdk.transforms.splittabledofn.HasDefaultWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.Sizes;
+import
org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -670,6 +674,14 @@ public abstract class DoFn<InputT, OutputT> implements
Serializable, HasDisplayD
* GetInitialRestriction}. This method is optional only if the
restriction type returned by
* {@link GetInitialRestriction} implements {@link HasDefaultTracker}.
* <li>It <i>may</i> define a {@link GetRestrictionCoder} method.
+ * <li>It <i>may</i> define a {@link GetInitialWatermarkEstimatorState}
method. If none is
+ * defined then the watermark estimator state is of type {@link Void}.
+ * <li>It <i>may</i> define a {@link GetWatermarkEstimatorStateCoder}
method.
+ * <li>It <i>may</i> define a {@link NewWatermarkEstimator} method
returning a subtype of {@code
+ * WatermarkEstimator<W>} where {@code W} is the watermark estimator
state type returned by
+ * {@link GetInitialWatermarkEstimatorState}. This method is optional
only if {@link
+ * GetInitialWatermarkEstimatorState} has not been defined or {@code
W} implements {@link
+ * HasDefaultWatermarkEstimator}.
* <li>The {@link DoFn} itself <i>may</i> be annotated with {@link
BoundedPerElement} or {@link
* UnboundedPerElement}, but not both at the same time. If it's not
annotated with either of
* these, it's assumed to be {@link BoundedPerElement} if its {@link
ProcessElement} method
@@ -692,6 +704,12 @@ public abstract class DoFn<InputT, OutputT> implements
Serializable, HasDisplayD
* <li>If one of its arguments is tagged with the {@link Timestamp}
annotation, then it will be
* passed the timestamp of the current element being processed; the
argument must be of type
* {@link Instant}.
+ * <li>If one of its arguments is of the type {@link WatermarkEstimator},
then it will be passed
+ * the watermark estimator.
+ * <li>If one of its arguments is of the type {@link
ManualWatermarkEstimator}, then it will be
+ * passed a watermark estimator that can be updated manually. This
parameter can only be
+ * supplied if the method annotated with {@link
GetInitialWatermarkEstimatorState} returns a
+ * sub-type of {@link ManualWatermarkEstimator}.
* <li>If one of its arguments is a subtype of {@link BoundedWindow}, then
it will be passed the
* window of the current element. When applied by {@link ParDo} the
subtype of {@link
* BoundedWindow} must match the type of windows on the input {@link
PCollection}. If the
@@ -720,7 +738,8 @@ public abstract class DoFn<InputT, OutputT> implements
Serializable, HasDisplayD
/**
* Parameter annotation for the input element for {@link ProcessElement},
{@link
- * GetInitialRestriction}, {@link GetSize}, {@link SplitRestriction}, and
{@link NewTracker}
+ * GetInitialRestriction}, {@link GetSize}, {@link SplitRestriction}, {@link
+ * GetInitialWatermarkEstimatorState}, {@link NewWatermarkEstimator}, and
{@link NewTracker}
* methods.
*/
@Documented
@@ -729,8 +748,9 @@ public abstract class DoFn<InputT, OutputT> implements
Serializable, HasDisplayD
public @interface Element {}
/**
- * Parameter annotation for the restriction for {@link GetSize}, {@link
SplitRestriction}, and
- * {@link NewTracker} methods. Must match the return type used on the method
annotated with {@link
+ * Parameter annotation for the restriction for {@link GetSize}, {@link
SplitRestriction}, {@link
+ * GetInitialWatermarkEstimatorState}, {@link NewWatermarkEstimator}, and
{@link NewTracker}
+ * methods. Must match the return type used on the method annotated with
{@link
* GetInitialRestriction}.
*/
@Documented
@@ -740,7 +760,8 @@ public abstract class DoFn<InputT, OutputT> implements
Serializable, HasDisplayD
/**
* Parameter annotation for the input element timestamp for {@link
ProcessElement}, {@link
- * GetInitialRestriction}, {@link GetSize}, {@link SplitRestriction}, and
{@link NewTracker}
+ * GetInitialRestriction}, {@link GetSize}, {@link SplitRestriction}, {@link
+ * GetInitialWatermarkEstimatorState}, {@link NewWatermarkEstimator}, and
{@link NewTracker}
* methods.
*/
@Documented
@@ -1071,6 +1092,120 @@ public abstract class DoFn<InputT, OutputT> implements
Serializable, HasDisplayD
public @interface NewTracker {}
/**
+ * Annotation for the method that maps an element and restriction to initial
watermark estimator
+ * state for a <a
href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
+ *
+ * <p>Signature: {@code WatermarkEstimatorStateT
getInitialWatermarkState(<arguments>);}
+ *
+ * <p>This method must satisfy the following constraints:
+ *
+ * <ul>
+ * <li>The return type {@code WatermarkEstimatorStateT} defines the
watermark state type used
+ * within this splittable DoFn. All other methods that use a {@link
+ * WatermarkEstimatorState @WatermarkEstimatorState} parameter must
use the same type that
+ * is used here. It is suggested to use as narrow of a return type
definition as possible
+ * (for example prefer to use a square type over a shape type as a
square is a type of a
+ * shape).
+ * <li>If one of its arguments is tagged with the {@link Element}
annotation, then it will be
+ * passed the current element being processed; the argument must be of
type {@code InputT}.
+ * Note that automatic conversion of {@link Row}s and {@link
FieldAccess} parameters are
+ * currently unsupported.
+ * <li>If one of its arguments is tagged with the {@link Restriction}
annotation, then it will
+ * be passed the current restriction being processed; the argument
must be of type {@code
+ * RestrictionT}.
+ * <li>If one of its arguments is tagged with the {@link Timestamp}
annotation, then it will be
+ * passed the timestamp of the current element being processed; the
argument must be of type
+ * {@link Instant}.
+ * <li>If one of its arguments is a subtype of {@link BoundedWindow}, then
it will be passed the
+ * window of the current element. When applied by {@link ParDo} the
subtype of {@link
+ * BoundedWindow} must match the type of windows on the input {@link
PCollection}. If the
+ * window is not accessed a runner may perform additional
optimizations.
+ * <li>If one of its arguments is of type {@link PaneInfo}, then it will
be passed information
+ * about the current triggering pane.
+ * <li>If one of the parameters is of type {@link PipelineOptions}, then
it will be passed the
+ * options for the current pipeline.
+ * </ul>
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.METHOD)
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ public @interface GetInitialWatermarkEstimatorState {}
+
+ /**
+ * Annotation for the method that returns the coder to use for the watermark
estimator state of a
+ * <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link
DoFn}.
+ *
+ * <p>If not defined, a coder will be inferred using standard coder
inference rules and the
+ * pipeline's {@link Pipeline#getCoderRegistry coder registry}.
+ *
+ * <p>This method will be called only at pipeline construction time.
+ *
+ * <p>Signature: {@code Coder<WatermarkEstimatorStateT>
getWatermarkEstimatorStateCoder();}
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.METHOD)
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ public @interface GetWatermarkEstimatorStateCoder {}
+
+ /**
+ * Annotation for the method that creates a new {@link WatermarkEstimator}
for the watermark state
+ * of a <a href="https://s.apache.org/splittable-do-fn">splittable</a>
{@link DoFn}.
+ *
+ * <p>Signature: {@code MyWatermarkEstimator newWatermarkEstimator(<optional
arguments>);}
+ *
+ * <p>If the return type is a subtype of {@link
TimestampObservingWatermarkEstimator} then the
+ * timestamp of each element output from this DoFn is provided to the
watermark estimator.
+ *
+ * <p>This method must satisfy the following constraints:
+ *
+ * <ul>
+ * <li>The return type must be a subtype of {@code
+ * WatermarkEstimator<WatermarkEstimatorStateT>}. It is suggested to
use as narrow of a
+ * return type definition as possible (for example prefer to use a
square type over a shape
+ * type as a square is a type of a shape).
+ * <li>If one of its arguments is tagged with the {@link
WatermarkEstimatorState} annotation,
+ * then it will be passed the current watermark estimator state; the
argument must be of
+ * type {@code WatermarkEstimatorStateT}.
+ * <li>If one of its arguments is tagged with the {@link Element}
annotation, then it will be
+ * passed the current element being processed; the argument must be of
type {@code InputT}.
+ * Note that automatic conversion of {@link Row}s and {@link
FieldAccess} parameters are
+ * currently unsupported.
+ * <li>If one of its arguments is tagged with the {@link Restriction}
annotation, then it will
+ * be passed the current restriction being processed; the argument
must be of type {@code
+ * RestrictionT}.
+ * <li>If one of its arguments is tagged with the {@link Timestamp}
annotation, then it will be
+ * passed the timestamp of the current element being processed; the
argument must be of type
+ * {@link Instant}.
+ * <li>If one of its arguments is a subtype of {@link BoundedWindow}, then
it will be passed the
+ * window of the current element. When applied by {@link ParDo} the
subtype of {@link
+ * BoundedWindow} must match the type of windows on the input {@link
PCollection}. If the
+ * window is not accessed a runner may perform additional
optimizations.
+ * <li>If one of its arguments is of type {@link PaneInfo}, then it will
be passed information
+ * about the current triggering pane.
+ * <li>If one of the parameters is of type {@link PipelineOptions}, then
it will be passed the
+ * options for the current pipeline.
+ * </ul>
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.METHOD)
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ public @interface NewWatermarkEstimator {}
+
+ /**
+ * Parameter annotation for the watermark estimator state for the {@link
NewWatermarkEstimator}
+ * method. Must match the return type on the method annotated with {@link
+ * GetInitialWatermarkEstimatorState}.
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.PARAMETER)
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ public @interface WatermarkEstimatorState {}
+
+ /**
* Annotation on a <a
href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}
* specifying that the {@link DoFn} performs a bounded amount of work per
input element, so
* applying it to a bounded {@link PCollection} will produce also a bounded
{@link PCollection}.
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 112046a..6c445ec 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -33,13 +33,9 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.state.Timer;
-import org.apache.beam.sdk.state.TimerMap;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.transforms.DoFn.FinishBundleContext;
import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
-import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.StartBundleContext;
import org.apache.beam.sdk.transforms.Materializations.MultimapView;
@@ -55,7 +51,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.ValueInSingleWindow;
@@ -217,7 +212,13 @@ public class DoFnTester<InputT, OutputT> implements
AutoCloseable {
createProcessContext(
ValueInSingleWindow.of(element, timestamp, window,
PaneInfo.NO_FIRING));
fnInvoker.invokeProcessElement(
- new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
+ new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+
+ @Override
+ public String getErrorContext() {
+ return "DoFnTester";
+ }
+
@Override
public BoundedWindow window() {
return window;
@@ -258,16 +259,6 @@ public class DoFnTester<InputT, OutputT> implements
AutoCloseable {
}
@Override
- public InputT sideInput(String sideInputTag) {
- throw new UnsupportedOperationException("SideInputs are not
supported by DoFnTester");
- }
-
- @Override
- public InputT schemaElement(int index) {
- throw new UnsupportedOperationException("Schemas are not
supported by DoFnTester");
- }
-
- @Override
public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return processContext.timestamp();
}
@@ -290,21 +281,11 @@ public class DoFnTester<InputT, OutputT> implements
AutoCloseable {
}
@Override
- public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT>
doFn) {
- throw new UnsupportedOperationException("Schemas are not
supported by DoFnTester");
- }
-
- @Override
public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT,
OutputT> doFn) {
return DoFnOutputReceivers.windowedMultiReceiver(processContext,
null);
}
@Override
- public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException("DoFnTester doesn't
support timers yet.");
- }
-
- @Override
public Object restriction() {
throw new UnsupportedOperationException(
"Not expected to access Restriction from a regular DoFn in
DoFnTester");
@@ -315,27 +296,6 @@ public class DoFnTester<InputT, OutputT> implements
AutoCloseable {
throw new UnsupportedOperationException(
"Not expected to access RestrictionTracker from a regular
DoFn in DoFnTester");
}
-
- @Override
- public org.apache.beam.sdk.state.State state(String stateId,
boolean alwaysFetched) {
- throw new UnsupportedOperationException("DoFnTester doesn't
support state yet");
- }
-
- @Override
- public Timer timer(String timerId) {
- throw new UnsupportedOperationException("DoFnTester doesn't
support timers yet");
- }
-
- @Override
- public TimerMap timerFamily(String tagId) {
- throw new UnsupportedOperationException("DoFnTester doesn't
support timerFamily yet");
- }
-
- @Override
- public BundleFinalizer bundleFinalizer() {
- throw new UnsupportedOperationException(
- "DoFnTester doesn't support bundleFinalizer yet");
- }
});
} catch (UserCodeException e) {
unwrapUserCodeException(e);
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index f8aa338..014e6dd 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -55,13 +55,19 @@ import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimeDomain
import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerFamilyParameter;
import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimestampParameter;
+import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WatermarkEstimatorParameter;
+import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WatermarkEstimatorStateParameter;
import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import
org.apache.beam.sdk.transforms.splittabledofn.HasDefaultWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.Sizes;
import org.apache.beam.sdk.transforms.splittabledofn.Sizes.HasSize;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.ByteBuddy;
import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.description.field.FieldDescription;
import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.description.method.MethodDescription;
@@ -98,6 +104,7 @@ import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.jar.asm.Type;
import
org.apache.beam.vendor.bytebuddy.v1_10_8.net.bytebuddy.matcher.ElementMatchers;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Primitives;
+import org.joda.time.Instant;
/** Dynamically generates a {@link DoFnInvoker} instances for invoking a
{@link DoFn}. */
class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
@@ -119,6 +126,8 @@ class ByteBuddyDoFnInvokerFactory implements
DoFnInvokerFactory {
public static final String PIPELINE_OPTIONS_PARAMETER_METHOD =
"pipelineOptions";
public static final String RESTRICTION_PARAMETER_METHOD = "restriction";
public static final String RESTRICTION_TRACKER_PARAMETER_METHOD =
"restrictionTracker";
+ public static final String WATERMARK_ESTIMATOR_PARAMETER_METHOD =
"watermarkEstimator";
+ public static final String WATERMARK_ESTIMATOR_STATE_PARAMETER_METHOD =
"watermarkEstimatorState";
public static final String STATE_PARAMETER_METHOD = "state";
public static final String TIMER_PARAMETER_METHOD = "timer";
public static final String SIDE_INPUT_PARAMETER_METHOD = "sideInput";
@@ -313,6 +322,51 @@ class ByteBuddyDoFnInvokerFactory implements
DoFnInvokerFactory {
}
}
+ /**
+ * Default implementation of {@link DoFn.GetWatermarkEstimatorStateCoder},
for delegation by
+ * bytebuddy.
+ */
+ public static class DefaultWatermarkEstimatorStateCoder {
+ private final TypeDescriptor<?> watermarkEstimatorStateType;
+
+ DefaultWatermarkEstimatorStateCoder(TypeDescriptor<?>
watermarkEstimatorStateType) {
+ this.watermarkEstimatorStateType = watermarkEstimatorStateType;
+ }
+
+ @SuppressWarnings({"unused", "unchecked"})
+ public <WatermarkEstimatorStateT>
+ Coder<WatermarkEstimatorStateT> invokeGetWatermarkEstimatorStateCoder(
+ CoderRegistry registry) throws CannotProvideCoderException {
+ return (Coder) registry.getCoder(watermarkEstimatorStateType);
+ }
+ }
+
+ /** Default implementation of {@link DoFn.NewWatermarkEstimator}, for
delegation by bytebuddy. */
+ public static class DefaultNewWatermarkEstimator {
+
+ /** Returns a watermark estimator that always reports the minimum
watermark. */
+ @SuppressWarnings("unused")
+ public static <InputT, OutputT, WatermarkEstimatorStateT>
+ WatermarkEstimator<WatermarkEstimatorStateT> invokeNewTracker(
+ DoFnInvoker.ArgumentProvider<InputT, OutputT> argumentProvider) {
+ if (argumentProvider.watermarkEstimatorState() instanceof
HasDefaultWatermarkEstimator) {
+ return ((HasDefaultWatermarkEstimator)
argumentProvider.watermarkEstimatorState())
+ .newWatermarkEstimator();
+ }
+ return new WatermarkEstimator<WatermarkEstimatorStateT>() {
+ @Override
+ public Instant currentWatermark() {
+ return GlobalWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ @Override
+ public WatermarkEstimatorStateT getState() {
+ return null;
+ }
+ };
+ }
+ }
+
/** Default implementation of {@link DoFn.NewTracker}, for delegation by
bytebuddy. */
public static class DefaultNewTracker {
/** Uses {@link HasDefaultTracker} to produce the tracker. */
@@ -392,7 +446,17 @@ class ByteBuddyDoFnInvokerFactory implements
DoFnInvokerFactory {
.method(ElementMatchers.named("invokeNewTracker"))
.intercept(newTrackerDelegation(clazzDescription,
signature.newTracker()))
.method(ElementMatchers.named("invokeGetSize"))
- .intercept(getSizeDelegation(clazzDescription,
signature.getSize()));
+ .intercept(getSizeDelegation(clazzDescription,
signature.getSize()))
+
.method(ElementMatchers.named("invokeGetWatermarkEstimatorStateCoder"))
+
.intercept(getWatermarkEstimatorStateCoderDelegation(clazzDescription,
signature))
+
.method(ElementMatchers.named("invokeGetInitialWatermarkEstimatorState"))
+ .intercept(
+ delegateMethodWithExtraParametersOrNoop(
+ clazzDescription,
signature.getInitialWatermarkEstimatorState()))
+ .method(ElementMatchers.named("invokeNewWatermarkEstimator"))
+ .intercept(
+ newWatermarkEstimatorDelegation(
+ clazzDescription, signature.newWatermarkEstimator()));
DynamicType.Unloaded<?> unloaded = builder.make();
@SuppressWarnings("unchecked")
@@ -421,6 +485,24 @@ class ByteBuddyDoFnInvokerFactory implements
DoFnInvokerFactory {
}
}
+ private static Implementation getWatermarkEstimatorStateCoderDelegation(
+ TypeDescription doFnType, DoFnSignature signature) {
+ if (signature.processElement().isSplittable()) {
+ if (signature.getWatermarkEstimatorStateCoder() == null) {
+ return MethodDelegation.to(
+ new DefaultWatermarkEstimatorStateCoder(
+ signature.getInitialWatermarkEstimatorState() == null
+ ? TypeDescriptors.voids()
+ :
signature.getInitialWatermarkEstimatorState().watermarkEstimatorStateT()));
+ } else {
+ return new DowncastingParametersMethodDelegation(
+ doFnType,
signature.getWatermarkEstimatorStateCoder().targetMethod());
+ }
+ } else {
+ return ExceptionMethod.throwing(UnsupportedOperationException.class);
+ }
+ }
+
private static Implementation splitRestrictionDelegation(
TypeDescription doFnType, DoFnSignature.SplitRestrictionMethod
signature) {
if (signature == null) {
@@ -430,6 +512,15 @@ class ByteBuddyDoFnInvokerFactory implements
DoFnInvokerFactory {
}
}
+ private static Implementation newWatermarkEstimatorDelegation(
+ TypeDescription doFnType, @Nullable
DoFnSignature.NewWatermarkEstimatorMethod signature) {
+ if (signature == null) {
+ return MethodDelegation.to(DefaultNewWatermarkEstimator.class);
+ } else {
+ return new DoFnMethodWithExtraParametersDelegation(doFnType, signature);
+ }
+ }
+
private static Implementation newTrackerDelegation(
TypeDescription doFnType, @Nullable DoFnSignature.NewTrackerMethod
signature) {
if (signature == null) {
@@ -862,6 +953,27 @@ class ByteBuddyDoFnInvokerFactory implements
DoFnInvokerFactory {
}
@Override
+ public StackManipulation dispatch(WatermarkEstimatorParameter p) {
+ // DoFnInvoker.ArgumentProvider.watermarkEstimator() returns a
WatermarkEstimator,
+ // but the methods expect a concrete subtype of it.
+ // Insert a downcast.
+ return new StackManipulation.Compound(
+
simpleExtraContextParameter(WATERMARK_ESTIMATOR_PARAMETER_METHOD),
+ TypeCasting.to(new
TypeDescription.ForLoadedType(p.estimatorT().getRawType())));
+ }
+
+ @Override
+ public StackManipulation dispatch(WatermarkEstimatorStateParameter
p) {
+ // DoFnInvoker.ArgumentProvider.watermarkEstimatorState() returns
an Object,
+ // but the methods expect a concrete subtype of it.
+ // Insert a downcast.
+ return new StackManipulation.Compound(
+
simpleExtraContextParameter(WATERMARK_ESTIMATOR_STATE_PARAMETER_METHOD),
+ TypeCasting.to(
+ new
TypeDescription.ForLoadedType(p.estimatorStateT().getRawType())));
+ }
+
+ @Override
public StackManipulation dispatch(StateParameter p) {
return new StackManipulation.Compound(
new TextConstant(p.referent().id()),
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index d52872e..412d5f7 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.DoFn.StateId;
import org.apache.beam.sdk.transforms.DoFn.TimerId;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.Sizes;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.Row;
@@ -104,6 +105,21 @@ public interface DoFnInvoker<InputT, OutputT> {
<RestrictionT, PositionT> RestrictionTracker<RestrictionT, PositionT>
invokeNewTracker(
ArgumentProvider<InputT, OutputT> arguments);
+ /** Invoke the {@link DoFn.NewWatermarkEstimator} method on the bound {@link
DoFn}. */
+ @SuppressWarnings("TypeParameterUnusedInFormals")
+ <WatermarkEstimatorStateT>
+ WatermarkEstimator<WatermarkEstimatorStateT> invokeNewWatermarkEstimator(
+ ArgumentProvider<InputT, OutputT> arguments);
+
+ /** Invoke the {@link DoFn.GetInitialWatermarkEstimatorState} method on the
bound {@link DoFn}. */
+ Object invokeGetInitialWatermarkEstimatorState(ArgumentProvider<InputT,
OutputT> arguments);
+
+ /**
+ * Invoke the {@link DoFn.GetWatermarkEstimatorStateCoder} method on the
bound {@link DoFn}.
+ * Called only during pipeline construction time.
+ */
+ Coder<?> invokeGetWatermarkEstimatorStateCoder(CoderRegistry coderRegistry);
+
/** Get the bound {@link DoFn}. */
DoFn<InputT, OutputT> getFn();
@@ -190,6 +206,18 @@ public interface DoFnInvoker<InputT, OutputT> {
*/
RestrictionTracker<?, ?> restrictionTracker();
+ /**
+ * If this is a splittable {@link DoFn}, returns the associated watermark
estimator state with
+ * the current call.
+ */
+ Object watermarkEstimatorState();
+
+ /**
+ * If this is a splittable {@link DoFn}, returns the associated {@link
WatermarkEstimator} with
+ * the current call.
+ */
+ WatermarkEstimator<?> watermarkEstimator();
+
/** Returns the state cell for the given {@link StateId}. */
State state(String stateId, boolean alwaysFetched);
@@ -338,6 +366,18 @@ public interface DoFnInvoker<InputT, OutputT> {
}
@Override
+ public Object watermarkEstimatorState() {
+ throw new UnsupportedOperationException(
+ String.format("WatermarkEstimatorState unsupported in %s",
getErrorContext()));
+ }
+
+ @Override
+ public WatermarkEstimator<?> watermarkEstimator() {
+ throw new UnsupportedOperationException(
+ String.format("WatermarkEstimator unsupported in %s",
getErrorContext()));
+ }
+
+ @Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException(
String.format("BundleFinalizer unsupported in %s",
getErrorContext()));
@@ -449,6 +489,16 @@ public interface DoFnInvoker<InputT, OutputT> {
}
@Override
+ public Object watermarkEstimatorState() {
+ return delegate.watermarkEstimatorState();
+ }
+
+ @Override
+ public WatermarkEstimator<?> watermarkEstimator() {
+ return delegate.watermarkEstimator();
+ }
+
+ @Override
public State state(String stateId, boolean alwaysFetch) {
return delegate.state(stateId, alwaysFetch);
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index b675596..1df70db 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -46,6 +46,7 @@ import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParam
import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -114,6 +115,18 @@ public abstract class DoFnSignature {
@Nullable
public abstract GetRestrictionCoderMethod getRestrictionCoder();
+ /** Details about this {@link DoFn}'s {@link
DoFn.GetWatermarkEstimatorStateCoder} method. */
+ @Nullable
+ public abstract GetWatermarkEstimatorStateCoderMethod
getWatermarkEstimatorStateCoder();
+
+ /** Details about this {@link DoFn}'s {@link
DoFn.GetInitialWatermarkEstimatorState} method. */
+ @Nullable
+ public abstract GetInitialWatermarkEstimatorStateMethod
getInitialWatermarkEstimatorState();
+
+ /** Details about this {@link DoFn}'s {@link DoFn.NewWatermarkEstimator}
method. */
+ @Nullable
+ public abstract NewWatermarkEstimatorMethod newWatermarkEstimator();
+
/** Details about this {@link DoFn}'s {@link DoFn.NewTracker} method. */
@Nullable
public abstract NewTrackerMethod newTracker();
@@ -178,6 +191,14 @@ public abstract class DoFnSignature {
abstract Builder setGetSize(GetSizeMethod getSize);
+ abstract Builder setGetInitialWatermarkEstimatorState(
+ GetInitialWatermarkEstimatorStateMethod
getInitialWatermarkEstimatorState);
+
+ abstract Builder setNewWatermarkEstimator(NewWatermarkEstimatorMethod
newWatermarkEstimator);
+
+ abstract Builder setGetWatermarkEstimatorStateCoder(
+ GetWatermarkEstimatorStateCoderMethod getWatermarkEstimatorStateCoder);
+
abstract Builder setStateDeclarations(Map<String, StateDeclaration>
stateDeclarations);
abstract Builder setTimerDeclarations(Map<String, TimerDeclaration>
timerDeclarations);
@@ -247,6 +268,10 @@ public abstract class DoFnSignature {
return cases.dispatch((RestrictionParameter) this);
} else if (this instanceof RestrictionTrackerParameter) {
return cases.dispatch((RestrictionTrackerParameter) this);
+ } else if (this instanceof WatermarkEstimatorParameter) {
+ return cases.dispatch((WatermarkEstimatorParameter) this);
+ } else if (this instanceof WatermarkEstimatorStateParameter) {
+ return cases.dispatch((WatermarkEstimatorStateParameter) this);
} else if (this instanceof StateParameter) {
return cases.dispatch((StateParameter) this);
} else if (this instanceof TimerParameter) {
@@ -311,6 +336,10 @@ public abstract class DoFnSignature {
ResultT dispatch(RestrictionTrackerParameter p);
+ ResultT dispatch(WatermarkEstimatorParameter p);
+
+ ResultT dispatch(WatermarkEstimatorStateParameter p);
+
ResultT dispatch(StateParameter p);
ResultT dispatch(TimerParameter p);
@@ -406,6 +435,16 @@ public abstract class DoFnSignature {
}
@Override
+ public ResultT dispatch(WatermarkEstimatorParameter p) {
+ return dispatchDefault(p);
+ }
+
+ @Override
+ public ResultT dispatch(WatermarkEstimatorStateParameter p) {
+ return dispatchDefault(p);
+ }
+
+ @Override
public ResultT dispatch(BundleFinalizerParameter p) {
return dispatchDefault(p);
}
@@ -551,6 +590,19 @@ public abstract class DoFnSignature {
return new
AutoValue_DoFnSignature_Parameter_RestrictionTrackerParameter(trackerT);
}
+ /** Returns a {@link WatermarkEstimatorParameter}. */
+ public static WatermarkEstimatorParameter watermarkEstimator(
+ TypeDescriptor<?> watermarkEstimatorT) {
+ return new
AutoValue_DoFnSignature_Parameter_WatermarkEstimatorParameter(watermarkEstimatorT);
+ }
+
+ /** Returns a {@link WatermarkEstimatorStateParameter}. */
+ public static WatermarkEstimatorStateParameter watermarkEstimatorState(
+ TypeDescriptor<?> watermarkEstimatorStateT) {
+ return new
AutoValue_DoFnSignature_Parameter_WatermarkEstimatorStateParameter(
+ watermarkEstimatorStateT);
+ }
+
/** Returns a {@link StateParameter} referring to the given {@link
StateDeclaration}. */
public static StateParameter stateParameter(StateDeclaration decl, boolean
alwaysFetched) {
return new AutoValue_DoFnSignature_Parameter_StateParameter(decl,
alwaysFetched);
@@ -768,6 +820,32 @@ public abstract class DoFnSignature {
}
/**
+ * Descriptor for a {@link Parameter} of type {@link
DoFn.WatermarkEstimatorState}.
+ *
+ * <p>All such descriptors are equal.
+ */
+ @AutoValue
+ public abstract static class WatermarkEstimatorStateParameter extends
Parameter {
+ // Package visible for AutoValue
+ WatermarkEstimatorStateParameter() {}
+
+ public abstract TypeDescriptor<?> estimatorStateT();
+ }
+
+ /**
+ * Descriptor for a {@link Parameter} of type {@link
DoFn.WatermarkEstimatorState}.
+ *
+ * <p>All such descriptors are equal.
+ */
+ @AutoValue
+ public abstract static class WatermarkEstimatorParameter extends Parameter
{
+ // Package visible for AutoValue
+ WatermarkEstimatorParameter() {}
+
+ public abstract TypeDescriptor<?> estimatorT();
+ }
+
+ /**
* Descriptor for a {@link Parameter} of a subclass of {@link
RestrictionTracker}.
*
* <p>All such descriptors are equal.
@@ -845,6 +923,10 @@ public abstract class DoFnSignature {
@Nullable
public abstract TypeDescriptor<?> trackerT();
+ /** Concrete type of the {@link WatermarkEstimator} parameter, if present.
*/
+ @Nullable
+ public abstract TypeDescriptor<?> watermarkEstimatorT();
+
/** The window type used by this method, if any. */
@Nullable
@Override
@@ -859,6 +941,7 @@ public abstract class DoFnSignature {
boolean requiresStableInput,
boolean requiresTimeSortedInput,
TypeDescriptor<?> trackerT,
+ TypeDescriptor<?> watermarkEstimatorT,
@Nullable TypeDescriptor<? extends BoundedWindow> windowT,
boolean hasReturnValue) {
return new AutoValue_DoFnSignature_ProcessElementMethod(
@@ -867,6 +950,7 @@ public abstract class DoFnSignature {
requiresStableInput,
requiresTimeSortedInput,
trackerT,
+ watermarkEstimatorT,
windowT,
hasReturnValue);
}
@@ -1228,7 +1312,7 @@ public abstract class DoFnSignature {
}
}
- /** Describes a {@link DoFn.NewTracker} method. */
+ /** Describes a {@link DoFn.GetSize} method. */
@AutoValue
public abstract static class GetSizeMethod implements
MethodWithExtraParameters {
/** The annotated method itself. */
@@ -1266,4 +1350,80 @@ public abstract class DoFnSignature {
return new
AutoValue_DoFnSignature_GetRestrictionCoderMethod(targetMethod, coderT);
}
}
+
+ /** Describes a {@link DoFn.GetInitialWatermarkEstimatorState} method. */
+ @AutoValue
+ public abstract static class GetInitialWatermarkEstimatorStateMethod
+ implements MethodWithExtraParameters {
+ /** The annotated method itself. */
+ @Override
+ public abstract Method targetMethod();
+
+ /** Type of the returned watermark estimator state. */
+ public abstract TypeDescriptor<?> watermarkEstimatorStateT();
+
+ /** The window type used by this method, if any. */
+ @Nullable
+ @Override
+ public abstract TypeDescriptor<? extends BoundedWindow> windowT();
+
+ /** Types of optional parameters of the annotated method, in the order
they appear. */
+ @Override
+ public abstract List<Parameter> extraParameters();
+
+ static GetInitialWatermarkEstimatorStateMethod create(
+ Method targetMethod,
+ TypeDescriptor<?> watermarkEstimatorStateT,
+ TypeDescriptor<? extends BoundedWindow> windowT,
+ List<Parameter> extraParameters) {
+ return new
AutoValue_DoFnSignature_GetInitialWatermarkEstimatorStateMethod(
+ targetMethod, watermarkEstimatorStateT, windowT, extraParameters);
+ }
+ }
+
+ /** Describes a {@link DoFn.NewWatermarkEstimator} method. */
+ @AutoValue
+ public abstract static class NewWatermarkEstimatorMethod implements
MethodWithExtraParameters {
+ /** The annotated method itself. */
+ @Override
+ public abstract Method targetMethod();
+
+ /** Type of the returned {@link WatermarkEstimator}. */
+ public abstract TypeDescriptor<?> watermarkEstimatorT();
+
+ /** The window type used by this method, if any. */
+ @Nullable
+ @Override
+ public abstract TypeDescriptor<? extends BoundedWindow> windowT();
+
+ /** Types of optional parameters of the annotated method, in the order
they appear. */
+ @Override
+ public abstract List<Parameter> extraParameters();
+
+ static NewWatermarkEstimatorMethod create(
+ Method targetMethod,
+ TypeDescriptor<?> watermarkEstimatorT,
+ TypeDescriptor<? extends BoundedWindow> windowT,
+ List<Parameter> extraParameters) {
+ return new AutoValue_DoFnSignature_NewWatermarkEstimatorMethod(
+ targetMethod, watermarkEstimatorT, windowT, extraParameters);
+ }
+ }
+
+ /** Describes a {@link DoFn.GetRestrictionCoder} method. */
+ @AutoValue
+ public abstract static class GetWatermarkEstimatorStateCoderMethod
implements DoFnMethod {
+ /** The annotated method itself. */
+ @Override
+ public abstract Method targetMethod();
+
+ /** Type of the returned {@link Coder}. */
+ public abstract TypeDescriptor<?> coderT();
+
+ static GetWatermarkEstimatorStateCoderMethod create(
+ Method targetMethod, TypeDescriptor<?> coderT) {
+ return new AutoValue_DoFnSignature_GetWatermarkEstimatorStateCoderMethod(
+ targetMethod, coderT);
+ }
+ }
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index fb76330..cea704b 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -37,6 +37,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
@@ -62,25 +63,33 @@ import org.apache.beam.sdk.transforms.DoFn.StateId;
import org.apache.beam.sdk.transforms.DoFn.TimerId;
import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.FieldAccessDeclaration;
import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.GetInitialRestrictionMethod;
+import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.GetInitialWatermarkEstimatorStateMethod;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
+import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.PipelineOptionsParameter;
import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionParameter;
import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.SchemaElementParameter;
import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerFamilyParameter;
import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
+import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WatermarkEstimatorParameter;
+import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WatermarkEstimatorStateParameter;
import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerFamilyDeclaration;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import
org.apache.beam.sdk.transforms.splittabledofn.HasDefaultWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.TypeParameter;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicates;
@@ -127,6 +136,7 @@ public class DoFnSignatures {
Parameter.TaggedOutputReceiverParameter.class,
Parameter.ProcessContextParameter.class,
Parameter.RestrictionTrackerParameter.class,
+ Parameter.WatermarkEstimatorParameter.class,
Parameter.SideInputParameter.class,
Parameter.BundleFinalizerParameter.class);
@@ -213,12 +223,32 @@ public class DoFnSignatures {
ImmutableList.of(
Parameter.ElementParameter.class,
Parameter.RestrictionParameter.class,
- Parameter.RestrictionTrackerParameter.class,
Parameter.WindowParameter.class,
Parameter.TimestampParameter.class,
Parameter.PaneInfoParameter.class,
Parameter.PipelineOptionsParameter.class);
+ private static final Collection<Class<? extends Parameter>>
+ ALLOWED_GET_INITIAL_WATERMARK_ESTIMATOR_STATE_PARAMETERS =
+ ImmutableList.of(
+ Parameter.ElementParameter.class,
+ Parameter.RestrictionParameter.class,
+ Parameter.WindowParameter.class,
+ Parameter.TimestampParameter.class,
+ Parameter.PaneInfoParameter.class,
+ Parameter.PipelineOptionsParameter.class);
+
+ private static final Collection<Class<? extends Parameter>>
+ ALLOWED_NEW_WATERMARK_ESTIMATOR_PARAMETERS =
+ ImmutableList.of(
+ Parameter.WatermarkEstimatorStateParameter.class,
+ Parameter.ElementParameter.class,
+ Parameter.RestrictionParameter.class,
+ Parameter.WindowParameter.class,
+ Parameter.TimestampParameter.class,
+ Parameter.PaneInfoParameter.class,
+ Parameter.PipelineOptionsParameter.class);
+
/** @return the {@link DoFnSignature} for the given {@link DoFn} instance. */
public static <FnT extends DoFn<?, ?>> DoFnSignature signatureForDoFn(FnT
fn) {
return getSignature(fn.getClass());
@@ -332,21 +362,34 @@ public class DoFnSignatures {
private MethodAnalysisContext() {}
- /** Indicates whether a {@link RestrictionTrackerParameter} is known in
this context. */
- public boolean hasRestrictionTrackerParameter() {
- return extraParameters.stream()
-
.anyMatch(Predicates.instanceOf(RestrictionTrackerParameter.class)::apply);
+ /** Indicates whether the specified {@link Parameter} is known in this
context. */
+ public boolean hasParameter(Class<? extends Parameter> type) {
+ return
extraParameters.stream().anyMatch(Predicates.instanceOf(type)::apply);
}
- /** Indicates whether a {@link WindowParameter} is known in this context.
*/
- public boolean hasWindowParameter() {
- return
extraParameters.stream().anyMatch(Predicates.instanceOf(WindowParameter.class)::apply);
+ /**
+ * Returns the specified {@link Parameter} if it is known in this context.
Throws {@link
+ * IllegalStateException} if there is more than one instance of the
parameter.
+ */
+ @Nullable
+ public <T extends Parameter> Optional<T> findParameter(Class<T> type) {
+ List<T> parameters = findParameters(type);
+ switch (parameters.size()) {
+ case 0:
+ return Optional.empty();
+ case 1:
+ return Optional.of(parameters.get(0));
+ default:
+ throw new IllegalStateException(
+ String.format(
+ "Expected to have found at most one parameter of type %s but
found %s.",
+ type, parameters));
+ }
}
- /** Indicates whether a {@link Parameter.PipelineOptionsParameter} is
known in this context. */
- public boolean hasPipelineOptionsParamter() {
- return extraParameters.stream()
-
.anyMatch(Predicates.instanceOf(Parameter.PipelineOptionsParameter.class)::apply);
+ public <T extends Parameter> List<T> findParameters(Class<T> type) {
+ return (List<T>)
+
extraParameters.stream().filter(Predicates.instanceOf(type)).collect(Collectors.toList());
}
/** The window type, if any, used by this method. */
@@ -477,6 +520,12 @@ public class DoFnSignatures {
findAnnotatedMethod(errors, DoFn.GetRestrictionCoder.class, fnClass,
false);
Method newTrackerMethod = findAnnotatedMethod(errors,
DoFn.NewTracker.class, fnClass, false);
Method getSizeMethod = findAnnotatedMethod(errors, DoFn.GetSize.class,
fnClass, false);
+ Method getWatermarkEstimatorStateCoderMethod =
+ findAnnotatedMethod(errors,
DoFn.GetWatermarkEstimatorStateCoder.class, fnClass, false);
+ Method getInitialWatermarkEstimatorStateMethod =
+ findAnnotatedMethod(errors,
DoFn.GetInitialWatermarkEstimatorState.class, fnClass, false);
+ Method newWatermarkEstimatorMethod =
+ findAnnotatedMethod(errors, DoFn.NewWatermarkEstimator.class, fnClass,
false);
Collection<Method> onTimerMethods =
declaredMethodsWithAnnotation(DoFn.OnTimer.class, fnClass, DoFn.class);
@@ -601,7 +650,7 @@ public class DoFnSignatures {
"Splittable, but does not define the required @%s method.",
DoFnSignatures.format(DoFn.GetInitialRestriction.class));
- GetInitialRestrictionMethod method =
+ GetInitialRestrictionMethod initialRestrictionMethod =
analyzeGetInitialRestrictionMethod(
errors.forMethod(DoFn.GetInitialRestriction.class,
getInitialRestrictionMethod),
fnT,
@@ -610,8 +659,24 @@ public class DoFnSignatures {
outputT,
fnContext);
- signatureBuilder.setGetInitialRestriction(method);
- TypeDescriptor<?> restrictionT = method.restrictionT();
+ signatureBuilder.setGetInitialRestriction(initialRestrictionMethod);
+ TypeDescriptor<?> restrictionT = initialRestrictionMethod.restrictionT();
+
+ TypeDescriptor<?> watermarkEstimatorStateT = TypeDescriptors.voids();
+ if (getInitialWatermarkEstimatorStateMethod != null) {
+ GetInitialWatermarkEstimatorStateMethod
initialWatermarkEstimatorStateMethod =
+ analyzeGetInitialWatermarkEstimatorStateMethod(
+ errors.forMethod(
+ DoFn.GetInitialWatermarkEstimatorState.class,
+ getInitialWatermarkEstimatorStateMethod),
+ fnT,
+ getInitialWatermarkEstimatorStateMethod,
+ inputT,
+ outputT,
+ fnContext);
+ watermarkEstimatorStateT =
initialWatermarkEstimatorStateMethod.watermarkEstimatorStateT();
+
signatureBuilder.setGetInitialWatermarkEstimatorState(initialWatermarkEstimatorStateMethod);
+ }
if (newTrackerMethod != null) {
signatureBuilder.setNewTracker(
@@ -665,6 +730,39 @@ public class DoFnSignatures {
fnT,
getRestrictionCoderMethod));
}
+
+ if (getWatermarkEstimatorStateCoderMethod != null) {
+ signatureBuilder.setGetWatermarkEstimatorStateCoder(
+ analyzeGetWatermarkEstimatorStateCoderMethod(
+ errors.forMethod(
+ DoFn.GetWatermarkEstimatorStateCoder.class,
+ getWatermarkEstimatorStateCoderMethod),
+ fnT,
+ getWatermarkEstimatorStateCoderMethod));
+ }
+
+ if (newWatermarkEstimatorMethod != null) {
+ signatureBuilder.setNewWatermarkEstimator(
+ analyzeNewWatermarkEstimatorMethod(
+ errors.forMethod(DoFn.NewWatermarkEstimator.class,
newWatermarkEstimatorMethod),
+ fnT,
+ newWatermarkEstimatorMethod,
+ inputT,
+ outputT,
+ restrictionT,
+ watermarkEstimatorStateT,
+ fnContext));
+ } else if (getInitialWatermarkEstimatorStateMethod != null) {
+ errors
+ .forMethod(DoFn.NewWatermarkEstimator.class, null)
+ .checkArgument(
+ watermarkEstimatorStateT.isSubtypeOf(
+ TypeDescriptor.of(HasDefaultWatermarkEstimator.class)),
+ "Splittable, either @%s method must be defined or %s must
implement %s.",
+ format(DoFn.NewWatermarkEstimator.class),
+ format(watermarkEstimatorStateT),
+ format(HasDefaultWatermarkEstimator.class));
+ }
} else {
// Validate that none of the splittable DoFn only methods have been
declared.
List<String> forbiddenMethods = new ArrayList<>();
@@ -683,6 +781,15 @@ public class DoFnSignatures {
if (getSizeMethod != null) {
forbiddenMethods.add("@" + format(DoFn.GetSize.class));
}
+ if (getInitialWatermarkEstimatorStateMethod != null) {
+ forbiddenMethods.add("@" +
format(DoFn.GetInitialWatermarkEstimatorState.class));
+ }
+ if (getWatermarkEstimatorStateCoderMethod != null) {
+ forbiddenMethods.add("@" +
format(DoFn.GetWatermarkEstimatorStateCoder.class));
+ }
+ if (newWatermarkEstimatorMethod != null) {
+ forbiddenMethods.add("@" + format(DoFn.NewWatermarkEstimator.class));
+ }
errors.checkArgument(
forbiddenMethods.isEmpty(), "Non-splittable, but defines methods:
%s", forbiddenMethods);
}
@@ -785,11 +892,19 @@ public class DoFnSignatures {
signature.getInitialRestriction();
DoFnSignature.NewTrackerMethod newTracker = signature.newTracker();
DoFnSignature.GetRestrictionCoderMethod getRestrictionCoder =
signature.getRestrictionCoder();
+ DoFnSignature.GetInitialWatermarkEstimatorStateMethod
getInitialWatermarkEstimatorState =
+ signature.getInitialWatermarkEstimatorState();
+ DoFnSignature.GetWatermarkEstimatorStateCoderMethod
getWatermarkEstimatorStateCoder =
+ signature.getWatermarkEstimatorStateCoder();
ErrorReporter processElementErrors =
errors.forMethod(DoFn.ProcessElement.class,
processElement.targetMethod());
TypeDescriptor<?> restrictionT = getInitialRestriction.restrictionT();
+ TypeDescriptor<?> watermarkEstimatorStateT =
+ getInitialWatermarkEstimatorState == null
+ ? TypeDescriptors.voids()
+ : getInitialWatermarkEstimatorState.watermarkEstimatorStateT();
if (newTracker == null) {
ErrorReporter newTrackerErrors = errors.forMethod(DoFn.NewTracker.class,
null);
@@ -806,6 +921,17 @@ public class DoFnSignatures {
"Has tracker type %s, but the DoFn's tracker type must be of type
RestrictionTracker.",
format(processElement.trackerT()));
+ if (processElement.watermarkEstimatorT() != null) {
+ processElementErrors.checkArgument(
+
processElement.watermarkEstimatorT().getRawType().equals(WatermarkEstimator.class)
+ || processElement
+ .watermarkEstimatorT()
+ .getRawType()
+ .equals(ManualWatermarkEstimator.class),
+ "Has watermark estimator type %s, but the DoFn's watermark estimator
type must be one of [WatermarkEstimator, ManualWatermarkEstimator] types.",
+ format(processElement.watermarkEstimatorT()));
+ }
+
if (getRestrictionCoder != null) {
ErrorReporter getInitialRestrictionErrors =
errors.forMethod(DoFn.GetInitialRestriction.class,
getInitialRestriction.targetMethod());
@@ -819,6 +945,26 @@ public class DoFnSignatures {
format(getRestrictionCoder.coderT()),
format(coderTypeOf(restrictionT)));
}
+
+ if (getWatermarkEstimatorStateCoder != null) {
+ ErrorReporter getInitialWatermarkEstimatorStateReporter =
+ errors.forMethod(
+ DoFn.GetInitialWatermarkEstimatorState.class,
+ getInitialWatermarkEstimatorState == null
+ ? null
+ : getInitialWatermarkEstimatorState.targetMethod());
+ getInitialWatermarkEstimatorStateReporter.checkArgument(
+ getWatermarkEstimatorStateCoder
+ .coderT()
+ .isSubtypeOf(coderTypeOf(watermarkEstimatorStateT)),
+ "Uses watermark estimator state type %s, but @%s method %s returns
%s "
+ + "which is not a subtype of %s",
+ format(watermarkEstimatorStateT),
+ format(DoFn.GetInitialWatermarkEstimatorState.class),
+ format(getWatermarkEstimatorStateCoder.targetMethod()),
+ format(getWatermarkEstimatorStateCoder.coderT()),
+ format(coderTypeOf(watermarkEstimatorStateT)));
+ }
}
/**
@@ -1022,11 +1168,9 @@ public class DoFnSignatures {
boolean requiresStableInput =
m.isAnnotationPresent(DoFn.RequiresStableInput.class);
boolean requiresTimeSortedInput =
m.isAnnotationPresent(DoFn.RequiresTimeSortedInput.class);
- Type[] params = m.getGenericParameterTypes();
-
- TypeDescriptor<?> trackerT = getTrackerType(fnClass, m);
TypeDescriptor<? extends BoundedWindow> windowT = getWindowType(fnClass,
m);
+ Type[] params = m.getGenericParameterTypes();
for (int i = 0; i < params.length; ++i) {
Parameter extraParam =
analyzeExtraParameter(
@@ -1055,8 +1199,19 @@ public class DoFnSignatures {
}
}
+ TypeDescriptor<?> trackerT =
+ methodContext
+ .findParameter(RestrictionTrackerParameter.class)
+ .map(p -> p.trackerT())
+ .orElse(null);
+ TypeDescriptor<?> watermarkEstimatorT =
+ methodContext
+ .findParameter(WatermarkEstimatorParameter.class)
+ .map(p -> p.estimatorT())
+ .orElse(null);
+
// The allowed parameters depend on whether this DoFn is splittable
- if (methodContext.hasRestrictionTrackerParameter()) {
+ if (trackerT != null) {
for (Parameter parameter : methodContext.getExtraParameters()) {
checkParameterOneOf(errors, parameter,
ALLOWED_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS);
}
@@ -1072,6 +1227,7 @@ public class DoFnSignatures {
requiresStableInput,
requiresTimeSortedInput,
trackerT,
+ watermarkEstimatorT,
windowT,
DoFn.ProcessContinuation.class.equals(m.getReturnType()));
}
@@ -1113,20 +1269,22 @@ public class DoFnSignatures {
String fieldAccessString = getFieldAccessId(param.getAnnotations());
if (fieldAccessString != null) {
return Parameter.schemaElementParameter(paramT, fieldAccessString,
param.getIndex());
- } else if (hasElementAnnotation(param.getAnnotations())) {
+ } else if (hasAnnotation(DoFn.Element.class, param.getAnnotations())) {
return (paramT.equals(inputT))
? Parameter.elementParameter(paramT)
: Parameter.schemaElementParameter(paramT, null, param.getIndex());
- } else if (hasRestrictionAnnotation(param.getAnnotations())) {
+ } else if (hasAnnotation(DoFn.Restriction.class, param.getAnnotations())) {
return Parameter.restrictionParameter(paramT);
- } else if (hasTimestampAnnotation(param.getAnnotations())) {
+ } else if (hasAnnotation(DoFn.WatermarkEstimatorState.class,
param.getAnnotations())) {
+ return Parameter.watermarkEstimatorState(paramT);
+ } else if (hasAnnotation(DoFn.Timestamp.class, param.getAnnotations())) {
methodErrors.checkArgument(
rawType.equals(Instant.class),
"@Timestamp argument must have type org.joda.time.Instant.");
return Parameter.timestampParameter();
} else if (rawType.equals(TimeDomain.class)) {
return Parameter.timeDomainParameter();
- } else if (hasSideInputAnnotation(param.getAnnotations())) {
+ } else if (hasAnnotation(DoFn.SideInput.class, param.getAnnotations())) {
String sideInputId = getSideInputId(param.getAnnotations());
paramErrors.checkArgument(
sideInputId != null, "%s missing %s annotation",
format(SideInput.class));
@@ -1161,7 +1319,7 @@ public class DoFnSignatures {
return Parameter.onTimerContext();
} else if (BoundedWindow.class.isAssignableFrom(rawType)) {
methodErrors.checkArgument(
- !methodContext.hasWindowParameter(),
+ !methodContext.hasParameter(WindowParameter.class),
"Multiple %s parameters",
format(BoundedWindow.class));
return Parameter.boundedWindow((TypeDescriptor<? extends BoundedWindow>)
paramT);
@@ -1183,17 +1341,22 @@ public class DoFnSignatures {
return Parameter.taggedOutputReceiverParameter();
} else if (PipelineOptions.class.equals(rawType)) {
methodErrors.checkArgument(
- !methodContext.hasPipelineOptionsParamter(),
+ !methodContext.hasParameter(PipelineOptionsParameter.class),
"Multiple %s parameters",
format(PipelineOptions.class));
return Parameter.pipelineOptions();
} else if (RestrictionTracker.class.isAssignableFrom(rawType)) {
methodErrors.checkArgument(
- !methodContext.hasRestrictionTrackerParameter(),
+ !methodContext.hasParameter(RestrictionTrackerParameter.class),
"Multiple %s parameters",
format(RestrictionTracker.class));
return Parameter.restrictionTracker(paramT);
-
+ } else if (WatermarkEstimator.class.isAssignableFrom(rawType)) {
+ methodErrors.checkArgument(
+ !methodContext.hasParameter(WatermarkEstimatorParameter.class),
+ "Multiple %s parameters",
+ format(WatermarkEstimator.class));
+ return Parameter.watermarkEstimator(paramT);
} else if (rawType.equals(Timer.class)) {
// m.getParameters() is not available until Java 8
String id = getTimerId(param.getAnnotations());
@@ -1221,7 +1384,7 @@ public class DoFnSignatures {
return Parameter.timerParameter(timerDecl);
- } else if (hasTimerIdAnnotation(param.getAnnotations())) {
+ } else if (hasAnnotation(DoFn.TimerId.class, param.getAnnotations())) {
boolean isValidTimerIdForTimerFamily =
fnContext.getTimerFamilyDeclarations().size() > 0 &&
rawType.equals(String.class);
paramErrors.checkArgument(
@@ -1348,36 +1511,8 @@ public class DoFnSignatures {
return annotation.isPresent() ? (T) annotation.get() : null;
}
- private static boolean hasElementAnnotation(List<Annotation> annotations) {
- return annotations.stream().anyMatch(a ->
a.annotationType().equals(DoFn.Element.class));
- }
-
- private static boolean hasRestrictionAnnotation(List<Annotation>
annotations) {
- return annotations.stream().anyMatch(a ->
a.annotationType().equals(DoFn.Restriction.class));
- }
-
- private static boolean hasTimestampAnnotation(List<Annotation> annotations) {
- return annotations.stream().anyMatch(a ->
a.annotationType().equals(DoFn.Timestamp.class));
- }
-
- private static boolean hasSideInputAnnotation(List<Annotation> annotations) {
- return annotations.stream().anyMatch(a ->
a.annotationType().equals(DoFn.SideInput.class));
- }
-
- private static boolean hasTimerIdAnnotation(List<Annotation> annotations) {
- return annotations.stream().anyMatch(a ->
a.annotationType().equals(DoFn.TimerId.class));
- }
-
- @Nullable
- private static TypeDescriptor<?> getTrackerType(TypeDescriptor<?> fnClass,
Method method) {
- Type[] params = method.getGenericParameterTypes();
- for (Type param : params) {
- TypeDescriptor<?> paramT = fnClass.resolveType(param);
- if (RestrictionTracker.class.isAssignableFrom(paramT.getRawType())) {
- return paramT;
- }
- }
- return null;
+ private static boolean hasAnnotation(Class<?> annotation, List<Annotation>
annotations) {
+ return annotations.stream().anyMatch(a ->
a.annotationType().equals(annotation));
}
@Nullable
@@ -1509,6 +1644,53 @@ public class DoFnSignatures {
m, fnT.resolveType(m.getGenericReturnType()), windowT,
methodContext.extraParameters);
}
+ @VisibleForTesting
+ static DoFnSignature.GetInitialWatermarkEstimatorStateMethod
+ analyzeGetInitialWatermarkEstimatorStateMethod(
+ ErrorReporter errors,
+ TypeDescriptor<? extends DoFn<?, ?>> fnT,
+ Method m,
+ TypeDescriptor<?> inputT,
+ TypeDescriptor<?> outputT,
+ FnAnalysisContext fnContext) {
+ // Method is of the form:
+ // @GetInitialWatermarkEstimatorState
+ // WatermarkEstimatorStateT getInitialWatermarkEstimatorState(...
parameters ...);
+
+ Type[] params = m.getGenericParameterTypes();
+ MethodAnalysisContext methodContext = MethodAnalysisContext.create();
+ TypeDescriptor<? extends BoundedWindow> windowT = getWindowType(fnT, m);
+ for (int i = 0; i < params.length; ++i) {
+ Parameter extraParam =
+ analyzeExtraParameter(
+ errors,
+ fnContext,
+ methodContext,
+ fnT,
+ ParameterDescription.of(
+ m, i, fnT.resolveType(params[i]),
Arrays.asList(m.getParameterAnnotations()[i])),
+ inputT,
+ outputT);
+ if (extraParam instanceof SchemaElementParameter) {
+ errors.throwIllegalArgument(
+ "Schema @%s are not supported for @%s method. Found %s, did you
mean to use %s?",
+ format(DoFn.Element.class),
+ format(DoFn.GetInitialWatermarkEstimatorState.class),
+ format(((SchemaElementParameter) extraParam).elementT()),
+ format(inputT));
+ }
+ methodContext.addParameter(extraParam);
+ }
+
+ for (Parameter parameter : methodContext.getExtraParameters()) {
+ checkParameterOneOf(
+ errors, parameter,
ALLOWED_GET_INITIAL_WATERMARK_ESTIMATOR_STATE_PARAMETERS);
+ }
+
+ return DoFnSignature.GetInitialWatermarkEstimatorStateMethod.create(
+ m, fnT.resolveType(m.getGenericReturnType()), windowT,
methodContext.extraParameters);
+ }
+
/**
* Generates a {@link TypeDescriptor} for {@code
DoFn.OutputReceiver<OutputT>} given {@code
* OutputT}.
@@ -1683,8 +1865,21 @@ public class DoFnSignatures {
return DoFnSignature.GetRestrictionCoderMethod.create(m, resT);
}
+ @VisibleForTesting
+ static DoFnSignature.GetWatermarkEstimatorStateCoderMethod
+ analyzeGetWatermarkEstimatorStateCoderMethod(
+ ErrorReporter errors, TypeDescriptor<? extends DoFn> fnT, Method m) {
+ errors.checkArgument(m.getParameterTypes().length == 0, "Must have zero
arguments");
+ TypeDescriptor<?> resT = fnT.resolveType(m.getGenericReturnType());
+ errors.checkArgument(
+ resT.isSubtypeOf(TypeDescriptor.of(Coder.class)),
+ "Must return a Coder, but returns %s",
+ format(resT));
+ return DoFnSignature.GetWatermarkEstimatorStateCoderMethod.create(m, resT);
+ }
+
/**
- * Generates a {@link TypeDescriptor} for {@code
RestrictionTracker<RestrictionT>} given {@code
+ * Generates a {@link TypeDescriptor} for {@code
RestrictionTracker<RestrictionT, ?>} given {@code
* RestrictionT}.
*/
private static <RestrictionT>
@@ -1694,6 +1889,17 @@ public class DoFnSignatures {
new TypeParameter<RestrictionT>() {}, restrictionT);
}
+ /**
+ * Generates a {@link TypeDescriptor} for {@code
WatermarkEstimator<WatermarkEstimatorStateT>}
+ * given {@code WatermarkEstimatorStateT}.
+ */
+ private static <WatermarkEstimatorStateT>
+ TypeDescriptor<WatermarkEstimator<WatermarkEstimatorStateT>>
watermarkEstimatorTypeOf(
+ TypeDescriptor<WatermarkEstimatorStateT> watermarkEstimatorStateT) {
+ return new TypeDescriptor<WatermarkEstimator<WatermarkEstimatorStateT>>()
{}.where(
+ new TypeParameter<WatermarkEstimatorStateT>() {},
watermarkEstimatorStateT);
+ }
+
@VisibleForTesting
static DoFnSignature.NewTrackerMethod analyzeNewTrackerMethod(
ErrorReporter errors,
@@ -1755,6 +1961,76 @@ public class DoFnSignatures {
}
@VisibleForTesting
+ static DoFnSignature.NewWatermarkEstimatorMethod
analyzeNewWatermarkEstimatorMethod(
+ ErrorReporter errors,
+ TypeDescriptor<? extends DoFn<?, ?>> fnT,
+ Method m,
+ TypeDescriptor<?> inputT,
+ TypeDescriptor<?> outputT,
+ TypeDescriptor<?> restrictionT,
+ TypeDescriptor<?> watermarkEstimatorStateT,
+ FnAnalysisContext fnContext) {
+ // Method is of the form:
+ // @NewWatermarkEstimator
+ // WatermarkEstimatorT newWatermarkEstimator(... parameters ...);
+ Type[] params = m.getGenericParameterTypes();
+ TypeDescriptor<?> watermarkEstimatorT =
fnT.resolveType(m.getGenericReturnType());
+ TypeDescriptor<?> expectedWatermarkEstimatorT =
+ watermarkEstimatorTypeOf(watermarkEstimatorStateT);
+ errors.checkArgument(
+ watermarkEstimatorT.isSubtypeOf(expectedWatermarkEstimatorT),
+ "Returns %s, but must return a subtype of %s",
+ format(watermarkEstimatorT),
+ format(expectedWatermarkEstimatorT));
+
+ MethodAnalysisContext methodContext = MethodAnalysisContext.create();
+ TypeDescriptor<? extends BoundedWindow> windowT = getWindowType(fnT, m);
+ for (int i = 0; i < params.length; ++i) {
+ Parameter extraParam =
+ analyzeExtraParameter(
+ errors,
+ fnContext,
+ methodContext,
+ fnT,
+ ParameterDescription.of(
+ m, i, fnT.resolveType(params[i]),
Arrays.asList(m.getParameterAnnotations()[i])),
+ inputT,
+ outputT);
+ if (extraParam instanceof SchemaElementParameter) {
+ errors.throwIllegalArgument(
+ "Schema @%s are not supported for @%s method. Found %s, did you
mean to use %s?",
+ format(DoFn.Element.class),
+ format(DoFn.NewWatermarkEstimator.class),
+ format(((SchemaElementParameter) extraParam).elementT()),
+ format(inputT));
+ } else if (extraParam instanceof RestrictionParameter) {
+ errors.checkArgument(
+ restrictionT.equals(((RestrictionParameter)
extraParam).restrictionT()),
+ "Uses restriction type %s, but @%s method uses restriction type
%s",
+ format(((RestrictionParameter) extraParam).restrictionT()),
+ format(DoFn.GetInitialWatermarkEstimatorState.class),
+ format(restrictionT));
+ } else if (extraParam instanceof WatermarkEstimatorStateParameter) {
+ errors.checkArgument(
+ watermarkEstimatorStateT.equals(
+ ((WatermarkEstimatorStateParameter)
extraParam).estimatorStateT()),
+ "Uses watermark estimator state type %s, but @%s method uses
watermark estimator state type %s",
+ format(((WatermarkEstimatorStateParameter)
extraParam).estimatorStateT()),
+ format(DoFn.GetInitialWatermarkEstimatorState.class),
+ format(watermarkEstimatorStateT));
+ }
+ methodContext.addParameter(extraParam);
+ }
+
+ for (Parameter parameter : methodContext.getExtraParameters()) {
+ checkParameterOneOf(errors, parameter,
ALLOWED_NEW_WATERMARK_ESTIMATOR_PARAMETERS);
+ }
+
+ return DoFnSignature.NewWatermarkEstimatorMethod.create(
+ m, fnT.resolveType(m.getGenericReturnType()), windowT,
methodContext.getExtraParameters());
+ }
+
+ @VisibleForTesting
static DoFnSignature.GetSizeMethod analyzeGetSizeMethod(
ErrorReporter errors,
TypeDescriptor<? extends DoFn<?, ?>> fnT,
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/HasDefaultWatermarkEstimator.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/HasDefaultWatermarkEstimator.java
new file mode 100644
index 0000000..fac64ad
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/HasDefaultWatermarkEstimator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * Interface for watermark estimator state for which a default implementation
of {@link
+ * DoFn.NewWatermarkEstimator} is available, depending only on the watermark
estimator state itself.
+ */
+@Experimental(Kind.SPLITTABLE_DO_FN)
+public interface HasDefaultWatermarkEstimator<
+ WatermarkEstimatorStateT,
+ WatermarkEstimatorT extends WatermarkEstimator<WatermarkEstimatorStateT>> {
+ /** Creates a new watermark estimator for {@code this}. */
+ WatermarkEstimatorT newWatermarkEstimator();
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java
new file mode 100644
index 0000000..a6e2797
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ManualWatermarkEstimator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.joda.time.Instant;
+
+/**
+ * A {@link WatermarkEstimator} which is controlled manually from within a
{@link DoFn}. The {@link
+ * DoFn} must invoke {@link #setWatermark} to advance the watermark.
+ */
+@Experimental(Kind.SPLITTABLE_DO_FN)
+public interface ManualWatermarkEstimator<WatermarkEstimatorStateT>
+ extends WatermarkEstimator<WatermarkEstimatorStateT> {
+
+ /**
+ * Sets a timestamp before or at the timestamps of all future elements
produced by the associated
+ * DoFn.
+ *
+ * <p>This can be approximate. If records are output that violate this
guarantee, they will be
+ * considered late, which will affect how they will be processed. See <a
+ *
href="https://beam.apache.org/documentation/programming-guide/#watermarks-and-late-data">watermarks
+ * and late data</a> for more information on late data and how to handle it.
+ *
+ * <p>However, this value should be as late as possible. Downstream windows
may not be able to
+ * close until this watermark passes their end.
+ */
+ void setWatermark(Instant watermark);
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/TimestampObservingWatermarkEstimator.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/TimestampObservingWatermarkEstimator.java
new file mode 100644
index 0000000..b217690
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/TimestampObservingWatermarkEstimator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.joda.time.Instant;
+
+/**
+ * A {@link WatermarkEstimator} that observes the timestamps of all records
output from a {@link
+ * DoFn}.
+ */
+@Experimental(Kind.SPLITTABLE_DO_FN)
+public interface TimestampObservingWatermarkEstimator<WatermarkEstimatorStateT>
+ extends WatermarkEstimator<WatermarkEstimatorStateT> {
+
+ /**
+ * Update watermark estimate with latest output timestamp. This is called
with the timestamp of
+ * every element output from the DoFn.
+ */
+ void observeTimestamp(Instant timestamp);
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java
new file mode 100644
index 0000000..343fea6
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.joda.time.Instant;
+
+/**
+ * A {@link WatermarkEstimator} which is used for estimating output watermarks
of a splittable
+ * {@link DoFn}.
+ */
+@Experimental(Kind.SPLITTABLE_DO_FN)
+public interface WatermarkEstimator<WatermarkEstimatorStateT> {
+ /**
+ * Return estimated output watermark. This method must return monotonically
increasing watermarks
+ * across instances that are constructed from prior state.
+ */
+ Instant currentWatermark();
+
+ /**
+ * Get current state of the {@link WatermarkEstimator} instance, which can
be used to recreate the
+ * {@link WatermarkEstimator} when processing the restriction. See {@link
+ * DoFn.NewWatermarkEstimator} for additional details.
+ *
+ * <p>The internal state of the estimator must not be mutated by this method.
+ *
+ * <p>The state returned must not be mutated.
+ */
+ WatermarkEstimatorStateT getState();
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java
new file mode 100644
index 0000000..40992f1
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.joda.time.Instant;
+
+/**
+ * A set of {@link WatermarkEstimator}s that users can use to advance the
output watermark for their
+ * associated {@link DoFn splittable DoFn}s.
+ */
+@Experimental(Kind.SPLITTABLE_DO_FN)
+public class WatermarkEstimators {
+ /** Concrete implementation of a {@link ManualWatermarkEstimator}. */
+ public static class Manual implements ManualWatermarkEstimator<Instant> {
+ private Instant watermark;
+
+ public Manual(Instant watermark) {
+ this.watermark = checkNotNull(watermark, "watermark must not be null.");
+ if (watermark.isBefore(GlobalWindow.TIMESTAMP_MIN_VALUE)
+ || watermark.isAfter(GlobalWindow.TIMESTAMP_MAX_VALUE)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Provided watermark %s must be within bounds [%s, %s].",
+ watermark, GlobalWindow.TIMESTAMP_MIN_VALUE,
GlobalWindow.TIMESTAMP_MAX_VALUE));
+ }
+ }
+
+ @Override
+ public void setWatermark(Instant watermark) {
+ if (watermark.isBefore(GlobalWindow.TIMESTAMP_MIN_VALUE)
+ || watermark.isAfter(GlobalWindow.TIMESTAMP_MAX_VALUE)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Provided watermark %s must be within bounds [%s, %s].",
+ watermark, GlobalWindow.TIMESTAMP_MIN_VALUE,
GlobalWindow.TIMESTAMP_MAX_VALUE));
+ }
+ if (watermark.isBefore(this.watermark)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Watermark must be monotonically increasing. Provided
watermark %s is less then "
+ + "current watermark %s.",
+ watermark, this.watermark));
+ }
+ this.watermark = watermark;
+ }
+
+ @Override
+ public Instant currentWatermark() {
+ return watermark;
+ }
+
+ @Override
+ public Instant getState() {
+ return watermark;
+ }
+ }
+
+ /** A watermark estimator that tracks wall time. */
+ public static class WallTime implements WatermarkEstimator<Instant> {
+ private Instant watermark;
+
+ public WallTime(Instant watermark) {
+ this.watermark = checkNotNull(watermark, "watermark must not be null.");
+ if (watermark.isBefore(GlobalWindow.TIMESTAMP_MIN_VALUE)
+ || watermark.isAfter(GlobalWindow.TIMESTAMP_MAX_VALUE)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Provided watermark %s must be within bounds [%s, %s].",
+ watermark, GlobalWindow.TIMESTAMP_MIN_VALUE,
GlobalWindow.TIMESTAMP_MAX_VALUE));
+ }
+ }
+
+ @Override
+ public Instant currentWatermark() {
+ Instant now = Instant.now();
+ this.watermark = now.isAfter(watermark) ? now : watermark;
+ return watermark;
+ }
+
+ @Override
+ public Instant getState() {
+ return watermark;
+ }
+ }
+
+ /**
+ * A watermark estimator that observes and timestamps of records output from
a DoFn reporting the
+ * timestamp of the last element seen as the current watermark.
+ *
+ * <p>Note that this watermark estimator requires output timestamps in
monotonically increasing
+ * order.
+ */
+ public static class MonotonicallyIncreasing
+ implements TimestampObservingWatermarkEstimator<Instant> {
+ private Instant watermark;
+
+ public MonotonicallyIncreasing(Instant watermark) {
+ this.watermark = checkNotNull(watermark, "timestamp must not be null.");
+ if (watermark.isBefore(GlobalWindow.TIMESTAMP_MIN_VALUE)
+ || watermark.isAfter(GlobalWindow.TIMESTAMP_MAX_VALUE)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Provided watermark %s must be within bounds [%s, %s].",
+ watermark, GlobalWindow.TIMESTAMP_MIN_VALUE,
GlobalWindow.TIMESTAMP_MAX_VALUE));
+ }
+ }
+
+ @Override
+ public void observeTimestamp(Instant timestamp) {
+ // Beyond bounds error checking isn't important since the system is
expected to perform output
+ // timestamp bounds checking already.
+ if (timestamp.isBefore(this.watermark)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Timestamp must be monotonically increasing. Provided
timestamp %s is less then "
+ + "previously provided timestamp %s.",
+ timestamp, this.watermark));
+ }
+ this.watermark = timestamp;
+ }
+
+ @Override
+ public Instant currentWatermark() {
+ return watermark;
+ }
+
+ @Override
+ public Instant getState() {
+ return watermark;
+ }
+ }
+}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index f2e4676..9dcc86a 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -34,14 +34,17 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderProviders;
import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
@@ -57,8 +60,11 @@ import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
import
org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import
org.apache.beam.sdk.transforms.splittabledofn.HasDefaultWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.UserCodeException;
@@ -402,7 +408,9 @@ public class DoFnInvokersTest {
public static class MockFn extends DoFn<String, String> {
@ProcessElement
public ProcessContinuation processElement(
- ProcessContext c, RestrictionTracker<SomeRestriction, Void> tracker) {
+ ProcessContext c,
+ RestrictionTracker<SomeRestriction, Void> tracker,
+ WatermarkEstimator<Instant> watermarkEstimator) {
return null;
}
@@ -426,6 +434,22 @@ public class DoFnInvokersTest {
public SomeRestrictionCoder getRestrictionCoder() {
return null;
}
+
+ @GetInitialWatermarkEstimatorState
+ public Instant getInitialWatermarkEstimatorState() {
+ return null;
+ }
+
+ @GetWatermarkEstimatorStateCoder
+ public InstantCoder getWatermarkEstimatorStateCoder() {
+ return null;
+ }
+
+ @NewWatermarkEstimator
+ public WatermarkEstimator<Instant> newWatermarkEstimator(
+ @WatermarkEstimatorState Instant watermarkEstimatorState) {
+ return null;
+ }
}
@Test
@@ -434,11 +458,16 @@ public class DoFnInvokersTest {
DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
final SomeRestrictionTracker tracker = mock(SomeRestrictionTracker.class);
final SomeRestrictionCoder coder = mock(SomeRestrictionCoder.class);
+ final InstantCoder watermarkEstimatorStateCoder = InstantCoder.of();
+ final Instant watermarkEstimatorState = Instant.now();
+ final WatermarkEstimator<Instant> watermarkEstimator =
+ new WatermarkEstimators.Manual(watermarkEstimatorState);
SomeRestriction restriction = new SomeRestriction();
final SomeRestriction part1 = new SomeRestriction();
final SomeRestriction part2 = new SomeRestriction();
final SomeRestriction part3 = new SomeRestriction();
when(fn.getRestrictionCoder()).thenReturn(coder);
+
when(fn.getWatermarkEstimatorStateCoder()).thenReturn(watermarkEstimatorStateCoder);
when(fn.getInitialRestriction(mockElement)).thenReturn(restriction);
doAnswer(
AdditionalAnswers.delegatesTo(
@@ -456,11 +485,15 @@ public class DoFnInvokersTest {
}))
.when(fn)
.splitRestriction(eq(mockElement), same(restriction), any());
+
when(fn.getInitialWatermarkEstimatorState()).thenReturn(watermarkEstimatorState);
when(fn.newTracker(restriction)).thenReturn(tracker);
- when(fn.processElement(mockProcessContext, tracker)).thenReturn(resume());
+
when(fn.newWatermarkEstimator(watermarkEstimatorState)).thenReturn(watermarkEstimator);
+ when(fn.processElement(mockProcessContext, tracker,
watermarkEstimator)).thenReturn(resume());
assertEquals(coder,
invoker.invokeGetRestrictionCoder(CoderRegistry.createDefault()));
-
+ assertEquals(
+ watermarkEstimatorStateCoder,
+
invoker.invokeGetWatermarkEstimatorStateCoder(CoderRegistry.createDefault()));
assertEquals(
restriction,
invoker.invokeGetInitialRestriction(
@@ -501,6 +534,9 @@ public class DoFnInvokersTest {
assertEquals(Arrays.asList(part1, part2, part3), outputs);
assertEquals(
+ watermarkEstimatorState,
+ invoker.invokeGetInitialWatermarkEstimatorState(new
FakeArgumentProvider<>()));
+ assertEquals(
tracker,
invoker.invokeNewTracker(
new FakeArgumentProvider<String, String>() {
@@ -515,6 +551,15 @@ public class DoFnInvokersTest {
}
}));
assertEquals(
+ watermarkEstimator,
+ invoker.invokeNewWatermarkEstimator(
+ new FakeArgumentProvider<String, String>() {
+ @Override
+ public Object watermarkEstimatorState() {
+ return watermarkEstimatorState;
+ }
+ }));
+ assertEquals(
resume(),
invoker.invokeProcessElement(
new FakeArgumentProvider<String, String>() {
@@ -527,6 +572,11 @@ public class DoFnInvokersTest {
public RestrictionTracker<?, ?> restrictionTracker() {
return tracker;
}
+
+ @Override
+ public WatermarkEstimator<?> watermarkEstimator() {
+ return watermarkEstimator;
+ }
}));
}
@@ -573,17 +623,64 @@ public class DoFnInvokersTest {
}
}
+ private static class WatermarkEstimatorStateWithDefaultWatermarkEstimator
+ implements HasDefaultWatermarkEstimator<
+ WatermarkEstimatorStateWithDefaultWatermarkEstimator,
DefaultWatermarkEstimator> {
+
+ @Override
+ public DefaultWatermarkEstimator newWatermarkEstimator() {
+ return new DefaultWatermarkEstimator();
+ }
+ }
+
+ private static class DefaultWatermarkEstimator
+ implements
WatermarkEstimator<WatermarkEstimatorStateWithDefaultWatermarkEstimator> {
+ @Override
+ public Instant currentWatermark() {
+ return null;
+ }
+
+ @Override
+ public WatermarkEstimatorStateWithDefaultWatermarkEstimator getState() {
+ return null;
+ }
+ }
+
+ private static class
CoderForWatermarkEstimatorStateWithDefaultWatermarkEstimator
+ extends
AtomicCoder<WatermarkEstimatorStateWithDefaultWatermarkEstimator> {
+
+ @Override
+ public void encode(
+ WatermarkEstimatorStateWithDefaultWatermarkEstimator value,
OutputStream outStream)
+ throws CoderException, IOException {}
+
+ @Override
+ public WatermarkEstimatorStateWithDefaultWatermarkEstimator
decode(InputStream inStream)
+ throws CoderException, IOException {
+ return null;
+ }
+ }
+
@Test
public void testSplittableDoFnDefaultMethods() throws Exception {
class MockFn extends DoFn<String, String> {
@ProcessElement
public void processElement(
- ProcessContext c, RestrictionTracker<RestrictionWithDefaultTracker,
Void> tracker) {}
+ ProcessContext c,
+ RestrictionTracker<RestrictionWithDefaultTracker, Void> tracker,
+
WatermarkEstimator<WatermarkEstimatorStateWithDefaultWatermarkEstimator>
+ watermarkEstimator) {}
@GetInitialRestriction
public RestrictionWithDefaultTracker getInitialRestriction(@Element
String element) {
return null;
}
+
+ @GetInitialWatermarkEstimatorState
+ public WatermarkEstimatorStateWithDefaultWatermarkEstimator
+ getInitialWatermarkEstimatorState() {
+ return null;
+ }
}
MockFn fn = mock(MockFn.class);
@@ -593,9 +690,15 @@ public class DoFnInvokersTest {
coderRegistry.registerCoderProvider(
CoderProviders.fromStaticMethods(
RestrictionWithDefaultTracker.class,
CoderForDefaultTracker.class));
+ coderRegistry.registerCoderForClass(
+ WatermarkEstimatorStateWithDefaultWatermarkEstimator.class,
+ new CoderForWatermarkEstimatorStateWithDefaultWatermarkEstimator());
assertThat(
invoker.<RestrictionWithDefaultTracker>invokeGetRestrictionCoder(coderRegistry),
instanceOf(CoderForDefaultTracker.class));
+ assertThat(
+ invoker.invokeGetWatermarkEstimatorStateCoder(coderRegistry),
+
instanceOf(CoderForWatermarkEstimatorStateWithDefaultWatermarkEstimator.class));
invoker.invokeSplitRestriction(
new FakeArgumentProvider<String, String>() {
@Override
@@ -639,6 +742,15 @@ public class DoFnInvokersTest {
}
}),
instanceOf(DefaultTracker.class));
+ assertThat(
+ invoker.invokeNewWatermarkEstimator(
+ new FakeArgumentProvider<String, String>() {
+ @Override
+ public Object watermarkEstimatorState() {
+ return new
WatermarkEstimatorStateWithDefaultWatermarkEstimator();
+ }
+ }),
+ instanceOf(DefaultWatermarkEstimator.class));
}
//
---------------------------------------------------------------------------------------
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index 2a2a08e..e030d99 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
import java.lang.reflect.Method;
import java.util.List;
+import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.StructuredCoder;
@@ -36,15 +37,19 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.Restriction;
-import org.apache.beam.sdk.transforms.DoFn.StateId;
import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionParameter;
+import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WatermarkEstimatorStateParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures.FnAnalysisContext;
import
org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.AnonymousMethod;
import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import
org.apache.beam.sdk.transforms.splittabledofn.HasDefaultWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
@@ -272,7 +277,9 @@ public class DoFnSignaturesSplittableDoFnTest {
class GoodSplittableDoFn extends DoFn<Integer, String> {
@ProcessElement
public ProcessContinuation processElement(
- ProcessContext context, RestrictionTracker<SomeRestriction, Void>
tracker) {
+ ProcessContext context,
+ RestrictionTracker<SomeRestriction, Void> tracker,
+ ManualWatermarkEstimator<Instant> watermarkEstimator) {
return null;
}
@@ -312,7 +319,6 @@ public class DoFnSignaturesSplittableDoFnTest {
public double getSize(
@Element Integer element,
@Restriction SomeRestriction restriction,
- RestrictionTracker<SomeRestriction, Void> restrictionTracker,
PipelineOptions pipelineOptions,
BoundedWindow boundedWindow,
PaneInfo paneInfo,
@@ -324,10 +330,41 @@ public class DoFnSignaturesSplittableDoFnTest {
public SomeRestrictionCoder getRestrictionCoder() {
return null;
}
+
+ @GetInitialWatermarkEstimatorState
+ public Instant getInitialWatermarkEstimatorState(
+ @Element Integer element,
+ @Restriction SomeRestriction restriction,
+ PipelineOptions pipelineOptions,
+ BoundedWindow boundedWindow,
+ PaneInfo paneInfo,
+ @Timestamp Instant timestamp) {
+ return null;
+ }
+
+ @GetWatermarkEstimatorStateCoder
+ public InstantCoder getWatermarkEstimatorStateCoder() {
+ return null;
+ }
+
+ @NewWatermarkEstimator
+ public WatermarkEstimators.Manual newWatermarkEstimator(
+ @WatermarkEstimatorState Instant watermarkEstimatorState,
+ @Element Integer element,
+ @Restriction SomeRestriction restriction,
+ PipelineOptions pipelineOptions,
+ BoundedWindow boundedWindow,
+ PaneInfo paneInfo,
+ @Timestamp Instant timestamp) {
+ return null;
+ }
}
DoFnSignature signature =
DoFnSignatures.getSignature(GoodSplittableDoFn.class);
assertEquals(RestrictionTracker.class,
signature.processElement().trackerT().getRawType());
+ assertEquals(
+ ManualWatermarkEstimator.class,
+ signature.processElement().watermarkEstimatorT().getRawType());
assertTrue(signature.processElement().isSplittable());
assertTrue(signature.processElement().hasReturnValue());
assertEquals(
@@ -350,6 +387,18 @@ public class DoFnSignaturesSplittableDoFnTest {
getParameterOfType(signature.getSize().extraParameters(),
RestrictionParameter.class)
.restrictionT()
.getRawType());
+ assertEquals(
+ Instant.class,
+
signature.getInitialWatermarkEstimatorState().watermarkEstimatorStateT().getRawType());
+ assertEquals(
+ Instant.class,
+ getParameterOfType(
+ signature.newWatermarkEstimator().extraParameters(),
+ WatermarkEstimatorStateParameter.class)
+ .estimatorStateT()
+ .getRawType());
+ assertEquals(
+ InstantCoder.class,
signature.getWatermarkEstimatorStateCoder().coderT().getRawType());
}
/**
@@ -358,9 +407,17 @@ public class DoFnSignaturesSplittableDoFnTest {
*/
@Test
public void testSplittableWithAllFunctionsGeneric() throws Exception {
- class GoodGenericSplittableDoFn<RestrictionT, TrackerT, CoderT> extends
DoFn<Integer, String> {
+ class GoodGenericSplittableDoFn<
+ RestrictionT,
+ TrackerT,
+ RestrictionCoderT,
+ WatermarkEstimatorStateT,
+ WatermarkEstimatorStateCoderT,
+ WatermarkEstimatorT>
+ extends DoFn<Integer, String> {
@ProcessElement
- public ProcessContinuation processElement(ProcessContext context,
TrackerT tracker) {
+ public ProcessContinuation processElement(
+ ProcessContext context, TrackerT tracker, WatermarkEstimatorT
watermarkEstimatorT) {
return null;
}
@@ -379,14 +436,30 @@ public class DoFnSignaturesSplittableDoFnTest {
}
@GetRestrictionCoder
- public CoderT getRestrictionCoder() {
+ public RestrictionCoderT getRestrictionCoder() {
return null;
}
@GetSize
- public double getSize(@Restriction RestrictionT restriction, TrackerT
restrictionTracker) {
+ public double getSize(@Restriction RestrictionT restriction) {
return 1.0;
}
+
+ @GetInitialWatermarkEstimatorState
+ public WatermarkEstimatorStateT getInitialWatermarkEstimatorState() {
+ return null;
+ }
+
+ @GetWatermarkEstimatorStateCoder
+ public WatermarkEstimatorStateCoderT getWatermarkEstimatorStateCoder() {
+ return null;
+ }
+
+ @NewWatermarkEstimator
+ public WatermarkEstimatorT newWatermarkEstimator(
+ @WatermarkEstimatorState WatermarkEstimatorStateT
watermarkEstimatorState) {
+ return null;
+ }
}
DoFnSignature signature =
@@ -394,8 +467,14 @@ public class DoFnSignaturesSplittableDoFnTest {
new GoodGenericSplittableDoFn<
SomeRestriction,
RestrictionTracker<SomeRestriction, ?>,
- SomeRestrictionCoder>() {}.getClass());
+ SomeRestrictionCoder,
+ Instant,
+ InstantCoder,
+ ManualWatermarkEstimator<Instant>>() {}.getClass());
assertEquals(RestrictionTracker.class,
signature.processElement().trackerT().getRawType());
+ assertEquals(
+ ManualWatermarkEstimator.class,
+ signature.processElement().watermarkEstimatorT().getRawType());
assertTrue(signature.processElement().isSplittable());
assertTrue(signature.processElement().hasReturnValue());
assertEquals(
@@ -413,6 +492,18 @@ public class DoFnSignaturesSplittableDoFnTest {
.restrictionT()
.getRawType());
assertEquals(SomeRestrictionCoder.class,
signature.getRestrictionCoder().coderT().getRawType());
+ assertEquals(
+ Instant.class,
+
signature.getInitialWatermarkEstimatorState().watermarkEstimatorStateT().getRawType());
+ assertEquals(
+ Instant.class,
+ getParameterOfType(
+ signature.newWatermarkEstimator().extraParameters(),
+ WatermarkEstimatorStateParameter.class)
+ .estimatorStateT()
+ .getRawType());
+ assertEquals(
+ InstantCoder.class,
signature.getWatermarkEstimatorStateCoder().coderT().getRawType());
}
@Test
@@ -446,6 +537,68 @@ public class DoFnSignaturesSplittableDoFnTest {
}
@Test
+ public void
testGetInitialWatermarkEstimatorStateUnsupportedSchemaElementArgument()
+ throws Exception {
+ thrown.expectMessage(
+ "Schema @Element are not supported for
@GetInitialWatermarkEstimatorState method. Found String, did you mean to use
Integer?");
+ DoFnSignatures.analyzeGetInitialWatermarkEstimatorStateMethod(
+ errors(),
+ TypeDescriptor.of(FakeDoFn.class),
+ new AnonymousMethod() {
+ SomeRestriction method(@Element String element) {
+ return null;
+ }
+ }.getMethod(),
+ TypeDescriptor.of(Integer.class),
+ TypeDescriptor.of(String.class),
+ FnAnalysisContext.create());
+ }
+
+ @Test
+ public void testNewWatermarkEstimatorUnsupportedSchemaElementArgument()
throws Exception {
+ thrown.expectMessage(
+ "Schema @Element are not supported for @NewWatermarkEstimator method.
Found String, did you mean to use Integer?");
+ DoFnSignatures.analyzeNewWatermarkEstimatorMethod(
+ errors(),
+ TypeDescriptor.of(FakeDoFn.class),
+ new AnonymousMethod() {
+ WatermarkEstimator<Instant> method(@Element String element) {
+ return null;
+ }
+ }.getMethod(),
+ TypeDescriptor.of(Integer.class),
+ TypeDescriptor.of(String.class),
+ TypeDescriptor.of(SomeRestriction.class),
+ TypeDescriptor.of(Instant.class),
+ FnAnalysisContext.create());
+ }
+
+ @Test
+ public void testMissingNewWatermarkEstimatorMethod() throws Exception {
+ class BadFn extends DoFn<Integer, String> {
+ @ProcessElement
+ public void process(
+ ProcessContext context,
+ RestrictionTracker<SomeRestriction, Void> tracker,
+ ManualWatermarkEstimator<Instant> watermarkEstimator) {}
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction() {
+ return null;
+ }
+
+ @GetInitialWatermarkEstimatorState
+ public Instant getInitialWatermarkEstimatorState() {
+ return null;
+ }
+ }
+
+ thrown.expectMessage(
+ "Splittable, either @NewWatermarkEstimator method must be defined or
Instant must implement HasDefaultWatermarkEstimator.");
+ DoFnSignatures.getSignature(BadFn.class);
+ }
+
+ @Test
public void testSplittableMissingNewTrackerMethod() throws Exception {
class OtherRestriction {}
@@ -488,6 +641,40 @@ public class DoFnSignaturesSplittableDoFnTest {
assertEquals(RestrictionTracker.class,
signature.processElement().trackerT().getRawType());
}
+ abstract static class SomeDefaultWatermarkEstimator
+ implements
WatermarkEstimator<WatermarkEstimatorStateWithDefaultWatermarkEstimator> {}
+
+ abstract static class WatermarkEstimatorStateWithDefaultWatermarkEstimator
+ implements HasDefaultWatermarkEstimator<
+ WatermarkEstimatorStateWithDefaultWatermarkEstimator,
SomeDefaultWatermarkEstimator> {}
+
+ @Test
+ public void testHasDefaultWatermarkEstimator() throws Exception {
+ class Fn extends DoFn<Integer, String> {
+ @ProcessElement
+ public void process(
+ ProcessContext c,
+ RestrictionTracker<SomeRestriction, Void> tracker,
+
WatermarkEstimator<WatermarkEstimatorStateWithDefaultWatermarkEstimator>
+ watermarkEstimator) {}
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(@Element Integer element) {
+ return null;
+ }
+
+ @GetInitialWatermarkEstimatorState
+ public WatermarkEstimatorStateWithDefaultWatermarkEstimator
+ getInitialWatermarkEstimatorState() {
+ return null;
+ }
+ }
+
+ DoFnSignature signature = DoFnSignatures.getSignature(Fn.class);
+ assertEquals(
+ WatermarkEstimator.class,
signature.processElement().watermarkEstimatorT().getRawType());
+ }
+
@Test
public void testRestrictionHasDefaultTrackerProcessUsesWrongTracker() throws
Exception {
class Fn extends DoFn<Integer, String> {
@@ -507,6 +694,34 @@ public class DoFnSignaturesSplittableDoFnTest {
}
@Test
+ public void
+
testWatermarkEstimatorStateHasDefaultWatermarkEstimatorProcessUsesWrongWatermarkEstimator()
+ throws Exception {
+ class Fn extends DoFn<Integer, String> {
+ @ProcessElement
+ public void process(
+ ProcessContext c,
+ RestrictionTracker<SomeRestriction, Void> tracker,
+ SomeDefaultWatermarkEstimator watermarkEstimator) {}
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(@Element Integer element) {
+ return null;
+ }
+
+ @GetInitialWatermarkEstimatorState
+ public WatermarkEstimatorStateWithDefaultWatermarkEstimator
+ getInitialWatermarkEstimatorState() {
+ return null;
+ }
+ }
+
+ thrown.expectMessage(
+ "Has watermark estimator type SomeDefaultWatermarkEstimator, but the
DoFn's watermark estimator type must be one of [WatermarkEstimator,
ManualWatermarkEstimator] types.");
+ DoFnSignature signature = DoFnSignatures.getSignature(Fn.class);
+ }
+
+ @Test
public void testNewTrackerReturnsWrongType() throws Exception {
class BadFn extends DoFn<Integer, String> {
@ProcessElement
@@ -528,6 +743,26 @@ public class DoFnSignaturesSplittableDoFnTest {
}
@Test
+ public void testNewWatermarkEstimatorReturnsWrongType() throws Exception {
+ class BadFn extends DoFn<Integer, String> {
+ @ProcessElement
+ public void process(
+ ProcessContext context, RestrictionTracker<SomeRestriction, Void>
tracker) {}
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(@Element Integer element) {
+ return null;
+ }
+
+ @NewWatermarkEstimator
+ public void newWatermarkEstimator() {}
+ }
+
+ thrown.expectMessage("Returns void, but must return a subtype of
WatermarkEstimator<Void>");
+ DoFnSignatures.getSignature(BadFn.class);
+ }
+
+ @Test
public void testGetInitialRestrictionMismatchesNewTracker() throws Exception
{
class BadFn extends DoFn<Integer, String> {
@ProcessElement
@@ -551,6 +786,36 @@ public class DoFnSignaturesSplittableDoFnTest {
}
@Test
+ public void
testGetInitialWatermarkEstimatorStateMismatchesNewWatermarkEstimator()
+ throws Exception {
+ class BadFn extends DoFn<Integer, String> {
+ @ProcessElement
+ public void process(
+ ProcessContext context, RestrictionTracker<SomeRestriction, Void>
tracker) {}
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(@Element Integer element) {
+ return null;
+ }
+
+ @GetInitialWatermarkEstimatorState
+ public Instant getInitalWatermarkEstimatorState() {
+ return null;
+ }
+
+ @NewWatermarkEstimator
+ public WatermarkEstimator<Void> newWatermarkEstimator(
+ @WatermarkEstimatorState Instant watermarkEstimatorState) {
+ return null;
+ }
+ }
+
+ thrown.expectMessage("but must return a subtype of
WatermarkEstimator<Instant>");
+ thrown.expectMessage("newWatermarkEstimator(Instant): Returns
WatermarkEstimator<Void>");
+ DoFnSignatures.getSignature(BadFn.class);
+ }
+
+ @Test
public void testGetRestrictionCoderReturnsWrongType() throws Exception {
class BadFn extends DoFn<Integer, String> {
@ProcessElement
@@ -579,6 +844,29 @@ public class DoFnSignaturesSplittableDoFnTest {
}
@Test
+ public void testGetWatermarkEstimatorStateCoderReturnsWrongType() throws
Exception {
+ class BadFn extends DoFn<Integer, String> {
+ @ProcessElement
+ public void process(
+ ProcessContext context, RestrictionTracker<SomeRestriction, Void>
tracker) {}
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(@Element Integer element) {
+ return null;
+ }
+
+ @GetWatermarkEstimatorStateCoder
+ public KvCoder getWatermarkEstimatorStateCoder() {
+ return null;
+ }
+ }
+
+ thrown.expectMessage(
+ "getWatermarkEstimatorStateCoder() returns KvCoder which is not a
subtype of Coder<Void>");
+ DoFnSignatures.getSignature(BadFn.class);
+ }
+
+ @Test
public void testSplitRestrictionReturnsWrongType() throws Exception {
thrown.expectMessage(
"OutputReceiver should be parameterized by "
@@ -696,11 +984,26 @@ public class DoFnSignaturesSplittableDoFnTest {
public double getSize() {
return 1.0;
}
+
+ @GetInitialWatermarkEstimatorState
+ public Instant getInitialWatermarkEstimatorState() {
+ return null;
+ }
+
+ @GetWatermarkEstimatorStateCoder
+ public InstantCoder getWatermarkEstimatorStateCoder() {
+ return null;
+ }
+
+ @NewWatermarkEstimator
+ public WatermarkEstimator<Instant> newWatermarkEstimator() {
+ return null;
+ }
}
thrown.expectMessage(
"Non-splittable, but defines methods: "
- + "[@GetInitialRestriction, @SplitRestriction, @NewTracker,
@GetRestrictionCoder, @GetSize]");
+ + "[@GetInitialRestriction, @SplitRestriction, @NewTracker,
@GetRestrictionCoder, @GetSize, @GetInitialWatermarkEstimatorState,
@GetWatermarkEstimatorStateCoder, @NewWatermarkEstimator]");
DoFnSignatures.getSignature(BadFn.class);
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimatorsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimatorsTest.java
new file mode 100644
index 0000000..3fd0dec
--- /dev/null
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimatorsTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import org.apache.beam.sdk.testing.ResetDateTimeProvider;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.joda.time.DateTimeUtils;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class WatermarkEstimatorsTest {
+ public @Rule ResetDateTimeProvider resetDateTimeProvider = new
ResetDateTimeProvider();
+
+ @Test
+ public void testManualWatermarkEstimator() throws Exception {
+ ManualWatermarkEstimator<Instant> watermarkEstimator =
+ new WatermarkEstimators.Manual(GlobalWindow.TIMESTAMP_MIN_VALUE);
+ assertEquals(GlobalWindow.TIMESTAMP_MIN_VALUE,
watermarkEstimator.currentWatermark());
+ watermarkEstimator.setWatermark(GlobalWindow.TIMESTAMP_MIN_VALUE);
+ watermarkEstimator.setWatermark(
+ watermarkEstimator.currentWatermark().plus(Duration.standardHours(1)));
+ assertEquals(
+ GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.standardHours(1)),
+ watermarkEstimator.currentWatermark());
+ assertThrows(
+ "must be within bounds",
+ IllegalArgumentException.class,
+ () ->
watermarkEstimator.setWatermark(GlobalWindow.TIMESTAMP_MIN_VALUE.minus(1)));
+ assertThrows(
+ "must be within bounds",
+ IllegalArgumentException.class,
+ () ->
watermarkEstimator.setWatermark(GlobalWindow.TIMESTAMP_MAX_VALUE.plus(1)));
+ assertThrows(
+ "monotonically increasing",
+ IllegalArgumentException.class,
+ () ->
watermarkEstimator.setWatermark(GlobalWindow.TIMESTAMP_MIN_VALUE));
+ watermarkEstimator.setWatermark(GlobalWindow.TIMESTAMP_MAX_VALUE);
+ assertEquals(GlobalWindow.TIMESTAMP_MAX_VALUE,
watermarkEstimator.currentWatermark());
+ }
+
+ @Test
+ public void testWallTimeWatermarkEstimator() throws Exception {
+
DateTimeUtils.setCurrentMillisFixed(GlobalWindow.TIMESTAMP_MIN_VALUE.getMillis());
+ WatermarkEstimator<Instant> watermarkEstimator =
+ new WatermarkEstimators.WallTime(new Instant());
+
DateTimeUtils.setCurrentMillisFixed(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1).getMillis());
+ assertEquals(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1),
watermarkEstimator.currentWatermark());
+
+
DateTimeUtils.setCurrentMillisFixed(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(2).getMillis());
+ // Make sure that we don't mutate state even if the clock advanced
+ assertEquals(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1),
watermarkEstimator.getState());
+ assertEquals(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(2),
watermarkEstimator.currentWatermark());
+ assertEquals(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(2),
watermarkEstimator.getState());
+
+ // Handle the case if the clock ever goes backwards. Could happen if we
resumed processing
+ // on a machine that had misconfigured clock or due to clock skew.
+
DateTimeUtils.setCurrentMillisFixed(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(1).getMillis());
+ assertEquals(GlobalWindow.TIMESTAMP_MIN_VALUE.plus(2),
watermarkEstimator.currentWatermark());
+ }
+
+ @Test
+ public void testMonotonicallyIncreasingWatermarkEstimator() throws Exception
{
+ TimestampObservingWatermarkEstimator<Instant> watermarkEstimator =
+ new
WatermarkEstimators.MonotonicallyIncreasing(GlobalWindow.TIMESTAMP_MIN_VALUE);
+ assertEquals(GlobalWindow.TIMESTAMP_MIN_VALUE,
watermarkEstimator.currentWatermark());
+ watermarkEstimator.observeTimestamp(GlobalWindow.TIMESTAMP_MIN_VALUE);
+ watermarkEstimator.observeTimestamp(
+ watermarkEstimator.currentWatermark().plus(Duration.standardHours(1)));
+ assertEquals(
+ GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.standardHours(1)),
+ watermarkEstimator.currentWatermark());
+ assertThrows(
+ "monotonically increasing",
+ IllegalArgumentException.class,
+ () ->
watermarkEstimator.observeTimestamp(GlobalWindow.TIMESTAMP_MIN_VALUE));
+ watermarkEstimator.observeTimestamp(GlobalWindow.TIMESTAMP_MAX_VALUE);
+ assertEquals(GlobalWindow.TIMESTAMP_MAX_VALUE,
watermarkEstimator.currentWatermark());
+ }
+}
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 9fb5d3e..681d041 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -81,6 +81,7 @@ import
org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -1242,6 +1243,17 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, OutputT> {
currentOutputWatermark);
currentOutputWatermark = watermark;
}
+
+ @Override
+ public Object watermarkEstimatorState() {
+ throw new UnsupportedOperationException(
+ "@WatermarkEstimatorState parameters are not supported.");
+ }
+
+ @Override
+ public WatermarkEstimator<?> watermarkEstimator() {
+ throw new UnsupportedOperationException("WatermarkEstimator parameters
are not supported.");
+ }
}
/** Provides arguments for a {@link DoFnInvoker} for {@link DoFn.OnTimer
@OnTimer}. */