scwhittle commented on code in PR #34283: URL: https://github.com/apache/beam/pull/34283#discussion_r2016118734
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -244,27 +244,28 @@ protected final void executeSafely(Runnable runnable) { * * @implNote This is sent asynchronously via an executor to minimize blocking. Messages are sent * serially. If we recently sent a message before we attempt to schedule the health check, the - * stream has been restarted/closed, there is an active health check that hasn't completed due - * to flow control/pushback or there was a more recent send by the time we enter the - * synchronized block, we skip the attempt to send scheduled the health check. + * stream has been restarted/closed, there is a scheduled health check that hasn't completed + * or there was a more recent send by the time we enter the synchronized block, we skip the + * attempt to send scheduled the health check. */ - public final void maybeSendHealthCheck(Instant lastSendThreshold) { - if (debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis() && !isHealthCheckActive) { + public final void maybeScheduleHealthCheck(Instant lastSendThreshold) { + if (debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis() + && isHealthCheckScheduled.compareAndSet(false, true)) { // Don't block other streams when sending health check. executeSafely( () -> { synchronized (this) { - if (!isHealthCheckActive - && !clientClosed + if (!clientClosed && debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()) { - isHealthCheckActive = true; try { sendHealthCheck(); } catch (Exception e) { logger.debug("Received exception sending health check.", e); } - isHealthCheckActive = false; } + + // Ready to send another health check after we attempt the scheduled health check. + isHealthCheckScheduled.set(false); Review Comment: how about showing if health check is scheduled in the appendSummaryHtml? ########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java: ########## @@ -84,68 +85,60 @@ public void testShutdown_notBlockedBySend() throws InterruptedException, Executi } @Test - public void testMaybeSendHealthCheck() throws InterruptedException, ExecutionException { - TestCallStreamObserver callStreamObserver = new TestCallStreamObserver(); + public void testMaybeScheduleHealthCheck() { + TestCallStreamObserver callStreamObserver = + new TestCallStreamObserver(/* waitForSend= */ false); Function<StreamObserver<Integer>, StreamObserver<Integer>> clientFactory = ignored -> callStreamObserver; TestStream testStream = newStream(clientFactory); testStream.start(); - ExecutorService sendExecutor = Executors.newSingleThreadExecutor(); Instant reportingThreshold = Instant.now().minus(Duration.millis(1)); - Future<?> sendFuture = - sendExecutor.submit(() -> testStream.maybeSendHealthCheck(reportingThreshold)); - - // Sleep a bit to give sendExecutor time to execute the send(). - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - - callStreamObserver.unblockSend(); - - // Make sure the future completes. - sendFuture.get(); + testStream.maybeScheduleHealthCheck(reportingThreshold); + testStream.waitForHealthChecks(1); assertThat(testStream.numHealthChecks.get()).isEqualTo(1); testStream.shutdown(); } @Test - public void testMaybeSendHealthCheck_doesNotSendIfLastSendLessThanThreshold() - throws ExecutionException, InterruptedException { - TestCallStreamObserver callStreamObserver = new TestCallStreamObserver(); + public void testMaybeSendHealthCheck_doesNotSendIfLastScheduleLessThanThreshold() { + TestCallStreamObserver callStreamObserver = + new TestCallStreamObserver(/* waitForSend= */ false); Function<StreamObserver<Integer>, StreamObserver<Integer>> clientFactory = ignored -> callStreamObserver; TestStream testStream = newStream(clientFactory); testStream.start(); - ExecutorService sendExecutor = Executors.newSingleThreadExecutor(); - Future<?> sendFuture = - sendExecutor.submit( - () -> { - try { - testStream.trySend(1); - } catch (WindmillStreamShutdownException e) { - throw new RuntimeException(e); - } - - // Sleep a bit to give sendExecutor time to execute the send(). - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - - // Set a really long reporting threshold. - Instant reportingThreshold = Instant.now().minus(Duration.standardHours(1)); - // Should not send health checks since we just sent the above message. - testStream.maybeSendHealthCheck(reportingThreshold); - testStream.maybeSendHealthCheck(reportingThreshold); - }); - callStreamObserver.unblockSend(); + try { + testStream.trySend(1); + } catch (WindmillStreamShutdownException e) { + throw new RuntimeException(e); + } + + // Sleep a bit to give sendExecutor time to execute the send(). + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + + // Set a really long reporting threshold. + Instant reportingThreshold = Instant.now().minus(Duration.standardHours(1)); + + // Should not send health checks since we just sent the above message. + testStream.maybeScheduleHealthCheck(reportingThreshold); + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + testStream.maybeScheduleHealthCheck(reportingThreshold); + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + + callStreamObserver.waitForSend(); + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); Review Comment: // Sleep just to ensure a aysnc health check doesn't show up ########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java: ########## @@ -197,18 +202,37 @@ protected void shutdownInternal() {} } private static class TestCallStreamObserver extends CallStreamObserver<Integer> { + private static final Logger LOG = LoggerFactory.getLogger(AbstractWindmillStreamTest.class); private final CountDownLatch sendBlocker = new CountDownLatch(1); + private final boolean waitForSend; + + private TestCallStreamObserver(boolean waitForSend) { + this.waitForSend = waitForSend; + } + private void unblockSend() { sendBlocker.countDown(); } - @Override - public void onNext(Integer integer) { + private void waitForSend() { try { - sendBlocker.await(); + int waitedMillis = 0; + while (!sendBlocker.await(100, TimeUnit.MILLISECONDS)) { + waitedMillis += 100; + LOG.info("Waiting from send for {}ms", waitedMillis); Review Comment: I think sendBlocker is being used two different ways and is a little confusing. How about naming this method to waitForSendUnblocked and logging here that you are waiting for send to be unblocked. And then below in onNext instead of decrementing sendblocker, instead you increment a count of number of received and can add a new method to wait for N messages to be sent. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -244,27 +244,28 @@ protected final void executeSafely(Runnable runnable) { * * @implNote This is sent asynchronously via an executor to minimize blocking. Messages are sent * serially. If we recently sent a message before we attempt to schedule the health check, the - * stream has been restarted/closed, there is an active health check that hasn't completed due - * to flow control/pushback or there was a more recent send by the time we enter the - * synchronized block, we skip the attempt to send scheduled the health check. + * stream has been restarted/closed, there is a scheduled health check that hasn't completed + * or there was a more recent send by the time we enter the synchronized block, we skip the + * attempt to send scheduled the health check. */ - public final void maybeSendHealthCheck(Instant lastSendThreshold) { - if (debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis() && !isHealthCheckActive) { + public final void maybeScheduleHealthCheck(Instant lastSendThreshold) { + if (debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis() + && isHealthCheckScheduled.compareAndSet(false, true)) { // Don't block other streams when sending health check. executeSafely( () -> { synchronized (this) { - if (!isHealthCheckActive - && !clientClosed + if (!clientClosed && debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()) { - isHealthCheckActive = true; try { sendHealthCheck(); } catch (Exception e) { logger.debug("Received exception sending health check.", e); } - isHealthCheckActive = false; } + + // Ready to send another health check after we attempt the scheduled health check. Review Comment: seems like it would be a little safer to move the try to cover all of this lambda so that we are sure to set it back to false. try { sycnrhonized () { ... maybe send } } catch { log } finally { isHealthCheckScheduled.set(false); } ########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java: ########## @@ -84,68 +85,60 @@ public void testShutdown_notBlockedBySend() throws InterruptedException, Executi } @Test - public void testMaybeSendHealthCheck() throws InterruptedException, ExecutionException { - TestCallStreamObserver callStreamObserver = new TestCallStreamObserver(); + public void testMaybeScheduleHealthCheck() { + TestCallStreamObserver callStreamObserver = + new TestCallStreamObserver(/* waitForSend= */ false); Function<StreamObserver<Integer>, StreamObserver<Integer>> clientFactory = ignored -> callStreamObserver; TestStream testStream = newStream(clientFactory); testStream.start(); - ExecutorService sendExecutor = Executors.newSingleThreadExecutor(); Instant reportingThreshold = Instant.now().minus(Duration.millis(1)); - Future<?> sendFuture = - sendExecutor.submit(() -> testStream.maybeSendHealthCheck(reportingThreshold)); - - // Sleep a bit to give sendExecutor time to execute the send(). - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - - callStreamObserver.unblockSend(); - - // Make sure the future completes. - sendFuture.get(); + testStream.maybeScheduleHealthCheck(reportingThreshold); + testStream.waitForHealthChecks(1); assertThat(testStream.numHealthChecks.get()).isEqualTo(1); testStream.shutdown(); } @Test - public void testMaybeSendHealthCheck_doesNotSendIfLastSendLessThanThreshold() - throws ExecutionException, InterruptedException { - TestCallStreamObserver callStreamObserver = new TestCallStreamObserver(); + public void testMaybeSendHealthCheck_doesNotSendIfLastScheduleLessThanThreshold() { + TestCallStreamObserver callStreamObserver = + new TestCallStreamObserver(/* waitForSend= */ false); Function<StreamObserver<Integer>, StreamObserver<Integer>> clientFactory = ignored -> callStreamObserver; TestStream testStream = newStream(clientFactory); testStream.start(); - ExecutorService sendExecutor = Executors.newSingleThreadExecutor(); - Future<?> sendFuture = - sendExecutor.submit( - () -> { - try { - testStream.trySend(1); - } catch (WindmillStreamShutdownException e) { - throw new RuntimeException(e); - } - - // Sleep a bit to give sendExecutor time to execute the send(). - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - - // Set a really long reporting threshold. - Instant reportingThreshold = Instant.now().minus(Duration.standardHours(1)); - // Should not send health checks since we just sent the above message. - testStream.maybeSendHealthCheck(reportingThreshold); - testStream.maybeSendHealthCheck(reportingThreshold); - }); - callStreamObserver.unblockSend(); + try { + testStream.trySend(1); + } catch (WindmillStreamShutdownException e) { + throw new RuntimeException(e); + } + + // Sleep a bit to give sendExecutor time to execute the send(). Review Comment: doesn't trySend send inline? can this be removed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org