This is an automated email from the ASF dual-hosted git repository.

mkevo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 304e4d58 GEODE-8455: Fix difference between create region with gw 
sender and a… (#5476)
304e4d58 is described below

commit 304e4d5857fc7539c054713e6eaddb0e74ea01d8
Author: Alberto Gomez <[email protected]>
AuthorDate: Tue Sep 1 11:19:52 2020 +0200

    GEODE-8455: Fix difference between create region with gw sender and a… 
(#5476)
    
    * GEODE-8455: Fix difference between create region with gw sender and alter 
region with gw sender
    
    Geode behaves differently with respect to transactions when
    creating a partitioned region with a parallel gateway sender to
    when first the partitioned region is created and then the
    parallel gateway sender is added by altering the region.
    
    In the first case (create region with gateway sender), when
    sending a transaction for the region, an event is sent
    to each server hosting the bucket for the data in the transaction.
    
    In the second case (create region + alter region),
    when sending a transaction for the region, an event
    is sent to every cache member and not only to those hosting
    the bucket for the data in the transaction. This leads to,
    in the servers not hosting the bucket for the data,
    that the events are stored in the
    bucketToTempQueueMap member variable of the
    ParallelGatewaySenderQueue.
    
    The behavior in the first case is the right one.
    
    The wrong behavior is provoked by the setting to true of the
    requiresNotification variable of the PartitionedRegion
    by the distributeUpdatedProfileOnSenderCreation method.
    This method should only set this variable to true
    in case the sender is serial as it is done when
    a partitioned region is initialized.
    
    * feature/GEODE-8455: Fix bug shown in dist tests and minor changes after 
review
    
    * GEODE-8455: Remove unused parameter from helper method in WANTestBase 
support test class
---
 .../geode/internal/cache/PartitionedRegion.java    |  11 +-
 .../geode/internal/cache/wan/WANTestBase.java      | 296 +++++--------------
 .../parallel/ParallelWANPropagationDUnitTest.java  | 313 ++++++++++++++-------
 .../wan/parallel/ParallelWANStatsDUnitTest.java    |  37 +--
 .../cache/wan/serial/SerialWANStatsDUnitTest.java  |  60 ++--
 5 files changed, 315 insertions(+), 402 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 5ad77c6..b58b7eb 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -1253,7 +1253,7 @@ public class PartitionedRegion extends LocalRegion
   void distributeUpdatedProfileOnSenderCreation() {
     if (!(this.isClosed || this.isLocallyDestroyed)) {
       // tell others of the change in status
-      this.requiresNotification = true;
+      requiresNotification = true;
       new UpdateAttributesProcessor(this).distribute(false);
     }
   }
@@ -1262,8 +1262,10 @@ public class PartitionedRegion extends LocalRegion
   public void addGatewaySenderId(String gatewaySenderId) {
     super.addGatewaySenderId(gatewaySenderId);
     new UpdateAttributesProcessor(this).distribute();
-    ((PartitionedRegion) this).distributeUpdatedProfileOnSenderCreation();
     GatewaySender sender = getCache().getGatewaySender(gatewaySenderId);
+    if (sender != null && !sender.isParallel()) {
+      distributeUpdatedProfileOnSenderCreation();
+    }
     if (sender != null && sender.isParallel() && sender.isRunning()) {
       AbstractGatewaySender senderImpl = (AbstractGatewaySender) sender;
       ((ConcurrentParallelGatewaySenderQueue) 
senderImpl.getQueues().toArray(new RegionQueue[1])[0])
@@ -1308,10 +1310,11 @@ public class PartitionedRegion extends LocalRegion
   @Override
   public void addAsyncEventQueueId(String asyncEventQueueId, boolean 
isInternal) {
     super.addAsyncEventQueueId(asyncEventQueueId, isInternal);
-    new UpdateAttributesProcessor(this).distribute();
-    ((PartitionedRegion) this).distributeUpdatedProfileOnSenderCreation();
     GatewaySender sender = getCache()
         
.getGatewaySender(AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId(asyncEventQueueId));
+    if (sender != null && !sender.isParallel()) {
+      distributeUpdatedProfileOnSenderCreation();
+    }
     if (sender != null && sender.isParallel() && sender.isRunning()) {
       AbstractGatewaySender senderImpl = (AbstractGatewaySender) sender;
       ((ConcurrentParallelGatewaySenderQueue) 
senderImpl.getQueues().toArray(new RegionQueue[1])[0])
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 451a6a2..4b465e5 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -66,6 +66,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.StringTokenizer;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutionException;
@@ -1111,14 +1112,7 @@ public class WANTestBase extends DistributedTestCase {
     IgnoredException exp1 =
         
IgnoredException.addIgnoredException(InterruptedException.class.getName());
     try {
-      Set<GatewaySender> senders = cache.getGatewaySenders();
-      GatewaySender sender = null;
-      for (GatewaySender s : senders) {
-        if (s.getId().equals(senderId)) {
-          sender = s;
-          break;
-        }
-      }
+      GatewaySender sender = getGatewaySender(senderId);
       sender.start();
     } finally {
       exp.remove();
@@ -1136,14 +1130,7 @@ public class WANTestBase extends DistributedTestCase {
     IgnoredException exp1 =
         
IgnoredException.addIgnoredException(InterruptedException.class.getName());
     try {
-      Set<GatewaySender> senders = cache.getGatewaySenders();
-      GatewaySender sender = null;
-      for (GatewaySender s : senders) {
-        if (s.getId().equals(senderId)) {
-          sender = s;
-          break;
-        }
-      }
+      GatewaySender sender = getGatewaySender(senderId);
       sender.startWithCleanQueue();
     } finally {
       exp.remove();
@@ -1154,26 +1141,12 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static void enableConflation(String senderId) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    AbstractGatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = (AbstractGatewaySender) s;
-        break;
-      }
-    }
+    AbstractGatewaySender sender = (AbstractGatewaySender) 
getGatewaySender(senderId);
     sender.test_setBatchConflationEnabled(true);
   }
 
   public static Map getSenderToReceiverConnectionInfo(String senderId) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender = getGatewaySender(senderId);
     Map connectionInfo = null;
     if (!sender.isParallel() && ((AbstractGatewaySender) sender).isPrimary()) {
       connectionInfo = new HashMap();
@@ -1536,14 +1509,7 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static HashMap checkQueue_BR(String senderId, int numBuckets) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender = getGatewaySender(senderId);
     RegionQueue parallelQueue =
         ((AbstractGatewaySender) sender).getQueues().toArray(new 
RegionQueue[1])[0];
 
@@ -1584,14 +1550,7 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   private void addCacheListenerOnQueueBucketRegion(String senderId, int 
numBuckets) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender = getGatewaySender(senderId);
     RegionQueue parallelQueue =
         ((AbstractGatewaySender) sender).getQueues().toArray(new 
RegionQueue[1])[0];
 
@@ -1630,14 +1589,7 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   private void addCacheQueueListener(String senderId, boolean isParallel) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender = getGatewaySender(senderId);
     listener1 = new QueueListener();
     if (!isParallel) {
       Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
@@ -1652,14 +1604,7 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   private void addSecondCacheQueueListener(String senderId, boolean 
isParallel) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender = getGatewaySender(senderId);
     listener2 = new QueueListener();
     if (!isParallel) {
       Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
@@ -1678,14 +1623,7 @@ public class WANTestBase extends DistributedTestCase {
     IgnoredException exp =
         
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
     try {
-      Set<GatewaySender> senders = cache.getGatewaySenders();
-      GatewaySender sender = null;
-      for (GatewaySender s : senders) {
-        if (s.getId().equals(senderId)) {
-          sender = s;
-          break;
-        }
-      }
+      GatewaySender sender = getGatewaySender(senderId);
       sender.pause();
       ((AbstractGatewaySender) 
sender).getEventProcessor().waitForDispatcherToPause();
 
@@ -1700,14 +1638,7 @@ public class WANTestBase extends DistributedTestCase {
     IgnoredException exp =
         
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
     try {
-      Set<GatewaySender> senders = cache.getGatewaySenders();
-      GatewaySender sender = null;
-      for (GatewaySender s : senders) {
-        if (s.getId().equals(senderId)) {
-          sender = s;
-          break;
-        }
-      }
+      GatewaySender sender = getGatewaySender(senderId);
       sender.resume();
     } finally {
       exp.remove();
@@ -1720,14 +1651,7 @@ public class WANTestBase extends DistributedTestCase {
     IgnoredException exp =
         
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
     try {
-      Set<GatewaySender> senders = cache.getGatewaySenders();
-      GatewaySender sender = null;
-      for (GatewaySender s : senders) {
-        if (s.getId().equals(senderId)) {
-          sender = s;
-          break;
-        }
-      }
+      GatewaySender sender = getGatewaySender(senderId);
       AbstractGatewaySenderEventProcessor eventProcessor = null;
       if (sender instanceof AbstractGatewaySender) {
         eventProcessor = ((AbstractGatewaySender) sender).getEventProcessor();
@@ -1801,13 +1725,12 @@ public class WANTestBase extends DistributedTestCase {
       Integer maxMemory, Integer batchSize, boolean isConflation, boolean 
isPersistent,
       GatewayEventFilter filter, boolean isManualStart) {
     createSender(dsName, remoteDsId, isParallel, maxMemory, batchSize, 
isConflation, isPersistent,
-        filter, isManualStart, false, -1);
+        filter, isManualStart, false);
   }
 
   public static void createSender(String dsName, int remoteDsId, boolean 
isParallel,
       Integer maxMemory, Integer batchSize, boolean isConflation, boolean 
isPersistent,
-      GatewayEventFilter filter, boolean isManualStart,
-      boolean groupTransactionEvents, int batchTimeInterval) {
+      GatewayEventFilter filter, boolean isManualStart, boolean 
groupTransactionEvents) {
     final IgnoredException exln = IgnoredException.addIgnoredException("Could 
not connect");
     try {
       File persistentDirectory =
@@ -1821,9 +1744,6 @@ public class WANTestBase extends DistributedTestCase {
           GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE);
       gateway.setGroupTransactionEvents(groupTransactionEvents);
       gateway.create(dsName, remoteDsId);
-      if (batchTimeInterval > 0) {
-        gateway.setBatchTimeInterval(batchTimeInterval);
-      }
     } finally {
       exln.remove();
     }
@@ -3176,14 +3096,7 @@ public class WANTestBase extends DistributedTestCase {
     IgnoredException exp1 =
         
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
     try {
-      Set<GatewaySender> senders = cache.getGatewaySenders();
-      AbstractGatewaySender sender = null;
-      for (GatewaySender s : senders) {
-        if (s.getId().equals(senderId)) {
-          sender = (AbstractGatewaySender) s;
-          break;
-        }
-      }
+      AbstractGatewaySender sender = (AbstractGatewaySender) 
getGatewaySender(senderId);
       if (sender.isPrimary()) {
         logger.info("Gateway sender is killed by a test");
         cache.getDistributedSystem().disconnect();
@@ -3274,14 +3187,7 @@ public class WANTestBase extends DistributedTestCase {
   public static void checkLocatorsinSender(String senderId, InetSocketAddress 
locatorToWaitFor)
       throws InterruptedException {
 
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender = getGatewaySender(senderId);
 
     MyLocatorCallback callback =
         (MyLocatorCallback) ((AbstractGatewaySender) 
sender).getLocatorDiscoveryCallback();
@@ -3297,14 +3203,7 @@ public class WANTestBase extends DistributedTestCase {
     IgnoredException exp2 =
         
IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
     try {
-      Set<GatewaySender> senders = cache.getGatewaySenders();
-      GatewaySender sender = null;
-      for (GatewaySender s : senders) {
-        if (s.getId().equals(senderId)) {
-          sender = s;
-          break;
-        }
-      }
+      GatewaySender sender = getGatewaySender(senderId);
 
       if (!sender.isParallel()) {
         final Set<RegionQueue> queues = ((AbstractGatewaySender) 
sender).getQueues();
@@ -3332,14 +3231,7 @@ public class WANTestBase extends DistributedTestCase {
 
   // Ensure that the sender's queue(s) have been closed.
   public static void validateQueueClosedForConcurrentSerialGatewaySender(final 
String senderId) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender = getGatewaySender(senderId);
     final Set<RegionQueue> regionQueue;
     if (sender instanceof AbstractGatewaySender) {
       regionQueue = ((AbstractGatewaySender) sender).getQueues();
@@ -3351,14 +3243,7 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void 
validateQueueContentsForConcurrentSerialGatewaySender(final String senderId,
       final int regionSize) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender = getGatewaySender(senderId);
     final Set<RegionQueue> regionQueue;
     if (!sender.isParallel()) {
       regionQueue = ((AbstractGatewaySender) sender).getQueues();
@@ -3375,14 +3260,7 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static Integer getSecondaryQueueContentSize(final String senderId) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender = getGatewaySender(senderId);
     AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
     int size = abstractSender.getSecondaryEventQueueSize();
     return size;
@@ -3432,14 +3310,7 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static Integer getQueueContentSize(final String senderId, boolean 
includeSecondary) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender = getGatewaySender(senderId);
 
     if (!sender.isParallel()) {
       // if sender is serial, the queues will be all primary or all secondary 
at one member
@@ -3466,14 +3337,7 @@ public class WANTestBase extends DistributedTestCase {
 
   public static void validateParallelSenderQueueBucketSize(final String 
senderId,
       final int bucketSize) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender = getGatewaySender(senderId);
     RegionQueue regionQueue =
         ((AbstractGatewaySender) sender).getQueues().toArray(new 
RegionQueue[1])[0];
     Set<BucketRegion> buckets = ((PartitionedRegion) 
regionQueue.getRegion()).getDataStore()
@@ -3491,14 +3355,7 @@ public class WANTestBase extends DistributedTestCase {
     IgnoredException exp1 =
         
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
     try {
-      Set<GatewaySender> senders = cache.getGatewaySenders();
-      GatewaySender sender = null;
-      for (GatewaySender s : senders) {
-        if (s.getId().equals(senderId)) {
-          sender = s;
-          break;
-        }
-      }
+      GatewaySender sender = getGatewaySender(senderId);
       final AbstractGatewaySender abstractSender = (AbstractGatewaySender) 
sender;
       RegionQueue queue = abstractSender.getEventProcessor().queue;
       await().untilAsserted(() -> {
@@ -3523,14 +3380,7 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static Integer validateAfterAck(final String senderId) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender = getGatewaySender(senderId);
 
     final MyGatewayEventFilter_AfterAck filter =
         (MyGatewayEventFilter_AfterAck) sender.getGatewayEventFilters().get(0);
@@ -3538,14 +3388,7 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static int verifyAndGetEventsDispatchedByConcurrentDispatchers(final 
String senderId) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender = getGatewaySender(senderId);
     ConcurrentParallelGatewaySenderEventProcessor cProc =
         (ConcurrentParallelGatewaySenderEventProcessor) 
((AbstractGatewaySender) sender)
             .getEventProcessor();
@@ -3561,14 +3404,7 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static Long getNumberOfEntriesOverflownToDisk(final String senderId) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender = getGatewaySender(senderId);
 
     long numEntries = 0;
     if (sender.isParallel()) {
@@ -3581,14 +3417,7 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static Long getNumberOfEntriesInVM(final String senderId) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender = getGatewaySender(senderId);
     RegionQueue regionQueue;
     long numEntries = 0;
     if (sender.isParallel()) {
@@ -3599,21 +3428,42 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static void verifyTmpDroppedEventSize(String senderId, int size) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender = getGatewaySender(senderId);
 
     AbstractGatewaySender ags = (AbstractGatewaySender) sender;
     await().untilAsserted(() -> assertEquals("Expected tmpDroppedEvents size: 
" + size
         + " but actual size: " + ags.getTmpDroppedEventSize(), size, 
ags.getTmpDroppedEventSize()));
   }
 
-  public static void verifyQueueSize(String senderId, int size) {
+  /**
+   * Checks that the bucketToTempQueueMap for a partitioned region
+   * that holds events for buckets that are not available locally, is empty.
+   */
+  public static void validateEmptyBucketToTempQueueMap(String senderId) {
+    GatewaySender sender = getGatewaySender(senderId);
+
+    int size = 0;
+    Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
+    for (Object queue : queues) {
+      PartitionedRegion region =
+          (PartitionedRegion) ((ConcurrentParallelGatewaySenderQueue) 
queue).getRegion();
+      int buckets = region.getTotalNumberOfBuckets();
+      for (int bucket = 0; bucket < buckets; bucket++) {
+        BlockingQueue<GatewaySenderEventImpl> newQueue =
+            ((ConcurrentParallelGatewaySenderQueue) 
queue).getBucketTmpQueue(bucket);
+        if (newQueue != null) {
+          size += newQueue.size();
+        }
+      }
+    }
+
+    final int finalSize = size;
+    assertEquals("Expected elements in TempQueueMap: " + 0
+        + " but actual size: " + finalSize, 0, finalSize);
+
+  }
+
+  private static GatewaySender getGatewaySender(String senderId) {
     Set<GatewaySender> senders = cache.getGatewaySenders();
     GatewaySender sender = null;
     for (GatewaySender s : senders) {
@@ -3622,6 +3472,11 @@ public class WANTestBase extends DistributedTestCase {
         break;
       }
     }
+    return sender;
+  }
+
+  public static void verifyQueueSize(String senderId, int size) {
+    GatewaySender sender = getGatewaySender(senderId);
 
     if (!sender.isParallel()) {
       final Set<RegionQueue> queues = ((AbstractGatewaySender) 
sender).getQueues();
@@ -3639,14 +3494,7 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static void verifyRegionQueueNotEmpty(String senderId) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender = getGatewaySender(senderId);
 
     if (!sender.isParallel()) {
       final Set<RegionQueue> queues = ((AbstractGatewaySender) 
sender).getQueues();
@@ -3729,26 +3577,12 @@ public class WANTestBase extends DistributedTestCase {
   }
 
   public static void destroySender(String senderId) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = s;
-        break;
-      }
-    }
+    GatewaySender sender = getGatewaySender(senderId);
     sender.destroy();
   }
 
   public static void verifySenderDestroyed(String senderId, boolean 
isParallel) {
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    AbstractGatewaySender sender = null;
-    for (GatewaySender s : senders) {
-      if (s.getId().equals(senderId)) {
-        sender = (AbstractGatewaySender) s;
-        break;
-      }
-    }
+    AbstractGatewaySender sender = (AbstractGatewaySender) 
getGatewaySender(senderId);
     assertNull(sender);
 
     String queueRegionNameSuffix = null;
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
index a964e74..7423dbc 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
@@ -54,8 +54,8 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
 
   @Test
   public void test_ParallelGatewaySenderMetaRegionNotExposedToUser_Bug44216() {
-    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
     createCache(lnPort);
     createSender("ln", 2, true, 100, 300, false, false, null, true);
@@ -89,9 +89,9 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
   }
 
   @Test
-  public void testParallelPropagation_withoutRemoteSite() throws Exception {
-    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+  public void testParallelPropagation_withoutRemoteSite() {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
     createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
@@ -101,10 +101,10 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
     vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 300, false, 
false, null, true));
     vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 300, false, 
false, null, true));
 
-    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm6.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm5.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm6.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm7.invoke(createPartitionedRegionRunnable("ln", 1));
 
     startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
@@ -139,9 +139,9 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
    *
    */
   @Test
-  public void testParallelPropagation() throws Exception {
-    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+  public void testParallelPropagation() {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
     createReceiverInVMs(vm2, vm3);
@@ -152,10 +152,10 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
     vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
     vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
 
-    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm6.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm5.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm6.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm7.invoke(createPartitionedRegionRunnable("ln", 1));
 
     startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
@@ -185,8 +185,10 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
         isOffHeap());
   }
 
-  protected SerializableRunnableIF 
createPartitionedRegionRedundancy1Runnable() {
-    return () -> WANTestBase.createPartitionedRegion(getTestMethodName() + 
"_PR", "ln", 1, 100,
+  protected SerializableRunnableIF createPartitionedRegionRunnable(String 
senderIds,
+      int redundantCopies) {
+    return () -> WANTestBase.createPartitionedRegion(getTestMethodName() + 
"_PR", senderIds,
+        redundantCopies, 100,
         isOffHeap());
   }
 
@@ -195,9 +197,9 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
   }
 
   @Test
-  public void testParallelPropagation_ManualStart() throws Exception {
-    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+  public void testParallelPropagation_ManualStart() {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
     createReceiverInVMs(vm2, vm3);
@@ -208,10 +210,10 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
     vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, false));
     vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, false));
 
-    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm6.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm5.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm6.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm7.invoke(createPartitionedRegionRunnable("ln", 1));
 
     vm2.invoke(createReceiverPartitionedRegionRedundancy1());
     vm3.invoke(createReceiverPartitionedRegionRedundancy1());
@@ -239,9 +241,9 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
    *
    */
   @Test
-  public void testParallelPropagationPutBeforeSenderStart() throws Exception {
-    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+  public void testParallelPropagationPutBeforeSenderStart() {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
     createReceiverInVMs(vm2, vm3);
@@ -252,10 +254,10 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
     vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
     vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
 
-    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm6.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm5.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm6.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm7.invoke(createPartitionedRegionRunnable("ln", 1));
 
     vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000));
 
@@ -284,9 +286,9 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
    */
   @Category({WanTest.class})
   @Test
-  public void testParallelPropagationWithLocalCacheClosedAndRebuilt() throws 
Exception {
-    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+  public void testParallelPropagationWithLocalCacheClosedAndRebuilt() {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
     createReceiverInVMs(vm2, vm3);
@@ -297,10 +299,10 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
     vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
     vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
 
-    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm6.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm5.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm6.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm7.invoke(createPartitionedRegionRunnable("ln", 1));
 
     startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
@@ -324,7 +326,7 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
     vm7.invoke(() -> WANTestBase.killSender());
 
     Integer regionSize =
-        (Integer) vm2.invoke(() -> 
WANTestBase.getRegionSize(getTestMethodName() + "_PR"));
+        vm2.invoke(() -> WANTestBase.getRegionSize(getTestMethodName() + 
"_PR"));
     LogWriterUtils.getLogWriter().info("Region size on remote is: " + 
regionSize);
 
     createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -334,10 +336,10 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
     vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
     vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
 
-    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm6.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm5.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm6.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm7.invoke(createPartitionedRegionRunnable("ln", 1));
 
     startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
@@ -364,9 +366,9 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
   }
 
   @Test
-  public void testParallelColocatedPropagation() throws Exception {
-    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+  public void testParallelColocatedPropagation() {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
     createReceiverInVMs(vm2, vm3);
@@ -412,9 +414,9 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
    */
 
   @Test
-  public void testParallelColocatedPropagation2() throws Exception {
-    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+  public void testParallelColocatedPropagation2() {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
     createReceiverInVMs(vm2, vm3);
@@ -458,9 +460,9 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
 
   @Category({WanTest.class})
   @Test
-  public void testParallelPropagationWithOverflow() throws Exception {
-    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+  public void testParallelPropagationWithOverflow() {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
     vm2.invoke(
@@ -504,10 +506,10 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
   }
 
   @Test
-  public void testSerialReplicatedAndParallelPartitionedPropagation() throws 
Exception {
+  public void testSerialReplicatedAndParallelPartitionedPropagation() {
 
-    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
     createReceiverInVMs(vm2, vm3);
@@ -571,10 +573,10 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
   }
 
   @Test
-  public void testPartitionedParallelPropagationToTwoWanSites() throws 
Exception {
+  public void testPartitionedParallelPropagationToTwoWanSites() {
     Integer lnPort = createFirstLocatorWithDSId(1);
-    Integer nyPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
-    Integer tkPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(3, lnPort));
+    Integer nyPort = vm0.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+    Integer tkPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(3, 
lnPort));
 
     createCacheInVMs(nyPort, vm2);
     vm2.invoke(createReceiverPartitionedRegionRedundancy1());
@@ -654,8 +656,8 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
     IgnoredException.addIgnoredException("Broken pipe");
     IgnoredException.addIgnoredException("Connection reset");
     IgnoredException.addIgnoredException("Unexpected IOException");
-    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
     createReceiverInVMs(vm2, vm3);
@@ -708,9 +710,9 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
   }
 
   @Test
-  public void testParallelPropagationWithFilter() throws Exception {
-    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+  public void testParallelPropagationWithFilter() {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
     createReceiverInVMs(vm2, vm3);
@@ -755,10 +757,10 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
 
 
   @Test
-  public void testParallelPropagationWithPutAll() throws Exception {
+  public void testParallelPropagationWithPutAll() {
 
-    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
     createReceiverInVMs(vm2, vm3);
@@ -769,10 +771,10 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
     vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
     vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
 
-    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm6.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm5.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm6.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm7.invoke(createPartitionedRegionRunnable("ln", 1));
 
     startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
@@ -804,10 +806,10 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
    *
    */
   @Test
-  public void testParallelPropagationWithDestroy() throws Exception {
+  public void testParallelPropagationWithDestroy() {
 
-    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
     createReceiverInVMs(vm2, vm3);
@@ -818,10 +820,10 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
     vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 100, false, 
false, null, true));
     vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 100, false, 
false, null, true));
 
-    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm6.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm5.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm6.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm7.invoke(createPartitionedRegionRunnable("ln", 1));
 
     startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
@@ -874,9 +876,9 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
    *
    */
   @Test
-  public void testParallelPropagationTxOperations() throws Exception {
-    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+  public void testParallelPropagationTxOperations() {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
     createCacheInVMs(nyPort, vm2, vm3);
     createReceiverInVMs(vm2, vm3);
@@ -891,8 +893,8 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
     // vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
     // true, 100, 10, false, false, null, true ));
 
-    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm5.invoke(createPartitionedRegionRunnable("ln", 1));
     // vm6.invoke(() -> WANTestBase.createPartitionedRegion(
     // testName + "_PR", "ln", true, 1, 100, isOffHeap() ));
     // vm7.invoke(() -> WANTestBase.createPartitionedRegion(
@@ -925,8 +927,8 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
   @Ignore
   @Test
   public void testParallelGatewaySenderQueueLocalSize() {
-    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
     createCacheInVMs(nyPort, vm2);
     createCacheInVMs(nyPort, vm2);
@@ -936,8 +938,8 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
     vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
     vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
 
-    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm5.invoke(createPartitionedRegionRunnable("ln", 1));
 
     startSenderInVMs("ln", vm4, vm5);
 
@@ -962,8 +964,8 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
     vm5.invoke(() -> WANTestBase.validateQueueContents("ln", 10));
 
     // instead of checking size as 5 and 5. check that combined size is 10
-    Integer localSize1 = (Integer) vm4.invoke(() -> 
WANTestBase.getPRQLocalSize("ln"));
-    Integer localSize2 = (Integer) vm5.invoke(() -> 
WANTestBase.getPRQLocalSize("ln"));
+    Integer localSize1 = vm4.invoke(() -> WANTestBase.getPRQLocalSize("ln"));
+    Integer localSize2 = vm5.invoke(() -> WANTestBase.getPRQLocalSize("ln"));
     assertEquals(10, localSize1 + localSize2);
   }
 
@@ -973,8 +975,8 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
     IgnoredException.addIgnoredException("Broken pipe");
     IgnoredException.addIgnoredException("Connection reset");
     IgnoredException.addIgnoredException("Unexpected IOException");
-    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
     createCacheInVMs(nyPort, vm2);
     createReceiverInVMs(vm2);
@@ -983,8 +985,8 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
     vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
     vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
 
-    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
-    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+    vm5.invoke(createPartitionedRegionRunnable("ln", 1));
 
     startSenderInVMs("ln", vm4, vm5);
 
@@ -1008,8 +1010,8 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
     vm4.invoke(() -> WANTestBase.validateQueueContents("ln", 10));
     vm5.invoke(() -> WANTestBase.validateQueueContents("ln", 10));
 
-    Integer localSize1 = (Integer) vm4.invoke(() -> 
WANTestBase.getPRQLocalSize("ln"));
-    Integer localSize2 = (Integer) vm5.invoke(() -> 
WANTestBase.getPRQLocalSize("ln"));
+    Integer localSize1 = vm4.invoke(() -> WANTestBase.getPRQLocalSize("ln"));
+    Integer localSize2 = vm5.invoke(() -> WANTestBase.getPRQLocalSize("ln"));
     assertEquals(10, localSize1 + localSize2);
 
     vm5.invoke(() -> WANTestBase.killSender());
@@ -1026,8 +1028,8 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
   @Test
   @Parameters(method = "getRegionShortcuts")
   public void 
testParallelSenderAttachedToChildRegionButNotToParentRegion(RegionShortcut 
shortcut) {
-    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
     // create cache and receiver on site2
     createCacheInVMs(nyPort, vm2);
@@ -1065,9 +1067,9 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
   }
 
   @Test
-  public void testParallelPropagationWithFilter_AfterAck() throws Exception {
-    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
-    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+  public void testParallelPropagationWithFilter_AfterAck() {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
 
     createCacheInVMs(nyPort, vm6, vm7);
     createReceiverInVMs(vm6, vm7);
@@ -1122,15 +1124,118 @@ public class ParallelWANPropagationDUnitTest extends 
WANTestBase {
     vm6.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 
1000));
     vm7.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 
1000));
 
-    Integer vm2Acks = (Integer) vm2.invoke(() -> 
WANTestBase.validateAfterAck("ln"));
-    Integer vm3Acks = (Integer) vm3.invoke(() -> 
WANTestBase.validateAfterAck("ln"));
-    Integer vm4Acks = (Integer) vm4.invoke(() -> 
WANTestBase.validateAfterAck("ln"));
-    Integer vm5Acks = (Integer) vm5.invoke(() -> 
WANTestBase.validateAfterAck("ln"));
+    Integer vm2Acks = vm2.invoke(() -> WANTestBase.validateAfterAck("ln"));
+    Integer vm3Acks = vm3.invoke(() -> WANTestBase.validateAfterAck("ln"));
+    Integer vm4Acks = vm4.invoke(() -> WANTestBase.validateAfterAck("ln"));
+    Integer vm5Acks = vm5.invoke(() -> WANTestBase.validateAfterAck("ln"));
 
     assertEquals(2000, (vm2Acks + vm3Acks + vm4Acks + vm5Acks));
 
   }
 
+  /**
+   * Test that, when a parallel gateway sender is added to a partitioned 
region through attributes
+   * mutator, transaction events are not sent to all region members but only 
to those who are
+   * hosting the bucket for the event and thus, events are not stored in the 
bucketToTempQueueMap
+   * member of the ParallelGatewaySenderQueue.
+   * Redundancy = 1 in the partitioned region.
+   *
+   */
+  @Test
+  public void 
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutator()
 {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
+    vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
+    vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
+
+    vm4.invoke(createPartitionedRegionRunnable(null, 1));
+    vm5.invoke(createPartitionedRegionRunnable(null, 1));
+    vm6.invoke(createPartitionedRegionRunnable(null, 1));
+    vm7.invoke(createPartitionedRegionRunnable(null, 1));
+
+    vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+    vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+    vm4.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + 
"_PR", "ln"));
+    vm5.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + 
"_PR", "ln"));
+    vm6.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + 
"_PR", "ln"));
+    vm7.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + 
"_PR", "ln"));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+
+    vm4.invoke(() -> WANTestBase.doTxPuts(getTestMethodName() + "_PR"));
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 3));
+    vm4.invoke(() -> WANTestBase.verifyQueueSize("ln", 3));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 0));
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 0));
+
+    vm4.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+    vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+    vm6.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+    vm7.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+  }
+
+  /**
+   * Test that, when a parallel gateway sender is added to a partitioned 
region through attributes
+   * mutator, transaction events are not sent to all region members but only 
to those who are
+   * hosting the bucket for the event and thus, events are not stored in the 
bucketToTempQueueMap
+   * member of the ParallelGatewaySenderQueue.
+   * No redundancy in the partitioned region.
+   *
+   */
+  @Test
+  public void 
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutatorNoRedundancy()
 {
+    Integer lnPort = vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, 
lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
+    vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
+
+    vm4.invoke(createPartitionedRegionRunnable(null, 0));
+    vm5.invoke(createPartitionedRegionRunnable(null, 0));
+
+    vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+    vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+    vm4.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + 
"_PR", "ln"));
+    vm5.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() + 
"_PR", "ln"));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+
+    vm4.invoke(() -> WANTestBase.doTxPuts(getTestMethodName() + "_PR"));
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 3));
+    vm4.invoke(() -> WANTestBase.verifyQueueSize("ln", 3));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 0));
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 0));
+
+    vm4.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+    vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+  }
+
+
   private static RegionShortcut[] getRegionShortcuts() {
     return new RegionShortcut[] {RegionShortcut.PARTITION, 
RegionShortcut.PARTITION_PERSISTENT};
   }
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
index 8213817..7155975 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -323,8 +323,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
     createCacheInVMs(nyPort, vm2);
     createReceiverInVMs(vm2);
 
-    int batchTimeInterval = 10000;
-    createSenders(lnPort, false, batchTimeInterval);
+    createSenders(lnPort, false);
 
     createReceiverCustomerOrderShipmentPR(vm2, 0);
 
@@ -394,8 +393,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
     createCacheInVMs(nyPort, vm2);
     createReceiverInVMs(vm2);
 
-    int batchTimeInterval = 10000;
-    createSenders(lnPort, true, batchTimeInterval);
+    createSenders(lnPort, true);
 
     createReceiverCustomerOrderShipmentPR(vm2, 0);
 
@@ -473,8 +471,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
 
     createCacheInVMs(nyPort, vm2);
 
-    int batchTimeInterval = 10000;
-    createSenders(lnPort, false, batchTimeInterval);
+    createSenders(lnPort, false);
 
     createSenderCustomerOrderShipmentPRs(vm4, 0);
 
@@ -540,8 +537,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
 
     createCacheInVMs(nyPort, vm2);
 
-    int batchTimeInterval = 10000;
-    createSenders(lnPort, true, batchTimeInterval);
+    createSenders(lnPort, true);
 
     createReceiverCustomerOrderShipmentPR(vm2, 0);
 
@@ -787,20 +783,16 @@ public class ParallelWANStatsDUnitTest extends 
WANTestBase {
     boolean groupTransactionEvents = true;
     vm4.invoke(
         () -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, 
false, null, true,
-            groupTransactionEvents,
-            -1));
+            groupTransactionEvents));
     vm5.invoke(
         () -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, 
false, null, true,
-            groupTransactionEvents,
-            -1));
+            groupTransactionEvents));
     vm6.invoke(
         () -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, 
false, null, true,
-            groupTransactionEvents,
-            -1));
+            groupTransactionEvents));
     vm7.invoke(
         () -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, 
false, null, true,
-            groupTransactionEvents,
-            -1));
+            groupTransactionEvents));
 
     startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
@@ -1131,22 +1123,21 @@ public class ParallelWANStatsDUnitTest extends 
WANTestBase {
     vm.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true));
   }
 
-  protected void createSenders(Integer lnPort, boolean groupTransactionEvents,
-      int batchTimeInterval) {
+  protected void createSenders(Integer lnPort, boolean groupTransactionEvents) 
{
     createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
     vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true,
-        groupTransactionEvents, batchTimeInterval));
+        groupTransactionEvents));
     vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true,
-        groupTransactionEvents, batchTimeInterval));
+        groupTransactionEvents));
     vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true,
-        groupTransactionEvents, batchTimeInterval));
+        groupTransactionEvents));
     vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, 
