Repository: geode
Updated Branches:
  refs/heads/feature/GEODE-2632-17 5cf271d22 -> 41e2eccd1


Revert BlockingHARegionJUnitTest


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/41e2eccd
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/41e2eccd
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/41e2eccd

Branch: refs/heads/feature/GEODE-2632-17
Commit: 41e2eccd1b92f85b47eb99471b528b2021c4733c
Parents: 5cf271d
Author: Kirk Lund <kl...@apache.org>
Authored: Wed May 24 19:13:53 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Wed May 24 19:13:53 2017 -0700

----------------------------------------------------------------------
 .../cache/ha/BlockingHARegionJUnitTest.java     | 494 ++++++++++---------
 1 file changed, 270 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/41e2eccd/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java
index 1534192..d0f5793 100755
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionJUnitTest.java
@@ -14,116 +14,76 @@
  */
 package org.apache.geode.internal.cache.ha;
 
-import static java.util.concurrent.TimeUnit.*;
 import static org.apache.geode.distributed.ConfigurationProperties.*;
-import static org.apache.geode.internal.cache.ha.HARegionQueue.*;
 import static org.junit.Assert.*;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Properties;
 
-import org.awaitility.Awaitility;
-import org.awaitility.core.ConditionFactory;
-import org.junit.After;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
 import org.junit.Before;
 import org.junit.Ignore;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.internal.cache.EventID;
-import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.test.dunit.ThreadUtils;
-import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+import org.apache.geode.test.dunit.Wait;
+import org.apache.geode.test.dunit.WaitCriterion;
 import org.apache.geode.test.junit.categories.IntegrationTest;
-import 
org.apache.geode.test.junit.rules.serializable.SerializableErrorCollector;
 
