scwhittle commented on code in PR #34283:
URL: https://github.com/apache/beam/pull/34283#discussion_r2016118734


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -244,27 +244,28 @@ protected final void executeSafely(Runnable runnable) {
    *
    * @implNote This is sent asynchronously via an executor to minimize 
blocking. Messages are sent
    *     serially. If we recently sent a message before we attempt to schedule 
the health check, the
-   *     stream has been restarted/closed, there is an active health check 
that hasn't completed due
-   *     to flow control/pushback or there was a more recent send by the time 
we enter the
-   *     synchronized block, we skip the attempt to send scheduled the health 
check.
+   *     stream has been restarted/closed, there is a scheduled health check 
that hasn't completed
+   *     or there was a more recent send by the time we enter the synchronized 
block, we skip the
+   *     attempt to send scheduled the health check.
    */
-  public final void maybeSendHealthCheck(Instant lastSendThreshold) {
-    if (debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis() && 
!isHealthCheckActive) {
+  public final void maybeScheduleHealthCheck(Instant lastSendThreshold) {
+    if (debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()
+        && isHealthCheckScheduled.compareAndSet(false, true)) {
       // Don't block other streams when sending health check.
       executeSafely(
           () -> {
             synchronized (this) {
-              if (!isHealthCheckActive
-                  && !clientClosed
+              if (!clientClosed
                   && debugMetrics.getLastSendTimeMs() < 
lastSendThreshold.getMillis()) {
-                isHealthCheckActive = true;
                 try {
                   sendHealthCheck();
                 } catch (Exception e) {
                   logger.debug("Received exception sending health check.", e);
                 }
-                isHealthCheckActive = false;
               }
+
+              // Ready to send another health check after we attempt the 
scheduled health check.
+              isHealthCheckScheduled.set(false);

Review Comment:
   how about showing if health check is scheduled in the appendSummaryHtml?



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java:
##########
@@ -84,68 +85,60 @@ public void testShutdown_notBlockedBySend() throws 
InterruptedException, Executi
   }
 
   @Test
-  public void testMaybeSendHealthCheck() throws InterruptedException, 
ExecutionException {
-    TestCallStreamObserver callStreamObserver = new TestCallStreamObserver();
+  public void testMaybeScheduleHealthCheck() {
+    TestCallStreamObserver callStreamObserver =
+        new TestCallStreamObserver(/* waitForSend= */ false);
     Function<StreamObserver<Integer>, StreamObserver<Integer>> clientFactory =
         ignored -> callStreamObserver;
 
     TestStream testStream = newStream(clientFactory);
     testStream.start();
-    ExecutorService sendExecutor = Executors.newSingleThreadExecutor();
     Instant reportingThreshold = Instant.now().minus(Duration.millis(1));
-    Future<?> sendFuture =
-        sendExecutor.submit(() -> 
testStream.maybeSendHealthCheck(reportingThreshold));
-
-    // Sleep a bit to give sendExecutor time to execute the send().
-    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
-
-    callStreamObserver.unblockSend();
-
-    // Make sure the future completes.
-    sendFuture.get();
 
+    testStream.maybeScheduleHealthCheck(reportingThreshold);
+    testStream.waitForHealthChecks(1);
     assertThat(testStream.numHealthChecks.get()).isEqualTo(1);
     testStream.shutdown();
   }
 
   @Test
-  public void testMaybeSendHealthCheck_doesNotSendIfLastSendLessThanThreshold()
-      throws ExecutionException, InterruptedException {
-    TestCallStreamObserver callStreamObserver = new TestCallStreamObserver();
+  public void 
testMaybeSendHealthCheck_doesNotSendIfLastScheduleLessThanThreshold() {
+    TestCallStreamObserver callStreamObserver =
+        new TestCallStreamObserver(/* waitForSend= */ false);
     Function<StreamObserver<Integer>, StreamObserver<Integer>> clientFactory =
         ignored -> callStreamObserver;
 
     TestStream testStream = newStream(clientFactory);
     testStream.start();
-    ExecutorService sendExecutor = Executors.newSingleThreadExecutor();
-    Future<?> sendFuture =
-        sendExecutor.submit(
-            () -> {
-              try {
-                testStream.trySend(1);
-              } catch (WindmillStreamShutdownException e) {
-                throw new RuntimeException(e);
-              }
-
-              // Sleep a bit to give sendExecutor time to execute the send().
-              Uninterruptibles.sleepUninterruptibly(100, 
TimeUnit.MILLISECONDS);
-
-              // Set a really long reporting threshold.
-              Instant reportingThreshold = 
Instant.now().minus(Duration.standardHours(1));
-              // Should not send health checks since we just sent the above 
message.
-              testStream.maybeSendHealthCheck(reportingThreshold);
-              testStream.maybeSendHealthCheck(reportingThreshold);
-            });
 
-    callStreamObserver.unblockSend();
+    try {
+      testStream.trySend(1);
+    } catch (WindmillStreamShutdownException e) {
+      throw new RuntimeException(e);
+    }
+
+    // Sleep a bit to give sendExecutor time to execute the send().
+    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+
+    // Set a really long reporting threshold.
+    Instant reportingThreshold = 
Instant.now().minus(Duration.standardHours(1));
+
+    // Should not send health checks since we just sent the above message.
+    testStream.maybeScheduleHealthCheck(reportingThreshold);
+    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+    testStream.maybeScheduleHealthCheck(reportingThreshold);
+    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+
+    callStreamObserver.waitForSend();
+    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);

Review Comment:
   // Sleep just to ensure a aysnc health check doesn't show up



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java:
##########
@@ -197,18 +202,37 @@ protected void shutdownInternal() {}
   }
 
   private static class TestCallStreamObserver extends 
CallStreamObserver<Integer> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractWindmillStreamTest.class);
     private final CountDownLatch sendBlocker = new CountDownLatch(1);
 
+    private final boolean waitForSend;
+
+    private TestCallStreamObserver(boolean waitForSend) {
+      this.waitForSend = waitForSend;
+    }
+
     private void unblockSend() {
       sendBlocker.countDown();
     }
 
-    @Override
-    public void onNext(Integer integer) {
+    private void waitForSend() {
       try {
-        sendBlocker.await();
+        int waitedMillis = 0;
+        while (!sendBlocker.await(100, TimeUnit.MILLISECONDS)) {
+          waitedMillis += 100;
+          LOG.info("Waiting from send for {}ms", waitedMillis);

Review Comment:
   I think sendBlocker is being used two different ways and is a little 
confusing.
   
   How about naming this method to waitForSendUnblocked and logging here that 
you are waiting for send to be unblocked.
   And then below in onNext instead of decrementing sendblocker, instead you 
increment a count of number of received and can add a new method to wait for N 
messages to be sent.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -244,27 +244,28 @@ protected final void executeSafely(Runnable runnable) {
    *
    * @implNote This is sent asynchronously via an executor to minimize 
blocking. Messages are sent
    *     serially. If we recently sent a message before we attempt to schedule 
the health check, the
-   *     stream has been restarted/closed, there is an active health check 
that hasn't completed due
-   *     to flow control/pushback or there was a more recent send by the time 
we enter the
-   *     synchronized block, we skip the attempt to send scheduled the health 
check.
+   *     stream has been restarted/closed, there is a scheduled health check 
that hasn't completed
+   *     or there was a more recent send by the time we enter the synchronized 
block, we skip the
+   *     attempt to send scheduled the health check.
    */
-  public final void maybeSendHealthCheck(Instant lastSendThreshold) {
-    if (debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis() && 
!isHealthCheckActive) {
+  public final void maybeScheduleHealthCheck(Instant lastSendThreshold) {
+    if (debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()
+        && isHealthCheckScheduled.compareAndSet(false, true)) {
       // Don't block other streams when sending health check.
       executeSafely(
           () -> {
             synchronized (this) {
-              if (!isHealthCheckActive
-                  && !clientClosed
+              if (!clientClosed
                   && debugMetrics.getLastSendTimeMs() < 
lastSendThreshold.getMillis()) {
-                isHealthCheckActive = true;
                 try {
                   sendHealthCheck();
                 } catch (Exception e) {
                   logger.debug("Received exception sending health check.", e);
                 }
-                isHealthCheckActive = false;
               }
+
+              // Ready to send another health check after we attempt the 
scheduled health check.

Review Comment:
   seems like it would be a little safer to move the try to cover all of this 
lambda
   so that we are sure to set it back to false.
   
   try {
     sycnrhonized () { ... maybe send }
   } catch {
     log
   } finally  {
     isHealthCheckScheduled.set(false);
   }



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java:
##########
@@ -84,68 +85,60 @@ public void testShutdown_notBlockedBySend() throws 
InterruptedException, Executi
   }
 
   @Test
-  public void testMaybeSendHealthCheck() throws InterruptedException, 
ExecutionException {
-    TestCallStreamObserver callStreamObserver = new TestCallStreamObserver();
+  public void testMaybeScheduleHealthCheck() {
+    TestCallStreamObserver callStreamObserver =
+        new TestCallStreamObserver(/* waitForSend= */ false);
     Function<StreamObserver<Integer>, StreamObserver<Integer>> clientFactory =
         ignored -> callStreamObserver;
 
     TestStream testStream = newStream(clientFactory);
     testStream.start();
-    ExecutorService sendExecutor = Executors.newSingleThreadExecutor();
     Instant reportingThreshold = Instant.now().minus(Duration.millis(1));
-    Future<?> sendFuture =
-        sendExecutor.submit(() -> 
testStream.maybeSendHealthCheck(reportingThreshold));
-
-    // Sleep a bit to give sendExecutor time to execute the send().
-    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
-
-    callStreamObserver.unblockSend();
-
-    // Make sure the future completes.
-    sendFuture.get();
 
+    testStream.maybeScheduleHealthCheck(reportingThreshold);
+    testStream.waitForHealthChecks(1);
     assertThat(testStream.numHealthChecks.get()).isEqualTo(1);
     testStream.shutdown();
   }
 
   @Test
-  public void testMaybeSendHealthCheck_doesNotSendIfLastSendLessThanThreshold()
-      throws ExecutionException, InterruptedException {
-    TestCallStreamObserver callStreamObserver = new TestCallStreamObserver();
+  public void 
testMaybeSendHealthCheck_doesNotSendIfLastScheduleLessThanThreshold() {
+    TestCallStreamObserver callStreamObserver =
+        new TestCallStreamObserver(/* waitForSend= */ false);
     Function<StreamObserver<Integer>, StreamObserver<Integer>> clientFactory =
         ignored -> callStreamObserver;
 
     TestStream testStream = newStream(clientFactory);
     testStream.start();
-    ExecutorService sendExecutor = Executors.newSingleThreadExecutor();
-    Future<?> sendFuture =
-        sendExecutor.submit(
-            () -> {
-              try {
-                testStream.trySend(1);
-              } catch (WindmillStreamShutdownException e) {
-                throw new RuntimeException(e);
-              }
-
-              // Sleep a bit to give sendExecutor time to execute the send().
-              Uninterruptibles.sleepUninterruptibly(100, 
TimeUnit.MILLISECONDS);
-
-              // Set a really long reporting threshold.
-              Instant reportingThreshold = 
Instant.now().minus(Duration.standardHours(1));
-              // Should not send health checks since we just sent the above 
message.
-              testStream.maybeSendHealthCheck(reportingThreshold);
-              testStream.maybeSendHealthCheck(reportingThreshold);
-            });
 
-    callStreamObserver.unblockSend();
+    try {
+      testStream.trySend(1);
+    } catch (WindmillStreamShutdownException e) {
+      throw new RuntimeException(e);
+    }
+
+    // Sleep a bit to give sendExecutor time to execute the send().

Review Comment:
   doesn't trySend send inline? can this be removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to