chamikaramj commented on code in PR #38346:
URL: https://github.com/apache/beam/pull/38346#discussion_r3198310496
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java:
##########
@@ -50,12 +51,27 @@ public Result(
@Nullable RestrictionT residualRestriction,
DoFn.ProcessContinuation continuation,
@Nullable Instant futureOutputWatermark,
- @Nullable WatermarkEstimatorStateT futureWatermarkEstimatorState) {
+ @Nullable WatermarkEstimatorStateT futureWatermarkEstimatorState,
+ double backlogBytes) {
checkArgument(continuation != null, "continuation must not be null");
this.continuation = continuation;
this.residualRestriction = residualRestriction;
this.futureOutputWatermark = futureOutputWatermark;
this.futureWatermarkEstimatorState = futureWatermarkEstimatorState;
+ this.backlogBytes = backlogBytes;
+ }
+
+ public Result(
+ @Nullable RestrictionT residualRestriction,
+ DoFn.ProcessContinuation continuation,
+ @Nullable Instant futureOutputWatermark,
+ @Nullable WatermarkEstimatorStateT futureWatermarkEstimatorState) {
+ this(
+ residualRestriction,
+ continuation,
+ futureOutputWatermark,
+ futureWatermarkEstimatorState,
+ -1.0);
Review Comment:
Probably move to a constant or add a comment.
##########
runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java:
##########
@@ -95,6 +95,30 @@ public OffsetRange
getInitialRestriction(@SuppressWarnings("unused") @Element Vo
}
}
+ private static class GetSizeFn extends DoFn<Void, String> {
+ @ProcessElement
+ public ProcessContinuation process(
+ ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
+ for (long i = tracker.currentRestriction().getFrom();
tracker.tryClaim(i); ++i) {
+ c.output(String.valueOf(i));
+ if (i == 2) {
+ return resume();
+ }
+ }
+ return stop();
+ }
+
+ @GetInitialRestriction
+ public OffsetRange getInitialRestriction() {
+ return new OffsetRange(0, 10);
+ }
+
+ @GetSize
Review Comment:
Seems like implementing GetSize is optional:
https://github.com/apache/beam/blob/3a66beeec82b11972476a4418607bc2971b0e8b9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L1088
Have we considered the case where this is not implemented ? Also, let's add
a test for this.
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java:
##########
@@ -278,8 +278,109 @@ public FinishBundleContext
finishBundleContext(DoFn<InputT, OutputT> doFn) {
if (residual == null) {
return new Result(null, cont, null, null);
}
+ final KV<RestrictionT, KV<Instant, WatermarkEstimatorStateT>>
residualForGetSize = residual;
+ // For a list of all DoFnInvoker arguments, see DoFn.java.
+ double backlogBytes =
+ invoker.invokeGetSize(
+ new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+ @Override
+ public String getErrorContext() {
+ return
OutputAndTimeBoundedSplittableProcessElementInvoker.class.getSimpleName()
+ + "/GetSize";
+ }
+
+ @Override
+ public InputT element(DoFn<InputT, OutputT> doFn) {
+ return element.getValue();
+ }
+
+ @Override
+ public Object restriction() {
+ return residualForGetSize.getKey();
+ }
+
+ @Override
+ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+ return element.getTimestamp();
+ }
+
+ @Override
+ public RestrictionTracker<?, ?> restrictionTracker() {
+ return invoker.invokeNewTracker(
+ new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+ @Override
+ public String getErrorContext() {
+ return
OutputAndTimeBoundedSplittableProcessElementInvoker.class
+ .getSimpleName()
+ + "/NewTracker";
+ }
+
+ @Override
+ public InputT element(DoFn<InputT, OutputT> doFn) {
+ return element.getValue();
+ }
+
+ @Override
+ public Object restriction() {
+ return residualForGetSize.getKey();
+ }
+
+ @Override
+ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+ return element.getTimestamp();
+ }
+
+ @Override
+ public BoundedWindow window() {
+ throw new IllegalStateException(
+ "Attempting to access window outside of a windowed
context");
+ }
+
+ @Override
+ public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+ throw new IllegalStateException(
+ "Attempting to access PaneInfo outside of a
windowed context");
+ }
+
+ @Override
+ public PipelineOptions pipelineOptions() {
+ return pipelineOptions;
+ }
+ });
+ }
+
+ @Override
+ public BoundedWindow window() {
+ throw new IllegalStateException(
+ "Attempting to access window outside of a windowed
context");
+ }
+
+ @Override
+ public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+ throw new IllegalStateException(
+ "Attempting to access PaneInfo outside of a windowed
context");
+ }
+
+ @Override
+ public PipelineOptions pipelineOptions() {
+ return pipelineOptions;
+ }
+
+ @Override
+ public Object sideInput(String tagId) {
+ PCollectionView<?> view = sideInputMapping.get(tagId);
+ if (view == null) {
+ throw new IllegalArgumentException("calling getSideInput()
with unknown view");
+ }
+ return processContext.sideInput(view);
+ }
+ });
return new Result(
- residual.getKey(), cont, residual.getValue().getKey(),
residual.getValue().getValue());
+ residual.getKey(),
+ cont,
+ residual.getValue().getKey(),
+ residual.getValue().getValue(),
+ backlogBytes);
Review Comment:
We expect to get this information only during finishBundle ?
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java:
##########
@@ -278,8 +278,109 @@ public FinishBundleContext
finishBundleContext(DoFn<InputT, OutputT> doFn) {
if (residual == null) {
return new Result(null, cont, null, null);
}
+ final KV<RestrictionT, KV<Instant, WatermarkEstimatorStateT>>
residualForGetSize = residual;
+ // For a list of all DoFnInvoker arguments, see DoFn.java.
+ double backlogBytes =
+ invoker.invokeGetSize(
+ new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+ @Override
+ public String getErrorContext() {
+ return
OutputAndTimeBoundedSplittableProcessElementInvoker.class.getSimpleName()
+ + "/GetSize";
+ }
+
+ @Override
+ public InputT element(DoFn<InputT, OutputT> doFn) {
+ return element.getValue();
+ }
+
+ @Override
+ public Object restriction() {
+ return residualForGetSize.getKey();
+ }
+
+ @Override
+ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+ return element.getTimestamp();
+ }
+
+ @Override
+ public RestrictionTracker<?, ?> restrictionTracker() {
+ return invoker.invokeNewTracker(
+ new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+ @Override
+ public String getErrorContext() {
+ return
OutputAndTimeBoundedSplittableProcessElementInvoker.class
+ .getSimpleName()
+ + "/NewTracker";
+ }
+
+ @Override
+ public InputT element(DoFn<InputT, OutputT> doFn) {
+ return element.getValue();
+ }
+
+ @Override
+ public Object restriction() {
+ return residualForGetSize.getKey();
+ }
+
+ @Override
+ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+ return element.getTimestamp();
+ }
+
+ @Override
+ public BoundedWindow window() {
+ throw new IllegalStateException(
+ "Attempting to access window outside of a windowed
context");
+ }
+
+ @Override
+ public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+ throw new IllegalStateException(
+ "Attempting to access PaneInfo outside of a
windowed context");
+ }
+
+ @Override
+ public PipelineOptions pipelineOptions() {
+ return pipelineOptions;
+ }
+ });
+ }
+
+ @Override
+ public BoundedWindow window() {
+ throw new IllegalStateException(
+ "Attempting to access window outside of a windowed
context");
+ }
+
+ @Override
+ public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+ throw new IllegalStateException(
+ "Attempting to access PaneInfo outside of a windowed
context");
+ }
+
+ @Override
+ public PipelineOptions pipelineOptions() {
+ return pipelineOptions;
+ }
+
+ @Override
+ public Object sideInput(String tagId) {
+ PCollectionView<?> view = sideInputMapping.get(tagId);
+ if (view == null) {
+ throw new IllegalArgumentException("calling getSideInput()
with unknown view");
+ }
+ return processContext.sideInput(view);
+ }
+ });
return new Result(
- residual.getKey(), cont, residual.getValue().getKey(),
residual.getValue().getValue());
+ residual.getKey(),
+ cont,
+ residual.getValue().getKey(),
+ residual.getValue().getValue(),
+ backlogBytes);
Review Comment:
Should we do any form of validation of the value returned here before
sending it to the runner ? For example, ignore if negative or zero(wrong
implementation but we probably don't want to pass that to the runner).
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -528,6 +529,11 @@ public Map<Long, Pair<Instant, Runnable>> flushState() {
getWorkItem().getWorkToken(),
activeReader);
activeReader = null;
+ } else if (backlogBytes != UnboundedReader.BACKLOG_UNKNOWN && backlogBytes
!= 1L) {
Review Comment:
Good point :)
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java:
##########
@@ -278,8 +278,109 @@ public FinishBundleContext
finishBundleContext(DoFn<InputT, OutputT> doFn) {
if (residual == null) {
return new Result(null, cont, null, null);
}
+ final KV<RestrictionT, KV<Instant, WatermarkEstimatorStateT>>
residualForGetSize = residual;
+ // For a list of all DoFnInvoker arguments, see DoFn.java.
+ double backlogBytes =
+ invoker.invokeGetSize(
Review Comment:
How about creating a util to get information from the residual instead of
creating an inline class here. Probably also refactor other similar places if
any.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]