Cleaning up some duplicate code in AsyncEventListenerDUnitTest

This class was now getting a "code to large" complilation error with the
lambdas.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/58038ec1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/58038ec1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/58038ec1

Branch: refs/heads/feature/GEODE-866
Commit: 58038ec1ff842128a0337f29d73495188af0fe2b
Parents: 51c03a9
Author: Dan Smith <upthewatersp...@apache.org>
Authored: Thu Feb 11 16:41:44 2016 -0800
Committer: Dan Smith <upthewatersp...@apache.org>
Committed: Tue Feb 16 14:08:40 2016 -0800

----------------------------------------------------------------------
 .../asyncqueue/AsyncEventListenerDUnitTest.java | 446 +++++--------------
 1 file changed, 108 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/58038ec1/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
 
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
index f5ed32b..8bf2443 100644
--- 
a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
+++ 
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
@@ -84,10 +84,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
   public void testSerialAsyncEventQueueSize() {
     Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    createCaches(lnPort);
 
     vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         false, 100, 100, false, false, null, false ));
@@ -98,19 +95,9 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         false, 100, 100, false, false, null, false ));
 
-    vm4.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm5.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm6.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm7.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    createReplicatedRegionWithAsyncEventQueue();
 
-    vm4
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-    vm5
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-    vm6
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-    vm7
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
+    pauseAsyncEventQueues();
     Wait.pause(1000);// pause at least for the batchTimeInterval
 
     vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + 
"_RR",
@@ -121,6 +108,13 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm4size);
     assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm5size);
   }