-/**
- * Integration tests for Blocking HARegionQueue.
- *
- * <p>
- * #40314: Filled up queue causes all publishers to block
- *
- * <p>
- * #37627: In case of out of order messages, (sequence Id violation), in spite 
of HARQ not full, the
- * capacity (putPermits) of the HARQ exhausted.
- */
 @Category({IntegrationTest.class, ClientSubscriptionTest.class})
 public class BlockingHARegionJUnitTest {
 
-  public static final String REGION = "BlockingHARegionJUnitTest_Region";
-  private static final long THREAD_TIMEOUT = 2 * 60 * 1000;
-
-  private final Object numberForThreadsLock = new Object();
-  private int numberForDoPuts;
-  private int numberForDoTakes;
-
-  volatile boolean stopThreads;
+  private static InternalCache cache = null;
 
-  private InternalCache cache;
-  private HARegionQueueAttributes queueAttributes;
-  private List<Thread> threads;
-  private ThreadGroup threadGroup;
-
-  @Rule
-  public SerializableErrorCollector errorCollector = new 
SerializableErrorCollector();
+  /** boolean to record an exception occurence in another thread **/
+  private static volatile boolean exceptionOccurred = false;
+  /** StringBuffer to store the exception **/
+  private static StringBuffer exceptionString = new StringBuffer();
+  /** boolen to quit the for loop **/
+  private static volatile boolean quitForLoop = false;
 
   @Before
   public void setUp() throws Exception {
-    synchronized (this.numberForThreadsLock) {
-      this.numberForDoPuts = 0;
-      this.numberForDoTakes = 0;
-    }
-
-    this.stopThreads = false;
-    this.threads = new ArrayList<>();
-    this.threadGroup = new ThreadGroup(getClass().getSimpleName()) {
-      @Override
-      public void uncaughtException(Thread t, Throwable e) {
-        errorCollector.addError(e);
-      }
-    };
-
-    this.queueAttributes = new HARegionQueueAttributes();
-
-    Properties config = new Properties();
-    config.setProperty(MCAST_PORT, "0");
-
-    this.cache = (InternalCache) 
CacheFactory.create(DistributedSystem.connect(config));
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    try {
-      this.stopThreads = true;
-      for (Thread thread : this.threads) {
-        thread.interrupt();
-        ThreadUtils.join(thread, THREAD_TIMEOUT);
-      }
-    } finally {
-      if (this.cache != null) {
-        this.cache.close();
-      }
+    Properties props = new Properties();
+    props.setProperty(MCAST_PORT, "0");
+    if (cache != null) {
+      cache.close(); // fault tolerance
     }
+    cache = (InternalCache) 
CacheFactory.create(DistributedSystem.connect(props));
   }
 
   /**
-   * This test has a scenario where the HARegionQueue capacity is just 1. 
There will be two thread.
+   * This test has a scenario where the HAReqionQueue capacity is just 1. 
There will be two thread.
    * One doing a 1000 puts and the other doing a 1000 takes. The validation 
for this test is that it
    * should not encounter any exceptions
    */
   @Test
   public void testBoundedPuts() throws Exception {
-    this.queueAttributes.setBlockingQueueCapacity(1);
-    HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, 
this.queueAttributes,
-        BLOCKING_HA_QUEUE, false);
-    hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked 
for primary only
+    exceptionOccurred = false;
+    HARegionQueueAttributes harqa = new HARegionQueueAttributes();
+    harqa.setBlockingQueueCapacity(1);
+    HARegionQueue hrq = 
HARegionQueue.getHARegionQueueInstance("BlockingHARegionJUnitTest_Region",
+        cache, harqa, HARegionQueue.BLOCKING_HA_QUEUE, false);
+    hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for 
primary only.
+    Thread thread1 = new DoPuts(hrq, 1000);
+    Thread thread2 = new DoTake(hrq, 1000);
+
+    thread1.start();
+    thread2.start();
+
+    ThreadUtils.join(thread1, 30 * 1000);
+    ThreadUtils.join(thread2, 30 * 1000);
+
+    if (exceptionOccurred) {
+      fail(" Test failed due to " + exceptionString);
+    }
 
-    startDoPuts(hrq, 1000);
-    startDoTakes(hrq, 1000);
+    cache.close();
   }
 
   /**
@@ -136,23 +96,62 @@ public class BlockingHARegionJUnitTest {
    */
   @Test
   public void testPutBeingBlocked() throws Exception {
-    this.queueAttributes.setBlockingQueueCapacity(1);
-    HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, 
this.queueAttributes,
-        BLOCKING_HA_QUEUE, false);
-    hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked 
for primary only
+    exceptionOccurred = false;
+    quitForLoop = false;
+    HARegionQueueAttributes harqa = new HARegionQueueAttributes();
+    harqa.setBlockingQueueCapacity(1);
+    final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance(
+        "BlockingHARegionJUnitTest_Region", cache, harqa, 
HARegionQueue.BLOCKING_HA_QUEUE, false);
+    hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for 
primary only.
+    final Thread thread1 = new DoPuts(hrq, 2);
+    thread1.start();
+    WaitCriterion ev = new WaitCriterion() {
+      public boolean done() {
+        return hrq.region.size() == 2;
+      }
 
-    Thread doPuts = startDoPuts(hrq, 2);
+      public String description() {
+        return null;
+      }
+    };
+    Wait.waitForCriterion(ev, 1000, 200, true);
+    assertTrue(thread1.isAlive()); // thread should still be alive (in wait 
state)
+
+    Thread thread2 = new DoTake(hrq, 1);
+    thread2.start(); // start take thread
+    ev = new WaitCriterion() {
+      public boolean done() {
+        return hrq.region.size() == 3;
+      }
 
-    await().until(() -> assertTrue(hrq.region.size() == 2));
+      public String description() {
+        return null;
+      }
+    };
+    // sleep. take will proceed and so will sleeping put
+    Wait.waitForCriterion(ev, 3 * 1000, 200, true);
 
-    // thread should still be alive (in wait state)
-    assertTrue(doPuts.isAlive());
+    // thread should have died since put should have proceeded
+    ev = new WaitCriterion() {
+      public boolean done() {
+        return !thread1.isAlive();
+      }
 
-    startDoTakes(hrq, 1);
+      public String description() {
+        return "thread1 still alive";
+      }
+    };
+    Wait.waitForCriterion(ev, 30 * 1000, 1000, true);
 
