acrites commented on code in PR #38346:
URL: https://github.com/apache/beam/pull/38346#discussion_r3211771676
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java:
##########
@@ -278,8 +278,109 @@ public FinishBundleContext
finishBundleContext(DoFn<InputT, OutputT> doFn) {
if (residual == null) {
return new Result(null, cont, null, null);
}
+ final KV<RestrictionT, KV<Instant, WatermarkEstimatorStateT>>
residualForGetSize = residual;
+ // For a list of all DoFnInvoker arguments, see DoFn.java.
+ double backlogBytes =
+ invoker.invokeGetSize(
+ new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+ @Override
+ public String getErrorContext() {
+ return
OutputAndTimeBoundedSplittableProcessElementInvoker.class.getSimpleName()
+ + "/GetSize";
+ }
+
+ @Override
+ public InputT element(DoFn<InputT, OutputT> doFn) {
+ return element.getValue();
+ }
+
+ @Override
+ public Object restriction() {
+ return residualForGetSize.getKey();
+ }
+
+ @Override
+ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+ return element.getTimestamp();
+ }
+
+ @Override
+ public RestrictionTracker<?, ?> restrictionTracker() {
+ return invoker.invokeNewTracker(
+ new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+ @Override
+ public String getErrorContext() {
+ return
OutputAndTimeBoundedSplittableProcessElementInvoker.class
+ .getSimpleName()
+ + "/NewTracker";
+ }
+
+ @Override
+ public InputT element(DoFn<InputT, OutputT> doFn) {
+ return element.getValue();
+ }
+
+ @Override
+ public Object restriction() {
+ return residualForGetSize.getKey();
+ }
+
+ @Override
+ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+ return element.getTimestamp();
+ }
+
+ @Override
+ public BoundedWindow window() {
+ throw new IllegalStateException(
+ "Attempting to access window outside of a windowed
context");
+ }
+
+ @Override
+ public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+ throw new IllegalStateException(
+ "Attempting to access PaneInfo outside of a
windowed context");
+ }
+
+ @Override
+ public PipelineOptions pipelineOptions() {
+ return pipelineOptions;
+ }
+ });
+ }
+
+ @Override
+ public BoundedWindow window() {
+ throw new IllegalStateException(
+ "Attempting to access window outside of a windowed
context");
+ }
+
+ @Override
+ public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+ throw new IllegalStateException(
+ "Attempting to access PaneInfo outside of a windowed
context");
+ }
+
+ @Override
+ public PipelineOptions pipelineOptions() {
+ return pipelineOptions;
+ }
+
+ @Override
+ public Object sideInput(String tagId) {
+ PCollectionView<?> view = sideInputMapping.get(tagId);
+ if (view == null) {
+ throw new IllegalArgumentException("calling getSideInput()
with unknown view");
+ }
+ return processContext.sideInput(view);
+ }
+ });
return new Result(
- residual.getKey(), cont, residual.getValue().getKey(),
residual.getValue().getValue());
+ residual.getKey(),
+ cont,
+ residual.getValue().getKey(),
+ residual.getValue().getValue(),
+ backlogBytes);
Review Comment:
It's not during finishBundle, but rather in processElement after we've
finished processing the restriction. The idea is that we want to get the
backlog (work remaining) of the residual restriction after we've finished
processing the bundle.
I don't think you can call tryClaim or otherwise change the restriction in
finishBundle (since it requires keyed state and whatnot).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]