false, null, true,
-        groupTransactionEvents, batchTimeInterval));
+        groupTransactionEvents));
   }
 
   protected void createSenders(Integer lnPort) {
-    createSenders(lnPort, false, -1);
+    createSenders(lnPort, false);
   }
 
   private void verifyConflationIndexesSize(String senderId, int expectedSize, 
VM... vms) {
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
index 0af1d42..2cbdc35 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
@@ -97,24 +97,19 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
 
     createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
-    int batchTimeInterval = 10000;
     boolean groupTransactionEvents = false;
     vm4.invoke(
         () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, 
null, true,
-            groupTransactionEvents,
-            batchTimeInterval));
+            groupTransactionEvents));
     vm5.invoke(
         () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, 
null, true,
-            groupTransactionEvents,
-            batchTimeInterval));
+            groupTransactionEvents));
     vm6.invoke(
         () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, 
null, true,
-            groupTransactionEvents,
-            batchTimeInterval));
+            groupTransactionEvents));
     vm7.invoke(
         () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, 
null, true,
-            groupTransactionEvents,
-            batchTimeInterval));
+            groupTransactionEvents));
 
     vm2.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR", 
null, isOffHeap()));
 
@@ -159,24 +154,19 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
 
     createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
-    int batchTimeInterval = 30000;
     boolean groupTransactionEvents = true;
     vm4.invoke(
         () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, 
null, true,
-            groupTransactionEvents,
-            batchTimeInterval));
+            groupTransactionEvents));
     vm5.invoke(
         () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, 
null, true,
-            groupTransactionEvents,
-            batchTimeInterval));
+            groupTransactionEvents));
     vm6.invoke(
         () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, 
null, true,
-            groupTransactionEvents,
-            batchTimeInterval));
+            groupTransactionEvents));
     vm7.invoke(
         () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, 
null, true,
-            groupTransactionEvents,
-            batchTimeInterval));
+            groupTransactionEvents));
 
     vm2.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR", 
