m-trieu commented on code in PR #31902:
URL: https://github.com/apache/beam/pull/31902#discussion_r1795796743
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -156,29 +162,47 @@ public void sendHealthCheck() {
protected void onResponse(StreamingCommitResponse response) {
commitWorkThrottleTimer.stop();
- RuntimeException finalException = null;
+ @Nullable RuntimeException failure = null;
for (int i = 0; i < response.getRequestIdCount(); ++i) {
long requestId = response.getRequestId(i);
if (requestId == HEARTBEAT_REQUEST_ID) {
continue;
}
- PendingRequest done = pending.remove(requestId);
- if (done == null) {
- LOG.error("Got unknown commit request ID: {}", requestId);
+ PendingRequest pendingRequest = pending.remove(requestId);
+ if (pendingRequest == null) {
+ if (!isShutdown()) {
+ // Skip responses when the stream is shutdown since they are now
invalid.
+ LOG.error("Got unknown commit request ID: {}", requestId);
+ }
} else {
try {
- done.onDone.accept(
+ pendingRequest.completeWithStatus(
(i < response.getStatusCount()) ? response.getStatus(i) :
CommitStatus.OK);
} catch (RuntimeException e) {
// Catch possible exceptions to ensure that an exception for one
commit does not prevent
- // other commits from being processed.
+ // other commits from being processed. Aggregate all the failures to
throw after
+ // processing the response if they exist.
LOG.warn("Exception while processing commit response.", e);
- finalException = e;
+ if (failure == null) {
+ failure = e;
+ } else {
+ failure.addSuppressed(e);
Review Comment:
opted to record last 10 errors in detail and have a counter of <status,
throwable.class> so we can keep track of any weird behavior
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -156,29 +162,47 @@ public void sendHealthCheck() {
protected void onResponse(StreamingCommitResponse response) {
commitWorkThrottleTimer.stop();
- RuntimeException finalException = null;
+ @Nullable RuntimeException failure = null;
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]