scwhittle commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1840157947


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##########
@@ -121,7 +121,7 @@ public void onNext(T t) throws StreamClosedException, 
WindmillStreamShutdownExce
 
       try {
         delegate.onError(e);
-      } catch (RuntimeException ignored) {
+      } catch (IllegalStateException ignored) {

Review Comment:
   add the ignored to the exception too? just in case we catch something 
unexpected and it helps debugging?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -161,19 +162,27 @@ public void onNext(T value) throws 
StreamObserverCancelledException {
   public void onError(Throwable t) {
     isReadyNotifier.forceTermination();
     synchronized (lock) {
-      isClosed = true;
+      markClosedOrThrow();
       outboundObserver.onError(t);
     }
   }
 
   @Override
   public void onCompleted() {
+    isReadyNotifier.forceTermination();
     synchronized (lock) {
-      isClosed = true;
+      markClosedOrThrow();
       outboundObserver.onCompleted();
     }
   }
 
+  private void markClosedOrThrow() {
+    synchronized (lock) {
+      Preconditions.checkState(!isClosed);

Review Comment:
   this is going to throw if we have sequence
   
   T1: onNext()
   T2: terminate()
   T1: onError()
   
   It seems like that could happen if we're terminating from other threads than 
the one driving the observer generally.  We could have a separate bool tracking 
if userClosed or not, and change this exception to be based upon that as that 
is using the class wrong. having a terminate before/during a 
onCompleted/onError doesn't necessarily seem like misuse and I think we should 
avoid throwing an exception.
   
   
   



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -161,19 +162,27 @@ public void onNext(T value) throws 
StreamObserverCancelledException {
   public void onError(Throwable t) {
     isReadyNotifier.forceTermination();
     synchronized (lock) {
-      isClosed = true;
+      markClosedOrThrow();
       outboundObserver.onError(t);
     }
   }
 
   @Override
   public void onCompleted() {
+    isReadyNotifier.forceTermination();
     synchronized (lock) {
-      isClosed = true;
+      markClosedOrThrow();
       outboundObserver.onCompleted();
     }
   }
 
+  private void markClosedOrThrow() {

Review Comment:
   just make method synchronized (though with suggestion below, probably easier 
to just duplicate in onCompleted/onError and use the same if block.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -188,7 +188,9 @@ protected void onResponse(StreamingGetDataResponse chunk) {
 
     for (int i = 0; i < chunk.getRequestIdCount(); ++i) {
       AppendableInputStream responseStream = 
pending.get(chunk.getRequestId(i));
-      verify(responseStream != null, "No pending response stream");
+      synchronized (this) {

Review Comment:
   avoid synchronizing if not necessary and you need to handle the null case or 
you will just get an exception in following line.
   
   if (responseStream == null) {
     sychronized (this) { verify(isShutdown); }
     continue;
   }



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java:
##########
@@ -131,13 +136,27 @@ public void testShutdown_abortsQueuedCommits() throws 
InterruptedException {
               commitProcessed.countDown();
             });
       }
+    } catch (StreamObserverCancelledException ignored) {
     }
 
     // Verify that we sent the commits above in a request + the initial header.
-    verify(requestObserver, 
times(2)).onNext(any(Windmill.StreamingCommitWorkRequest.class));
+    verify(requestObserver, times(2))
+        .onNext(
+            argThat(
+                request -> {
+                  if (request.getHeader().equals(TEST_JOB_HEADER)) {

Review Comment:
   nit: how about separate onNext? as is 2 headers would pass. may need some 
sequence object, not sure.
   
   .onNext(argThat(r -> return r.getHeader().equals(TEST_JOB_HEADER))
   .onNext(argThat(r - > !return request.getCommitChunkList().empty())



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -128,7 +128,9 @@ public void onNext(T value) throws 
StreamObserverCancelledException {
           // Phaser is terminated so don't use the outboundObserver. Since 
onError and onCompleted
           // are synchronized after terminating the phaser if we observe that 
the phaser is not
           // terminated the onNext calls below are guaranteed to not be called 
on a closed observer.
-          if (currentPhase < 0) return;
+          if (currentPhase < 0) {
+            throw new StreamObserverCancelledException("StreamObserver was 
terminated.");
+          }

Review Comment:
   assert(!isClosed); 
   since we terminate before closing seems like it is guaranteed but clearer to 
reader to assert I think



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java:
##########
@@ -104,6 +106,10 @@ public void setMessageCompression(boolean b) {}
             () ->
                 assertThrows(WindmillStreamShutdownException.class, () -> 
testStream.testSend(1)));
     testStream.shutdown();
+
+    // Sleep a bit to give sendExecutor time to execute the send().
+    Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);

Review Comment:
   sleep for less, 100ms maybe? Tests take long enough already



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -128,7 +128,9 @@ public void onNext(T value) throws 
StreamObserverCancelledException {
           // Phaser is terminated so don't use the outboundObserver. Since 
onError and onCompleted
           // are synchronized after terminating the phaser if we observe that 
the phaser is not
           // terminated the onNext calls below are guaranteed to not be called 
on a closed observer.
-          if (currentPhase < 0) return;
+          if (currentPhase < 0) {
+            throw new StreamObserverCancelledException("StreamObserver was 
terminated.");
+          }
           messagesSinceReady = 0;

Review Comment:
   assert(!isClosed); 
   since we terminate before closing seems like it is guaranteed but clearer to 
reader to assert I think



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -83,16 +85,14 @@ public void onNext(T value) throws 
StreamObserverCancelledException {
           // Phaser is terminated so don't use the outboundObserver. Since 
onError and onCompleted
           // are synchronized after terminating the phaser if we observe that 
the phaser is not
           // terminated the onNext calls below are guaranteed to not be called 
on a closed observer.
-          if (currentPhase < 0) return;
+          if (currentPhase < 0) {
+            throw new StreamObserverCancelledException("StreamObserver was 
terminated.");
+          }

Review Comment:
   assert(!isClosed); 
   since we terminate before grabbing synchronized to close it is guaranteed 
but clearer to reader to assert I think



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamTest.java:
##########
@@ -102,6 +105,48 @@ private GrpcGetDataStream 
createGetDataStream(GetDataStreamTestStub testStub) {
     return getDataStream;
   }
 
+  @Test
+  public void testRequestKeyedData() {
+    GetDataStreamTestStub testStub =
+        new GetDataStreamTestStub(new TestGetDataStreamRequestObserver());
+    GrpcGetDataStream getDataStream = createGetDataStream(testStub);
+    // These will block until they are successfully sent.
+    CompletableFuture<Windmill.KeyedGetDataResponse> sendFuture =
+        CompletableFuture.supplyAsync(
+            () -> {
+              try {
+                return getDataStream.requestKeyedData(
+                    "computationId",
+                    Windmill.KeyedGetDataRequest.newBuilder()
+                        .setKey(ByteString.EMPTY)
+                        .setShardingKey(1)
+                        .setCacheToken(1)
+                        .setWorkToken(1)
+                        .build());
+              } catch (WindmillStreamShutdownException e) {
+                throw new RuntimeException(e);
+              }
+            });
+
+    // Sleep a bit to allow future to run.
+    Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);

Review Comment:
   ditto less



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequestsTest.java:
##########
@@ -142,7 +144,8 @@ public void 
testQueuedBatch_notifyFailed_throwsWindmillStreamShutdownExceptionOn
                 assertThrows(
                     WindmillStreamShutdownException.class,
                     queuedBatch::waitForSendOrFailNotification));
-
+    // Wait a few seconds for the above future to get scheduled and run.
+    Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);

Review Comment:
   ditto how about 100ms



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to