null, isOffHeap()));
 
@@ -230,24 +220,19 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
 
     createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
-    int batchTimeInterval = 30000;
     boolean groupTransactionEvents = false;
     vm4.invoke(
         () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, 
null, true,
-            groupTransactionEvents,
-            batchTimeInterval));
+            groupTransactionEvents));
     vm5.invoke(
         () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, 
null, true,
-            groupTransactionEvents,
-            batchTimeInterval));
+            groupTransactionEvents));
     vm6.invoke(
         () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, 
null, true,
-            groupTransactionEvents,
-            batchTimeInterval));
+            groupTransactionEvents));
     vm7.invoke(
         () -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, 
null, true,
-            groupTransactionEvents,
-            batchTimeInterval));
+            groupTransactionEvents));
 
     vm2.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR", 
null, isOffHeap()));
 
@@ -298,25 +283,20 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
 
     createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
-    int batchTimeInterval = 10000;
     boolean groupTransactionEvents = true;
     int batchSize = 10;
     vm4.invoke(
         () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, 
false, null, true,
-            groupTransactionEvents,
-            batchTimeInterval));
+            groupTransactionEvents));
     vm5.invoke(
         () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, 
false, null, true,
-            groupTransactionEvents,
-            batchTimeInterval));
+            groupTransactionEvents));
     vm6.invoke(
         () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, 
false, null, true,
-            groupTransactionEvents,
-            batchTimeInterval));
+            groupTransactionEvents));
     vm7.invoke(
         () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, 
false, null, true,
-            groupTransactionEvents,
-            batchTimeInterval));
+            groupTransactionEvents));
 
     vm2.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR", 
null, isOffHeap()));
 
@@ -533,16 +513,16 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
     boolean groupTransactionEvents = true;
     vm4.invoke(
         () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, 
false, null, true,
-            groupTransactionEvents, -1));
+            groupTransactionEvents));
     vm5.invoke(
         () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, 
false, null, true,
-            groupTransactionEvents, -1));
+            groupTransactionEvents));
     vm6.invoke(
         () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, 
false, null, true,
-            groupTransactionEvents, -1));
+            groupTransactionEvents));
     vm7.invoke(
         () -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false, 
false, null, true,
-            groupTransactionEvents, -1));
+            groupTransactionEvents));
 
     vm2.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR", 
null, isOffHeap()));
 

Reply via email to