lukecwik commented on a change in pull request #12016:
URL: https://github.com/apache/beam/pull/12016#discussion_r447384216
##########
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##########
@@ -383,6 +383,12 @@ message StandardPTransforms {
//
// Input: KV(KV(element, restriction), size); output: DoFn's output.
PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS = 2 [(beam_urn) =
"beam:transform:sdf_process_sized_element_and_restrictions:v1"];
+
+ // Truncates the restriction of each element/restriction pair and returns
the
+ // finite restriction which is going to be processed when drain starts.
+ // Input: KV(KV(element, restriction), size);
+ // Output: KV(KV(element, restriction), size).
Review comment:
please address
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
##########
@@ -99,6 +100,10 @@
*/
public abstract void checkDone() throws IllegalStateException;
+ public boolean isBounded() throws NotImplementedException {
+ throw new NotImplementedException("isBounded is not implemented.");
Review comment:
Not relevant anymore.
##########
File path:
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
##########
@@ -113,6 +113,8 @@
// SplittableParDoComponents
public static final String SPLITTABLE_PAIR_WITH_RESTRICTION_URN =
"beam:transform:sdf_pair_with_restriction:v1";
+ public static final String SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN =
+ "beam:transform:sdf_truncate_sized_restrictions:v1";
Review comment:
please address
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1636,6 +1657,96 @@ public void outputWithTimestamp(OutputT output, Instant
timestamp) {
}
}
+ private class SizedRestrictionWindowObservingProcessBundleContext
Review comment:
Class comment that this context outputs `KV<KV<Element, Restriction>,
KV<Size, WatermarkEsimtatorStateT>>`
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
##########
@@ -99,6 +100,10 @@
*/
public abstract void checkDone() throws IllegalStateException;
+ public boolean isBounded() throws NotImplementedException {
+ throw new NotImplementedException("isBounded is not implemented.");
Review comment:
As discussed offline, we'll make `RestrictionTracker#isBounded` a
required method and we will not use `@UnboundedPerElement`/`@BoundedPerElement`
as part of the default inference.
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
##########
@@ -306,6 +309,30 @@ public static void
invokeSplitRestriction(DoFnInvoker.ArgumentProvider argumentP
}
}
+ /** Default implementation of {@link TruncateRestriction}, for delegation by
bytebuddy. */
+ public static class DefaultTruncateRestriction {
+
+ private final boolean isBoundedPerElement;
+
+ DefaultTruncateRestriction(DoFnSignature doFnSignature) {
+ this.isBoundedPerElement =
doFnSignature.isBoundedPerElement().equals(IsBounded.BOUNDED);
+ }
+
+ /** Return the current restriction if it's bounded.Otherwise, return null.
*/
+ @SuppressWarnings("unused")
+ public void invokeTruncateRestriction(DoFnInvoker.ArgumentProvider
argumentProvider) {
+ boolean isBounded = this.isBoundedPerElement;
+ try {
+ isBounded = argumentProvider.restrictionTracker().isBounded();
Review comment:
As discussed offline, this will be required to be implemented.
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
##########
@@ -1794,6 +1812,60 @@ private static boolean hasAnnotation(Class<?>
annotation, List<Annotation> annot
m, windowT, methodContext.getExtraParameters());
}
+ @VisibleForTesting
+ static DoFnSignature.TruncateRestrictionMethod
analyzeTruncateRestrictionMethod(
+ ErrorReporter errors,
+ TypeDescriptor<? extends DoFn<?, ?>> fnT,
+ Method m,
+ TypeDescriptor<?> inputT,
+ TypeDescriptor<?> outputT,
+ TypeDescriptor<?> restrictionT,
+ FnAnalysisContext fnContext) {
+ // Method is of the form:
+ // @TruncateRestriction
+ // void truncateRestriction(... parameters ...);
+ errors.checkArgument(void.class.equals(m.getReturnType()), "Must return
void");
+
+ Type[] params = m.getGenericParameterTypes();
+ MethodAnalysisContext methodContext = MethodAnalysisContext.create();
+ TypeDescriptor<? extends BoundedWindow> windowT = getWindowType(fnT, m);
+ for (int i = 0; i < params.length; ++i) {
+ Parameter extraParam =
+ analyzeExtraParameter(
+ errors,
+ fnContext,
+ methodContext,
+ fnT,
+ ParameterDescription.of(
+ m, i, fnT.resolveType(params[i]),
Arrays.asList(m.getParameterAnnotations()[i])),
+ inputT,
+ restrictionT);
+ if (extraParam instanceof SchemaElementParameter) {
+ errors.throwIllegalArgument(
+ "Schema @%s are not supported for @%s method. Found %s, did you
mean to use %s?",
+ format(DoFn.Element.class),
+ format(TruncateRestriction.class),
+ format(((SchemaElementParameter) extraParam).elementT()),
+ format(inputT));
+ } else if (extraParam instanceof RestrictionParameter) {
+ errors.checkArgument(
+ restrictionT.equals(((RestrictionParameter)
extraParam).restrictionT()),
+ "Uses restriction type %s, but @%s method uses restriction type
%s",
+ format(((RestrictionParameter) extraParam).restrictionT()),
+ format(DoFn.GetInitialRestriction.class),
+ format(restrictionT));
+ }
+ methodContext.addParameter(extraParam);
+ }
+
+ for (Parameter parameter : methodContext.getExtraParameters()) {
+ checkParameterOneOf(errors, parameter,
ALLOWED_SPLIT_RESTRICTION_PARAMETERS);
Review comment:
```suggestion
checkParameterOneOf(errors, parameter,
ALLOWED_TRUNCATE_RESTRICTION_PARAMETERS);
```
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
##########
@@ -1034,6 +1034,47 @@ public Duration getAllowedTimestampSkew() {
* <li>If one of its arguments is tagged with the {@link Timestamp}
annotation, then it will be
* passed the timestamp of the current element being processed; the
argument must be of type
* {@link Instant}.
+ * <li>If one of its arguments is a subtype of {@link BoundedWindow}, then
it will be passed the
+ * window of the current element. When applied by {@link ParDo} the
subtype of {@link
+ * BoundedWindow} must match the type of windows on the input {@link
PCollection}. If the
+ * window is not accessed a runner may perform additional
optimizations.
+ * <li>If one of its arguments is of type {@link PaneInfo}, then it will
be passed information
+ * about the current triggering pane.
+ * <li>If one of the parameters is of type {@link PipelineOptions}, then
it will be passed the
+ * options for the current pipeline.
+ * </ul>
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.METHOD)
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ public @interface SplitRestriction {}
+
+ /**
+ * Annotation for the method that truncates restriction of a <a
Review comment:
please address
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1636,6 +1657,96 @@ public void outputWithTimestamp(OutputT output, Instant
timestamp) {
}
}
+ private class SizedRestrictionWindowObservingProcessBundleContext
+ extends WindowObservingProcessBundleContext {
+ private final String errorContextPrefix;
+
+ SizedRestrictionWindowObservingProcessBundleContext(String
errorContextPrefix) {
+ this.errorContextPrefix = errorContextPrefix;
+ }
+
+ @Override
+ // OutputT == RestrictionT
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ double size =
+ doFnInvoker.invokeGetSize(
+ new DelegatingArgumentProvider<InputT, OutputT>(
+ this, this.errorContextPrefix + "/GetSize") {
+ @Override
+ public Object restriction() {
+ return output;
+ }
+
+ @Override
+ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+ return timestamp;
+ }
+
+ @Override
+ public RestrictionTracker<?, ?> restrictionTracker() {
+ return doFnInvoker.invokeNewTracker(this);
+ }
+ });
+
+ outputTo(
+ mainOutputConsumers,
+ (WindowedValue<OutputT>)
+ WindowedValue.of(
+ KV.of(
+ KV.of(
+ currentElement.getValue(), KV.of(output,
currentWatermarkEstimatorState)),
+ size),
+ timestamp,
+ currentWindow,
+ currentElement.getPane()));
+ }
+ }
+
+ private class SizedRestrictionNonWindowObservingProcessBundleContext
Review comment:
Class comment that this context outputs `KV<KV<Element, Restriction>,
KV<Size, WatermarkEsimtatorStateT>>`
##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -622,6 +638,16 @@ class DoFn(WithTypeHints, HasDisplayData,
urns.RunnerApiFn):
def from_callable(fn):
return CallableWrapperDoFn(fn)
+ @staticmethod
+ def unbounded_per_element():
Review comment:
sg
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]