lukecwik commented on a change in pull request #12430:
URL: https://github.com/apache/beam/pull/12430#discussion_r464673014



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -515,6 +515,9 @@
               && Iterables.get(mainOutputConsumers, 0) instanceof 
HandlesSplits) {
             mainInputConsumer =
                 new SplittableFnDataReceiver() {
+                  private final HandlesSplits splitDelegate =
+                      (HandlesSplits) Iterables.get(mainOutputConsumers, 0);

Review comment:
       nit: here and below around line 558
   ```suggestion
                         (HandlesSplits) 
Iterables.getOnlyElement(mainOutputConsumers);
   ```

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1029,7 +1040,27 @@ public double getProgress() {
   private Progress getProgress() {
     synchronized (splitLock) {
       if (currentTracker instanceof RestrictionTracker.HasProgress) {
-        return ((HasProgress) currentTracker).getProgress();
+        Progress progress = ((HasProgress) currentTracker).getProgress();
+        double totalWork = progress.getWorkCompleted() + 
progress.getWorkRemaining();
+        double completed =
+            totalWork * currentWindowIterator.previousIndex() + 
progress.getWorkCompleted();
+        double remaining =
+            totalWork * (currentElement.getWindows().size() - 
currentWindowIterator.nextIndex())
+                + progress.getWorkRemaining();
+        return Progress.from(completed, remaining);
+      }
+    }
+    return null;
+  }
+
+  private Progress getProgressFromWindowObservingTruncate(double 
elementCompleted) {
+    synchronized (splitLock) {
+      if (currentWindowIterator != null) {

Review comment:
       Should we register this with `addProgressRequestCallback` so we generate 
monitoring infos?
   
   I'm not sure if truncate should be using the downstream progress as part of 
its calculation when reporting it as a monitoring info. I know that this 
differs from how we calculate the progress/split point for the 
SplittableFnDataReceiver since the singular fraction needs to take into account 
the downstream progress accurately.
   
   I was always envisioning that work completed/work remaining for the 
monitoring infos always represented the local knowledge of work and didn't take 
into account any downstream/upstream knowledge. We can avoid this issue if we 
merge this logic into the `getProgress` method around line 533.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to