This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 102409d [Tests] Fix flaky test GracefulExecutorServicesShutdownTest
(#10599)
102409d is described below
commit 102409d085c8133a88b75d3d5e840d4df18ba658
Author: Lari Hotari <[email protected]>
AuthorDate: Sun May 16 11:48:34 2021 +0300
[Tests] Fix flaky test GracefulExecutorServicesShutdownTest (#10599)
- fix race condition in test by adding a CountDownLatch
to verify that execution has entered the awaitTermination
method before the future is cancelled
---
.../service/GracefulExecutorServicesShutdownTest.java | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java
index 726f37d..a784608 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GracefulExecutorServicesShutdownTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -120,25 +121,28 @@ public class GracefulExecutorServicesShutdownTest {
@Test
- public void shouldTerminateWhenFutureIsCancelled() throws
InterruptedException {
+ public void shouldTerminateWhenFutureIsCancelled() throws
InterruptedException, ExecutionException {
// given
GracefulExecutorServicesShutdown shutdown =
GracefulExecutorServicesShutdown.initiate();
shutdown.timeout(Duration.ofMillis(15000));
ExecutorService executorService = mock(ExecutorService.class);
when(executorService.isShutdown()).thenReturn(true);
AtomicBoolean terminated = new AtomicBoolean();
- AtomicBoolean awaitTerminationInterrupted = new AtomicBoolean();
+ CompletableFuture<Boolean> awaitTerminationInterrupted = new
CompletableFuture<>();
when(executorService.isTerminated()).thenAnswer(invocation ->
terminated.get());
+ CountDownLatch awaitingTerminationEntered = new CountDownLatch(1);
when(executorService.awaitTermination(anyLong(),
any())).thenAnswer(invocation -> {
long timeout = invocation.getArgument(0);
TimeUnit unit = invocation.getArgument(1);
+ awaitingTerminationEntered.countDown();
try {
Thread.sleep(unit.toMillis(timeout));
} catch (InterruptedException e) {
- awaitTerminationInterrupted.set(true);
+ awaitTerminationInterrupted.complete(true);
Thread.currentThread().interrupt();
throw e;
}
+ awaitTerminationInterrupted.complete(false);
throw new IllegalStateException("Thread.sleep should have been
interrupted");
});
when(executorService.shutdownNow()).thenAnswer(invocation -> {
@@ -149,11 +153,11 @@ public class GracefulExecutorServicesShutdownTest {
// when
shutdown.shutdown(executorService);
CompletableFuture<Void> future = shutdown.handle();
+ awaitingTerminationEntered.await();
future.cancel(false);
// then
- Awaitility.await().untilAsserted(() ->
assertTrue(awaitTerminationInterrupted.get(),
- "awaitTermination should have been interrupted"));
+ assertTrue(awaitTerminationInterrupted.get(), "awaitTermination should
have been interrupted");
verify(executorService, times(1)).awaitTermination(anyLong(), any());
verify(executorService, times(1)).shutdownNow();
}