m-trieu commented on code in PR #34283: URL: https://github.com/apache/beam/pull/34283#discussion_r2023477433
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -244,27 +244,28 @@ protected final void executeSafely(Runnable runnable) { * * @implNote This is sent asynchronously via an executor to minimize blocking. Messages are sent * serially. If we recently sent a message before we attempt to schedule the health check, the - * stream has been restarted/closed, there is an active health check that hasn't completed due - * to flow control/pushback or there was a more recent send by the time we enter the - * synchronized block, we skip the attempt to send scheduled the health check. + * stream has been restarted/closed, there is a scheduled health check that hasn't completed + * or there was a more recent send by the time we enter the synchronized block, we skip the + * attempt to send scheduled the health check. */ - public final void maybeSendHealthCheck(Instant lastSendThreshold) { - if (debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis() && !isHealthCheckActive) { + public final void maybeScheduleHealthCheck(Instant lastSendThreshold) { + if (debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis() + && isHealthCheckScheduled.compareAndSet(false, true)) { // Don't block other streams when sending health check. executeSafely( () -> { synchronized (this) { - if (!isHealthCheckActive - && !clientClosed + if (!clientClosed && debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()) { - isHealthCheckActive = true; try { sendHealthCheck(); } catch (Exception e) { logger.debug("Received exception sending health check.", e); } - isHealthCheckActive = false; } + + // Ready to send another health check after we attempt the scheduled health check. Review Comment: done ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -244,27 +244,28 @@ protected final void executeSafely(Runnable runnable) { * * @implNote This is sent asynchronously via an executor to minimize blocking. Messages are sent * serially. If we recently sent a message before we attempt to schedule the health check, the - * stream has been restarted/closed, there is an active health check that hasn't completed due - * to flow control/pushback or there was a more recent send by the time we enter the - * synchronized block, we skip the attempt to send scheduled the health check. + * stream has been restarted/closed, there is a scheduled health check that hasn't completed + * or there was a more recent send by the time we enter the synchronized block, we skip the + * attempt to send scheduled the health check. */ - public final void maybeSendHealthCheck(Instant lastSendThreshold) { - if (debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis() && !isHealthCheckActive) { + public final void maybeScheduleHealthCheck(Instant lastSendThreshold) { + if (debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis() + && isHealthCheckScheduled.compareAndSet(false, true)) { // Don't block other streams when sending health check. executeSafely( () -> { synchronized (this) { - if (!isHealthCheckActive - && !clientClosed + if (!clientClosed && debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()) { - isHealthCheckActive = true; try { sendHealthCheck(); } catch (Exception e) { logger.debug("Received exception sending health check.", e); } - isHealthCheckActive = false; } + + // Ready to send another health check after we attempt the scheduled health check. + isHealthCheckScheduled.set(false); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org