[
https://issues.apache.org/jira/browse/GEODE-4239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16323251#comment-16323251
]
ASF GitHub Bot commented on GEODE-4239:
---------------------------------------
kirklund closed pull request #1244: GEODE-4239: refactor tests to use new
ExecutorServiceRule
URL: https://github.com/apache/geode/pull/1244
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/DLockServiceLeakTest.java
b/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/DLockServiceLeakTest.java
index 57ec1492d8..a63ddba294 100644
---
a/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/DLockServiceLeakTest.java
+++
b/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/DLockServiceLeakTest.java
@@ -23,8 +23,6 @@
import java.util.LinkedList;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -32,6 +30,7 @@
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -43,13 +42,18 @@
import org.apache.geode.cache.Scope;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
@Category(IntegrationTest.class)
public class DLockServiceLeakTest {
+
private Cache cache;
- private ExecutorService executorService;
private DistributedRegion testRegion;
+ @Rule
+ public ExecutorServiceRule executorServiceRule =
+ ExecutorServiceRule.builder().threadCount(5).build();
+
@Before
public void setUp() {
Properties properties = new Properties();
@@ -61,14 +65,11 @@ public void setUp() {
.setEntryTimeToLive(new ExpirationAttributes(1,
ExpirationAction.DESTROY))
.create("testRegion");
testRegion.becomeLockGrantor();
-
- executorService = Executors.newFixedThreadPool(5);
}
@After
public void tearDown() {
cache.close();
- executorService.shutdownNow();
}
@Test
@@ -76,25 +77,25 @@ public void basicDLockUsage() throws InterruptedException,
ExecutionException, T
Lock lock = testRegion.getDistributedLock("testLockName");
lock.lockInterruptibly();
- Future<Boolean> future = executorService.submit(() -> lock.tryLock());
+ Future<Boolean> future = executorServiceRule.submit(() -> lock.tryLock());
assertFalse("should not be able to get lock from another thread",
future.get(5, TimeUnit.SECONDS));
assertTrue("Lock is reentrant", lock.tryLock());
// now locked twice.
- future = executorService.submit(() -> lock.tryLock());
+ future = executorServiceRule.submit(() -> lock.tryLock());
assertFalse("should not be able to get lock from another thread",
future.get(5, TimeUnit.SECONDS));
lock.unlock();
- future = executorService.submit(() -> lock.tryLock());
+ future = executorServiceRule.submit(() -> lock.tryLock());
assertFalse("should not be able to get lock from another thread",
future.get());
lock.unlock();
- future = executorService.submit(() -> {
+ future = executorServiceRule.submit(() -> {
boolean locked = lock.tryLock();
if (!locked) {
return false;
@@ -130,7 +131,7 @@ public void singleThreadWithCache() {
public void multipleThreadsWithCache() {
LinkedList<Future> futures = new LinkedList<>();
for (int i = 0; i < 5; i++) {
- futures.add(executorService.submit(this::putTestKey));
+ futures.add(executorServiceRule.submit(this::putTestKey));
}
futures.forEach(future -> {
diff --git
a/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java
index ed26541f93..fa9fcd6688 100755
---
a/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/messenger/GMSEncryptJUnitTest.java
@@ -14,7 +14,8 @@
*/
package org.apache.geode.distributed.internal.membership.gms.messenger;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.math.BigInteger;
import java.security.Key;
@@ -25,8 +26,6 @@
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import javax.crypto.KeyAgreement;
import javax.crypto.Mac;
@@ -34,6 +33,7 @@
import javax.crypto.spec.DHParameterSpec;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -47,16 +47,23 @@
import org.apache.geode.internal.admin.remote.RemoteTransportConfig;
import org.apache.geode.test.junit.categories.IntegrationTest;
import org.apache.geode.test.junit.categories.MembershipTest;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
@Category({IntegrationTest.class, MembershipTest.class})
public class GMSEncryptJUnitTest {
+ private static final int THREAD_COUNT = 20;
+
Services services;
InternalDistributedMember mockMembers[];
NetView netView;
+ @Rule
+ public ExecutorServiceRule executorServiceRule =
+ ExecutorServiceRule.builder().threadCount(THREAD_COUNT).build();
+
private void initMocks() throws Exception {
initMocks("AES:128");
}
@@ -148,12 +155,10 @@ public void
testOneMemberCanDecryptAnothersMessageMultithreaded() throws Excepti
gmsEncrypt1.installView(netView, mockMembers[1]);
gmsEncrypt2.installView(netView, mockMembers[2]);
- int nthreads = 20;
- ExecutorService executorService = Executors.newFixedThreadPool(nthreads);
- final CountDownLatch countDownLatch = new CountDownLatch(nthreads);
+ final CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
- for (int j = 0; j < nthreads; j++)
- executorService.execute(new Runnable() {
+ for (int j = 0; j < THREAD_COUNT; j++)
+ executorServiceRule.execute(new Runnable() {
public void run() {
// sender encrypts a message, so use receiver's public key
try {
@@ -196,9 +201,6 @@ public void run() {
});
countDownLatch.await();
- executorService.shutdown();
-
-
}
@Test
@@ -339,12 +341,10 @@ public void
testForClusterSecretKeyFromOtherMemberMultipleThreads() throws Excep
gmsEncrypt2.installView(netView, mockMembers[1]);
final int runs = 100000;
- int nthreads = 20;
- ExecutorService executorService = Executors.newFixedThreadPool(nthreads);
- final CountDownLatch countDownLatch = new CountDownLatch(nthreads);
+ final CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
- for (int j = 0; j < nthreads; j++)
- executorService.execute(new Runnable() {
+ for (int j = 0; j < THREAD_COUNT; j++)
+ executorServiceRule.execute(new Runnable() {
public void run() {
// sender encrypts a message, so use receiver's public key
try {
@@ -388,7 +388,6 @@ public void run() {
});
countDownLatch.await();
- executorService.shutdown();
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/InterruptDiskJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/InterruptDiskJUnitTest.java
index 377981a502..18a9076807 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/InterruptDiskJUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/InterruptDiskJUnitTest.java
@@ -14,18 +14,26 @@
*/
package org.apache.geode.internal.cache;
-import static org.apache.geode.distributed.ConfigurationProperties.*;
+import static
org.apache.geode.distributed.ConfigurationProperties.ENABLE_TIME_STATISTICS;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static
org.apache.geode.distributed.ConfigurationProperties.STATISTIC_ARCHIVE_FILE;
+import static
org.apache.geode.distributed.ConfigurationProperties.STATISTIC_SAMPLING_ENABLED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.File;
import java.util.Properties;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -35,6 +43,7 @@
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
/**
* Test of interrupting threads doing disk writes to see the effect.
@@ -48,9 +57,11 @@
private DistributedSystem ds;
private Cache cache;
private Region<Object, Object> region;
- private ExecutorService ex;
private AtomicLong nextValue = new AtomicLong();
+ @Rule
+ public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
@Test
@Ignore
public void testLoop() throws Throwable {
@@ -81,14 +92,12 @@ public void setUp() {
.create("store");
region = cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT)
.setDiskStoreName("store").create("region");
- ex = Executors.newSingleThreadExecutor();
}
@After
public void tearDown() {
ds.disconnect();
- ex.shutdownNow();
}
@@ -110,7 +119,7 @@ public Object call() {
}
};
- Future result = ex.submit(doPuts);
+ Future result = executorServiceRule.submit(doPuts);
Thread.sleep(50);
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorTest.java
index 0647d619ef..cb638b85ac 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorTest.java
@@ -33,14 +33,9 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -55,25 +50,17 @@
import org.apache.geode.internal.cache.persistence.DiskStoreID;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.junit.categories.UnitTest;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
@Category(UnitTest.class)
public class RegionVersionVectorTest {
- private ExecutorService executorService;
+ @Rule
+ public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
@Rule
public ExpectedException expectedException = ExpectedException.none();
- @Before
- public void setUp() throws Exception {
- executorService = Executors.newSingleThreadExecutor();
- }
-
- @After
- public void tearDown() throws Exception {
- assertThat(executorService.shutdownNow()).isEmpty();
- }
-
@Test
public void testExceptionsWithContains() {
DiskStoreID ownerId = new DiskStoreID(0, 0);
@@ -627,8 +614,7 @@ public void doesNotHangIfOtherThreadChangedVersion() throws
Exception {
long newVersion = 2;
RegionVersionVector rvv = new
VersionRaceConditionRegionVersionVector(ownerId, oldVersion);
- Future<Void> result =
- CompletableFuture.runAsync(() -> rvv.updateLocalVersion(newVersion),
executorService);
+ Future<Void> result = executorServiceRule.runAsync(() ->
rvv.updateLocalVersion(newVersion));
assertThatCode(() -> result.get(2, SECONDS)).doesNotThrowAnyException();
assertThat(rvv.getVersionForMember(ownerId)).isEqualTo(newVersion);
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/process/BlockingProcessStreamReaderWindowsTest.java
b/geode-core/src/test/java/org/apache/geode/internal/process/BlockingProcessStreamReaderWindowsTest.java
index 60c94e4086..30e6df45c9 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/process/BlockingProcessStreamReaderWindowsTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/process/BlockingProcessStreamReaderWindowsTest.java
@@ -17,22 +17,20 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.geode.internal.lang.SystemUtils.isWindows;
import static
org.apache.geode.internal.process.ProcessStreamReader.ReadingMode.BLOCKING;
-import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assume.assumeTrue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
-import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.internal.process.ProcessStreamReader.ReadingMode;
import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
/**
* Functional integration test {@link #hangsOnWindows} for
BlockingProcessStreamReader which
@@ -52,20 +50,12 @@
/** Timeout to confirm hang on Windows */
private static final int HANG_TIMEOUT_SECONDS = 10;
- private ExecutorService futures;
+ @Rule
+ public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
@Before
public void setUp() throws Exception {
assumeTrue(isWindows());
-
- futures = Executors.newSingleThreadExecutor();
- }
-
- @After
- public void tearDown() throws Exception {
- if (futures != null) {
- assertThat(futures.shutdownNow()).isEmpty();
- }
}
@Test
@@ -74,7 +64,7 @@ public void hangsOnWindows() throws Exception {
givenRunningProcessWithStreamReaders(ProcessSleeps.class);
// act
- Future<Boolean> future = futures.submit(() -> {
+ Future<Boolean> future = executorServiceRule.submit(() -> {
process.getOutputStream().close();
process.getErrorStream().close();
process.getInputStream().close();
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/process/FileProcessControllerIntegrationTest.java
b/geode-core/src/test/java/org/apache/geode/internal/process/FileProcessControllerIntegrationTest.java
index 80d267c69f..045c53a3ba 100755
---
a/geode-core/src/test/java/org/apache/geode/internal/process/FileProcessControllerIntegrationTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/process/FileProcessControllerIntegrationTest.java
@@ -24,14 +24,11 @@
import static org.mockito.Mockito.when;
import java.io.File;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
-import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
@@ -50,6 +47,7 @@
import org.apache.geode.internal.process.io.StringFileWriter;
import org.apache.geode.test.junit.categories.FlakyTest;
import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
/**
* Integration tests for {@link FileProcessController}.
@@ -71,7 +69,9 @@
private File stopRequestFile;
private int pid;
private FileControllerParameters params;
- private ExecutorService executor;
+
+ @Rule
+ public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
@Rule
public ErrorCollector errorCollector = new ErrorCollector();
@@ -97,13 +97,6 @@ public void setUp() throws Exception {
when(params.getProcessId()).thenReturn(pid);
when(params.getProcessType()).thenReturn(processType);
when(params.getDirectory()).thenReturn(temporaryFolder.getRoot());
-
- executor = Executors.newSingleThreadExecutor();
- }
-
- @After
- public void tearDown() throws Exception {
- assertThat(executor.shutdownNow()).isEmpty();
}
@Test
@@ -126,7 +119,7 @@ public void statusShouldReturnJsonFromStatusFile() throws
Exception {
FileProcessController controller = new FileProcessController(params, pid,
2, MINUTES);
// when: status is called in one thread
- executor.execute(() -> {
+ executorServiceRule.execute(() -> {
try {
statusRef.set(controller.status());
} catch (Exception e) {
@@ -154,7 +147,7 @@ public void emptyStatusFileCausesStatusToHang() throws
Exception {
FileProcessController controller = new FileProcessController(params, pid,
2, MINUTES);
// when: status is called in one thread
- executor.execute(() -> {
+ executorServiceRule.execute(() -> {
try {
statusRef.set(controller.status());
} catch (Exception e) {
@@ -203,7 +196,7 @@ public void
status_withStatusRequestFileExists_doesNotFail() throws Exception {
assertThat(statusRequestFile.createNewFile()).isTrue();
// act
- executor.execute(() -> {
+ executorServiceRule.execute(() -> {
try {
statusRef.set(controller.status());
} catch (Exception e) {
@@ -224,7 +217,7 @@ public void statusCreatesStatusRequestFile() throws
Exception {
FileProcessController controller = new FileProcessController(params, pid,
2, MINUTES);
// act
- executor.execute(() -> {
+ executorServiceRule.execute(() -> {
try {
assertThatThrownBy(() -> statusRef.set(controller.status()))
.isInstanceOf(InterruptedException.class);
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/process/PidFileIntegrationTest.java
b/geode-core/src/test/java/org/apache/geode/internal/process/PidFileIntegrationTest.java
index 6377038b3f..c3992dc077 100755
---
a/geode-core/src/test/java/org/apache/geode/internal/process/PidFileIntegrationTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/process/PidFileIntegrationTest.java
@@ -20,10 +20,7 @@
import java.io.File;
import java.io.FileNotFoundException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -45,7 +42,6 @@
private File directory;
private File pidFile;
private String pidFileName;
- private ExecutorService futures;
private int pid;
@Rule
@@ -56,15 +52,9 @@ public void before() throws Exception {
directory = temporaryFolder.getRoot();
pidFile = new File(directory, "pid.txt");
pidFileName = pidFile.getName();
- futures = Executors.newFixedThreadPool(2);
pid = identifyPid();
}
- @After
- public void after() {
- assertThat(this.futures.shutdownNow()).isEmpty();
- }
-
@Test
public void readsIntFromFile() throws Exception {
// arrange
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/util/AbortableTaskServiceJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/util/AbortableTaskServiceJUnitTest.java
index 346c582714..ff489bb901 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/util/AbortableTaskServiceJUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/util/AbortableTaskServiceJUnitTest.java
@@ -14,7 +14,8 @@
*/
package org.apache.geode.internal.util;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
@@ -22,7 +23,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -30,11 +30,13 @@
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.internal.util.AbortableTaskService.AbortableTask;
import org.apache.geode.test.junit.categories.UnitTest;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
@Category(UnitTest.class)
public class AbortableTaskServiceJUnitTest {
@@ -42,14 +44,15 @@
private static final long TIMEOUT_SECONDS = 10;
private volatile CountDownLatch delay;
- private ExecutorService futures;
private AbortableTaskService tasks;
+ @Rule
+ public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
@Before
public void setUp() {
this.delay = new CountDownLatch(1);
this.tasks = new AbortableTaskService(Executors.newSingleThreadExecutor());
- this.futures = Executors.newSingleThreadExecutor();
}
@After
@@ -58,7 +61,6 @@ public void tearDown() {
this.delay.countDown();
}
this.tasks.abortAll();
- assertTrue(this.futures.shutdownNow().isEmpty());
}
@Test
@@ -66,7 +68,7 @@ public void testExecute() throws Exception {
DelayedTask dt = new DelayedTask();
this.tasks.execute(dt);
- Future<Boolean> future = this.futures.submit(new Callable<Boolean>() {
+ Future<Boolean> future = executorServiceRule.submit(new
Callable<Boolean>() {
@Override
public Boolean call() {
tasks.waitForCompletion();
@@ -87,7 +89,7 @@ public void testAbortDuringExecute() throws Exception {
DelayedTask dt = new DelayedTask();
this.tasks.execute(dt);
- Future<Boolean> future = this.futures.submit(new Callable<Boolean>() {
+ Future<Boolean> future = executorServiceRule.submit(new
Callable<Boolean>() {
@Override
public Boolean call() {
tasks.waitForCompletion();
@@ -118,7 +120,7 @@ public void testAbortBeforeExecute() throws Exception {
this.tasks.execute(dt);
this.tasks.execute(dt2);
- Future<Boolean> future = this.futures.submit(new Callable<Boolean>() {
+ Future<Boolean> future = executorServiceRule.submit(new
Callable<Boolean>() {
@Override
public Boolean call() {
tasks.waitForCompletion();
diff --git
a/geode-core/src/test/java/org/apache/geode/test/process/MainLauncherJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/test/process/MainLauncherJUnitTest.java
index 8ba3b28864..74994166d2 100755
---
a/geode-core/src/test/java/org/apache/geode/test/process/MainLauncherJUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/test/process/MainLauncherJUnitTest.java
@@ -14,17 +14,14 @@
*/
package org.apache.geode.test.process;
-import static org.junit.Assert.*;
-import static org.junit.contrib.java.lang.system.TextFromStandardInputStream.*;
+import static org.junit.Assert.assertTrue;
+import static
org.junit.contrib.java.lang.system.TextFromStandardInputStream.emptyStandardInputStream;
import java.lang.reflect.Method;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -32,6 +29,7 @@
import org.junit.experimental.categories.Category;
import org.apache.geode.test.junit.categories.UnitTest;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
/**
* Quick sanity tests to make sure MainLauncher is functional.
@@ -44,22 +42,16 @@
private static volatile boolean flag = false;
private final String launchedClass = getClass().getName();
- private ExecutorService futures;
@Rule
- public final TextFromStandardInputStream systemInMock =
emptyStandardInputStream();
+ public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
+ @Rule
+ public TextFromStandardInputStream systemInMock = emptyStandardInputStream();
@Before
public void before() {
flag = false;
- this.futures = Executors.newSingleThreadExecutor();
- assertFalse(flag);
- }
-
- @After
- public void after() {
- flag = false;
- assertTrue(this.futures.shutdownNow().isEmpty());
}
@Test
@@ -100,14 +92,14 @@ public void testInvokeMainWithTwoArgs() throws Exception {
@Test
public void testInvokeMainWithMainLauncherWithNoArgs() throws Exception {
- Future<Boolean> future = this.futures.submit(new Callable<Boolean>() {
+ Future<Boolean> future = executorServiceRule.submit(new
Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
Class<?> clazz = MainLauncher.class;
Method mainMethod = clazz.getMethod("main", String[].class);
String[] args = new String[] {launchedClass};
- mainMethod.invoke(null, new Object[] {args}); // this will block until
"\n" is fed to
- // System.in
+ // this will block until "\n" is fed to System.in
+ mainMethod.invoke(null, new Object[] {args});
return true;
}
});
@@ -118,7 +110,7 @@ public Boolean call() throws Exception {
@Test
public void testInvokeMainWithMainLauncherWithOneArg() throws Exception {
- Future<Boolean> future = this.futures.submit(new Callable<Boolean>() {
+ Future<Boolean> future = executorServiceRule.submit(new
Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
Class<?> clazz = MainLauncher.class;
@@ -135,7 +127,7 @@ public Boolean call() throws Exception {
@Test
public void testInvokeMainWithMainLauncherWithTwoArgs() throws Exception {
- Future<Boolean> future = this.futures.submit(new Callable<Boolean>() {
+ Future<Boolean> future = executorServiceRule.submit(new
Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
Class<?> clazz = MainLauncher.class;
diff --git
a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
new file mode 100644
index 0000000000..21b2a80c1e
--- /dev/null
+++
b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.test.junit.rules;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import
org.apache.geode.test.junit.rules.serializable.SerializableExternalResource;
+
+/**
+ * Provides a reusable mechanism for executing tasks asynchronously in tests.
This {@code Rule}
+ * creates an {@code ExecutorService} which is terminated after the scope of
the {@code Rule}. This
+ * {@code Rule} can be used in tests for hangs, deadlocks, and infinite loops.
+ *
+ * <p>
+ * By default, the {@code ExecutorService} is single-threaded. You can specify
the thread count by
+ * using {@link Builder#threadCount(int)} or {@link ExecutorServiceRule(int)}.
+ *
+ * <p>
+ * Example with default configuration (single-threaded and does not assert
that tasks are done):
+ *
+ * <pre>
+ * private CountDownLatch hangLatch = new CountDownLatch(1);
+ *
+ * {@literal @}Rule
+ * public AsynchronousRule asynchronousRule = new AsynchronousRule();
+ *
+ * {@literal @}Test
+ * public void doTest() throws Exception {
+ * Future<Void> result = asynchronousRule.runAsync(() -> {
+ * try {
+ * hangLatch.await();
+ * } catch (InterruptedException e) {
+ * throw new RuntimeException(e);
+ * }
+ * });
+ *
+ * assertThatThrownBy(() -> result.get(1,
MILLISECONDS)).isInstanceOf(TimeoutException.class);
+ * }
+ * </pre>
+ *
+ * <p>
+ * The {@code Rule} can be configured to await termination by specifying
+ * {@link Builder#awaitTermination(long, TimeUnit)}. If all tasks have not
terminated by the
+ * specified timeout, then {@code TimeoutException} will be thrown. This has
the potential to
+ * obscure any {@code Throwable}s thrown by the test itself.
+ *
+ * <p>
+ * Example with awaitTermination enabled. Awaits up to timeout for all
submitted tasks to terminate.
+ * This causes the {@code Rule} to invoke awaitTermination during its tear
down:
+ *
+ * <pre>
+ * private CountDownLatch hangLatch = new CountDownLatch(1);
+ *
+ * {@literal @}Rule
+ * public AsynchronousRule asynchronousRule =
AsynchronousRule.builder().threadCount(10).awaitTermination(10,
MILLISECONDS).build();
+ *
+ * {@literal @}Test
+ * public void doTest() throws Exception {
+ * for (int i = 0; i < 10; i++) {
+ * asynchronousRule.runAsync(() -> {
+ * try {
+ * hangLatch.await();
+ * } catch (InterruptedException e) {
+ * // do nothing
+ * }
+ * });
+ * }
+ * }
+ * </pre>
+ */
+@SuppressWarnings("unused")
+public class ExecutorServiceRule extends SerializableExternalResource {
+
+ protected final int threadCount;
+ protected final boolean enableAwaitTermination;
+ protected final long awaitTerminationTimeout;
+ protected final TimeUnit awaitTerminationTimeUnit;
+ protected final boolean awaitTerminationBeforeShutdown;
+ protected final boolean useShutdown;
+ protected final boolean useShutdownNow;
+
+ protected transient volatile ExecutorService executor;
+
+ /**
+ * Returns a {@code Builder} to configure a new {@code ExecutorServiceRule}.
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ protected ExecutorServiceRule(Builder builder) {
+ this.threadCount = builder.threadCount;
+ this.enableAwaitTermination = builder.enableAwaitTermination;
+ this.awaitTerminationTimeout = builder.awaitTerminationTimeout;
+ this.awaitTerminationTimeUnit = builder.awaitTerminationTimeUnit;
+ this.awaitTerminationBeforeShutdown =
builder.awaitTerminationBeforeShutdown;
+ this.useShutdown = builder.useShutdown;
+ this.useShutdownNow = builder.useShutdownNow;
+ }
+
+ /**
+ * Constructs a new single-threaded {@code ExecutorServiceRule} which invokes
+ * {@code ExecutorService.shutdownNow()} during {@code tearDown}.
+ */
+ public ExecutorServiceRule() {
+ this.threadCount = 1;
+ this.enableAwaitTermination = false;
+ this.awaitTerminationTimeout = 0;
+ this.awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
+ this.awaitTerminationBeforeShutdown = false;
+ this.useShutdown = false;
+ this.useShutdownNow = true;
+ }
+
+ /**
+ * Constructs a new multi-threaded {@code ExecutorServiceRule} which invokes
+ * {@code ExecutorService.shutdownNow()} during {@code tearDown}.
+ */
+ public ExecutorServiceRule(int threadCount) {
+ this.threadCount = threadCount;
+ this.enableAwaitTermination = false;
+ this.awaitTerminationTimeout = 0;
+ this.awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
+ this.awaitTerminationBeforeShutdown = false;
+ this.useShutdown = false;
+ this.useShutdownNow = true;
+ }
+
+ @Override
+ public void before() {
+ if (threadCount > 1) {
+ executor = Executors.newFixedThreadPool(threadCount);
+ } else {
+ executor = Executors.newSingleThreadExecutor();
+ }
+ }
+
+ @Override
+ public void after() {
+ if (awaitTerminationBeforeShutdown) {
+ enableAwaitTermination();
+ }
+ if (useShutdown) {
+ executor.shutdown();
+ } else if (useShutdownNow) {
+ executor.shutdownNow();
+ }
+ if (!awaitTerminationBeforeShutdown) {
+ enableAwaitTermination();
+ }
+ }
+
+ private void enableAwaitTermination() {
+ if (enableAwaitTermination) {
+ try {
+ executor.awaitTermination(awaitTerminationTimeout,
awaitTerminationTimeUnit);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Returns a direct reference to the underlying {@code ExecutorService}.
+ */
+ public ExecutorService getExecutorService() {
+ return executor;
+ }
+
+ /**
+ * Executes the given command at some time in the future.
+ *
+ * @param command the runnable task
+ * @throws RejectedExecutionException if this task cannot be accepted for
execution
+ * @throws NullPointerException if command is null
+ */
+ public void execute(Runnable command) {
+ executor.execute(command);
+ }
+
+ /**
+ * Submits a value-returning task for execution and returns a Future
representing the pending
+ * results of the task. The Future's {@code get} method will return the
task's result upon
+ * successful completion.
+ *
+ * <p>
+ * If you would like to immediately block waiting for a task, you can use
constructions of the
+ * form {@code result = exec.submit(aCallable).get();}
+ *
+ * @param task the task to submit
+ * @param <T> the type of the task's result
+ * @return a Future representing pending completion of the task
+ * @throws RejectedExecutionException if the task cannot be scheduled for
execution
+ * @throws NullPointerException if the task is null
+ */
+ public <T> Future<T> submit(Callable<T> task) {
+ return executor.submit(task);
+ }
+
+ /**
+ * Submits a Runnable task for execution and returns a Future representing
that task. The Future's
+ * {@code get} method will return the given result upon successful
completion.
+ *
+ * @param task the task to submit
+ * @param result the result to return
+ * @param <T> the type of the result
+ * @return a Future representing pending completion of the task
+ * @throws RejectedExecutionException if the task cannot be scheduled for
execution
+ * @throws NullPointerException if the task is null
+ */
+ public <T> Future<T> submit(Runnable task, T result) {
+ return executor.submit(task, result);
+ }
+
+ /**
+ * Submits a Runnable task for execution and returns a Future representing
that task. The Future's
+ * {@code get} method will return {@code null} upon <em>successful</em>
completion.
+ *
+ * @param task the task to submit
+ * @return a Future representing pending completion of the task
+ * @throws RejectedExecutionException if the task cannot be scheduled for
execution
+ * @throws NullPointerException if the task is null
+ */
+ public Future<?> submit(Runnable task) {
+ return executor.submit(task);
+ }
+
+ /**
+ * Returns a new CompletableFuture that is asynchronously completed by a
task running in the
+ * dedicated executor after it runs the given action.
+ *
+ * @param runnable the action to run before completing the returned
CompletableFuture
+ * @return the new CompletableFuture
+ */
+ public CompletableFuture<Void> runAsync(Runnable runnable) {
+ return CompletableFuture.runAsync(runnable, executor);
+ }
+
+ /**
+ * Returns a new CompletableFuture that is asynchronously completed by a
task running in the
+ * dedicated executor with the value obtained by calling the given Supplier.
+ *
+ * @param supplier a function returning the value to be used to complete the
returned
+ * CompletableFuture
+ * @param <U> the function's return type
+ * @return the new CompletableFuture
+ */
+ public <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
+ return CompletableFuture.supplyAsync(supplier, executor);
+ }
+
+ public static class Builder {
+
+ protected int threadCount = 1;
+ protected boolean enableAwaitTermination = false;
+ protected long awaitTerminationTimeout = 0;
+ protected TimeUnit awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
+ protected boolean awaitTerminationBeforeShutdown = true;
+ protected boolean useShutdown = false;
+ protected boolean useShutdownNow = true;
+
+ protected Builder() {
+ // nothing
+ }
+
+ /**
+ * Configures the number of threads. Default is one thread.
+ *
+ * @param threadCount the number of threads in the pool
+ */
+ public Builder threadCount(int threadCount) {
+ this.threadCount = threadCount;
+ return this;
+ }
+
+ /**
+ * Enables invocation of {@code awaitTermination} during {@code tearDown}.
Default is disabled.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ */
+ public Builder awaitTermination(long timeout, TimeUnit unit) {
+ this.enableAwaitTermination = true;
+ this.awaitTerminationTimeout = timeout;
+ this.awaitTerminationTimeUnit = unit;
+ return this;
+ }
+
+ /**
+ * Enables invocation of {@code shutdown} during {@code tearDown}. Default
is disabled.
+ */
+ public Builder useShutdown() {
+ this.useShutdown = true;
+ this.useShutdownNow = false;
+ return this;
+ }
+
+ /**
+ * Enables invocation of {@code shutdownNow} during {@code tearDown}.
Default is enabled.
+ *
+ */
+ public Builder useShutdownNow() {
+ this.useShutdown = false;
+ this.useShutdownNow = true;
+ return this;
+ }
+
+ /**
+ * Specifies invocation of {@code awaitTermination} before {@code
shutdown} or
+ * {@code shutdownNow}.
+ */
+ public Builder awaitTerminationBeforeShutdown() {
+ this.awaitTerminationBeforeShutdown = true;
+ return this;
+ }
+
+ /**
+ * Specifies invocation of {@code awaitTermination} after {@code shutdown}
or
+ * {@code shutdownNow}.
+ */
+ public Builder awaitTerminationAfterShutdown() {
+ this.awaitTerminationBeforeShutdown = false;
+ return this;
+ }
+
+ /**
+ * Builds the instance of {@code ExecutorServiceRule}.
+ */
+ public ExecutorServiceRule build() {
+ assertThat(threadCount).isGreaterThan(0);
+ return new ExecutorServiceRule(this);
+ }
+ }
+}
diff --git
a/geode-junit/src/test/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleIntegrationTest.java
b/geode-junit/src/test/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleIntegrationTest.java
new file mode 100644
index 0000000000..e7e917b6d7
--- /dev/null
+++
b/geode-junit/src/test/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleIntegrationTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.test.junit.rules;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.Result;
+import org.mockito.InOrder;
+
+import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.apache.geode.test.junit.runners.TestRunner;
+
+@Category(IntegrationTest.class)
+public class ExecutorServiceRuleIntegrationTest {
+
+ static volatile CountDownLatch hangLatch;
+ static volatile CountDownLatch terminateLatch;
+ static volatile ExecutorService executorService;
+ static Awaits.Invocations invocations;
+
+ @Before
+ public void setUp() throws Exception {
+ hangLatch = new CountDownLatch(1);
+ terminateLatch = new CountDownLatch(1);
+ invocations = mock(Awaits.Invocations.class);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ invocations = null;
+
+ while (hangLatch != null && hangLatch.getCount() > 0) {
+ hangLatch.countDown();;
+ }
+
+ if (executorService != null) {
+ executorService.shutdownNow();
+ executorService = null;
+ }
+
+ hangLatch = null;
+ terminateLatch = null;
+ }
+
+ @Test
+ public void awaitTermination() throws Exception {
+ Result result = TestRunner.runTest(Awaits.class);
+ assertThat(result.wasSuccessful()).isTrue();
+
+ assertThat(isTestHung()).isTrue();
+ await().atMost(10, SECONDS).until(() ->
assertThat(executorService.isTerminated()).isTrue());
+ invocations.afterRule();
+
+ InOrder invocationOrder = inOrder(invocations);
+ invocationOrder.verify(invocations).afterTest();
+ invocationOrder.verify(invocations).afterHangLatch();
+ invocationOrder.verify(invocations).afterTerminateLatch();
+ invocationOrder.verify(invocations).afterRule();
+ invocationOrder.verifyNoMoreInteractions();
+ }
+
+ private static boolean isTestHung() {
+ return hangLatch.getCount() > 0;
+ }
+
+ public static class Awaits {
+
+ @Rule
+ public ExecutorServiceRule executorServiceRule =
+ ExecutorServiceRule.builder().awaitTermination(2, SECONDS).build();
+
+ @Before
+ public void setUp() throws Exception {
+ executorService = executorServiceRule.getExecutorService();
+ }
+
+ @After
+ public void after() throws Exception {
+ invocations.afterTest();
+ }
+
+ @Test
+ public void doTest() throws Exception {
+ executorServiceRule.runAsync(() -> {
+ try {
+ hangLatch.await(1, SECONDS);
+ invocations.afterHangLatch();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ terminateLatch.countDown();
+ invocations.afterTerminateLatch();
+ }
+ });
+ }
+
+ interface Invocations {
+ void afterHangLatch();
+
+ void afterTerminateLatch();
+
+ void afterRule();
+
+ void afterTest();
+ }
+ }
+}
diff --git
a/geode-junit/src/test/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleTest.java
b/geode-junit/src/test/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleTest.java
new file mode 100644
index 0000000000..9641285215
--- /dev/null
+++
b/geode-junit/src/test/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleTest.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.test.junit.rules;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.Result;
+import org.junit.runner.notification.Failure;
+
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.apache.geode.test.junit.runners.TestRunner;
+
+@Category(UnitTest.class)
+public class ExecutorServiceRuleTest {
+
+ static volatile AtomicIntegerWithMaxValueSeen concurrentTasks;
+ static volatile CountDownLatch hangLatch;
+ static volatile CountDownLatch terminateLatch;
+ static volatile ExecutorService executorService;
+
+ @Before
+ public void setUp() throws Exception {
+ concurrentTasks = new AtomicIntegerWithMaxValueSeen(0);
+ hangLatch = new CountDownLatch(1);
+ terminateLatch = new CountDownLatch(1);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ concurrentTasks = null;
+
+ while (hangLatch != null && hangLatch.getCount() > 0) {
+ hangLatch.countDown();;
+ }
+
+ if (executorService != null) {
+ executorService.shutdownNow();
+ executorService = null;
+ }
+
+ hangLatch = null;
+ terminateLatch = null;
+ }
+
+ @Test
+ public void providesExecutorService() throws Exception {
+ Result result = TestRunner.runTest(HasExecutorService.class);
+ assertThat(result.wasSuccessful()).isTrue();
+ assertThat(executorService).isInstanceOf(ExecutorService.class);
+ }
+
+ @Test
+ public void shutsDownAfterTest() throws Exception {
+ Result result = TestRunner.runTest(HasExecutorService.class);
+ assertThat(result.wasSuccessful()).isTrue();
+ assertThat(executorService.isShutdown()).isTrue();
+ }
+
+ @Test
+ public void terminatesAfterTest() throws Exception {
+ Result result = TestRunner.runTest(HasExecutorService.class);
+ assertThat(result.wasSuccessful()).isTrue();
+ assertThat(executorService.isTerminated()).isTrue();
+ }
+
+ @Test
+ public void shutsDownHungThread() throws Exception {
+ Result result = TestRunner.runTest(Hangs.class);
+ assertThat(result.wasSuccessful()).isTrue();
+ assertThat(isTestHung()).isTrue();
+ assertThat(executorService.isShutdown()).isTrue();
+ terminateLatch.await(10, SECONDS);
+ }
+
+ @Test
+ public void terminatesHungThread() throws Exception {
+ Result result = TestRunner.runTest(Hangs.class);
+ assertThat(result.wasSuccessful()).isTrue();
+ assertThat(isTestHung()).isTrue();
+ await().atMost(10, SECONDS).until(() ->
assertThat(executorService.isTerminated()).isTrue());
+ terminateLatch.await(1, SECONDS);
+ }
+
+ @Test
+ public void futureTimesOut() throws Exception {
+ Result result = TestRunner.runTest(TimesOut.class);
+ assertThat(result.wasSuccessful()).isFalse();
+ assertThat(result.getFailures()).hasSize(1);
+ Failure failure = result.getFailures().get(0);
+ assertThat(failure.getException()).isInstanceOf(TimeoutException.class);
+ }
+
+ @Test
+ public void singleThreadedByDefault() throws Exception {
+ terminateLatch = new CountDownLatch(2);
+ Result result = TestRunner.runTest(SingleThreaded.class);
+ assertThat(result.wasSuccessful()).as(result.toString()).isTrue();
+ assertThat(concurrentTasks.getMaxValueSeen()).isEqualTo(1);
+ }
+
+ @Test
+ public void threadCountTwoHasTwoThreads() throws Exception {
+ terminateLatch = new CountDownLatch(3);
+ Result result = TestRunner.runTest(ThreadCountTwo.class);
+ assertThat(result.wasSuccessful()).as(result.toString()).isTrue();
+ assertThat(concurrentTasks.getMaxValueSeen()).isEqualTo(2);
+ }
+
+ private static boolean isTestHung() {
+ return hangLatch.getCount() > 0;
+ }
+
+ public abstract static class HasAsynchronousRule {
+
+ @Rule
+ public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
+ @Before
+ public void setUpHasAsynchronousRule() throws Exception {
+ executorService = executorServiceRule.getExecutorService();
+ }
+ }
+
+ public static class HasExecutorService extends HasAsynchronousRule {
+
+ @Test
+ public void doTest() throws Exception {
+ // nothing
+ }
+ }
+
+ public static class Hangs extends HasAsynchronousRule {
+
+ @Test
+ public void doTest() throws Exception {
+ executorServiceRule.runAsync(() -> {
+ try {
+ hangLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ terminateLatch.countDown();
+ }
+ });
+ }
+ }
+
+ public static class TimesOut extends HasAsynchronousRule {
+
+ @Test
+ public void doTest() throws Exception {
+ Future<?> future = executorServiceRule.runAsync(() -> {
+ try {
+ hangLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ terminateLatch.countDown();
+ }
+ });
+
+ future.get(1, MILLISECONDS);
+ }
+ }
+
+ public static class SingleThreaded extends HasAsynchronousRule {
+
+ private volatile CountDownLatch task1Latch;
+ private volatile CountDownLatch task2Latch;
+
+ private volatile CountDownLatch hang1Latch;
+ private volatile CountDownLatch hang2Latch;
+
+ @Before
+ public void setUp() throws Exception {
+ task1Latch = new CountDownLatch(1);
+ task2Latch = new CountDownLatch(1);
+
+ hang1Latch = new CountDownLatch(1);
+ hang2Latch = new CountDownLatch(1);
+
+ assertThat(terminateLatch.getCount()).isEqualTo(2);
+ }
+
+ @Test
+ public void doTest() throws Exception {
+ Future<Void> task1 = executorServiceRule.runAsync(() -> {
+ try {
+ task1Latch.countDown();
+ concurrentTasks.increment();
+ hang1Latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ concurrentTasks.decrement();
+ terminateLatch.countDown();
+ }
+ });
+
+ // assert that task1 begins
+ task1Latch.await(30, SECONDS);
+
+ Future<Void> task2 = executorServiceRule.runAsync(() -> {
+ try {
+ task2Latch.countDown();
+ concurrentTasks.increment();
+ hang2Latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ concurrentTasks.decrement();
+ terminateLatch.countDown();
+ }
+ });
+
+ // assert that there is only 1 thread in the default rule's
ExecutorService
+ assertThat(task1Latch.getCount()).isEqualTo(0);
+ assertThat(task2Latch.getCount()).isEqualTo(1);
+
+ // assert that task1 completes
+ hang1Latch.countDown();
+ task1.get(30, SECONDS);
+ assertThat(terminateLatch.getCount()).isEqualTo(1);
+
+ // assert that task2 begins
+ task2Latch.await(30, SECONDS);
+
+ // assert that task2 completes
+ hang2Latch.countDown();
+ task2.get(30, SECONDS);
+ assertThat(terminateLatch.getCount()).isEqualTo(0);
+ }
+ }
+
+ public static class ThreadCountTwo {
+
+ private volatile CountDownLatch task1Latch;
+ private volatile CountDownLatch task2Latch;
+ private volatile CountDownLatch task3Latch;
+
+ private volatile CountDownLatch hang1Latch;
+ private volatile CountDownLatch hang2Latch;
+ private volatile CountDownLatch hang3Latch;
+
+ @Rule
+ public ExecutorServiceRule executorServiceRule =
+ ExecutorServiceRule.builder().threadCount(2).build();
+
+ @Before
+ public void setUp() throws Exception {
+ task1Latch = new CountDownLatch(1);
+ task2Latch = new CountDownLatch(1);
+ task3Latch = new CountDownLatch(1);
+
+ hang1Latch = new CountDownLatch(1);
+ hang2Latch = new CountDownLatch(1);
+ hang3Latch = new CountDownLatch(1);
+
+ assertThat(terminateLatch.getCount()).isEqualTo(3);
+ }
+
+ @Test
+ public void doTest() throws Exception {
+ Future<Void> task1 = executorServiceRule.runAsync(() -> {
+ try {
+ task1Latch.countDown();
+ concurrentTasks.increment();
+ hang1Latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ concurrentTasks.decrement();
+ terminateLatch.countDown();
+ }
+ });
+
+ // assert that task1 begins
+ task1Latch.await(30, SECONDS);
+
+ Future<Void> task2 = executorServiceRule.runAsync(() -> {
+ try {
+ task2Latch.countDown();
+ concurrentTasks.increment();
+ hang2Latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ concurrentTasks.decrement();
+ terminateLatch.countDown();
+ }
+ });
+
+ // assert that task2 begins
+ task2Latch.await(30, SECONDS);
+
+ Future<Void> task3 = executorServiceRule.runAsync(() -> {
+ try {
+ task3Latch.countDown();
+ concurrentTasks.increment();
+ hang3Latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ concurrentTasks.decrement();
+ terminateLatch.countDown();
+ }
+ });
+
+ // assert that there are 2 threads in the rule's ExecutorService
+ assertThat(task1Latch.getCount()).isEqualTo(0);
+ assertThat(task2Latch.getCount()).isEqualTo(0);
+ assertThat(task3Latch.getCount()).isEqualTo(1);
+
+ // assert that task1 completes
+ hang1Latch.countDown();
+ task1.get(30, SECONDS);
+ assertThat(terminateLatch.getCount()).isEqualTo(2);
+
+ // assert that task3 begins
+ task3Latch.await(30, SECONDS);
+
+ // assert that task2 completes
+ hang2Latch.countDown();
+ task2.get(30, SECONDS);
+ assertThat(terminateLatch.getCount()).isEqualTo(1);
+
+ // assert that task3 completes
+ hang3Latch.countDown();
+ task3.get(30, SECONDS);
+ assertThat(terminateLatch.getCount()).isEqualTo(0);
+ }
+ }
+
+ static class AtomicIntegerWithMaxValueSeen extends AtomicInteger {
+
+ private int maxValueSeen = 0;
+
+ AtomicIntegerWithMaxValueSeen(int initialValue) {
+ super(initialValue);
+ }
+
+ void increment() {
+ maxValueSeen = Integer.max(maxValueSeen, super.incrementAndGet());
+ }
+
+ void decrement() {
+ super.decrementAndGet();
+ }
+
+ int getMaxValueSeen() {
+ return maxValueSeen;
+ }
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Create JUnit Rule to facilitate reuse of ExecutorService pattern for
> concurrent integration tests
> -------------------------------------------------------------------------------------------------
>
> Key: GEODE-4239
> URL: https://issues.apache.org/jira/browse/GEODE-4239
> Project: Geode
> Issue Type: Test
> Components: tests
> Reporter: Kirk Lund
> Assignee: Kirk Lund
>
> These tests all duplicate the same pattern of using a ExecutorService to
> provide a thread that can be deadlocked or live-locked for a concurrency bug:
> * DLockServiceLeakTest
> * GMSEncryptJUnitTest
> * InterruptDiskJUnitTest
> * RegionVersionVectorTest
> * BlockingProcessStreamReaderWindowsTest
> * FileProcessControllerIntegrationTest
> * AbortableTaskServiceJUnitTest
> * MainLauncherJUnitTest
> I'd like to refactor the above list of tests to use a JUnit Rule for this.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)