lukecwik commented on a change in pull request #12430:
URL: https://github.com/apache/beam/pull/12430#discussion_r464673014
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -515,6 +515,9 @@
&& Iterables.get(mainOutputConsumers, 0) instanceof
HandlesSplits) {
mainInputConsumer =
new SplittableFnDataReceiver() {
+ private final HandlesSplits splitDelegate =
+ (HandlesSplits) Iterables.get(mainOutputConsumers, 0);
Review comment:
nit: here and below around line 558
```suggestion
(HandlesSplits)
Iterables.getOnlyElement(mainOutputConsumers);
```
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1029,7 +1040,27 @@ public double getProgress() {
private Progress getProgress() {
synchronized (splitLock) {
if (currentTracker instanceof RestrictionTracker.HasProgress) {
- return ((HasProgress) currentTracker).getProgress();
+ Progress progress = ((HasProgress) currentTracker).getProgress();
+ double totalWork = progress.getWorkCompleted() +
progress.getWorkRemaining();
+ double completed =
+ totalWork * currentWindowIterator.previousIndex() +
progress.getWorkCompleted();
+ double remaining =
+ totalWork * (currentElement.getWindows().size() -
currentWindowIterator.nextIndex())
+ + progress.getWorkRemaining();
+ return Progress.from(completed, remaining);
+ }
+ }
+ return null;
+ }
+
+ private Progress getProgressFromWindowObservingTruncate(double
elementCompleted) {
+ synchronized (splitLock) {
+ if (currentWindowIterator != null) {
Review comment:
Should we register this with `addProgressRequestCallback` so we generate
monitoring infos?
I'm not sure if truncate should be using the downstream progress as part of
its calculation when reporting it as a monitoring info. I know that this
differs from how we calculate the progress/split point for the
SplittableFnDataReceiver since the singular fraction needs to take into account
the downstream progress accurately.
I was always envisioning that work completed/work remaining for the
monitoring infos always represented the local knowledge of work and didn't take
into account any downstream/upstream knowledge. We can avoid this issue if we
merge this logic into the `getProgress` method around line 533.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]