boyuanzz commented on a change in pull request #12430:
URL: https://github.com/apache/beam/pull/12430#discussion_r465891961
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1029,7 +1040,27 @@ public double getProgress() {
private Progress getProgress() {
synchronized (splitLock) {
if (currentTracker instanceof RestrictionTracker.HasProgress) {
- return ((HasProgress) currentTracker).getProgress();
+ Progress progress = ((HasProgress) currentTracker).getProgress();
+ double totalWork = progress.getWorkCompleted() +
progress.getWorkRemaining();
+ double completed =
+ totalWork * currentWindowIterator.previousIndex() +
progress.getWorkCompleted();
+ double remaining =
+ totalWork * (currentElement.getWindows().size() -
currentWindowIterator.nextIndex())
+ + progress.getWorkRemaining();
+ return Progress.from(completed, remaining);
+ }
+ }
+ return null;
+ }
+
+ private Progress getProgressFromWindowObservingTruncate(double
elementCompleted) {
+ synchronized (splitLock) {
+ if (currentWindowIterator != null) {
Review comment:
Thanks for the formula. If we let the runner to do the computation, the
runner needs to lookup the graph to figure out which progress is from
downstream. I'm thinking about another option: we can add a progress signal for
split specifically, which should be reported by the root node of the SDK graph.
All computations you have mentioned can be done in the SDK side recursively.
The runner side can decide whether to look into this signal by knowing whether
there is an SDF(or SDFs) in the SDK graph.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]