m-trieu commented on code in PR #31902:
URL: https://github.com/apache/beam/pull/31902#discussion_r1795771933


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -156,29 +162,47 @@ public void sendHealthCheck() {
   protected void onResponse(StreamingCommitResponse response) {
     commitWorkThrottleTimer.stop();
 
-    RuntimeException finalException = null;
+    @Nullable RuntimeException failure = null;
     for (int i = 0; i < response.getRequestIdCount(); ++i) {
       long requestId = response.getRequestId(i);
       if (requestId == HEARTBEAT_REQUEST_ID) {
         continue;
       }
-      PendingRequest done = pending.remove(requestId);
-      if (done == null) {
-        LOG.error("Got unknown commit request ID: {}", requestId);
+      PendingRequest pendingRequest = pending.remove(requestId);
+      if (pendingRequest == null) {
+        if (!isShutdown()) {
+          // Skip responses when the stream is shutdown since they are now 
invalid.
+          LOG.error("Got unknown commit request ID: {}", requestId);
+        }
       } else {
         try {
-          done.onDone.accept(
+          pendingRequest.completeWithStatus(
               (i < response.getStatusCount()) ? response.getStatus(i) : 
CommitStatus.OK);
         } catch (RuntimeException e) {
           // Catch possible exceptions to ensure that an exception for one 
commit does not prevent
-          // other commits from being processed.
+          // other commits from being processed. Aggregate all the failures to 
throw after
+          // processing the response if they exist.
           LOG.warn("Exception while processing commit response.", e);
-          finalException = e;
+          if (failure == null) {
+            failure = e;
+          } else {
+            failure.addSuppressed(e);

Review Comment:
   It is important that we track it some how? maybe keep account of the 
error/exception type
   it's possible that some might have a real issue but others won't
   
   actually maybe a subset is good
   
   this is an error consuming the commit since at this point windmill is acking 
that the commit was either successful or failed.  so maybe we can just record 
the status + failure or have a map<<status, exception>, count> and just log that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to