arunpandianp commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1812150816


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -402,6 +420,11 @@ private Batcher() {
     @Override
     public boolean commitWorkItem(
         String computation, WorkItemCommitRequest commitRequest, 
Consumer<CommitStatus> onDone) {
+      if (isShutdown()) {
+        onDone.accept(CommitStatus.ABORTED);
+        return false;
+      }
+
       if (!canAccept(commitRequest.getSerializedSize() + computation.length()) 
|| isShutdown()) {

Review Comment:
   ```suggestion
         if (!canAccept(commitRequest.getSerializedSize() + 
computation.length())) {
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -354,18 +358,24 @@ private <ResponseT> ResponseT issueRequest(QueuedRequest 
request, ParseFn<Respon
       }
     }
 
-    throw new WindmillStreamShutdownException(
-        "Cannot send request=[" + request + "] on closed stream.");
+    throw new org.apache.beam.runners.dataflow.worker.windmill.client
+        .WindmillStreamShutdownException("Cannot send request=[" + request + 
"] on closed stream.");
   }
 
   private void handleShutdown(QueuedRequest request, Throwable cause) {
-    if (cause instanceof WindmillStreamShutdownException) {
-      throw (WindmillStreamShutdownException) cause;
+    if (cause
+        instanceof
+        
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException)
 {
+      throw (org.apache.beam.runners.dataflow.worker.windmill.client

Review Comment:
   This cast looks redundant, do we need it?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -278,21 +354,67 @@ public String backendWorkerToken() {
   }
 
   @Override
-  public void shutdown() {
-    if (isShutdown.compareAndSet(false, true)) {
-      requestObserver()
-          .onError(new WindmillStreamShutdownException("Explicit call to 
shutdown stream."));
+  public final void shutdown() {
+    // Don't lock on "this" as isShutdown checks are used in the stream to 
free blocked
+    // threads or as exit conditions to loops.
+    synchronized (shutdownLock) {
+      if (!isShutdown) {
+        isShutdown = true;
+        shutdownTime.set(DateTime.now());
+        requestObserver.onError(
+            new WindmillStreamShutdownException("Explicit call to shutdown 
stream."));
+        shutdownInternal();
+      }
     }
   }
 
-  private void setLastError(String error) {
-    lastError.set(error);
-    lastErrorTime.set(DateTime.now());
+  private void recordRestartReason(String error) {
+    lastRestartReason.set(error);
+    lastRestartTime.set(DateTime.now());
   }
 
-  public static class WindmillStreamShutdownException extends RuntimeException 
{
-    public WindmillStreamShutdownException(String message) {
-      super(message);
+  protected abstract void shutdownInternal();
+
+  /**
+   * Request observer that allows resetting its internal delegate using the 
given {@link
+   * #requestObserverSupplier}.
+   */
+  @ThreadSafe
+  private static class ResettableRequestObserver<RequestT> implements 
StreamObserver<RequestT> {
+
+    private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
+
+    @GuardedBy("this")
+    private @Nullable StreamObserver<RequestT> delegateRequestObserver;
+
+    private ResettableRequestObserver(Supplier<StreamObserver<RequestT>> 
requestObserverSupplier) {
+      this.requestObserverSupplier = requestObserverSupplier;
+      this.delegateRequestObserver = null;
+    }
+
+    private synchronized StreamObserver<RequestT> delegate() {
+      return Preconditions.checkNotNull(
+          delegateRequestObserver,
+          "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+    }
+
+    private synchronized void reset() {
+      delegateRequestObserver = requestObserverSupplier.get();
+    }
+
+    @Override
+    public void onNext(RequestT requestT) {
+      delegate().onNext(requestT);

Review Comment:
   do we need to guard the delegate().method calls by `this`? 
   
   Are delegates expected to be threadsafe? If so, please add a comment.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequests.java:
##########
@@ -122,12 +132,62 @@ void addRequest(QueuedRequest request) {
       byteSize += request.byteSize();
     }
 
-    void countDown() {
+    /** Let waiting for threads know that the request has been successfully 
sent. */
+    synchronized void notifySent() {

Review Comment:
   don't think we need `synchronized` on both these methods.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -188,20 +195,32 @@ private long uniqueId() {
 
   @Override
   public KeyedGetDataResponse requestKeyedData(String computation, 
KeyedGetDataRequest request) {
-    return issueRequest(
-        QueuedRequest.forComputation(uniqueId(), computation, request),
-        KeyedGetDataResponse::parseFrom);
+    try {
+      return issueRequest(
+          QueuedRequest.forComputation(uniqueId(), computation, request),
+          KeyedGetDataResponse::parseFrom);
+    } catch (
+        
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException
 e) {
+      throw new WorkItemCancelledException(request.getShardingKey());
+    }
   }
 
   @Override
   public GlobalData requestGlobalData(GlobalDataRequest request) {
-    return issueRequest(QueuedRequest.global(uniqueId(), request), 
GlobalData::parseFrom);
+    try {
+      return issueRequest(QueuedRequest.global(uniqueId(), request), 
GlobalData::parseFrom);
+    } catch (
+        
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException
 e) {
+      throw new WorkItemCancelledException(

Review Comment:
   this gets converted into GetDataException upstream, I think we need to 
remove GetDataException (could lead to repeated retries). Can be done in a 
separate PR.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -229,31 +252,38 @@ private void issueBatchedRequest(Map<Long, 
PendingRequest> requests) {
     for (Map.Entry<Long, PendingRequest> entry : requests.entrySet()) {
       PendingRequest request = entry.getValue();
       StreamingCommitRequestChunk.Builder chunkBuilder = 
requestBuilder.addCommitChunkBuilder();
-      if (lastComputation == null || 
!lastComputation.equals(request.computation)) {
-        chunkBuilder.setComputationId(request.computation);
-        lastComputation = request.computation;
+      if (lastComputation == null || 
!lastComputation.equals(request.computationId())) {
+        chunkBuilder.setComputationId(request.computationId());
+        lastComputation = request.computationId();
       }
-      chunkBuilder.setRequestId(entry.getKey());
-      chunkBuilder.setShardingKey(request.request.getShardingKey());
-      chunkBuilder.setSerializedWorkItemCommit(request.request.toByteString());
+      chunkBuilder
+          .setRequestId(entry.getKey())
+          .setShardingKey(request.shardingKey())
+          .setSerializedWorkItemCommit(request.serializedCommit());
     }
     StreamingCommitWorkRequest request = requestBuilder.build();
-    synchronized (this) {
-      pending.putAll(requests);
-      try {
-        send(request);
-      } catch (IllegalStateException e) {
-        // Stream was broken, request will be retried when stream is reopened.
-      }
+
+    if (shouldCancelRequest(requests)) {
+      requests.forEach((ignored, pendingRequest) -> pendingRequest.abort());
+      return;
+    }
+
+    try {
+      send(request);
+    } catch (IllegalStateException e) {
+      // Stream was broken, request will be retried when stream is reopened.
     }
   }
 
-  private void issueMultiChunkRequest(final long id, PendingRequest 
pendingRequest) {
-    checkNotNull(pendingRequest.computation);
-    final ByteString serializedCommit = pendingRequest.request.toByteString();
+  private void issueMultiChunkRequest(long id, PendingRequest pendingRequest) {
+    checkNotNull(pendingRequest.computationId());
+    ByteString serializedCommit = pendingRequest.serializedCommit();

Review Comment:
   nit: move this after the if block below.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -302,38 +331,57 @@ public void appendSpecificHtml(PrintWriter writer) {
   }
 
   private <ResponseT> ResponseT issueRequest(QueuedRequest request, 
ParseFn<ResponseT> parseFn) {
-    while (true) {
+    while (!isShutdown()) {
       request.resetResponseStream();
       try {
         queueRequestAndWait(request);
         return parseFn.parse(request.getResponseStream());
-      } catch (CancellationException e) {
-        // Retry issuing the request since the response stream was cancelled.
-        continue;
+      } catch (AppendableInputStream.InvalidInputStreamStateException
+          | VerifyException

Review Comment:
   If we see VerifyException, that means there is a logic bug and we need to 
correct code to fix them. 
   
   One case I see is, on shutdown we do `batches.clear()`, this could happen in 
the middle of the batch sending. There is `verify(batch == 
batches.peekFirst());` inside `send` that will throw after shutdown.
   
   from 
https://github.com/google/guava/wiki/ConditionalFailuresExplained#kinds-of-conditional-failures
   ```
   A verification check is used when you lack high confidence that an API you 
consume will meet its (real or implied) specification. It's easiest to 
understand this type of check as "like an assertion in almost every way, but 
we'll never want to disable them."
   ``` 
   
   Can we make sure Verify exceptions are not thrown unless there is a bug?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -283,21 +313,97 @@ private void issueMultiChunkRequest(final long id, 
PendingRequest pendingRequest
     }
   }
 
-  private static class PendingRequest {
+  private boolean shouldCancelRequest(long id, PendingRequest request) {

Review Comment:
   `handleRequest` or something else instead of `shouldCancelRequest`? It is 
not clear from `shouldCancelRequest` that it is mutating the `pending` map.
   
   Please add a comment explaining the meaning of return value.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -207,7 +209,8 @@ public KeyedGetDataResponse requestKeyedData(String 
computation, KeyedGetDataReq
   public GlobalData requestGlobalData(GlobalDataRequest request) {
     try {
       return issueRequest(QueuedRequest.global(uniqueId(), request), 
GlobalData::parseFrom);
-    } catch (WindmillStreamShutdownException e) {
+    } catch (
+        
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException
 e) {

Review Comment:
   import the class and use only the class name?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -402,6 +420,11 @@ private Batcher() {
     @Override
     public boolean commitWorkItem(
         String computation, WorkItemCommitRequest commitRequest, 
Consumer<CommitStatus> onDone) {
+      if (isShutdown()) {
+        onDone.accept(CommitStatus.ABORTED);
+        return false;
+      }
+
       if (!canAccept(commitRequest.getSerializedSize() + computation.length()) 
|| isShutdown()) {

Review Comment:
   having the shutdown check here could reject the first message.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequests.java:
##########
@@ -95,11 +108,8 @@ static class QueuedBatch {
     private final List<QueuedRequest> requests = new ArrayList<>();

Review Comment:
   Comments on which methods are thread safe and which ones need external 
synchronization would help.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to