-    await().until(() -> assertTrue(hrq.region.size() == 3));
+    ThreadUtils.join(thread1, 30 * 1000); // for completeness
+    ThreadUtils.join(thread2, 30 * 1000);
+    if (exceptionOccurred) {
+      fail(" Test failed due to " + exceptionString);
+    }
+    cache.close();
   }
 
+
   /**
    * This test tests that the region capacity is never exceeded even in highly 
concurrent
    * environments. The region capacity is set to 10000. Then 5 threads start 
doing put
@@ -162,26 +161,62 @@ public class BlockingHARegionJUnitTest {
    */
   @Test
   public void testConcurrentPutsNotExceedingLimit() throws Exception {
-    this.queueAttributes.setBlockingQueueCapacity(10000);
-    HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, 
this.queueAttributes,
-        BLOCKING_HA_QUEUE, false);
-    hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked 
for primary only
-
-    Thread doPuts1 = startDoPuts(hrq, 20000, 1);
-    Thread doPuts2 = startDoPuts(hrq, 20000, 2);
-    Thread doPuts3 = startDoPuts(hrq, 20000, 3);
-    Thread doPuts4 = startDoPuts(hrq, 20000, 4);
-    Thread doPuts5 = startDoPuts(hrq, 20000, 5);
+    exceptionOccurred = false;
+    quitForLoop = false;
+    HARegionQueueAttributes harqa = new HARegionQueueAttributes();
+    harqa.setBlockingQueueCapacity(10000);
+    final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance(
+        "BlockingHARegionJUnitTest_Region", cache, harqa, 
HARegionQueue.BLOCKING_HA_QUEUE, false);
+    hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for 
primary only.
+    Thread thread1 = new DoPuts(hrq, 20000, 1);
+    Thread thread2 = new DoPuts(hrq, 20000, 2);
+    Thread thread3 = new DoPuts(hrq, 20000, 3);
+    Thread thread4 = new DoPuts(hrq, 20000, 4);
+    Thread thread5 = new DoPuts(hrq, 20000, 5);
+
+    thread1.start();
+    thread2.start();
+    thread3.start();
+    thread4.start();
+    thread5.start();
+
+    WaitCriterion ev = new WaitCriterion() {
+      public boolean done() {
+        return hrq.region.size() == 20000;
+      }
 
-    await().until(() -> assertTrue(hrq.region.size() == 20000));
+      public String description() {
+        return null;
+      }
+    };
+    Wait.waitForCriterion(ev, 30 * 1000, 200, true);
 
-    assertTrue(doPuts1.isAlive());
-    assertTrue(doPuts2.isAlive());
-    assertTrue(doPuts3.isAlive());
-    assertTrue(doPuts4.isAlive());
-    assertTrue(doPuts5.isAlive());
+    assertTrue(thread1.isAlive());
+    assertTrue(thread2.isAlive());
+    assertTrue(thread3.isAlive());
+    assertTrue(thread4.isAlive());
+    assertTrue(thread5.isAlive());
 
     assertTrue(hrq.region.size() == 20000);
+
+    quitForLoop = true;
+    Thread.sleep(20000);
+
+    thread1.interrupt();
+    thread2.interrupt();
+    thread3.interrupt();
+    thread4.interrupt();
+    thread5.interrupt();
+
+    Thread.sleep(2000);
+
+    ThreadUtils.join(thread1, 5 * 60 * 1000);
+    ThreadUtils.join(thread2, 5 * 60 * 1000);
+    ThreadUtils.join(thread3, 5 * 60 * 1000);
+    ThreadUtils.join(thread4, 5 * 60 * 1000);
+    ThreadUtils.join(thread5, 5 * 60 * 1000);
+
+    cache.close();
   }
 
   /**
@@ -191,41 +226,84 @@ public class BlockingHARegionJUnitTest {
    * state. the region size would be verified to be 20000 (10000 puts and 
10000 DACE objects). then
    * the threads are interrupted and made to quit the loop
    */
