lukecwik commented on a change in pull request #12016:
URL: https://github.com/apache/beam/pull/12016#discussion_r447133216
##########
File path:
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
##########
@@ -113,6 +113,8 @@
// SplittableParDoComponents
public static final String SPLITTABLE_PAIR_WITH_RESTRICTION_URN =
"beam:transform:sdf_pair_with_restriction:v1";
+ public static final String SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN =
+ "beam:transform:sdf_truncate_sized_restrictions:v1";
Review comment:
please add a checkState in static block like the others below
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
##########
@@ -306,6 +308,32 @@ public static void
invokeSplitRestriction(DoFnInvoker.ArgumentProvider argumentP
}
}
+ /**
+ * Default implementation of {@link DoFn.TruncateSizedRestriction}, for
delegation by bytebuddy.
+ */
+ public static class DefaultTruncateSizedRestriction {
+ private final boolean isBoundedPerElement;
+
+ DefaultTruncateSizedRestriction(DoFnSignature doFnSignature) {
+ this.isBoundedPerElement =
doFnSignature.isBoundedPerElement().equals(IsBounded.BOUNDED);
+ }
+ /** Return the current restriction if it's bounded.Otherwise, return null.
*/
+ @SuppressWarnings("unused")
+ public void invokeTruncateSizedRestriction(DoFnInvoker.ArgumentProvider
argumentProvider) {
+ boolean isBounded;
+ try {
+ isBounded = argumentProvider.restrictionTracker().isBounded();
+ } catch (NotImplementedException expectedException) {
+ isBounded = this.isBoundedPerElement;
+ }
+ if (isBounded) {
+
argumentProvider.outputReceiver(null).output(argumentProvider.restriction());
+ } else {
+ argumentProvider.outputReceiver(null).output(null);
+ }
+ }
+ }
Review comment:
```suggestion
/**
* Default implementation of {@link DoFn.TruncateRestriction}, for
delegation by bytebuddy.
*/
public static class DefaultTruncateRestriction {
private final boolean isBoundedPerElement;
DefaultTruncateRestriction(DoFnSignature doFnSignature) {
this.isBoundedPerElement =
doFnSignature.isBoundedPerElement().equals(IsBounded.BOUNDED);
}
/** Return the current restriction if it's bounded.Otherwise, return
null. */
@SuppressWarnings("unused")
public void invokeTruncateRestriction(DoFnInvoker.ArgumentProvider
argumentProvider) {
boolean isBounded = this.isBoundedPerElement;
try {
isBounded = argumentProvider.restrictionTracker().isBounded();
} catch (NotImplementedException expectedException) {
}
if (isBounded) {
argumentProvider.outputReceiver(null).output(argumentProvider.restriction());
}
}
}
```
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
##########
@@ -140,4 +140,9 @@ public Progress getProgress() {
totalWork.subtract(workRemaining,
MathContext.DECIMAL128).doubleValue(),
workRemaining.doubleValue());
}
+
+ @Override
+ public boolean isBounded() {
+ return false;
Review comment:
Shouldn't this be true if `range.getTo() != Long.MAX_VALUE`?
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -574,6 +578,99 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
};
}
break;
+ case PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN:
+ if ((doFnSignature.truncateSizedRestriction() != null
+ && doFnSignature.truncateSizedRestriction().observesWindow())
+ || (doFnSignature.newTracker() != null &&
doFnSignature.newTracker().observesWindow())
+ || (doFnSignature.getSize() != null &&
doFnSignature.getSize().observesWindow())
+ || !sideInputMapping.isEmpty()) {
+ mainInputConsumer =
this::processElementForWindowObservingTruncateSizedRestriction;
+ // OutputT == RestrictionT
+ this.processContext =
+ new WindowObservingProcessBundleContext() {
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant
timestamp) {
+ double size =
+ doFnInvoker.invokeGetSize(
+ new DelegatingArgumentProvider<InputT, OutputT>(
+ this,
+
PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN
+ + "/GetSize") {
Review comment:
```suggestion
+ "/TruncateRestriction") {
```
##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1428,6 +1428,36 @@ def process(self, element_restriction, *args, **kwargs):
return _create_sdf_operation(SplitAndSizeRestrictions, *args)
[email protected]_urn(
+ common_urns.sdf_components.TRUNCATE_SIZED_RESTRICTION.urn,
+ beam_runner_api_pb2.ParDoPayload)
+def create_truncate_sized_restriction(*args):
+ class TruncateAndSizeRestriction(beam.DoFn):
+ def __init__(self, fn, restriction_provider, watermark_estimator_provider):
+ self.restriction_provider = restriction_provider
+ self.dofn_signature = common.DoFnSignature(fn)
+
+ def process(self, element_restrictin, *args, **kwargs):
+ ((element, (restriction, estimator_state)), _) = element_restrictin
+ try:
+ truncated_restriciton = self.restriction_provider.truncate(
Review comment:
We should be returning the same as split_and_size allowing for 0
restrictions to be returned.
##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1428,6 +1428,36 @@ def process(self, element_restriction, *args, **kwargs):
return _create_sdf_operation(SplitAndSizeRestrictions, *args)
[email protected]_urn(
+ common_urns.sdf_components.TRUNCATE_SIZED_RESTRICTION.urn,
+ beam_runner_api_pb2.ParDoPayload)
+def create_truncate_sized_restriction(*args):
+ class TruncateAndSizeRestriction(beam.DoFn):
+ def __init__(self, fn, restriction_provider, watermark_estimator_provider):
+ self.restriction_provider = restriction_provider
+ self.dofn_signature = common.DoFnSignature(fn)
+
+ def process(self, element_restrictin, *args, **kwargs):
+ ((element, (restriction, estimator_state)), _) = element_restrictin
+ try:
+ truncated_restriciton = self.restriction_provider.truncate(
Review comment:
truncated_restriciton -> truncated_restriction
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -500,6 +505,19 @@ public void splitRestriction(
return new UnboundedSourceAsSDFRestrictionTracker(restriction,
pipelineOptions);
}
+ @TruncateSizedRestriction
+ public void truncateSizedRestriction(
+ @Restriction UnboundedSourceRestriction<OutputT, CheckpointT>
restriction,
+ OutputReceiver<UnboundedSourceRestriction<OutputT, CheckpointT>>
receiver) {
+ try {
+ restriction.getCheckpoint().finalizeCheckpoint();
+ } catch (Exception e) {
+ LOG.warn("Failed to finalize CheckpointMark {}",
restriction.getWatermark());
+ } finally {
+ receiver.output(null);
+ }
Review comment:
I don't think we produce any outputs and finalization is handled already
by the bundleFinalizer.afterBundleCommit callback in `@ProcessElement`
```suggestion
```
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
##########
@@ -1034,6 +1034,47 @@ public Duration getAllowedTimestampSkew() {
* <li>If one of its arguments is tagged with the {@link Timestamp}
annotation, then it will be
* passed the timestamp of the current element being processed; the
argument must be of type
* {@link Instant}.
+ * <li>If one of its arguments is a subtype of {@link BoundedWindow}, then
it will be passed the
+ * window of the current element. When applied by {@link ParDo} the
subtype of {@link
+ * BoundedWindow} must match the type of windows on the input {@link
PCollection}. If the
+ * window is not accessed a runner may perform additional
optimizations.
+ * <li>If one of its arguments is of type {@link PaneInfo}, then it will
be passed information
+ * about the current triggering pane.
+ * <li>If one of the parameters is of type {@link PipelineOptions}, then
it will be passed the
+ * options for the current pipeline.
+ * </ul>
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.METHOD)
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ public @interface SplitRestriction {}
+
+ /**
+ * Annotation for the method that truncates restriction of a <a
Review comment:
We should add a comment to the `@ProcessElement` javadoc comment stating
something like:
`If may define a TruncateRestriction method to override the default
implementation where the default is ...`
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
##########
@@ -1034,6 +1034,47 @@ public Duration getAllowedTimestampSkew() {
* <li>If one of its arguments is tagged with the {@link Timestamp}
annotation, then it will be
* passed the timestamp of the current element being processed; the
argument must be of type
* {@link Instant}.
+ * <li>If one of its arguments is a subtype of {@link BoundedWindow}, then
it will be passed the
+ * window of the current element. When applied by {@link ParDo} the
subtype of {@link
+ * BoundedWindow} must match the type of windows on the input {@link
PCollection}. If the
+ * window is not accessed a runner may perform additional
optimizations.
+ * <li>If one of its arguments is of type {@link PaneInfo}, then it will
be passed information
+ * about the current triggering pane.
+ * <li>If one of the parameters is of type {@link PipelineOptions}, then
it will be passed the
+ * options for the current pipeline.
+ * </ul>
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.METHOD)
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ public @interface SplitRestriction {}
+
+ /**
+ * Annotation for the method that truncates restriction of a <a
+ * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}
into bounded one to be
+ * processed when pipeline starts to drain.
+ *
+ * <p>This method is used to perform truncating the restriction before
actual processing.
+ *
+ * <p>Signature: {@code void truncateSizedRestriction(<arguments>);}
+ *
+ * <p>This method must satisfy the following constraints:
+ *
+ * <ul>
+ * <li>If one of the arguments is of type {@link OutputReceiver}, then it
will be passed an
+ * output receiver for outputting the splits. All splits must be
output through this
+ * parameter.
+ * <li>If one of its arguments is tagged with the {@link Element}
annotation, then it will be
+ * passed the current element being processed; the argument must be of
type {@code InputT}.
+ * Note that automatic conversion of {@link Row}s and {@link
FieldAccess} parameters are
+ * currently unsupported.
+ * <li>If one of its arguments is tagged with the {@link Restriction}
annotation, then it will
+ * be passed the current restriction being processed; the argument
must be of type {@code
+ * RestrictionT}.
+ * <li>If one of its arguments is tagged with the {@link Timestamp}
annotation, then it will be
+ * passed the timestamp of the current element being processed; the
argument must be of type
+ * {@link Instant}.
Review comment:
```suggestion
* Annotation for the method that truncates the restriction of a <a
* href="https://s.apache.org/splittable-do-fn">splittable</a> {@link
DoFn} into a bounded one to be
* processed when pipeline starts to drain.
*
* <p>This method is used to perform truncation of the restriction while
it is not actively being processed.
*
* <p>Signature: {@code void truncateRestriction(<arguments>);}
*
* <p>This method must satisfy the following constraints:
*
* <ul>
* <li>If one of the arguments is of type {@link OutputReceiver}, then
it will be passed an
* output receiver for outputting the truncated restrictions. All
truncated restrictions must be output through this
* parameter.
* <li>If one of its arguments is tagged with the {@link Element}
annotation, then it will be
* passed the current element being processed; the argument must be
of type {@code InputT}.
* Note that automatic conversion of {@link Row}s and {@link
FieldAccess} parameters are
* currently unsupported.
* <li>If one of its arguments is tagged with the {@link Restriction}
annotation, then it will
* be passed the current restriction being processed; the argument
must be of type {@code
* RestrictionT}.
* <li>If one of its arguments is tagged with the {@link Timestamp}
annotation, then it will be
* passed the timestamp of the current element being processed; the
argument must be of type
* {@link Instant}.
```
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
##########
@@ -99,6 +100,10 @@
*/
public abstract void checkDone() throws IllegalStateException;
+ public boolean isBounded() throws NotImplementedException {
Review comment:
We should use the return type `IsBounded`.
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
##########
@@ -1051,7 +1092,7 @@ public Duration getAllowedTimestampSkew() {
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Experimental(Kind.SPLITTABLE_DO_FN)
- public @interface SplitRestriction {}
+ public @interface TruncateSizedRestriction {}
Review comment:
The DoFn doesn't need to know whether it is sized or not. How about
`TruncateRestriction`?
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -574,6 +578,99 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
};
}
break;
+ case PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN:
+ if ((doFnSignature.truncateSizedRestriction() != null
+ && doFnSignature.truncateSizedRestriction().observesWindow())
+ || (doFnSignature.newTracker() != null &&
doFnSignature.newTracker().observesWindow())
+ || (doFnSignature.getSize() != null &&
doFnSignature.getSize().observesWindow())
+ || !sideInputMapping.isEmpty()) {
+ mainInputConsumer =
this::processElementForWindowObservingTruncateSizedRestriction;
+ // OutputT == RestrictionT
+ this.processContext =
+ new WindowObservingProcessBundleContext() {
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant
timestamp) {
+ double size =
+ doFnInvoker.invokeGetSize(
Review comment:
invokeGetSize?
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -574,6 +578,99 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
};
}
break;
+ case PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN:
+ if ((doFnSignature.truncateSizedRestriction() != null
+ && doFnSignature.truncateSizedRestriction().observesWindow())
+ || (doFnSignature.newTracker() != null &&
doFnSignature.newTracker().observesWindow())
+ || (doFnSignature.getSize() != null &&
doFnSignature.getSize().observesWindow())
+ || !sideInputMapping.isEmpty()) {
+ mainInputConsumer =
this::processElementForWindowObservingTruncateSizedRestriction;
+ // OutputT == RestrictionT
+ this.processContext =
Review comment:
Since this shares the same ProcessBundleContext as GetSize (except for
the error context string), can we make this a class called
SizedRestrictionWindowObservingProcessBundleContext that extends
WindowObservingProcessBundleContext and is used in both places.
Ditto for the non window observing one as well.
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
##########
@@ -99,6 +100,10 @@
*/
public abstract void checkDone() throws IllegalStateException;
+ public boolean isBounded() throws NotImplementedException {
+ throw new NotImplementedException("isBounded is not implemented.");
Review comment:
If we want to make this optional, we should follow the `HasProgress`
style and use an interface like `HasBoundedness`
We could also make this a required method and update the changelog stating
that this is a breaking change. This would remove the "default" inference on
the `@UnboundedPerElement`/`@BoundedPerElement` annotations on the DoFn.
Depending on the default ordering we want, we'll want to update update the
restriction trackers in PeriodicSequence and Watch as well (note that Watch
will have to be updated regardless since it has a tracker that converts
unbounded restrictions to bounded ones).
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
##########
@@ -99,6 +100,10 @@
*/
public abstract void checkDone() throws IllegalStateException;
+ public boolean isBounded() throws NotImplementedException {
+ throw new NotImplementedException("isBounded is not implemented.");
Review comment:
We should add a comment stating the unbounded restriction trackers can
only be used with `@UnboundedPerElement` SDFs. We should add a comment on each
restriction tracker whether it could be unbounded.
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -608,17 +705,6 @@ public void accept(WindowedValue input) throws Exception {
pCollectionConsumerRegistry.register(
pTransform.getInputsOrThrow(mainInput), pTransformId, (FnDataReceiver)
mainInputConsumer);
- switch (pTransform.getSpec().getUrn()) {
Review comment:
Thanks
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -858,6 +1008,10 @@ public static WindowedSplitResult forRoots(
private void processElementForWindowObservingSizedElementAndRestriction(
WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>,
Double>> elem) {
currentElement = elem.withValue(elem.getValue().getKey().getKey());
+ // No need to process if current restriction is null.
+ if (elem.getValue().getKey().getValue().getKey() == null) {
Review comment:
I don't think this is the right way to go. The output received should
produce 0 outputs instead of null.
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -829,6 +917,68 @@ private void
processElementForWindowObservingSplitRestriction(
this.stateAccessor.finalizeState();
}
+ private void processElementForTruncateSizedRestriction(
+ WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>,
Double>> elem) {
+ currentElement = elem.withValue(elem.getValue().getKey().getKey());
+ currentRestriction = elem.getValue().getKey().getValue().getKey();
+ currentWatermarkEstimatorState =
elem.getValue().getKey().getValue().getValue();
+ currentTracker =
Review comment:
It looks like currentTracker isn't being set within both
`processElementForSplitRestriction` methods. Could you fix that in this PR?
##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1428,6 +1428,36 @@ def process(self, element_restriction, *args, **kwargs):
return _create_sdf_operation(SplitAndSizeRestrictions, *args)
[email protected]_urn(
+ common_urns.sdf_components.TRUNCATE_SIZED_RESTRICTION.urn,
+ beam_runner_api_pb2.ParDoPayload)
+def create_truncate_sized_restriction(*args):
+ class TruncateAndSizeRestriction(beam.DoFn):
+ def __init__(self, fn, restriction_provider, watermark_estimator_provider):
+ self.restriction_provider = restriction_provider
+ self.dofn_signature = common.DoFnSignature(fn)
+
+ def process(self, element_restrictin, *args, **kwargs):
Review comment:
element_restrictin -> element_restriction
##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -320,6 +320,22 @@ def split_and_size(self, element, restriction):
for part in self.split(element, restriction):
yield part, self.restriction_size(element, part)
+ def truncate(self, element, restriction):
+ """Truncate the given restriction into finite amount of work when the
+ pipeline starts to drain.
+
+ By default, if the restriction is bounded, it will return the entire
current
+ restriction. If the restriction is unbounded, it will return None.
Review comment:
```suggestion
By default, if the restriction is bounded, it will return the entire
restriction. If the restriction is unbounded, it will not return
anything.
```
##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -320,6 +320,22 @@ def split_and_size(self, element, restriction):
for part in self.split(element, restriction):
yield part, self.restriction_size(element, part)
+ def truncate(self, element, restriction):
+ """Truncate the given restriction into finite amount of work when the
+ pipeline starts to drain.
+
+ By default, if the restriction is bounded, it will return the entire
current
+ restriction. If the restriction is unbounded, it will return None.
+
+ The method throws NotImplementError when RestrictionTracker.is_bounded() is
+ not implemented.
+
+ It's recommended to implement this API if more granularity is required.
+ """
+ restriction_tracker = self.create_tracker(restriction)
+ if restriction_tracker.is_bounded():
+ return restriction
Review comment:
```suggestion
yield restriction
```
##########
File path: sdks/python/apache_beam/io/iobase.py
##########
@@ -1243,6 +1243,17 @@ def try_claim(self, position):
"""
raise NotImplementedError
+ def is_bounded(self):
+ """Identify whether the output produced by the current restriction is
+ bounded.
+
+ The value is important for the default behavior of truncate when the
+ pipeline starts to drain in streaming. If the current restriction is
+ bounded, it will be processed completely by default. If the restriction is
+ unbounded, it will be truncated into null and finish processing
immediately.
+ """
+ raise NotImplementedError
Review comment:
ThreadsafeRestrictionTracker should call the delegate so that users of
it get the same behavior as if they were calling the real restriction tracker
We might want to update the RestrictionTrackerView if we want to expose the
boundedness property during execution.
##########
File path: sdks/python/apache_beam/runners/common.py
##########
@@ -968,6 +973,9 @@ def process_with_sized_restriction(self, windowed_value):
(element, (restriction, estimator_state)), _ = windowed_value.value
watermark_estimator = (
self.do_fn_invoker.invoke_create_watermark_estimator(estimator_state))
+ if restriction is None:
Review comment:
Same issue as in java, we should allow None restrictions and instead
produce 0 outputs.
##########
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##########
@@ -383,6 +383,12 @@ message StandardPTransforms {
//
// Input: KV(KV(element, restriction), size); output: DoFn's output.
PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS = 2 [(beam_urn) =
"beam:transform:sdf_process_sized_element_and_restrictions:v1"];
+
+ // Truncates the restriction of each element/restriction pair and returns
the
+ // finite restriction which is going to be processed when drain starts.
+ // Input: KV(KV(element, restriction), size);
+ // Output: KV(KV(element, restriction), size).
Review comment:
Add a link to the drain design doc that Reuven shared with the Beam
community.
```suggestion
// Truncates the restriction of each element/restriction pair and
returns the
// finite restriction which will be processed when a pipeline is drained
<link>.
//
// Input: KV(KV(element, restriction), size);
// Output: KV(KV(element, restriction), size).
```
##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -320,6 +320,22 @@ def split_and_size(self, element, restriction):
for part in self.split(element, restriction):
yield part, self.restriction_size(element, part)
+ def truncate(self, element, restriction):
Review comment:
Should we also provide truncate_and_size?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]