damccorm commented on code in PR #39076:
URL: https://github.com/apache/beam/pull/39076#discussion_r3481779280


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AsyncWrapper.java:
##########
@@ -645,13 +645,14 @@ void commitFinishedItems(
         if (activeElements.containsKey(elementId)) {
           InFlightElement<OutputT> inFlight = activeElements.get(elementId);
           if (inFlight.future.isDone()) {
+            // Remove from local active map before checking the result
+            activeElements.remove(elementId);
             try {
               if (!inFlight.future.isCancelled()) {
                 toReturn.add(inFlight.future.get());
               }
-
+              // Only mark as finished if get() succeeded without exception
               finishedElementIds.add(elementId);

Review Comment:
   Looking at this - should we be adding to `finishedElementIds` if the future 
gets cancelled? Doesn't this mean that the task was not actually finished?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to