-  @Ignore("Test is disabled until/if blocking queue capacity becomes a hard 
limit")
+  @Ignore("TODO: test is disabled")
   @Test
   public void testConcurrentPutsTakesNotExceedingLimit() throws Exception {
-    this.queueAttributes.setBlockingQueueCapacity(10000);
-    HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, 
this.queueAttributes,
-        BLOCKING_HA_QUEUE, false);
-    hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked 
for primary only
-
-    Thread doPuts1 = startDoPuts(hrq, 40000, 1);
-    Thread doPuts2 = startDoPuts(hrq, 40000, 2);
-    Thread doPuts3 = startDoPuts(hrq, 40000, 3);
-    Thread doPuts4 = startDoPuts(hrq, 40000, 4);
-    Thread doPuts5 = startDoPuts(hrq, 40000, 5);
-
-    Thread doTakes1 = startDoTakes(hrq, 5000);
-    Thread doTakes2 = startDoTakes(hrq, 5000);
-    Thread doTakes3 = startDoTakes(hrq, 5000);
-    Thread doTakes4 = startDoTakes(hrq, 5000);
-    Thread doTakes5 = startDoTakes(hrq, 5000);
-
-    ThreadUtils.join(doTakes1, 30 * 1000);
-    ThreadUtils.join(doTakes2, 30 * 1000);
-    ThreadUtils.join(doTakes3, 30 * 1000);
-    ThreadUtils.join(doTakes4, 30 * 1000);
-    ThreadUtils.join(doTakes5, 30 * 1000);
-
-    await().until(() -> assertTrue(hrq.region.size() == 20000));
-
-    assertTrue(doPuts1.isAlive());
-    assertTrue(doPuts2.isAlive());
-    assertTrue(doPuts3.isAlive());
-    assertTrue(doPuts4.isAlive());
-    assertTrue(doPuts5.isAlive());
+    exceptionOccurred = false;
+    quitForLoop = false;
+    HARegionQueueAttributes harqa = new HARegionQueueAttributes();
+    harqa.setBlockingQueueCapacity(10000);
+    final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance(
+        "BlockingHARegionJUnitTest_Region", cache, harqa, 
HARegionQueue.BLOCKING_HA_QUEUE, false);
+    Thread thread1 = new DoPuts(hrq, 40000, 1);
+    Thread thread2 = new DoPuts(hrq, 40000, 2);
+    Thread thread3 = new DoPuts(hrq, 40000, 3);
+    Thread thread4 = new DoPuts(hrq, 40000, 4);
+    Thread thread5 = new DoPuts(hrq, 40000, 5);
+
+    Thread thread6 = new DoTake(hrq, 5000);
+    Thread thread7 = new DoTake(hrq, 5000);
+    Thread thread8 = new DoTake(hrq, 5000);
+    Thread thread9 = new DoTake(hrq, 5000);
+    Thread thread10 = new DoTake(hrq, 5000);
+
+    thread1.start();
+    thread2.start();
+    thread3.start();
+    thread4.start();
+    thread5.start();
+
+    thread6.start();
+    thread7.start();
+    thread8.start();
+    thread9.start();
+    thread10.start();
+
+    ThreadUtils.join(thread6, 30 * 1000);
+    ThreadUtils.join(thread7, 30 * 1000);
+    ThreadUtils.join(thread8, 30 * 1000);
+    ThreadUtils.join(thread9, 30 * 1000);
+    ThreadUtils.join(thread10, 30 * 1000);
+
+    WaitCriterion ev = new WaitCriterion() {
+      public boolean done() {
+        return hrq.region.size() == 20000;
+      }
+
+      public String description() {
+        return null;
+      }
+    };
+    Wait.waitForCriterion(ev, 30 * 1000, 200, true);
+
+    assertTrue(thread1.isAlive());
+    assertTrue(thread2.isAlive());
+    assertTrue(thread3.isAlive());
+    assertTrue(thread4.isAlive());
+    assertTrue(thread5.isAlive());
 
     assertTrue(hrq.region.size() == 20000);
+
+    quitForLoop = true;
+
+    Thread.sleep(2000);
+
+    thread1.interrupt();
+    thread2.interrupt();
+    thread3.interrupt();
+    thread4.interrupt();
+    thread5.interrupt();
+
+    Thread.sleep(2000);
+
+
+    ThreadUtils.join(thread1, 30 * 1000);
+    ThreadUtils.join(thread2, 30 * 1000);
+    ThreadUtils.join(thread3, 30 * 1000);
+    ThreadUtils.join(thread4, 30 * 1000);
+    ThreadUtils.join(thread5, 30 * 1000);
+
+    cache.close();
   }
 
   /**
@@ -237,92 +315,62 @@ public class BlockingHARegionJUnitTest {
    */
   @Test
   public void testHARQMaxCapacity_Bug37627() throws Exception {
-    this.queueAttributes.setBlockingQueueCapacity(1);
-    this.queueAttributes.setExpiryTime(180);
-    HARegionQueue hrq = getHARegionQueueInstance(REGION, this.cache, 
this.queueAttributes,
-        BLOCKING_HA_QUEUE, false);
-    hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked 
for primary only
-
-    EventID event1 = new EventID(new byte[] {1}, 1, 2); // violation
-    EventID event2 = new EventID(new byte[] {1}, 1, 1); // ignored
-    EventID event3 = new EventID(new byte[] {1}, 1, 3);
-
-    newThread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          hrq.put(new ConflatableObject("key1", "value1", event1, false, 
"region1"));
-          hrq.take();
-          hrq.put(new ConflatableObject("key2", "value1", event2, false, 
"region1"));
-          hrq.put(new ConflatableObject("key3", "value1", event3, false, 
"region1"));
-        } catch (Exception e) {
-          errorCollector.addError(e);
+    try {
+      exceptionOccurred = false;
+      quitForLoop = false;
+      HARegionQueueAttributes harqa = new HARegionQueueAttributes();
+      harqa.setBlockingQueueCapacity(1);
+      harqa.setExpiryTime(180);
+      final HARegionQueue hrq = HARegionQueue.getHARegionQueueInstance(
+          "BlockingHARegionJUnitTest_Region", cache, harqa, 
HARegionQueue.BLOCKING_HA_QUEUE, false);
+      hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked 
for primary only.
+      final EventID id1 = new EventID(new byte[] {1}, 1, 2); // violation
+      final EventID ignore = new EventID(new byte[] {1}, 1, 1); //
+      final EventID id2 = new EventID(new byte[] {1}, 1, 3); //
+      Thread t1 = new Thread() {
+        public void run() {
+          try {
+            hrq.put(new ConflatableObject("key1", "value1", id1, false, 
"region1"));
+            hrq.take();
+            hrq.put(new ConflatableObject("key2", "value1", ignore, false, 
"region1"));
+            hrq.put(new ConflatableObject("key3", "value1", id2, false, 
"region1"));
+          } catch (Exception e) {
+            exceptionString.append("First Put in region queue failed");
+            exceptionOccurred = true;
+          }
         }
+      };
+      t1.start();
+      ThreadUtils.join(t1, 20 * 1000);
+      if (exceptionOccurred) {
+        fail(" Test failed due to " + exceptionString);
+      }
+    } finally {
+      if (cache != null) {
+        cache.close();
       }
-    });
-  }
-
-  private Thread newThread(Runnable runnable) {
-    Thread thread = new Thread(this.threadGroup, runnable);
-    this.threads.add(thread);
-    thread.start();
-    return thread;
-  }
-
-  private Thread startDoPuts(HARegionQueue haRegionQueue, int count) {
-    return startDoPuts(haRegionQueue, count, 0);
-  }
-
-  private Thread startDoPuts(HARegionQueue haRegionQueue, int count, int 
regionId) {
-    Thread thread = new DoPuts(this.threadGroup, haRegionQueue, count, 
regionId);
-    this.threads.add(thread);
-    thread.start();
-    return thread;
-  }
-
-  private Thread startDoTakes(HARegionQueue haRegionQueue, int count) {
-    Thread thread = new DoTakes(this.threadGroup, haRegionQueue, count);
-    this.threads.add(thread);
-    thread.start();
-    return thread;
-  }
-
-  private ConditionFactory await() {
-    return Awaitility.await().atMost(2, MINUTES);
-  }
-
-  int nextDoPutsThreadNum() {
-    synchronized (this.numberForThreadsLock) {
-      return numberForDoPuts++;
-    }
-  }
-
-  int nextDoTakesThreadNum() {
-    synchronized (this.numberForThreadsLock) {
-      return numberForDoTakes++;
     }
   }
 
   /**
    * class which does specified number of puts on the queue
    */
