arunpandianp commented on code in PR #35327: URL: https://github.com/apache/beam/pull/35327#discussion_r2176736555
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java: ########## @@ -230,10 +271,13 @@ private void issueSingleRequest(long id, PendingRequest pendingRequest) .setSerializedWorkItemCommit(pendingRequest.serializedCommit()); StreamingCommitWorkRequest chunk = requestBuilder.build(); synchronized (this) { - if (!prepareForSend(id, pendingRequest)) { + if (isShutdown) { pendingRequest.abort(); Review Comment: should we return here? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java: ########## @@ -139,62 +159,83 @@ public CommitWorkStream.RequestBatcher batcher() { } @Override - protected boolean hasPendingRequests() { - return !pending.isEmpty(); - } - - @Override - protected void sendHealthCheck() throws WindmillStreamShutdownException { - if (hasPendingRequests()) { + protected synchronized void sendHealthCheck() throws WindmillStreamShutdownException { + if (currentPhysicalStream != null && currentPhysicalStream.hasPendingRequests()) { StreamingCommitWorkRequest.Builder builder = StreamingCommitWorkRequest.newBuilder(); builder.addCommitChunkBuilder().setRequestId(HEARTBEAT_REQUEST_ID); trySend(builder.build()); } } - @Override - protected void onResponse(StreamingCommitResponse response) { - CommitCompletionFailureHandler failureHandler = new CommitCompletionFailureHandler(); - for (int i = 0; i < response.getRequestIdCount(); ++i) { - long requestId = response.getRequestId(i); - if (requestId == HEARTBEAT_REQUEST_ID) { - continue; - } + private class CommitWorkPhysicalStreamHandler extends PhysicalStreamHandler { + @Override + public void onResponse(StreamingCommitResponse response) { + CommitCompletionFailureHandler failureHandler = new CommitCompletionFailureHandler(); + for (int i = 0; i < response.getRequestIdCount(); ++i) { + long requestId = response.getRequestId(i); + if (requestId == HEARTBEAT_REQUEST_ID) { + continue; + } + + // From windmill.proto: Indices must line up with the request_id field, but trailing OKs + // may be omitted. + CommitStatus commitStatus = + i < response.getStatusCount() ? response.getStatus(i) : CommitStatus.OK; - // From windmill.proto: Indices must line up with the request_id field, but trailing OKs may - // be omitted. - CommitStatus commitStatus = - i < response.getStatusCount() ? response.getStatus(i) : CommitStatus.OK; - - @Nullable PendingRequest pendingRequest = pending.remove(requestId); - if (pendingRequest == null) { - synchronized (this) { - if (!isShutdown) { - // Missing responses are expected after shutdown() because it removes them. - LOG.error("Got unknown commit request ID: {}", requestId); - } + @Nullable StreamAndRequest entry = pending.remove(requestId); + if (entry == null) { + LOG.error("Got unknown commit request ID: {}", requestId); + continue; } - } else { + if (entry.handler != this) { + LOG.error("Got commit request id {} on unexpected stream", requestId); + } + PendingRequest pendingRequest = entry.request; try { pendingRequest.completeWithStatus(commitStatus); } catch (RuntimeException e) { - // Catch possible exceptions to ensure that an exception for one commit does not prevent + // Catch possible exceptions to ensure that an exception for one commit does not + // prevent Review Comment: need to fix format ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ########## @@ -284,8 +374,8 @@ public void onHeartbeatResponse(List<Windmill.ComputationHeartbeatResponse> resp } @Override - protected void sendHealthCheck() throws WindmillStreamShutdownException { - if (hasPendingRequests()) { + protected synchronized void sendHealthCheck() throws WindmillStreamShutdownException { + if (currentPhysicalStream != null && currentPhysicalStream.hasPendingRequests()) { trySend(HEALTH_CHECK_REQUEST); Review Comment: not related to new code: will it be better to send health checks even when there are no pending requests? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org