scwhittle commented on code in PR #38920:
URL: https://github.com/apache/beam/pull/38920#discussion_r3413508642


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -391,12 +391,18 @@ BoundedQueueExecutorWorkHandleImpl createBudgetHandle(int 
elements, long bytes)
     if (keyGroupWorkQueue == null) {
       return null;
     }
-    @Nullable QueuedWork queuedWork = 
keyGroupWorkQueue.pollWork(computationId, keyGroup);
-    if (queuedWork == null) {
-      return null;
+    while (true) {
+      @Nullable QueuedWork queuedWork = 
keyGroupWorkQueue.pollWork(computationId, keyGroup);
+      if (queuedWork == null) {
+        return null;
+      }
+      if (queuedWork.getWork().work().isFailed()) {
+        queuedWork.getHandle().close();

Review Comment:
   this is just updating the counters, but it is not updating that this key was 
scheduled for processing.  I think that somehow 
ActiveWorkState.completeWorkAndGetNextWorkForKey needs to be called so that we 
note that this key is completed and other work for the key can schedule.
   
   Maybe a unit test at a higher level would help verify this behavior is 
working properly.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to