This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 102409d [Tests] Fix flaky test GracefulExecutorServicesShutdownTest (#10599) 102409d is described below commit 102409d085c8133a88b75d3d5e840d4df18ba658 Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Sun May 16 11:48:34 2021 +0300 [Tests] Fix flaky test GracefulExecutorServicesShutdownTest (#10599) - fix race condition in test by adding a CountDownLatch to verify that execution has entered the awaitTermination method before the future is cancelled --- .../service/GracefulExecutorServicesShutdownTest.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java index 726f37d..a784608 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertTrue; import java.time.Duration; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -120,25 +121,28 @@ public class GracefulExecutorServicesShutdownTest { @Test - public void shouldTerminateWhenFutureIsCancelled() throws InterruptedException { + public void shouldTerminateWhenFutureIsCancelled() throws InterruptedException, ExecutionException { // given GracefulExecutorServicesShutdown shutdown = GracefulExecutorServicesShutdown.initiate(); shutdown.timeout(Duration.ofMillis(15000)); ExecutorService executorService = mock(ExecutorService.class); when(executorService.isShutdown()).thenReturn(true); AtomicBoolean terminated = new AtomicBoolean(); - AtomicBoolean awaitTerminationInterrupted = new AtomicBoolean(); + CompletableFuture<Boolean> awaitTerminationInterrupted = new CompletableFuture<>(); when(executorService.isTerminated()).thenAnswer(invocation -> terminated.get()); + CountDownLatch awaitingTerminationEntered = new CountDownLatch(1); when(executorService.awaitTermination(anyLong(), any())).thenAnswer(invocation -> { long timeout = invocation.getArgument(0); TimeUnit unit = invocation.getArgument(1); + awaitingTerminationEntered.countDown(); try { Thread.sleep(unit.toMillis(timeout)); } catch (InterruptedException e) { - awaitTerminationInterrupted.set(true); + awaitTerminationInterrupted.complete(true); Thread.currentThread().interrupt(); throw e; } + awaitTerminationInterrupted.complete(false); throw new IllegalStateException("Thread.sleep should have been interrupted"); }); when(executorService.shutdownNow()).thenAnswer(invocation -> { @@ -149,11 +153,11 @@ public class GracefulExecutorServicesShutdownTest { // when shutdown.shutdown(executorService); CompletableFuture<Void> future = shutdown.handle(); + awaitingTerminationEntered.await(); future.cancel(false); // then - Awaitility.await().untilAsserted(() -> assertTrue(awaitTerminationInterrupted.get(), - "awaitTermination should have been interrupted")); + assertTrue(awaitTerminationInterrupted.get(), "awaitTermination should have been interrupted"); verify(executorService, times(1)).awaitTermination(anyLong(), any()); verify(executorService, times(1)).shutdownNow(); }