arunpandianp commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1809988877


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -229,17 +255,24 @@ private void issueBatchedRequest(Map<Long, 
PendingRequest> requests) {
     for (Map.Entry<Long, PendingRequest> entry : requests.entrySet()) {
       PendingRequest request = entry.getValue();
       StreamingCommitRequestChunk.Builder chunkBuilder = 
requestBuilder.addCommitChunkBuilder();
-      if (lastComputation == null || 
!lastComputation.equals(request.computation)) {
-        chunkBuilder.setComputationId(request.computation);
-        lastComputation = request.computation;
+      if (lastComputation == null || 
!lastComputation.equals(request.computationId())) {
+        chunkBuilder.setComputationId(request.computationId());
+        lastComputation = request.computationId();
       }
-      chunkBuilder.setRequestId(entry.getKey());
-      chunkBuilder.setShardingKey(request.request.getShardingKey());
-      chunkBuilder.setSerializedWorkItemCommit(request.request.toByteString());
+      chunkBuilder
+          .setRequestId(entry.getKey())
+          .setShardingKey(request.shardingKey())
+          .setSerializedWorkItemCommit(request.serializedCommit());
     }
     StreamingCommitWorkRequest request = requestBuilder.build();
     synchronized (this) {
-      pending.putAll(requests);
+      synchronized (shutdownLock) {
+        if (!isShutdown()) {
+          pending.putAll(requests);
+        } else {
+          return;

Review Comment:
   same, should we complete requests with error status in the else case?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -249,11 +282,16 @@ private void issueBatchedRequest(Map<Long, 
PendingRequest> requests) {
   }
 
   private void issueMultiChunkRequest(final long id, PendingRequest 
pendingRequest) {
-    checkNotNull(pendingRequest.computation);
-    final ByteString serializedCommit = pendingRequest.request.toByteString();
-
+    checkNotNull(pendingRequest.computationId());
+    final ByteString serializedCommit = pendingRequest.serializedCommit();
     synchronized (this) {
-      pending.put(id, pendingRequest);
+      synchronized (shutdownLock) {
+        if (!isShutdown()) {
+          pending.put(id, pendingRequest);
+        } else {
+          return;

Review Comment:
   complete requests with error status in the else case?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -160,80 +179,125 @@ protected boolean isShutdown() {
   private StreamObserver<RequestT> requestObserver() {
     if (requestObserver == null) {
       throw new NullPointerException(
-          "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+          "requestObserver cannot be null. Missing a call to start() to 
initialize stream.");

Review Comment:
   bump



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -314,26 +402,38 @@ private Batcher() {
     @Override
     public boolean commitWorkItem(
         String computation, WorkItemCommitRequest commitRequest, 
Consumer<CommitStatus> onDone) {
-      if (!canAccept(commitRequest.getSerializedSize() + 
computation.length())) {
+      if (!canAccept(commitRequest.getSerializedSize() + computation.length()) 
|| isShutdown()) {

Review Comment:
   should we call onDone with err status if shutdown is true?
   
   `StreamingEngineWorkCommitter` expects the first commit on a batch to queue 
successfully here.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -204,17 +224,23 @@ private void flushInternal(Map<Long, PendingRequest> 
requests) {
     }
   }
 
-  private void issueSingleRequest(final long id, PendingRequest 
pendingRequest) {
+  private void issueSingleRequest(long id, PendingRequest pendingRequest) {
     StreamingCommitWorkRequest.Builder requestBuilder = 
StreamingCommitWorkRequest.newBuilder();
     requestBuilder
         .addCommitChunkBuilder()
-        .setComputationId(pendingRequest.computation)
+        .setComputationId(pendingRequest.computationId())
         .setRequestId(id)
-        .setShardingKey(pendingRequest.request.getShardingKey())
-        .setSerializedWorkItemCommit(pendingRequest.request.toByteString());
+        .setShardingKey(pendingRequest.shardingKey())
+        .setSerializedWorkItemCommit(pendingRequest.serializedCommit());
     StreamingCommitWorkRequest chunk = requestBuilder.build();
     synchronized (this) {
-      pending.put(id, pendingRequest);
+      synchronized (shutdownLock) {
+        if (!isShutdown()) {
+          pending.put(id, pendingRequest);

Review Comment:
   call `pendingRequest.abort()` in the else case?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -314,26 +402,38 @@ private Batcher() {
     @Override
     public boolean commitWorkItem(
         String computation, WorkItemCommitRequest commitRequest, 
Consumer<CommitStatus> onDone) {
-      if (!canAccept(commitRequest.getSerializedSize() + 
computation.length())) {
+      if (!canAccept(commitRequest.getSerializedSize() + computation.length()) 
|| isShutdown()) {
         return false;
       }
-      PendingRequest request = new PendingRequest(computation, commitRequest, 
onDone);
+
+      PendingRequest request = PendingRequest.create(computation, 
commitRequest, onDone);
       add(idGenerator.incrementAndGet(), request);
       return true;
     }
 
     /** Flushes any pending work items to the wire. */
     @Override
     public void flush() {
-      flushInternal(queue);
-      queuedBytes = 0;
-      queue.clear();
+      try {
+        if (!isShutdown()) {
+          flushInternal(queue);
+        } else {
+          queue.forEach((ignored, request) -> 
request.onDone().accept(CommitStatus.ABORTED));

Review Comment:
   ```suggestion
             queue.forEach((ignored, request) -> request.abort());
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -204,17 +224,23 @@ private void flushInternal(Map<Long, PendingRequest> 
requests) {
     }
   }
 
-  private void issueSingleRequest(final long id, PendingRequest 
pendingRequest) {
+  private void issueSingleRequest(long id, PendingRequest pendingRequest) {
     StreamingCommitWorkRequest.Builder requestBuilder = 
StreamingCommitWorkRequest.newBuilder();
     requestBuilder
         .addCommitChunkBuilder()
-        .setComputationId(pendingRequest.computation)
+        .setComputationId(pendingRequest.computationId())
         .setRequestId(id)
-        .setShardingKey(pendingRequest.request.getShardingKey())
-        .setSerializedWorkItemCommit(pendingRequest.request.toByteString());
+        .setShardingKey(pendingRequest.shardingKey())
+        .setSerializedWorkItemCommit(pendingRequest.serializedCommit());
     StreamingCommitWorkRequest chunk = requestBuilder.build();
     synchronized (this) {
-      pending.put(id, pendingRequest);
+      synchronized (shutdownLock) {

Review Comment:
   In startStream, `shutdownLock` is locked before `this`, here `this` is 
locked before `shutdownLock`. Grabbing locks in different order could cause 
deadlocks, can we change to always grab `this` after `shutdownLock`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to