acrites commented on code in PR #38346:
URL: https://github.com/apache/beam/pull/38346#discussion_r3211715764


##########
runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java:
##########
@@ -278,8 +278,109 @@ public FinishBundleContext 
finishBundleContext(DoFn<InputT, OutputT> doFn) {
     if (residual == null) {
       return new Result(null, cont, null, null);
     }
+    final KV<RestrictionT, KV<Instant, WatermarkEstimatorStateT>> 
residualForGetSize = residual;
+    // For a list of all DoFnInvoker arguments, see DoFn.java.
+    double backlogBytes =
+        invoker.invokeGetSize(
+            new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+              @Override
+              public String getErrorContext() {
+                return 
OutputAndTimeBoundedSplittableProcessElementInvoker.class.getSimpleName()
+                    + "/GetSize";
+              }
+
+              @Override
+              public InputT element(DoFn<InputT, OutputT> doFn) {
+                return element.getValue();
+              }
+
+              @Override
+              public Object restriction() {
+                return residualForGetSize.getKey();
+              }
+
+              @Override
+              public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                return element.getTimestamp();
+              }
+
+              @Override
+              public RestrictionTracker<?, ?> restrictionTracker() {
+                return invoker.invokeNewTracker(
+                    new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+                      @Override
+                      public String getErrorContext() {
+                        return 
OutputAndTimeBoundedSplittableProcessElementInvoker.class
+                                .getSimpleName()
+                            + "/NewTracker";
+                      }
+
+                      @Override
+                      public InputT element(DoFn<InputT, OutputT> doFn) {
+                        return element.getValue();
+                      }
+
+                      @Override
+                      public Object restriction() {
+                        return residualForGetSize.getKey();
+                      }
+
+                      @Override
+                      public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                        return element.getTimestamp();
+                      }
+
+                      @Override
+                      public BoundedWindow window() {
+                        throw new IllegalStateException(
+                            "Attempting to access window outside of a windowed 
context");
+                      }
+
+                      @Override
+                      public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+                        throw new IllegalStateException(
+                            "Attempting to access PaneInfo outside of a 
windowed context");
+                      }
+
+                      @Override
+                      public PipelineOptions pipelineOptions() {
+                        return pipelineOptions;
+                      }
+                    });
+              }
+
+              @Override
+              public BoundedWindow window() {
+                throw new IllegalStateException(
+                    "Attempting to access window outside of a windowed 
context");
+              }
+
+              @Override
+              public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+                throw new IllegalStateException(
+                    "Attempting to access PaneInfo outside of a windowed 
context");
+              }
+
+              @Override
+              public PipelineOptions pipelineOptions() {
+                return pipelineOptions;
+              }
+
+              @Override
+              public Object sideInput(String tagId) {
+                PCollectionView<?> view = sideInputMapping.get(tagId);
+                if (view == null) {
+                  throw new IllegalArgumentException("calling getSideInput() 
with unknown view");
+                }
+                return processContext.sideInput(view);
+              }
+            });
     return new Result(
-        residual.getKey(), cont, residual.getValue().getKey(), 
residual.getValue().getValue());
+        residual.getKey(),
+        cont,
+        residual.getValue().getKey(),
+        residual.getValue().getValue(),
+        backlogBytes);

Review Comment:
   I decided to put the validation in the Dataflow execution context that way 
other runners could decide how they wanted to handle these edge cases. Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to