m-trieu commented on code in PR #34283:
URL: https://github.com/apache/beam/pull/34283#discussion_r2023477433


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -244,27 +244,28 @@ protected final void executeSafely(Runnable runnable) {
    *
    * @implNote This is sent asynchronously via an executor to minimize 
blocking. Messages are sent
    *     serially. If we recently sent a message before we attempt to schedule 
the health check, the
-   *     stream has been restarted/closed, there is an active health check 
that hasn't completed due
-   *     to flow control/pushback or there was a more recent send by the time 
we enter the
-   *     synchronized block, we skip the attempt to send scheduled the health 
check.
+   *     stream has been restarted/closed, there is a scheduled health check 
that hasn't completed
+   *     or there was a more recent send by the time we enter the synchronized 
block, we skip the
+   *     attempt to send scheduled the health check.
    */
-  public final void maybeSendHealthCheck(Instant lastSendThreshold) {
-    if (debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis() && 
!isHealthCheckActive) {
+  public final void maybeScheduleHealthCheck(Instant lastSendThreshold) {
+    if (debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()
+        && isHealthCheckScheduled.compareAndSet(false, true)) {
       // Don't block other streams when sending health check.
       executeSafely(
           () -> {
             synchronized (this) {
-              if (!isHealthCheckActive
-                  && !clientClosed
+              if (!clientClosed
                   && debugMetrics.getLastSendTimeMs() < 
lastSendThreshold.getMillis()) {
-                isHealthCheckActive = true;
                 try {
                   sendHealthCheck();
                 } catch (Exception e) {
                   logger.debug("Received exception sending health check.", e);
                 }
-                isHealthCheckActive = false;
               }
+
+              // Ready to send another health check after we attempt the 
scheduled health check.

Review Comment:
   done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -244,27 +244,28 @@ protected final void executeSafely(Runnable runnable) {
    *
    * @implNote This is sent asynchronously via an executor to minimize 
blocking. Messages are sent
    *     serially. If we recently sent a message before we attempt to schedule 
the health check, the
-   *     stream has been restarted/closed, there is an active health check 
that hasn't completed due
-   *     to flow control/pushback or there was a more recent send by the time 
we enter the
-   *     synchronized block, we skip the attempt to send scheduled the health 
check.
+   *     stream has been restarted/closed, there is a scheduled health check 
that hasn't completed
+   *     or there was a more recent send by the time we enter the synchronized 
block, we skip the
+   *     attempt to send scheduled the health check.
    */
-  public final void maybeSendHealthCheck(Instant lastSendThreshold) {
-    if (debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis() && 
!isHealthCheckActive) {
+  public final void maybeScheduleHealthCheck(Instant lastSendThreshold) {
+    if (debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()
+        && isHealthCheckScheduled.compareAndSet(false, true)) {
       // Don't block other streams when sending health check.
       executeSafely(
           () -> {
             synchronized (this) {
-              if (!isHealthCheckActive
-                  && !clientClosed
+              if (!clientClosed
                   && debugMetrics.getLastSendTimeMs() < 
lastSendThreshold.getMillis()) {
-                isHealthCheckActive = true;
                 try {
                   sendHealthCheck();
                 } catch (Exception e) {
                   logger.debug("Received exception sending health check.", e);
                 }
-                isHealthCheckActive = false;
               }
+
+              // Ready to send another health check after we attempt the 
scheduled health check.
+              isHealthCheckScheduled.set(false);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to