Cleanup HARegionQueueJUnitTest and BlockingHARegionQueueJUnitTest

Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/a637f76c
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/a637f76c
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/a637f76c

Branch: refs/heads/feature/GEODE-2632-17
Commit: a637f76c8cdc1f9eddd0d4185667879f0221fd78
Parents: 7635f5b
Author: Kirk Lund <kl...@apache.org>
Authored: Mon May 22 17:23:46 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Wed May 24 16:41:08 2017 -0700

----------------------------------------------------------------------
 .../ha/BlockingHARegionQueueJUnitTest.java      |  169 +-
 .../cache/ha/HARegionQueueJUnitTest.java        | 2307 +++++++++---------
 2 files changed, 1167 insertions(+), 1309 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/a637f76c/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java
index 39aa1e6..b529f0c 100755
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java
@@ -14,166 +14,141 @@
  */
 package org.apache.geode.internal.cache.ha;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.*;
 
-import java.io.IOException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.awaitility.Awaitility;
-
-import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.cache.CacheException;
 import org.apache.geode.internal.cache.Conflatable;
 import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 
 /**
  * Test runs all tests of HARegionQueueJUnitTest using BlockingHARegionQueue 
instead of
  * HARegionQueue.
- * 
- * 
  */
 @Category({IntegrationTest.class, ClientSubscriptionTest.class})
 public class BlockingHARegionQueueJUnitTest extends HARegionQueueJUnitTest {
 
-  /**
-   * Creates Blocking HA region-queue object
-   * 
-   * @return Blocking HA region-queue object
-   * @throws IOException
-   * @throws ClassNotFoundException
-   * @throws CacheException
-   * @throws InterruptedException
-   */
-  protected HARegionQueue createHARegionQueue(String name)
-      throws IOException, ClassNotFoundException, CacheException, 
InterruptedException {
-    HARegionQueue regionqueue =
-        HARegionQueue.getHARegionQueueInstance(name, cache, 
HARegionQueue.BLOCKING_HA_QUEUE, false);
-    return regionqueue;
-  }
-
-  /**
-   * Creates Blocking HA region-queue object
-   * 
-   * @return Blocking HA region-queue object
-   * @throws IOException
-   * @throws ClassNotFoundException
-   * @throws CacheException
-   * @throws InterruptedException
-   */
-  protected HARegionQueue createHARegionQueue(String name, 
HARegionQueueAttributes attrs)
-      throws IOException, ClassNotFoundException, CacheException, 
InterruptedException {
-    HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name, 
cache, attrs,
-        HARegionQueue.BLOCKING_HA_QUEUE, false);
-    return regionqueue;
+  @Override
+  protected int queueType() {
+    return HARegionQueue.BLOCKING_HA_QUEUE;
   }
 
   /**
    * Tests the effect of a put which is blocked because of capacity constraint 
& subsequent passage
    * because of take operation
-   * 
    */
   @Test
-  public void testBlockingPutAndTake()
-      throws InterruptedException, IOException, ClassNotFoundException {
+  public void testBlockingPutAndTake() throws Exception {
     HARegionQueueAttributes hrqa = new HARegionQueueAttributes();
     hrqa.setBlockingQueueCapacity(1);
-    final HARegionQueue hrq = 
this.createHARegionQueue("testBlockingPutAndTake", hrqa);
-    hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for 
primary only.
+
+    HARegionQueue hrq = createHARegionQueue(this.testName.getMethodName(), 
hrqa);
+    hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked 
for primary only.
+
     EventID id1 = new EventID(new byte[] {1}, 1, 1);
     hrq.put(new ConflatableObject("key1", "val1", id1, false, "testing"));
-    Thread t1 = new Thread(new Runnable() {
-      public void run() {
-        try {
-          EventID id2 = new EventID(new byte[] {1}, 1, 2);
-          hrq.put(new ConflatableObject("key1", "val2", id2, false, 
"testing"));
-        } catch (Exception e) {
-          encounteredException = true;
-        }
+
+    AtomicBoolean threadStarted = new AtomicBoolean(false);
+
+    Thread thread = new Thread(() -> {
+      try {
+        threadStarted.set(true);
+        EventID id2 = new EventID(new byte[] {1}, 1, 2);
+        hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing"));
+      } catch (InterruptedException e) {
+        errorCollector.addError(e);
       }
     });
-    t1.start();
-    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> t1.isAlive());
+    thread.start();
+
+    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> 
threadStarted.get());
+
     Conflatable conf = (Conflatable) hrq.take();
-    assertNotNull(conf);
-    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !t1.isAlive());
+    assertThat(conf, notNullValue());
+
+    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> 
!thread.isAlive());
   }
 
   /**
    * Test Scenario : BlockingQueue capacity is 1. The first put should be 
successful. The second put
    * should block till a peek/remove happens.
-   * 
    */
   @Test
-  public void testBlockingPutAndPeekRemove()
-      throws InterruptedException, IOException, ClassNotFoundException {
+  public void testBlockingPutAndPeekRemove() throws Exception {
     HARegionQueueAttributes hrqa = new HARegionQueueAttributes();
     hrqa.setBlockingQueueCapacity(1);
-    final HARegionQueue hrq = 
this.createHARegionQueue("testBlockingPutAndPeekRemove", hrqa);
+
+    HARegionQueue hrq = createHARegionQueue(this.testName.getMethodName(), 
hrqa);
     hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for 
primary only.
+
     EventID id1 = new EventID(new byte[] {1}, 1, 1);
     hrq.put(new ConflatableObject("key1", "val1", id1, false, "testing"));
-    Thread t1 = new Thread(new Runnable() {
-      public void run() {
-        try {
-          EventID id2 = new EventID(new byte[] {1}, 1, 2);
-          hrq.put(new ConflatableObject("key1", "val2", id2, false, 
"testing"));
-        } catch (Exception e) {
-          encounteredException = true;
-        }
+
+    AtomicBoolean threadStarted = new AtomicBoolean(false);
+
+    Thread thread = new Thread(() -> {
+      try {
+        threadStarted.set(true);
+        EventID id2 = new EventID(new byte[] {1}, 1, 2);
+        hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing"));
+      } catch (Exception e) {
+        errorCollector.addError(e);
       }
     });
-    t1.start();
-    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> t1.isAlive());
+    thread.start();
+
+    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> 
threadStarted.get());
+
     Conflatable conf = (Conflatable) hrq.peek();
-    assertNotNull(conf);
+    assertThat(conf, notNullValue());
+
     hrq.remove();
-    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !t1.isAlive());
-    assertFalse("Exception occurred in put-thread", encounteredException);
 
+    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> 
!thread.isAlive());
   }
 
   /**
    * Test Scenario :Blocking Queue capacity is 1. The first put should be 
successful.The second put
    * should block till the first put expires.
-   * 
+   * <p>
+   * fix for 40314 - capacity constraint is checked for primary only and 
expiry is not applicable on
+   * primary so marking this test as invalid.
    */
-  // fix for 40314 - capacity constraint is checked for primary only and
-  // expiry is not applicable on primary so marking this test as invalid.
-  @Ignore
   @Test
-  public void testBlockingPutAndExpiry()
-      throws InterruptedException, IOException, ClassNotFoundException {
+  public void testBlockingPutAndExpiry() throws Exception {
     HARegionQueueAttributes hrqa = new HARegionQueueAttributes();
     hrqa.setBlockingQueueCapacity(1);
     hrqa.setExpiryTime(1);
-    final HARegionQueue hrq = 
this.createHARegionQueue("testBlockingPutAndExpiry", hrqa);
+
+    HARegionQueue hrq = 
this.createHARegionQueue(this.testName.getMethodName(), hrqa);
 
     EventID id1 = new EventID(new byte[] {1}, 1, 1);
-    long start = System.currentTimeMillis();
+
     hrq.put(new ConflatableObject("key1", "val1", id1, false, "testing"));
-    Thread t1 = new Thread(new Runnable() {
-      public void run() {
-        try {
-          EventID id2 = new EventID(new byte[] {1}, 1, 2);
-          hrq.put(new ConflatableObject("key1", "val2", id2, false, 
"testing"));
-        } catch (Exception e) {
-          encounteredException = true;
-        }
+
+    AtomicBoolean threadStarted = new AtomicBoolean(false);
+
+    Thread thread = new Thread(() -> {
+      try {
+        threadStarted.set(true);
+        EventID id2 = new EventID(new byte[] {1}, 1, 2);
+        hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing"));
+      } catch (Exception e) {
+        errorCollector.addError(e);
       }
     });
-    t1.start();
-    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> t1.isAlive());
-    waitAtLeast(1000, start, () -> {
-      assertFalse("Put-thread blocked unexpectedly", t1.isAlive());
-    });
-    assertFalse("Exception occurred in put-thread", encounteredException);
+    thread.start();
+
+    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> 
threadStarted.get());
+
+    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> 
!thread.isAlive());
   }
 }

Reply via email to