-  private class DoPuts extends Thread {
+  private static class DoPuts extends Thread {
 
-    private final HARegionQueue regionQueue;
+    HARegionQueue regionQueue = null;
+    final int numberOfPuts;
 
-    private final int numberOfPuts;
+    DoPuts(HARegionQueue haRegionQueue, int numberOfPuts) {
+      this.regionQueue = haRegionQueue;
+      this.numberOfPuts = numberOfPuts;
+    }
 
     /**
      * region id can be specified to generate Thread unique events
      */
-    private final int regionId;
+    int regionId = 0;
 
-    DoPuts(ThreadGroup threadGroup, HARegionQueue haRegionQueue, int 
numberOfPuts) {
-      this(threadGroup, haRegionQueue, numberOfPuts, 0);
-    }
-
-    DoPuts(ThreadGroup threadGroup, HARegionQueue haRegionQueue, int 
numberOfPuts, int regionId) {
-      super(threadGroup, "DoPuts-" + nextDoPutsThreadNum());
+    DoPuts(HARegionQueue haRegionQueue, int numberOfPuts, int regionId) {
       this.regionQueue = haRegionQueue;
       this.numberOfPuts = numberOfPuts;
       this.regionId = regionId;
@@ -330,16 +378,19 @@ public class BlockingHARegionJUnitTest {
 
     @Override
     public void run() {
-      for (int i = 0; i < this.numberOfPuts; i++) {
-        if (stopThreads || Thread.currentThread().isInterrupted()) {
-          break;
-        }
+      for (int i = 0; i < numberOfPuts; i++) {
         try {
           this.regionQueue.put(new ConflatableObject("" + i, "" + i,
-              new EventID(new byte[this.regionId], i, i), false, REGION));
+              new EventID(new byte[regionId], i, i), false, 
"BlockingHARegionJUnitTest_Region"));
+          if (quitForLoop) {
+            break;
+          }
+          if (Thread.currentThread().isInterrupted()) {
+            break;
+          }
         } catch (Exception e) {
-          errorCollector.addError(e);
-          break;
+          exceptionOccurred = true;
+          exceptionString.append(" Exception occurred due to " + e);
         }
       }
     }
@@ -348,29 +399,24 @@ public class BlockingHARegionJUnitTest {
   /**
    * class which does a specified number of takes
    */
-  private class DoTakes extends Thread {
+  private static class DoTake extends Thread {
 
-    private final HARegionQueue regionQueue;
+    final HARegionQueue regionQueue;
+    final int numberOfTakes;
 
-    private final int numberOfTakes;
-
-    DoTakes(ThreadGroup threadGroup, HARegionQueue haRegionQueue, int 
numberOfTakes) {
-      super(threadGroup, "DoTakes-" + nextDoTakesThreadNum());
+    DoTake(HARegionQueue haRegionQueue, int numberOfTakes) {
       this.regionQueue = haRegionQueue;
       this.numberOfTakes = numberOfTakes;
     }
 
     @Override
     public void run() {
-      for (int i = 0; i < this.numberOfTakes; i++) {
-        if (stopThreads || Thread.currentThread().isInterrupted()) {
-          break;
-        }
+      for (int i = 0; i < numberOfTakes; i++) {
         try {
           assertNotNull(this.regionQueue.take());
         } catch (Exception e) {
-          errorCollector.addError(e);
-          break;
+          exceptionOccurred = true;
+          exceptionString.append(" Exception occurred due to " + e);
         }
       }
     }

Reply via email to