Repository: geode Updated Branches: refs/heads/feature/GEODE-2632-17 5cf271d22 -> 41e2eccd1
Revert BlockingHARegionJUnitTest Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/41e2eccd Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/41e2eccd Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/41e2eccd Branch: refs/heads/feature/GEODE-2632-17 Commit: 41e2eccd1b92f85b47eb99471b528b2021c4733c Parents: 5cf271d Author: Kirk Lund <kl...@apache.org> Authored: Wed May 24 19:13:53 2017 -0700 Committer: Kirk Lund <kl...@apache.org> Committed: Wed May 24 19:13:53 2017 -0700 ---------------------------------------------------------------------- .../cache/ha/BlockingHARegionJUnitTest.java | 494 ++++++++++--------- 1 file changed, 270 insertions(+), 224 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/41e2eccd/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java index 1534192..d0f5793 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java @@ -14,116 +14,76 @@ */ package org.apache.geode.internal.cache.ha; -import static java.util.concurrent.TimeUnit.*; import static org.apache.geode.distributed.ConfigurationProperties.*; -import static org.apache.geode.internal.cache.ha.HARegionQueue.*; import static org.junit.Assert.*; -import java.util.ArrayList; -import java.util.List; import java.util.Properties; -import org.awaitility.Awaitility; -import org.awaitility.core.ConditionFactory; -import org.junit.After; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.test.junit.categories.ClientSubscriptionTest; import org.junit.Before; import org.junit.Ignore; -import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheFactory; import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.internal.cache.EventID; -import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.test.dunit.ThreadUtils; -import org.apache.geode.test.junit.categories.ClientSubscriptionTest; +import org.apache.geode.test.dunit.Wait; +import org.apache.geode.test.dunit.WaitCriterion; import org.apache.geode.test.junit.categories.IntegrationTest; -import org.apache.geode.test.junit.rules.serializable.SerializableErrorCollector; -/** - * Integration tests for Blocking HARegionQueue. - * - * <p> - * #40314: Filled up queue causes all publishers to block - * - * <p> - * #37627: In case of out of order messages, (sequence Id violation), in spite of HARQ not full, the - * capacity (putPermits) of the HARQ exhausted. - */ @Category({IntegrationTest.class, ClientSubscriptionTest.class}) public class BlockingHARegionJUnitTest { - public static final String REGION = "BlockingHARegionJUnitTest_Region"; - private static final long THREAD_TIMEOUT = 2 * 60 * 1000; - - private final Object numberForThreadsLock = new Object(); - private int numberForDoPuts; - private int numberForDoTakes; - - volatile boolean stopThreads; + private static InternalCache cache = null; - private InternalCache cache; - private HARegionQueueAttributes queueAttributes; - private List<Thread> threads; - private ThreadGroup threadGroup; - - @Rule - public SerializableErrorCollector errorCollector = new SerializableErrorCollector(); + /** boolean to record an exception occurence in another thread **/ + private static volatile boolean exceptionOccurred = false; + /** StringBuffer to store the exception **/ + private static StringBuffer exceptionString = new StringBuffer(); + /** boolen to quit the for loop **/ + private static volatile boolean quitForLoop = false; @Before public void setUp() throws Exception { - synchronized (this.numberForThreadsLock) { - this.numberForDoPuts = 0; - this.numberForDoTakes = 0; - } - - this.stopThreads = false; - this.threads = new ArrayList<>(); - this.threadGroup = new ThreadGroup(getClass().getSimpleName()) { - @Override - public void uncaughtException(Thread t, Throwable e) { - errorCollector.addError(e); - } - }; - - this.queueAttributes = new HARegionQueueAttributes(); - - Properties config = new Properties(); - config.setProperty(MCAST_PORT, "0"); - - this.cache = (InternalCache) CacheFactory.create(DistributedSystem.connect(config)); - } - - @After - public void tearDown() throws Exception { - try { - this.stopThreads = true; - for (Thread thread : this.threads) { - thread.interrupt(); - ThreadUtils.join(thread, THREAD_TIMEOUT); - } - } finally { - if (this.cache != null) { - this.cache.close(); - } + Properties props = new Properties(); + props.setProperty(MCAST_PORT, "0"); + if (cache != null) { + cache.close(); // fault tolerance } + cache = (InternalCache) CacheFactory.create(DistributedSystem.connect(props)); } /** - * This test has a scenario where the HARegionQueue capacity is just 1. There will be two thread. + * This test has a scenario where the HAReqionQueue capacity is just 1. There will be two thread. * One doing a 1000 puts and the other doing a 1000 takes. The validation for this test is that it * should not encounter any exceptions */ @Test public void testBoundedPuts() throws Exception { - this.queueAttributes.setBlockingQueueCapacity(1); - HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, this.queueAttributes, - BLOCKING_HA_QUEUE, false); - hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked for primary only + exceptionOccurred = false; + HARegionQueueAttributes harqa = new HARegionQueueAttributes(); + harqa.setBlockingQueueCapacity(1); + HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance("BlockingHARegionJUnitTest_Region", + cache, harqa, HARegionQueue.BLOCKING_HA_QUEUE, false); + hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only. + Thread thread1 = new DoPuts(hrq, 1000); + Thread thread2 = new DoTake(hrq, 1000); + + thread1.start(); + thread2.start(); + + ThreadUtils.join(thread1, 30 * 1000); + ThreadUtils.join(thread2, 30 * 1000); + + if (exceptionOccurred) { + fail(" Test failed due to " + exceptionString); + } - startDoPuts(hrq, 1000); - startDoTakes(hrq, 1000); + cache.close(); } /** @@ -136,23 +96,62 @@ public class BlockingHARegionJUnitTest { */ @Test public void testPutBeingBlocked() throws Exception { - this.queueAttributes.setBlockingQueueCapacity(1); - HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, this.queueAttributes, - BLOCKING_HA_QUEUE, false); - hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked for primary only + exceptionOccurred = false; + quitForLoop = false; + HARegionQueueAttributes harqa = new HARegionQueueAttributes(); + harqa.setBlockingQueueCapacity(1); + final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance( + "BlockingHARegionJUnitTest_Region", cache, harqa, HARegionQueue.BLOCKING_HA_QUEUE, false); + hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only. + final Thread thread1 = new DoPuts(hrq, 2); + thread1.start(); + WaitCriterion ev = new WaitCriterion() { + public boolean done() { + return hrq.region.size() == 2; + } - Thread doPuts = startDoPuts(hrq, 2); + public String description() { + return null; + } + }; + Wait.waitForCriterion(ev, 1000, 200, true); + assertTrue(thread1.isAlive()); // thread should still be alive (in wait state) + + Thread thread2 = new DoTake(hrq, 1); + thread2.start(); // start take thread + ev = new WaitCriterion() { + public boolean done() { + return hrq.region.size() == 3; + } - await().until(() -> assertTrue(hrq.region.size() == 2)); + public String description() { + return null; + } + }; + // sleep. take will proceed and so will sleeping put + Wait.waitForCriterion(ev, 3 * 1000, 200, true); - // thread should still be alive (in wait state) - assertTrue(doPuts.isAlive()); + // thread should have died since put should have proceeded + ev = new WaitCriterion() { + public boolean done() { + return !thread1.isAlive(); + } - startDoTakes(hrq, 1); + public String description() { + return "thread1 still alive"; + } + }; + Wait.waitForCriterion(ev, 30 * 1000, 1000, true); - await().until(() -> assertTrue(hrq.region.size() == 3)); + ThreadUtils.join(thread1, 30 * 1000); // for completeness + ThreadUtils.join(thread2, 30 * 1000); + if (exceptionOccurred) { + fail(" Test failed due to " + exceptionString); + } + cache.close(); } + /** * This test tests that the region capacity is never exceeded even in highly concurrent * environments. The region capacity is set to 10000. Then 5 threads start doing put @@ -162,26 +161,62 @@ public class BlockingHARegionJUnitTest { */ @Test public void testConcurrentPutsNotExceedingLimit() throws Exception { - this.queueAttributes.setBlockingQueueCapacity(10000); - HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, this.queueAttributes, - BLOCKING_HA_QUEUE, false); - hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked for primary only - - Thread doPuts1 = startDoPuts(hrq, 20000, 1); - Thread doPuts2 = startDoPuts(hrq, 20000, 2); - Thread doPuts3 = startDoPuts(hrq, 20000, 3); - Thread doPuts4 = startDoPuts(hrq, 20000, 4); - Thread doPuts5 = startDoPuts(hrq, 20000, 5); + exceptionOccurred = false; + quitForLoop = false; + HARegionQueueAttributes harqa = new HARegionQueueAttributes(); + harqa.setBlockingQueueCapacity(10000); + final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance( + "BlockingHARegionJUnitTest_Region", cache, harqa, HARegionQueue.BLOCKING_HA_QUEUE, false); + hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only. + Thread thread1 = new DoPuts(hrq, 20000, 1); + Thread thread2 = new DoPuts(hrq, 20000, 2); + Thread thread3 = new DoPuts(hrq, 20000, 3); + Thread thread4 = new DoPuts(hrq, 20000, 4); + Thread thread5 = new DoPuts(hrq, 20000, 5); + + thread1.start(); + thread2.start(); + thread3.start(); + thread4.start(); + thread5.start(); + + WaitCriterion ev = new WaitCriterion() { + public boolean done() { + return hrq.region.size() == 20000; + } - await().until(() -> assertTrue(hrq.region.size() == 20000)); + public String description() { + return null; + } + }; + Wait.waitForCriterion(ev, 30 * 1000, 200, true); - assertTrue(doPuts1.isAlive()); - assertTrue(doPuts2.isAlive()); - assertTrue(doPuts3.isAlive()); - assertTrue(doPuts4.isAlive()); - assertTrue(doPuts5.isAlive()); + assertTrue(thread1.isAlive()); + assertTrue(thread2.isAlive()); + assertTrue(thread3.isAlive()); + assertTrue(thread4.isAlive()); + assertTrue(thread5.isAlive()); assertTrue(hrq.region.size() == 20000); + + quitForLoop = true; + Thread.sleep(20000); + + thread1.interrupt(); + thread2.interrupt(); + thread3.interrupt(); + thread4.interrupt(); + thread5.interrupt(); + + Thread.sleep(2000); + + ThreadUtils.join(thread1, 5 * 60 * 1000); + ThreadUtils.join(thread2, 5 * 60 * 1000); + ThreadUtils.join(thread3, 5 * 60 * 1000); + ThreadUtils.join(thread4, 5 * 60 * 1000); + ThreadUtils.join(thread5, 5 * 60 * 1000); + + cache.close(); } /** @@ -191,41 +226,84 @@ public class BlockingHARegionJUnitTest { * state. the region size would be verified to be 20000 (10000 puts and 10000 DACE objects). then * the threads are interrupted and made to quit the loop */ - @Ignore("Test is disabled until/if blocking queue capacity becomes a hard limit") + @Ignore("TODO: test is disabled") @Test public void testConcurrentPutsTakesNotExceedingLimit() throws Exception { - this.queueAttributes.setBlockingQueueCapacity(10000); - HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, this.queueAttributes, - BLOCKING_HA_QUEUE, false); - hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked for primary only - - Thread doPuts1 = startDoPuts(hrq, 40000, 1); - Thread doPuts2 = startDoPuts(hrq, 40000, 2); - Thread doPuts3 = startDoPuts(hrq, 40000, 3); - Thread doPuts4 = startDoPuts(hrq, 40000, 4); - Thread doPuts5 = startDoPuts(hrq, 40000, 5); - - Thread doTakes1 = startDoTakes(hrq, 5000); - Thread doTakes2 = startDoTakes(hrq, 5000); - Thread doTakes3 = startDoTakes(hrq, 5000); - Thread doTakes4 = startDoTakes(hrq, 5000); - Thread doTakes5 = startDoTakes(hrq, 5000); - - ThreadUtils.join(doTakes1, 30 * 1000); - ThreadUtils.join(doTakes2, 30 * 1000); - ThreadUtils.join(doTakes3, 30 * 1000); - ThreadUtils.join(doTakes4, 30 * 1000); - ThreadUtils.join(doTakes5, 30 * 1000); - - await().until(() -> assertTrue(hrq.region.size() == 20000)); - - assertTrue(doPuts1.isAlive()); - assertTrue(doPuts2.isAlive()); - assertTrue(doPuts3.isAlive()); - assertTrue(doPuts4.isAlive()); - assertTrue(doPuts5.isAlive()); + exceptionOccurred = false; + quitForLoop = false; + HARegionQueueAttributes harqa = new HARegionQueueAttributes(); + harqa.setBlockingQueueCapacity(10000); + final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance( + "BlockingHARegionJUnitTest_Region", cache, harqa, HARegionQueue.BLOCKING_HA_QUEUE, false); + Thread thread1 = new DoPuts(hrq, 40000, 1); + Thread thread2 = new DoPuts(hrq, 40000, 2); + Thread thread3 = new DoPuts(hrq, 40000, 3); + Thread thread4 = new DoPuts(hrq, 40000, 4); + Thread thread5 = new DoPuts(hrq, 40000, 5); + + Thread thread6 = new DoTake(hrq, 5000); + Thread thread7 = new DoTake(hrq, 5000); + Thread thread8 = new DoTake(hrq, 5000); + Thread thread9 = new DoTake(hrq, 5000); + Thread thread10 = new DoTake(hrq, 5000); + + thread1.start(); + thread2.start(); + thread3.start(); + thread4.start(); + thread5.start(); + + thread6.start(); + thread7.start(); + thread8.start(); + thread9.start(); + thread10.start(); + + ThreadUtils.join(thread6, 30 * 1000); + ThreadUtils.join(thread7, 30 * 1000); + ThreadUtils.join(thread8, 30 * 1000); + ThreadUtils.join(thread9, 30 * 1000); + ThreadUtils.join(thread10, 30 * 1000); + + WaitCriterion ev = new WaitCriterion() { + public boolean done() { + return hrq.region.size() == 20000; + } + + public String description() { + return null; + } + }; + Wait.waitForCriterion(ev, 30 * 1000, 200, true); + + assertTrue(thread1.isAlive()); + assertTrue(thread2.isAlive()); + assertTrue(thread3.isAlive()); + assertTrue(thread4.isAlive()); + assertTrue(thread5.isAlive()); assertTrue(hrq.region.size() == 20000); + + quitForLoop = true; + + Thread.sleep(2000); + + thread1.interrupt(); + thread2.interrupt(); + thread3.interrupt(); + thread4.interrupt(); + thread5.interrupt(); + + Thread.sleep(2000); + + + ThreadUtils.join(thread1, 30 * 1000); + ThreadUtils.join(thread2, 30 * 1000); + ThreadUtils.join(thread3, 30 * 1000); + ThreadUtils.join(thread4, 30 * 1000); + ThreadUtils.join(thread5, 30 * 1000); + + cache.close(); } /** @@ -237,92 +315,62 @@ public class BlockingHARegionJUnitTest { */ @Test public void testHARQMaxCapacity_Bug37627() throws Exception { - this.queueAttributes.setBlockingQueueCapacity(1); - this.queueAttributes.setExpiryTime(180); - HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, this.queueAttributes, - BLOCKING_HA_QUEUE, false); - hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked for primary only - - EventID event1 = new EventID(new byte[] {1}, 1, 2); // violation - EventID event2 = new EventID(new byte[] {1}, 1, 1); // ignored - EventID event3 = new EventID(new byte[] {1}, 1, 3); - - newThread(new Runnable() { - @Override - public void run() { - try { - hrq.put(new ConflatableObject("key1", "value1", event1, false, "region1")); - hrq.take(); - hrq.put(new ConflatableObject("key2", "value1", event2, false, "region1")); - hrq.put(new ConflatableObject("key3", "value1", event3, false, "region1")); - } catch (Exception e) { - errorCollector.addError(e); + try { + exceptionOccurred = false; + quitForLoop = false; + HARegionQueueAttributes harqa = new HARegionQueueAttributes(); + harqa.setBlockingQueueCapacity(1); + harqa.setExpiryTime(180); + final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance( + "BlockingHARegionJUnitTest_Region", cache, harqa, HARegionQueue.BLOCKING_HA_QUEUE, false); + hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only. + final EventID id1 = new EventID(new byte[] {1}, 1, 2); // violation + final EventID ignore = new EventID(new byte[] {1}, 1, 1); // + final EventID id2 = new EventID(new byte[] {1}, 1, 3); // + Thread t1 = new Thread() { + public void run() { + try { + hrq.put(new ConflatableObject("key1", "value1", id1, false, "region1")); + hrq.take(); + hrq.put(new ConflatableObject("key2", "value1", ignore, false, "region1")); + hrq.put(new ConflatableObject("key3", "value1", id2, false, "region1")); + } catch (Exception e) { + exceptionString.append("First Put in region queue failed"); + exceptionOccurred = true; + } } + }; + t1.start(); + ThreadUtils.join(t1, 20 * 1000); + if (exceptionOccurred) { + fail(" Test failed due to " + exceptionString); + } + } finally { + if (cache != null) { + cache.close(); } - }); - } - - private Thread newThread(Runnable runnable) { - Thread thread = new Thread(this.threadGroup, runnable); - this.threads.add(thread); - thread.start(); - return thread; - } - - private Thread startDoPuts(HARegionQueue haRegionQueue, int count) { - return startDoPuts(haRegionQueue, count, 0); - } - - private Thread startDoPuts(HARegionQueue haRegionQueue, int count, int regionId) { - Thread thread = new DoPuts(this.threadGroup, haRegionQueue, count, regionId); - this.threads.add(thread); - thread.start(); - return thread; - } - - private Thread startDoTakes(HARegionQueue haRegionQueue, int count) { - Thread thread = new DoTakes(this.threadGroup, haRegionQueue, count); - this.threads.add(thread); - thread.start(); - return thread; - } - - private ConditionFactory await() { - return Awaitility.await().atMost(2, MINUTES); - } - - int nextDoPutsThreadNum() { - synchronized (this.numberForThreadsLock) { - return numberForDoPuts++; - } - } - - int nextDoTakesThreadNum() { - synchronized (this.numberForThreadsLock) { - return numberForDoTakes++; } } /** * class which does specified number of puts on the queue */ - private class DoPuts extends Thread { + private static class DoPuts extends Thread { - private final HARegionQueue regionQueue; + HARegionQueue regionQueue = null; + final int numberOfPuts; - private final int numberOfPuts; + DoPuts(HARegionQueue haRegionQueue, int numberOfPuts) { + this.regionQueue = haRegionQueue; + this.numberOfPuts = numberOfPuts; + } /** * region id can be specified to generate Thread unique events */ - private final int regionId; + int regionId = 0; - DoPuts(ThreadGroup threadGroup, HARegionQueue haRegionQueue, int numberOfPuts) { - this(threadGroup, haRegionQueue, numberOfPuts, 0); - } - - DoPuts(ThreadGroup threadGroup, HARegionQueue haRegionQueue, int numberOfPuts, int regionId) { - super(threadGroup, "DoPuts-" + nextDoPutsThreadNum()); + DoPuts(HARegionQueue haRegionQueue, int numberOfPuts, int regionId) { this.regionQueue = haRegionQueue; this.numberOfPuts = numberOfPuts; this.regionId = regionId; @@ -330,16 +378,19 @@ public class BlockingHARegionJUnitTest { @Override public void run() { - for (int i = 0; i < this.numberOfPuts; i++) { - if (stopThreads || Thread.currentThread().isInterrupted()) { - break; - } + for (int i = 0; i < numberOfPuts; i++) { try { this.regionQueue.put(new ConflatableObject("" + i, "" + i, - new EventID(new byte[this.regionId], i, i), false, REGION)); + new EventID(new byte[regionId], i, i), false, "BlockingHARegionJUnitTest_Region")); + if (quitForLoop) { + break; + } + if (Thread.currentThread().isInterrupted()) { + break; + } } catch (Exception e) { - errorCollector.addError(e); - break; + exceptionOccurred = true; + exceptionString.append(" Exception occurred due to " + e); } } } @@ -348,29 +399,24 @@ public class BlockingHARegionJUnitTest { /** * class which does a specified number of takes */ - private class DoTakes extends Thread { + private static class DoTake extends Thread { - private final HARegionQueue regionQueue; + final HARegionQueue regionQueue; + final int numberOfTakes; - private final int numberOfTakes; - - DoTakes(ThreadGroup threadGroup, HARegionQueue haRegionQueue, int numberOfTakes) { - super(threadGroup, "DoTakes-" + nextDoTakesThreadNum()); + DoTake(HARegionQueue haRegionQueue, int numberOfTakes) { this.regionQueue = haRegionQueue; this.numberOfTakes = numberOfTakes; } @Override public void run() { - for (int i = 0; i < this.numberOfTakes; i++) { - if (stopThreads || Thread.currentThread().isInterrupted()) { - break; - } + for (int i = 0; i < numberOfTakes; i++) { try { assertNotNull(this.regionQueue.take()); } catch (Exception e) { - errorCollector.addError(e); - break; + exceptionOccurred = true; + exceptionString.append(" Exception occurred due to " + e); } } }