+
+  protected void createCaches(Integer lnPort) {
+    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+  }
   
   /**
    * Added to reproduce defect #50366: 
@@ -129,20 +123,14 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
   public void testConcurrentSerialAsyncEventQueueSize() {
        Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-       vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-       vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-       vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-       vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+       createCaches(lnPort);
 
     vm4.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue( 
"ln",
         false, 100, 150, true, false, null, false, 2, OrderPolicy.KEY ));
     vm5.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue( 
"ln",
         false, 100, 150, true, false, null, false, 2, OrderPolicy.KEY ));
 
-       vm4.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-       vm5.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-       vm6.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-       vm7.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
+       createReplicatedRegionWithAsyncEventQueue();
 
        vm4
          .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
@@ -159,6 +147,13 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
        assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm4size);
        assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm5size);
   }
+
+  protected void createReplicatedRegionWithAsyncEventQueue() {
+    vm4.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm5.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm6.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm7.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
+  }
   
   /**
    * Test configuration::
@@ -170,10 +165,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
   public void testReplicatedSerialAsyncEventQueue() {
     Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    createCaches(lnPort);
 
     vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         false, 100, 100, false, false, null, false ));
@@ -184,18 +176,12 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         false, 100, 100, false, false, null, false ));
 
-    vm4.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm5.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm6.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm7.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    createReplicatedRegionWithAsyncEventQueue();
 
     vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + 
"_RR",
         1000 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
1000 ));// primary sender
-    vm5.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
0 ));// secondary
-    vm6.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
0 ));// secondary
-    vm7.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
0 ));// secondary
+    validate1000Puts();
   }
   
   /**
@@ -205,10 +191,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
   public void testReplicatedSerialAsyncEventQueueWithCacheLoader() {
     Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    createCaches(lnPort);
 
     vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         false, 100, 100, false, false, null, false, 
"MyAsyncEventListener_CacheLoader" ));
@@ -248,10 +231,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
   public void testReplicatedSerialAsyncEventQueue_ExceptionScenario() {
     Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    createCaches(lnPort);
 
     vm4.invoke(() -> 
AsyncEventQueueTestBase.createAsyncEventQueueWithCustomListener( "ln",
         false, 100, 100, false, false, null, false, 1 ));
@@ -262,19 +242,9 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     vm7.invoke(() -> 
AsyncEventQueueTestBase.createAsyncEventQueueWithCustomListener( "ln",
         false, 100, 100, false, false, null, false, 1 ));
 
-    vm4.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm5.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm6.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm7.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    createReplicatedRegionWithAsyncEventQueue();
     
-    vm4
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-    vm5
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-    vm6
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-    vm7
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
+    pauseAsyncEventQueues();
     Wait.pause(2000);// pause at least for the batchTimeInterval
 
     vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + 
"_RR",
@@ -300,10 +270,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
   public void testReplicatedSerialAsyncEventQueueWithConflationEnabled() {
     Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    createCaches(lnPort);
 
     vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         false, 100, 100, true, false, null, false ));
@@ -314,19 +281,9 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         false, 100, 100, true, false, null, false ));
 
-    vm4.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm5.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm6.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm7.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    createReplicatedRegionWithAsyncEventQueue();
 
-    vm4
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-    vm5
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-    vm6
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-    vm7
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
+    pauseAsyncEventQueues();
     Wait.pause(1000);// pause at least for the batchTimeInterval
 
     final Map keyValues = new HashMap();
@@ -367,10 +324,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     vm6.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue( "ln" ));
     vm7.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue( "ln" ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
1000 ));// primary sender
-    vm5.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
0 ));// secondary
-    vm6.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
0 ));// secondary
-    vm7.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
0 ));// secondary
+    validate1000Puts();
   }
 
   
@@ -400,18 +354,12 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         false, 100, 100, false, false, null, false ));
 
-    vm4.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm5.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm6.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm7.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    createReplicatedRegionWithAsyncEventQueue();
 
     vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + 
"_RR",
         1000 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
1000 ));// primary sender
-    vm5.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
0 ));// secondary
-    vm6.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
0 ));// secondary
-    vm7.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
0 ));// secondary
+    validate1000Puts();
   }
 
   /**
@@ -426,10 +374,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
   public void testReplicatedSerialAsyncEventQueueWithPeristenceEnabled() {
     Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    createCaches(lnPort);
 
     vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         false, 100, 100, true, false, null, false ));
@@ -440,13 +385,14 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         false, 100, 100, true, false, null, false ));
 
-    vm4.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm5.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm6.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm7.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    createReplicatedRegionWithAsyncEventQueue();
 
     vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + 
"_RR",
         1000 ));
+    validate1000Puts();
+  }
+
+  protected void validate1000Puts() {
     vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
1000 ));// primary sender
     vm5.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
0 ));// secondary
     vm6.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
0 ));// secondary
@@ -466,10 +412,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
   public void 
DISABLED_testReplicatedSerialAsyncEventQueueWithPeristenceEnabled_Restart() {
     Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    createCaches(lnPort);
 
     String firstDStore = (String)vm4.invoke(() -> 
AsyncEventQueueTestBase.createAsyncEventQueueWithDiskStore( "ln", false, 100,
             100, true, null ));
@@ -549,10 +492,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
   public void testConcurrentSerialAsyncEventQueueWithReplicatedRegion() {
     Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    createCaches(lnPort);
 
     vm4.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue( 
"ln",
         false, 100, 100, true, false, null, false, 3, OrderPolicy.KEY ));
@@ -563,17 +503,11 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     vm7.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue( 
"ln",
         false, 100, 100, true, false, null, false, 3, OrderPolicy.KEY ));
 
-    vm4.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm5.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm6.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm7.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    createReplicatedRegionWithAsyncEventQueue();
 
     vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + 
"_RR",
         1000 ));
-    vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 
1000 ));// primary sender
-    vm5.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 
0 ));// secondary
-    vm6.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 
0 ));// secondary
-    vm7.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 
0 ));// secondary
+    validate1000Puts();
   }
   
   /**
@@ -587,10 +521,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
   public void testConcurrentSerialAsyncEventQueueWithReplicatedRegion_2() {
     Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    createCaches(lnPort);
 
     vm4.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue( 
"ln",
         false, 100, 100, true, false, null, false, 3, OrderPolicy.THREAD ));
@@ -601,10 +532,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     vm7.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue( 
"ln",
         false, 100, 100, true, false, null, false, 3, OrderPolicy.THREAD ));
 
-    vm4.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm5.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm6.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm7.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    createReplicatedRegionWithAsyncEventQueue();
 
     vm4.invokeAsync(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() 
+ "_RR",
         500 ));
@@ -614,10 +542,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
 //    vm4.invokeAsync(() -> AsyncEventQueueTestBase.doPuts( 
getTestMethodName() + "_RR",
 //      1000, 1500 ));
     
-    vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 
1000 ));// primary sender
-    vm5.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 
0 ));// secondary
-    vm6.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 
0 ));// secondary
-    vm7.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 
0 ));// secondary
+    validate1000Puts();
   }
   
   /**
@@ -627,10 +552,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
   public void testConcurrentSerialAsyncEventQueueWithoutOrderPolicy() {
     Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    createCaches(lnPort);
 
     vm4.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue( 
"ln",
         false, 100, 100, true, false, null, false, 3, null ));
@@ -641,17 +563,11 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     vm7.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue( 
"ln",
         false, 100, 100, true, false, null, false, 3, null ));
 
-    vm4.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm5.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm6.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm7.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    createReplicatedRegionWithAsyncEventQueue();
 
     vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + 
"_RR",
         1000 ));
-    vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 
1000 ));// primary sender
-    vm5.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 
0 ));// secondary
-    vm6.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 
0 ));// secondary
-    vm7.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 
0 ));// secondary
+    validate1000Puts();
   }
 
   /**
@@ -663,10 +579,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
   public void testPartitionedSerialAsyncEventQueue() {
     Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    createCaches(lnPort);
 
     vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         false, 100, 100, false, false, null, false ));
@@ -677,19 +590,20 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         false, 100, 100, false, false, null, false ));
 
-    vm4.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm5.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm6.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm7.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
+    createPartitionedRegions();
 
     vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + 
"_PR",
         500 ));
     vm5.invoke(() -> AsyncEventQueueTestBase.doPutsFrom(
         getTestMethodName() + "_PR", 500, 1000 ));
-    vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
1000 ));// primary sender
-    vm5.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
0 ));// secondary
-    vm6.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
0 ));// secondary
-    vm7.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
0 ));// secondary
+    validate1000Puts();
+  }
+
+  protected void createPartitionedRegions() {
+    vm4.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
+    vm5.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
+    vm6.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
+    vm7.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
   }
 
   /**
@@ -701,10 +615,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
   public void testPartitionedSerialAsyncEventQueueWithConflationEnabled() {
     Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    createCaches(lnPort);
 
     vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         false, 100, 100, true, false, null, false ));
@@ -715,19 +626,9 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         false, 100, 100, true, false, null, false ));
 
-    vm4.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm5.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm6.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm7.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
+    createPartitionedRegions();
 
-    vm4
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-    vm5
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-    vm6
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-    vm7
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
+    pauseAsyncEventQueues();
     
     Wait.pause(2000);
 
@@ -768,10 +669,18 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     vm6.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue( "ln" ));
     vm7.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue( "ln" ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
1000 ));// primary sender
-    vm5.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
0 ));// secondary
-    vm6.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
0 ));// secondary
-    vm7.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
0 ));// secondary
+    validate1000Puts();
+  }
+
+  protected void pauseAsyncEventQueues() {
+    vm4
+        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
+    vm5
+        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
+    vm6
+        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
+    vm7
+        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
   }
 
   /**
@@ -785,10 +694,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
   public void testPartitionedSerialAsyncEventQueueWithPeristenceEnabled() {
     Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    createCaches(lnPort);
 
     vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         false, 100, 100, false, true, null, false ));
@@ -799,19 +705,13 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         false, 100, 100, false, true, null, false ));
 
-    vm4.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm5.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm6.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm7.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
+    createPartitionedRegions();
 
     vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + 
"_PR",
         500 ));
     vm5.invoke(() -> AsyncEventQueueTestBase.doPutsFrom(
         getTestMethodName() + "_PR", 500, 1000 ));
-    vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
1000 ));// primary sender
-    vm5.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
0 ));// secondary
-    vm6.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
0 ));// secondary
-    vm7.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
0 ));// secondary
+    validate1000Puts();
   }
 
   /**
@@ -825,10 +725,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
   public void 
testPartitionedSerialAsyncEventQueueWithPeristenceEnabled_Restart() {
     Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    createCaches(lnPort);
 
     String firstDStore = (String)vm4.invoke(() -> 
AsyncEventQueueTestBase.createAsyncEventQueueWithDiskStore( "ln", false, 100,
             100, true, null ));
@@ -858,10 +755,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     try {
       Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-      vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-      vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-      vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-      vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+      createCaches(lnPort);
 
       vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue(
           "ln", true, 100, 100, true, false, null, false ));
@@ -887,10 +781,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
   public void testParallelAsyncEventQueue() {
     Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    createCaches(lnPort);
 
     vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         true, 100, 100, false, false, null, false ));
@@ -901,18 +792,12 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         true, 100, 100, false, false, null, false ));
 
-    vm4.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm5.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm6.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm7.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
+    createPartitionedRegions();
 
     vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + 
"_PR",
         256 ));
     
-    vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
-    vm5.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
-    vm6.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
-    vm7.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
+    waitForAsyncQueueToGetEmpty();
     
     int vm4size = (Integer)vm4.invoke(() -> 
AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln"));
     int vm5size = (Integer)vm5.invoke(() -> 
AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln"));
@@ -929,10 +814,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
   public void testParallelAsyncEventQueueWithCacheLoader() {
     Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    createCaches(lnPort);
 
     vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
        true, 100, 100, false, false, null, false, 
"MyAsyncEventListener_CacheLoader" ));
@@ -959,10 +841,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
   public void testParallelAsyncEventQueueSize() {
     Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    createCaches(lnPort);
 
     vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         true, 100, 100, false, false, null, false ));
@@ -973,19 +852,9 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         true, 100, 100, false, false, null, false ));
 
-    vm4.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm5.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm6.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm7.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
+    createPartitionedRegions();
 
-    vm4
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-    vm5
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-    vm6
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-    vm7
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
+    pauseAsyncEventQueues();
     Wait.pause(1000);// pause at least for the batchTimeInterval
 
     vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + 
"_PR",
@@ -1005,10 +874,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
   public void testConcurrentParallelAsyncEventQueueSize() {
        Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-       vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-       vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-       vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-       vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+       createCaches(lnPort);
 
        vm4.invoke(() -> 
AsyncEventQueueTestBase.createConcurrentAsyncEventQueue( "ln",
          true, 100, 100, false, false, null, false, 2, OrderPolicy.KEY ));
@@ -1019,19 +885,9 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
        vm7.invoke(() -> 
AsyncEventQueueTestBase.createConcurrentAsyncEventQueue( "ln",
          true, 100, 100, false, false, null, false, 2, OrderPolicy.KEY ));
 
-       vm4.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-       vm5.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-       vm6.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-       vm7.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
+       createPartitionedRegions();
 
-       vm4
-         .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-       vm5
-         .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-       vm6
-         .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-       vm7
-         .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
+       pauseAsyncEventQueues();
        Wait.pause(1000);// pause at least for the batchTimeInterval
 
        vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + 
"_PR",
@@ -1047,10 +903,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
   public void testParallelAsyncEventQueueWithConflationEnabled() {
     Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    createCaches(lnPort);
 
     vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         true, 100, 100, true, false, null, false ));
@@ -1061,19 +914,9 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         true, 100, 100, true, false, null, false ));
 
-    vm4.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm5.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm6.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm7.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
+    createPartitionedRegions();
 
-    vm4
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-    vm5
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-    vm6
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-    vm7
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
+    pauseAsyncEventQueues();
 
     Wait.pause(2000);// pause for the batchTimeInterval to ensure that all the
     // senders are paused
@@ -1112,10 +955,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     vm6.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue( "ln" ));
     vm7.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue( "ln" ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
-    vm5.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
-    vm6.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
-    vm7.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
+    waitForAsyncQueueToGetEmpty();
     
     int vm4size = (Integer)vm4.invoke(() -> 
AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln"));
     int vm5size = (Integer)vm5.invoke(() -> 
AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln"));
@@ -1131,10 +971,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
   public void testParallelAsyncEventQueueWithConflationEnabled_bug47213() {
     Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    createCaches(lnPort);
 
     vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         true, 100, 100, true, false, null, false ));
@@ -1150,14 +987,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     vm6.invoke(() -> 
AsyncEventQueueTestBase.createPRWithRedundantCopyWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
     vm7.invoke(() -> 
AsyncEventQueueTestBase.createPRWithRedundantCopyWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
 
-    vm4
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-    vm5
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-    vm6
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
-    vm7
-        .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" ));
+    pauseAsyncEventQueues();
 
     Wait.pause(2000);// pause for the batchTimeInterval to ensure that all the
     // senders are paused
@@ -1195,10 +1025,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     vm6.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue( "ln" ));
     vm7.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue( "ln" ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
-    vm5.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
-    vm6.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
-    vm7.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
+    waitForAsyncQueueToGetEmpty();
     
     int vm4size = (Integer)vm4.invoke(() -> 
AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln"));
     int vm5size = (Integer)vm5.invoke(() -> 
AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln"));
@@ -1213,10 +1040,7 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
     vm3.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    createCaches(lnPort);
 
     vm3.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         true, 100, 100, false, false, null, false ));
@@ -1231,18 +1055,12 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
 
     vm3.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionAccessorWithAsyncEventQueue(
             getTestMethodName() + "_PR", "ln" ));
-    vm4.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm5.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm6.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm7.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
+    createPartitionedRegions();
 
     vm3.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + 
"_PR",
         256 ));
     
-    vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
-    vm5.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
-    vm6.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
-    vm7.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
+    waitForAsyncQueueToGetEmpty();
     
     vm3.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 
0 ));
     
@@ -1255,13 +1073,17 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
 
   }
 
+  protected void waitForAsyncQueueToGetEmpty() {
+    vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
+    vm5.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
+    vm6.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
+    vm7.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
+  }
+
   public void testParallelAsyncEventQueueWithPersistence() {
     Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
 
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
+    createCaches(lnPort);
 
     vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         true, 100, 100, false, true, null, false ));
@@ -1272,18 +1094,12 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
         true, 100, 100, false, true, null, false ));
 
-    vm4.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm5.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm6.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
-    vm7.invoke(() -> 
AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( 
getTestMethodName() + "_PR", "ln", isOffHeap() ));
+    createPartitionedRegions();
 
     vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + 
"_PR",
         256 ));
     
-    vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
-    vm5.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
-    vm6.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
-    vm7.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
+    waitForAsyncQueueToGetEmpty();
     
     int vm4size = (Integer)vm4.invoke(() -> 
AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln"));
     int vm5size = (Integer)vm5.invoke(() -> 
AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln"));
@@ -1293,52 +1109,6 @@ public class AsyncEventListenerDUnitTest extends 
AsyncEventQueueTestBase {
     assertEquals(vm4size + vm5size + vm6size + vm7size, 256);
   }
   
-  /**
-   * Below test is disabled intentionally Replicated region with Parallel Async
-   * Event queue is not supported. Test is added for the same
-   * testParallelAsyncEventQueueWithReplicatedRegion
-   * 
-   * We are gone support this configuration in upcoming releases
-   */
-  
-  public void DISABLED_DUETO_BUG51491_testReplicatedParallelAsyncEventQueue() {
-    Integer lnPort = (Integer)vm0.invoke(() -> 
AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 ));
-
-    vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort ));
-
-    vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
-        true, 100, 100, false, false, null, false ));
-    vm5.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
-        true, 100, 100, false, false, null, false ));
-    vm6.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
-        true, 100, 100, false, false, null, false ));
-    vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln",
-        true, 100, 100, false, false, null, false ));
-
-    vm4.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm5.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm6.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-    vm7.invoke(() -> 
AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( 
getTestMethodName() + "_RR", "ln", isOffHeap() ));
-
-    vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + 
"_RR",
-        1000 ));
-
-    vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
-    vm5.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
-    vm6.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
-    vm7.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" 
));
-
-    int vm4size = (Integer)vm4.invoke(() -> 
AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln" ));
-    int vm5size = (Integer)vm5.invoke(() -> 
AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln" ));
-    int vm6size = (Integer)vm6.invoke(() -> 
AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln" ));
-    int vm7size = (Integer)vm7.invoke(() -> 
AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln" ));
-
-    assertEquals(vm4size + vm5size + vm6size + vm7size, 1000);
-  }
-  
 /**
  * Test case to test possibleDuplicates. vm4 & vm5 are hosting the PR. vm5 is
  * killed so the buckets hosted by it are shifted to vm4.

Reply via email to