lukecwik commented on a change in pull request #12430:
URL: https://github.com/apache/beam/pull/12430#discussion_r467178630



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1029,12 +1040,35 @@ public double getProgress() {
   private Progress getProgress() {
     synchronized (splitLock) {
       if (currentTracker instanceof RestrictionTracker.HasProgress) {
-        return ((HasProgress) currentTracker).getProgress();
+        return scaleProgress(
+            ((HasProgress) currentTracker).getProgress(),
+            currentWindowIterator.previousIndex(),
+            currentElement.getWindows().size());
       }
     }
     return null;
   }
 
+  private Progress getProgressFromWindowObservingTruncate(double 
elementCompleted) {
+    synchronized (splitLock) {
+      if (currentWindowIterator != null) {
+        return scaleProgress(
+            Progress.from(elementCompleted, 1 - elementCompleted),
+            currentWindowIterator.previousIndex(),
+            currentElement.getWindows().size());
+      }
+    }
+    return null;
+  }
+
+  private Progress scaleProgress(Progress progress, int completedWindowIndex, 
int windowCount) {

Review comment:
       For the future:
   ```suggestion
     private Progress scaleProgress(Progress progress, int currentWindowIndex, 
int stopWindowIndex) {
   ```
   
   Note that it is important that it isn't the number of completed windows but 
the current window index otherwise the math doesn't work out.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1029,12 +1040,35 @@ public double getProgress() {
   private Progress getProgress() {
     synchronized (splitLock) {
       if (currentTracker instanceof RestrictionTracker.HasProgress) {
-        return ((HasProgress) currentTracker).getProgress();
+        return scaleProgress(
+            ((HasProgress) currentTracker).getProgress(),
+            currentWindowIterator.previousIndex(),
+            currentElement.getWindows().size());
       }
     }
     return null;
   }
 
+  private Progress getProgressFromWindowObservingTruncate(double 
elementCompleted) {
+    synchronized (splitLock) {
+      if (currentWindowIterator != null) {
+        return scaleProgress(
+            Progress.from(elementCompleted, 1 - elementCompleted),
+            currentWindowIterator.previousIndex(),
+            currentElement.getWindows().size());
+      }
+    }
+    return null;
+  }
+
+  private Progress scaleProgress(Progress progress, int completedWindowIndex, 
int windowCount) {
+    double totalWork = progress.getWorkCompleted() + 
progress.getWorkRemaining();

Review comment:
       `totalWork` -> `totalWorkPerWindow`

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1029,12 +1040,35 @@ public double getProgress() {
   private Progress getProgress() {
     synchronized (splitLock) {
       if (currentTracker instanceof RestrictionTracker.HasProgress) {
-        return ((HasProgress) currentTracker).getProgress();
+        return scaleProgress(
+            ((HasProgress) currentTracker).getProgress(),
+            currentWindowIterator.previousIndex(),
+            currentElement.getWindows().size());

Review comment:
       This should be the `stop index` since splitting limits the current 
window iterator so the progress will be wrong after a split happens.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to