This is an automated email from the ASF dual-hosted git repository.
mkevo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 304e4d58 GEODE-8455: Fix difference between create region with gw
sender and a… (#5476)
304e4d58 is described below
commit 304e4d5857fc7539c054713e6eaddb0e74ea01d8
Author: Alberto Gomez <[email protected]>
AuthorDate: Tue Sep 1 11:19:52 2020 +0200
GEODE-8455: Fix difference between create region with gw sender and a…
(#5476)
* GEODE-8455: Fix difference between create region with gw sender and alter
region with gw sender
Geode behaves differently with respect to transactions when
creating a partitioned region with a parallel gateway sender to
when first the partitioned region is created and then the
parallel gateway sender is added by altering the region.
In the first case (create region with gateway sender), when
sending a transaction for the region, an event is sent
to each server hosting the bucket for the data in the transaction.
In the second case (create region + alter region),
when sending a transaction for the region, an event
is sent to every cache member and not only to those hosting
the bucket for the data in the transaction. This leads to,
in the servers not hosting the bucket for the data,
that the events are stored in the
bucketToTempQueueMap member variable of the
ParallelGatewaySenderQueue.
The behavior in the first case is the right one.
The wrong behavior is provoked by the setting to true of the
requiresNotification variable of the PartitionedRegion
by the distributeUpdatedProfileOnSenderCreation method.
This method should only set this variable to true
in case the sender is serial as it is done when
a partitioned region is initialized.
* feature/GEODE-8455: Fix bug shown in dist tests and minor changes after
review
* GEODE-8455: Remove unused parameter from helper method in WANTestBase
support test class
---
.../geode/internal/cache/PartitionedRegion.java | 11 +-
.../geode/internal/cache/wan/WANTestBase.java | 296 +++++--------------
.../parallel/ParallelWANPropagationDUnitTest.java | 313 ++++++++++++++-------
.../wan/parallel/ParallelWANStatsDUnitTest.java | 37 +--
.../cache/wan/serial/SerialWANStatsDUnitTest.java | 60 ++--
5 files changed, 315 insertions(+), 402 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 5ad77c6..b58b7eb 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -1253,7 +1253,7 @@ public class PartitionedRegion extends LocalRegion
void distributeUpdatedProfileOnSenderCreation() {
if (!(this.isClosed || this.isLocallyDestroyed)) {
// tell others of the change in status
- this.requiresNotification = true;
+ requiresNotification = true;
new UpdateAttributesProcessor(this).distribute(false);
}
}
@@ -1262,8 +1262,10 @@ public class PartitionedRegion extends LocalRegion
public void addGatewaySenderId(String gatewaySenderId) {
super.addGatewaySenderId(gatewaySenderId);
new UpdateAttributesProcessor(this).distribute();
- ((PartitionedRegion) this).distributeUpdatedProfileOnSenderCreation();
GatewaySender sender = getCache().getGatewaySender(gatewaySenderId);
+ if (sender != null && !sender.isParallel()) {
+ distributeUpdatedProfileOnSenderCreation();
+ }
if (sender != null && sender.isParallel() && sender.isRunning()) {
AbstractGatewaySender senderImpl = (AbstractGatewaySender) sender;
((ConcurrentParallelGatewaySenderQueue)
senderImpl.getQueues().toArray(new RegionQueue[1])[0])
@@ -1308,10 +1310,11 @@ public class PartitionedRegion extends LocalRegion
@Override
public void addAsyncEventQueueId(String asyncEventQueueId, boolean
isInternal) {
super.addAsyncEventQueueId(asyncEventQueueId, isInternal);
- new UpdateAttributesProcessor(this).distribute();
- ((PartitionedRegion) this).distributeUpdatedProfileOnSenderCreation();
GatewaySender sender = getCache()
.getGatewaySender(AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId(asyncEventQueueId));
+ if (sender != null && !sender.isParallel()) {
+ distributeUpdatedProfileOnSenderCreation();
+ }
if (sender != null && sender.isParallel() && sender.isRunning()) {
AbstractGatewaySender senderImpl = (AbstractGatewaySender) sender;
((ConcurrentParallelGatewaySenderQueue)
senderImpl.getQueues().toArray(new RegionQueue[1])[0])
diff --git
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 451a6a2..4b465e5 100644
---
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -66,6 +66,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.StringTokenizer;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
@@ -1111,14 +1112,7 @@ public class WANTestBase extends DistributedTestCase {
IgnoredException exp1 =
IgnoredException.addIgnoredException(InterruptedException.class.getName());
try {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
sender.start();
} finally {
exp.remove();
@@ -1136,14 +1130,7 @@ public class WANTestBase extends DistributedTestCase {
IgnoredException exp1 =
IgnoredException.addIgnoredException(InterruptedException.class.getName());
try {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
sender.startWithCleanQueue();
} finally {
exp.remove();
@@ -1154,26 +1141,12 @@ public class WANTestBase extends DistributedTestCase {
}
public static void enableConflation(String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- AbstractGatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = (AbstractGatewaySender) s;
- break;
- }
- }
+ AbstractGatewaySender sender = (AbstractGatewaySender)
getGatewaySender(senderId);
sender.test_setBatchConflationEnabled(true);
}
public static Map getSenderToReceiverConnectionInfo(String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
Map connectionInfo = null;
if (!sender.isParallel() && ((AbstractGatewaySender) sender).isPrimary()) {
connectionInfo = new HashMap();
@@ -1536,14 +1509,7 @@ public class WANTestBase extends DistributedTestCase {
}
public static HashMap checkQueue_BR(String senderId, int numBuckets) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
RegionQueue parallelQueue =
((AbstractGatewaySender) sender).getQueues().toArray(new
RegionQueue[1])[0];
@@ -1584,14 +1550,7 @@ public class WANTestBase extends DistributedTestCase {
}
private void addCacheListenerOnQueueBucketRegion(String senderId, int
numBuckets) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
RegionQueue parallelQueue =
((AbstractGatewaySender) sender).getQueues().toArray(new
RegionQueue[1])[0];
@@ -1630,14 +1589,7 @@ public class WANTestBase extends DistributedTestCase {
}
private void addCacheQueueListener(String senderId, boolean isParallel) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
listener1 = new QueueListener();
if (!isParallel) {
Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
@@ -1652,14 +1604,7 @@ public class WANTestBase extends DistributedTestCase {
}
private void addSecondCacheQueueListener(String senderId, boolean
isParallel) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
listener2 = new QueueListener();
if (!isParallel) {
Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
@@ -1678,14 +1623,7 @@ public class WANTestBase extends DistributedTestCase {
IgnoredException exp =
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
try {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
sender.pause();
((AbstractGatewaySender)
sender).getEventProcessor().waitForDispatcherToPause();
@@ -1700,14 +1638,7 @@ public class WANTestBase extends DistributedTestCase {
IgnoredException exp =
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
try {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
sender.resume();
} finally {
exp.remove();
@@ -1720,14 +1651,7 @@ public class WANTestBase extends DistributedTestCase {
IgnoredException exp =
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
try {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
AbstractGatewaySenderEventProcessor eventProcessor = null;
if (sender instanceof AbstractGatewaySender) {
eventProcessor = ((AbstractGatewaySender) sender).getEventProcessor();
@@ -1801,13 +1725,12 @@ public class WANTestBase extends DistributedTestCase {
Integer maxMemory, Integer batchSize, boolean isConflation, boolean
isPersistent,
GatewayEventFilter filter, boolean isManualStart) {
createSender(dsName, remoteDsId, isParallel, maxMemory, batchSize,
isConflation, isPersistent,
- filter, isManualStart, false, -1);
+ filter, isManualStart, false);
}
public static void createSender(String dsName, int remoteDsId, boolean
isParallel,
Integer maxMemory, Integer batchSize, boolean isConflation, boolean
isPersistent,
- GatewayEventFilter filter, boolean isManualStart,
- boolean groupTransactionEvents, int batchTimeInterval) {
+ GatewayEventFilter filter, boolean isManualStart, boolean
groupTransactionEvents) {
final IgnoredException exln = IgnoredException.addIgnoredException("Could
not connect");
try {
File persistentDirectory =
@@ -1821,9 +1744,6 @@ public class WANTestBase extends DistributedTestCase {
GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE);
gateway.setGroupTransactionEvents(groupTransactionEvents);
gateway.create(dsName, remoteDsId);
- if (batchTimeInterval > 0) {
- gateway.setBatchTimeInterval(batchTimeInterval);
- }
} finally {
exln.remove();
}
@@ -3176,14 +3096,7 @@ public class WANTestBase extends DistributedTestCase {
IgnoredException exp1 =
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
try {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- AbstractGatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = (AbstractGatewaySender) s;
- break;
- }
- }
+ AbstractGatewaySender sender = (AbstractGatewaySender)
getGatewaySender(senderId);
if (sender.isPrimary()) {
logger.info("Gateway sender is killed by a test");
cache.getDistributedSystem().disconnect();
@@ -3274,14 +3187,7 @@ public class WANTestBase extends DistributedTestCase {
public static void checkLocatorsinSender(String senderId, InetSocketAddress
locatorToWaitFor)
throws InterruptedException {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
MyLocatorCallback callback =
(MyLocatorCallback) ((AbstractGatewaySender)
sender).getLocatorDiscoveryCallback();
@@ -3297,14 +3203,7 @@ public class WANTestBase extends DistributedTestCase {
IgnoredException exp2 =
IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
try {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
if (!sender.isParallel()) {
final Set<RegionQueue> queues = ((AbstractGatewaySender)
sender).getQueues();
@@ -3332,14 +3231,7 @@ public class WANTestBase extends DistributedTestCase {
// Ensure that the sender's queue(s) have been closed.
public static void validateQueueClosedForConcurrentSerialGatewaySender(final
String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
final Set<RegionQueue> regionQueue;
if (sender instanceof AbstractGatewaySender) {
regionQueue = ((AbstractGatewaySender) sender).getQueues();
@@ -3351,14 +3243,7 @@ public class WANTestBase extends DistributedTestCase {
public static void
validateQueueContentsForConcurrentSerialGatewaySender(final String senderId,
final int regionSize) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
final Set<RegionQueue> regionQueue;
if (!sender.isParallel()) {
regionQueue = ((AbstractGatewaySender) sender).getQueues();
@@ -3375,14 +3260,7 @@ public class WANTestBase extends DistributedTestCase {
}
public static Integer getSecondaryQueueContentSize(final String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
int size = abstractSender.getSecondaryEventQueueSize();
return size;
@@ -3432,14 +3310,7 @@ public class WANTestBase extends DistributedTestCase {
}
public static Integer getQueueContentSize(final String senderId, boolean
includeSecondary) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
if (!sender.isParallel()) {
// if sender is serial, the queues will be all primary or all secondary
at one member
@@ -3466,14 +3337,7 @@ public class WANTestBase extends DistributedTestCase {
public static void validateParallelSenderQueueBucketSize(final String
senderId,
final int bucketSize) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
RegionQueue regionQueue =
((AbstractGatewaySender) sender).getQueues().toArray(new
RegionQueue[1])[0];
Set<BucketRegion> buckets = ((PartitionedRegion)
regionQueue.getRegion()).getDataStore()
@@ -3491,14 +3355,7 @@ public class WANTestBase extends DistributedTestCase {
IgnoredException exp1 =
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
try {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
final AbstractGatewaySender abstractSender = (AbstractGatewaySender)
sender;
RegionQueue queue = abstractSender.getEventProcessor().queue;
await().untilAsserted(() -> {
@@ -3523,14 +3380,7 @@ public class WANTestBase extends DistributedTestCase {
}
public static Integer validateAfterAck(final String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
final MyGatewayEventFilter_AfterAck filter =
(MyGatewayEventFilter_AfterAck) sender.getGatewayEventFilters().get(0);
@@ -3538,14 +3388,7 @@ public class WANTestBase extends DistributedTestCase {
}
public static int verifyAndGetEventsDispatchedByConcurrentDispatchers(final
String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
ConcurrentParallelGatewaySenderEventProcessor cProc =
(ConcurrentParallelGatewaySenderEventProcessor)
((AbstractGatewaySender) sender)
.getEventProcessor();
@@ -3561,14 +3404,7 @@ public class WANTestBase extends DistributedTestCase {
}
public static Long getNumberOfEntriesOverflownToDisk(final String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
long numEntries = 0;
if (sender.isParallel()) {
@@ -3581,14 +3417,7 @@ public class WANTestBase extends DistributedTestCase {
}
public static Long getNumberOfEntriesInVM(final String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
RegionQueue regionQueue;
long numEntries = 0;
if (sender.isParallel()) {
@@ -3599,21 +3428,42 @@ public class WANTestBase extends DistributedTestCase {
}
public static void verifyTmpDroppedEventSize(String senderId, int size) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
AbstractGatewaySender ags = (AbstractGatewaySender) sender;
await().untilAsserted(() -> assertEquals("Expected tmpDroppedEvents size:
" + size
+ " but actual size: " + ags.getTmpDroppedEventSize(), size,
ags.getTmpDroppedEventSize()));
}
- public static void verifyQueueSize(String senderId, int size) {
+ /**
+ * Checks that the bucketToTempQueueMap for a partitioned region
+ * that holds events for buckets that are not available locally, is empty.
+ */
+ public static void validateEmptyBucketToTempQueueMap(String senderId) {
+ GatewaySender sender = getGatewaySender(senderId);
+
+ int size = 0;
+ Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
+ for (Object queue : queues) {
+ PartitionedRegion region =
+ (PartitionedRegion) ((ConcurrentParallelGatewaySenderQueue)
queue).getRegion();
+ int buckets = region.getTotalNumberOfBuckets();
+ for (int bucket = 0; bucket < buckets; bucket++) {
+ BlockingQueue<GatewaySenderEventImpl> newQueue =
+ ((ConcurrentParallelGatewaySenderQueue)
queue).getBucketTmpQueue(bucket);
+ if (newQueue != null) {
+ size += newQueue.size();
+ }
+ }
+ }
+
+ final int finalSize = size;
+ assertEquals("Expected elements in TempQueueMap: " + 0
+ + " but actual size: " + finalSize, 0, finalSize);
+
+ }
+
+ private static GatewaySender getGatewaySender(String senderId) {
Set<GatewaySender> senders = cache.getGatewaySenders();
GatewaySender sender = null;
for (GatewaySender s : senders) {
@@ -3622,6 +3472,11 @@ public class WANTestBase extends DistributedTestCase {
break;
}
}
+ return sender;
+ }
+
+ public static void verifyQueueSize(String senderId, int size) {
+ GatewaySender sender = getGatewaySender(senderId);
if (!sender.isParallel()) {
final Set<RegionQueue> queues = ((AbstractGatewaySender)
sender).getQueues();
@@ -3639,14 +3494,7 @@ public class WANTestBase extends DistributedTestCase {
}
public static void verifyRegionQueueNotEmpty(String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
if (!sender.isParallel()) {
final Set<RegionQueue> queues = ((AbstractGatewaySender)
sender).getQueues();
@@ -3729,26 +3577,12 @@ public class WANTestBase extends DistributedTestCase {
}
public static void destroySender(String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
+ GatewaySender sender = getGatewaySender(senderId);
sender.destroy();
}
public static void verifySenderDestroyed(String senderId, boolean
isParallel) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- AbstractGatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = (AbstractGatewaySender) s;
- break;
- }
- }
+ AbstractGatewaySender sender = (AbstractGatewaySender)
getGatewaySender(senderId);
assertNull(sender);
String queueRegionNameSuffix = null;
diff --git
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
index a964e74..7423dbc 100644
---
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
+++
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
@@ -54,8 +54,8 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
@Test
public void test_ParallelGatewaySenderMetaRegionNotExposedToUser_Bug44216() {
- Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
createCache(lnPort);
createSender("ln", 2, true, 100, 300, false, false, null, true);
@@ -89,9 +89,9 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
}
@Test
- public void testParallelPropagation_withoutRemoteSite() throws Exception {
- Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+ public void testParallelPropagation_withoutRemoteSite() {
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -101,10 +101,10 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 300, false,
false, null, true));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 300, false,
false, null, true));
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
- vm6.invoke(createPartitionedRegionRedundancy1Runnable());
- vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm5.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm6.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm7.invoke(createPartitionedRegionRunnable("ln", 1));
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
@@ -139,9 +139,9 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
*
*/
@Test
- public void testParallelPropagation() throws Exception {
- Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+ public void testParallelPropagation() {
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
@@ -152,10 +152,10 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
- vm6.invoke(createPartitionedRegionRedundancy1Runnable());
- vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm5.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm6.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm7.invoke(createPartitionedRegionRunnable("ln", 1));
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
@@ -185,8 +185,10 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
isOffHeap());
}
- protected SerializableRunnableIF
createPartitionedRegionRedundancy1Runnable() {
- return () -> WANTestBase.createPartitionedRegion(getTestMethodName() +
"_PR", "ln", 1, 100,
+ protected SerializableRunnableIF createPartitionedRegionRunnable(String
senderIds,
+ int redundantCopies) {
+ return () -> WANTestBase.createPartitionedRegion(getTestMethodName() +
"_PR", senderIds,
+ redundantCopies, 100,
isOffHeap());
}
@@ -195,9 +197,9 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
}
@Test
- public void testParallelPropagation_ManualStart() throws Exception {
- Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+ public void testParallelPropagation_ManualStart() {
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
@@ -208,10 +210,10 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, false));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, false));
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
- vm6.invoke(createPartitionedRegionRedundancy1Runnable());
- vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm5.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm6.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm7.invoke(createPartitionedRegionRunnable("ln", 1));
vm2.invoke(createReceiverPartitionedRegionRedundancy1());
vm3.invoke(createReceiverPartitionedRegionRedundancy1());
@@ -239,9 +241,9 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
*
*/
@Test
- public void testParallelPropagationPutBeforeSenderStart() throws Exception {
- Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+ public void testParallelPropagationPutBeforeSenderStart() {
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
@@ -252,10 +254,10 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
- vm6.invoke(createPartitionedRegionRedundancy1Runnable());
- vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm5.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm6.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm7.invoke(createPartitionedRegionRunnable("ln", 1));
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000));
@@ -284,9 +286,9 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
*/
@Category({WanTest.class})
@Test
- public void testParallelPropagationWithLocalCacheClosedAndRebuilt() throws
Exception {
- Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+ public void testParallelPropagationWithLocalCacheClosedAndRebuilt() {
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
@@ -297,10 +299,10 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
- vm6.invoke(createPartitionedRegionRedundancy1Runnable());
- vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm5.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm6.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm7.invoke(createPartitionedRegionRunnable("ln", 1));
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
@@ -324,7 +326,7 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
vm7.invoke(() -> WANTestBase.killSender());
Integer regionSize =
- (Integer) vm2.invoke(() ->
WANTestBase.getRegionSize(getTestMethodName() + "_PR"));
+ vm2.invoke(() -> WANTestBase.getRegionSize(getTestMethodName() +
"_PR"));
LogWriterUtils.getLogWriter().info("Region size on remote is: " +
regionSize);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -334,10 +336,10 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
- vm6.invoke(createPartitionedRegionRedundancy1Runnable());
- vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm5.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm6.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm7.invoke(createPartitionedRegionRunnable("ln", 1));
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
@@ -364,9 +366,9 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
}
@Test
- public void testParallelColocatedPropagation() throws Exception {
- Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+ public void testParallelColocatedPropagation() {
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
@@ -412,9 +414,9 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
*/
@Test
- public void testParallelColocatedPropagation2() throws Exception {
- Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+ public void testParallelColocatedPropagation2() {
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
@@ -458,9 +460,9 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
@Category({WanTest.class})
@Test
- public void testParallelPropagationWithOverflow() throws Exception {
- Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+ public void testParallelPropagationWithOverflow() {
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
createCacheInVMs(nyPort, vm2, vm3);
vm2.invoke(
@@ -504,10 +506,10 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
}
@Test
- public void testSerialReplicatedAndParallelPartitionedPropagation() throws
Exception {
+ public void testSerialReplicatedAndParallelPartitionedPropagation() {
- Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
@@ -571,10 +573,10 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
}
@Test
- public void testPartitionedParallelPropagationToTwoWanSites() throws
Exception {
+ public void testPartitionedParallelPropagationToTwoWanSites() {
Integer lnPort = createFirstLocatorWithDSId(1);
- Integer nyPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
- Integer tkPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(3, lnPort));
+ Integer nyPort = vm0.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
+ Integer tkPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(3,
lnPort));
createCacheInVMs(nyPort, vm2);
vm2.invoke(createReceiverPartitionedRegionRedundancy1());
@@ -654,8 +656,8 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
IgnoredException.addIgnoredException("Broken pipe");
IgnoredException.addIgnoredException("Connection reset");
IgnoredException.addIgnoredException("Unexpected IOException");
- Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
@@ -708,9 +710,9 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
}
@Test
- public void testParallelPropagationWithFilter() throws Exception {
- Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+ public void testParallelPropagationWithFilter() {
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
@@ -755,10 +757,10 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
@Test
- public void testParallelPropagationWithPutAll() throws Exception {
+ public void testParallelPropagationWithPutAll() {
- Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
@@ -769,10 +771,10 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
- vm6.invoke(createPartitionedRegionRedundancy1Runnable());
- vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm5.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm6.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm7.invoke(createPartitionedRegionRunnable("ln", 1));
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
@@ -804,10 +806,10 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
*
*/
@Test
- public void testParallelPropagationWithDestroy() throws Exception {
+ public void testParallelPropagationWithDestroy() {
- Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
@@ -818,10 +820,10 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 100, false,
false, null, true));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 100, false,
false, null, true));
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
- vm6.invoke(createPartitionedRegionRedundancy1Runnable());
- vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm5.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm6.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm7.invoke(createPartitionedRegionRunnable("ln", 1));
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
@@ -874,9 +876,9 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
*
*/
@Test
- public void testParallelPropagationTxOperations() throws Exception {
- Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+ public void testParallelPropagationTxOperations() {
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);
@@ -891,8 +893,8 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
// vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
// true, 100, 10, false, false, null, true ));
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm5.invoke(createPartitionedRegionRunnable("ln", 1));
// vm6.invoke(() -> WANTestBase.createPartitionedRegion(
// testName + "_PR", "ln", true, 1, 100, isOffHeap() ));
// vm7.invoke(() -> WANTestBase.createPartitionedRegion(
@@ -925,8 +927,8 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
@Ignore
@Test
public void testParallelGatewaySenderQueueLocalSize() {
- Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
createCacheInVMs(nyPort, vm2);
createCacheInVMs(nyPort, vm2);
@@ -936,8 +938,8 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm5.invoke(createPartitionedRegionRunnable("ln", 1));
startSenderInVMs("ln", vm4, vm5);
@@ -962,8 +964,8 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
vm5.invoke(() -> WANTestBase.validateQueueContents("ln", 10));
// instead of checking size as 5 and 5. check that combined size is 10
- Integer localSize1 = (Integer) vm4.invoke(() ->
WANTestBase.getPRQLocalSize("ln"));
- Integer localSize2 = (Integer) vm5.invoke(() ->
WANTestBase.getPRQLocalSize("ln"));
+ Integer localSize1 = vm4.invoke(() -> WANTestBase.getPRQLocalSize("ln"));
+ Integer localSize2 = vm5.invoke(() -> WANTestBase.getPRQLocalSize("ln"));
assertEquals(10, localSize1 + localSize2);
}
@@ -973,8 +975,8 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
IgnoredException.addIgnoredException("Broken pipe");
IgnoredException.addIgnoredException("Connection reset");
IgnoredException.addIgnoredException("Unexpected IOException");
- Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
createCacheInVMs(nyPort, vm2);
createReceiverInVMs(vm2);
@@ -983,8 +985,8 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm4.invoke(createPartitionedRegionRunnable("ln", 1));
+ vm5.invoke(createPartitionedRegionRunnable("ln", 1));
startSenderInVMs("ln", vm4, vm5);
@@ -1008,8 +1010,8 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
vm4.invoke(() -> WANTestBase.validateQueueContents("ln", 10));
vm5.invoke(() -> WANTestBase.validateQueueContents("ln", 10));
- Integer localSize1 = (Integer) vm4.invoke(() ->
WANTestBase.getPRQLocalSize("ln"));
- Integer localSize2 = (Integer) vm5.invoke(() ->
WANTestBase.getPRQLocalSize("ln"));
+ Integer localSize1 = vm4.invoke(() -> WANTestBase.getPRQLocalSize("ln"));
+ Integer localSize2 = vm5.invoke(() -> WANTestBase.getPRQLocalSize("ln"));
assertEquals(10, localSize1 + localSize2);
vm5.invoke(() -> WANTestBase.killSender());
@@ -1026,8 +1028,8 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
@Test
@Parameters(method = "getRegionShortcuts")
public void
testParallelSenderAttachedToChildRegionButNotToParentRegion(RegionShortcut
shortcut) {
- Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
// create cache and receiver on site2
createCacheInVMs(nyPort, vm2);
@@ -1065,9 +1067,9 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
}
@Test
- public void testParallelPropagationWithFilter_AfterAck() throws Exception {
- Integer lnPort = (Integer) vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() ->
WANTestBase.createFirstRemoteLocator(2, lnPort));
+ public void testParallelPropagationWithFilter_AfterAck() {
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
createCacheInVMs(nyPort, vm6, vm7);
createReceiverInVMs(vm6, vm7);
@@ -1122,15 +1124,118 @@ public class ParallelWANPropagationDUnitTest extends
WANTestBase {
vm6.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(),
1000));
vm7.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(),
1000));
- Integer vm2Acks = (Integer) vm2.invoke(() ->
WANTestBase.validateAfterAck("ln"));
- Integer vm3Acks = (Integer) vm3.invoke(() ->
WANTestBase.validateAfterAck("ln"));
- Integer vm4Acks = (Integer) vm4.invoke(() ->
WANTestBase.validateAfterAck("ln"));
- Integer vm5Acks = (Integer) vm5.invoke(() ->
WANTestBase.validateAfterAck("ln"));
+ Integer vm2Acks = vm2.invoke(() -> WANTestBase.validateAfterAck("ln"));
+ Integer vm3Acks = vm3.invoke(() -> WANTestBase.validateAfterAck("ln"));
+ Integer vm4Acks = vm4.invoke(() -> WANTestBase.validateAfterAck("ln"));
+ Integer vm5Acks = vm5.invoke(() -> WANTestBase.validateAfterAck("ln"));
assertEquals(2000, (vm2Acks + vm3Acks + vm4Acks + vm5Acks));
}
+ /**
+ * Test that, when a parallel gateway sender is added to a partitioned
region through attributes
+ * mutator, transaction events are not sent to all region members but only
to those who are
+ * hosting the bucket for the event and thus, events are not stored in the
bucketToTempQueueMap
+ * member of the ParallelGatewaySenderQueue.
+ * Redundancy = 1 in the partitioned region.
+ *
+ */
+ @Test
+ public void
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutator()
{
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
+ vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
+ vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
+ vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
+
+ vm4.invoke(createPartitionedRegionRunnable(null, 1));
+ vm5.invoke(createPartitionedRegionRunnable(null, 1));
+ vm6.invoke(createPartitionedRegionRunnable(null, 1));
+ vm7.invoke(createPartitionedRegionRunnable(null, 1));
+
+ vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+ vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+ vm4.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() +
"_PR", "ln"));
+ vm5.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() +
"_PR", "ln"));
+ vm6.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() +
"_PR", "ln"));
+ vm7.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() +
"_PR", "ln"));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ vm4.invoke(() -> WANTestBase.doTxPuts(getTestMethodName() + "_PR"));
+
+ vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() +
"_PR", 3));
+ vm4.invoke(() -> WANTestBase.verifyQueueSize("ln", 3));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() +
"_PR", 0));
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() +
"_PR", 0));
+
+ vm4.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+ vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+ vm6.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+ vm7.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+ }
+
+ /**
+ * Test that, when a parallel gateway sender is added to a partitioned
region through attributes
+ * mutator, transaction events are not sent to all region members but only
to those who are
+ * hosting the bucket for the event and thus, events are not stored in the
bucketToTempQueueMap
+ * member of the ParallelGatewaySenderQueue.
+ * No redundancy in the partitioned region.
+ *
+ */
+ @Test
+ public void
testParallelPropagationTxNotificationsNotSentToAllRegionMembersWhenAddingParallelGatewaySenderThroughAttributesMutatorNoRedundancy()
{
+ Integer lnPort = vm0.invoke(() ->
WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,
lnPort));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
+ vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
+
+ vm4.invoke(createPartitionedRegionRunnable(null, 0));
+ vm5.invoke(createPartitionedRegionRunnable(null, 0));
+
+ vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+ vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+ vm4.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() +
"_PR", "ln"));
+ vm5.invoke(() -> addSenderThroughAttributesMutator(getTestMethodName() +
"_PR", "ln"));
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+
+ vm4.invoke(() -> WANTestBase.doTxPuts(getTestMethodName() + "_PR"));
+
+ vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() +
"_PR", 3));
+ vm4.invoke(() -> WANTestBase.verifyQueueSize("ln", 3));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() +
"_PR", 0));
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() +
"_PR", 0));
+
+ vm4.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+ vm5.invoke(() -> WANTestBase.validateEmptyBucketToTempQueueMap("ln"));
+ }
+
+
private static RegionShortcut[] getRegionShortcuts() {
return new RegionShortcut[] {RegionShortcut.PARTITION,
RegionShortcut.PARTITION_PERSISTENT};
}
diff --git
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
index 8213817..7155975 100644
---
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -323,8 +323,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
createCacheInVMs(nyPort, vm2);
createReceiverInVMs(vm2);
- int batchTimeInterval = 10000;
- createSenders(lnPort, false, batchTimeInterval);
+ createSenders(lnPort, false);
createReceiverCustomerOrderShipmentPR(vm2, 0);
@@ -394,8 +393,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
createCacheInVMs(nyPort, vm2);
createReceiverInVMs(vm2);
- int batchTimeInterval = 10000;
- createSenders(lnPort, true, batchTimeInterval);
+ createSenders(lnPort, true);
createReceiverCustomerOrderShipmentPR(vm2, 0);
@@ -473,8 +471,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
createCacheInVMs(nyPort, vm2);
- int batchTimeInterval = 10000;
- createSenders(lnPort, false, batchTimeInterval);
+ createSenders(lnPort, false);
createSenderCustomerOrderShipmentPRs(vm4, 0);
@@ -540,8 +537,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
createCacheInVMs(nyPort, vm2);
- int batchTimeInterval = 10000;
- createSenders(lnPort, true, batchTimeInterval);
+ createSenders(lnPort, true);
createReceiverCustomerOrderShipmentPR(vm2, 0);
@@ -787,20 +783,16 @@ public class ParallelWANStatsDUnitTest extends
WANTestBase {
boolean groupTransactionEvents = true;
vm4.invoke(
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false,
false, null, true,
- groupTransactionEvents,
- -1));
+ groupTransactionEvents));
vm5.invoke(
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false,
false, null, true,
- groupTransactionEvents,
- -1));
+ groupTransactionEvents));
vm6.invoke(
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false,
false, null, true,
- groupTransactionEvents,
- -1));
+ groupTransactionEvents));
vm7.invoke(
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false,
false, null, true,
- groupTransactionEvents,
- -1));
+ groupTransactionEvents));
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
@@ -1131,22 +1123,21 @@ public class ParallelWANStatsDUnitTest extends
WANTestBase {
vm.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true));
}
- protected void createSenders(Integer lnPort, boolean groupTransactionEvents,
- int batchTimeInterval) {
+ protected void createSenders(Integer lnPort, boolean groupTransactionEvents)
{
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true,
- groupTransactionEvents, batchTimeInterval));
+ groupTransactionEvents));
vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true,
- groupTransactionEvents, batchTimeInterval));
+ groupTransactionEvents));
vm6.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true,
- groupTransactionEvents, batchTimeInterval));
+ groupTransactionEvents));
vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false,
false, null, true,
- groupTransactionEvents, batchTimeInterval));
+ groupTransactionEvents));
}
protected void createSenders(Integer lnPort) {
- createSenders(lnPort, false, -1);
+ createSenders(lnPort, false);
}
private void verifyConflationIndexesSize(String senderId, int expectedSize,
VM... vms) {
diff --git
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
index 0af1d42..2cbdc35 100644
---
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
+++
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
@@ -97,24 +97,19 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
- int batchTimeInterval = 10000;
boolean groupTransactionEvents = false;
vm4.invoke(
() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false,
null, true,
- groupTransactionEvents,
- batchTimeInterval));
+ groupTransactionEvents));
vm5.invoke(
() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false,
null, true,
- groupTransactionEvents,
- batchTimeInterval));
+ groupTransactionEvents));
vm6.invoke(
() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false,
null, true,
- groupTransactionEvents,
- batchTimeInterval));
+ groupTransactionEvents));
vm7.invoke(
() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false,
null, true,
- groupTransactionEvents,
- batchTimeInterval));
+ groupTransactionEvents));
vm2.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR",
null, isOffHeap()));
@@ -159,24 +154,19 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
- int batchTimeInterval = 30000;
boolean groupTransactionEvents = true;
vm4.invoke(
() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false,
null, true,
- groupTransactionEvents,
- batchTimeInterval));
+ groupTransactionEvents));
vm5.invoke(
() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false,
null, true,
- groupTransactionEvents,
- batchTimeInterval));
+ groupTransactionEvents));
vm6.invoke(
() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false,
null, true,
- groupTransactionEvents,
- batchTimeInterval));
+ groupTransactionEvents));
vm7.invoke(
() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false,
null, true,
- groupTransactionEvents,
- batchTimeInterval));
+ groupTransactionEvents));
vm2.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR",
null, isOffHeap()));
@@ -230,24 +220,19 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
- int batchTimeInterval = 30000;
boolean groupTransactionEvents = false;
vm4.invoke(
() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false,
null, true,
- groupTransactionEvents,
- batchTimeInterval));
+ groupTransactionEvents));
vm5.invoke(
() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false,
null, true,
- groupTransactionEvents,
- batchTimeInterval));
+ groupTransactionEvents));
vm6.invoke(
() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false,
null, true,
- groupTransactionEvents,
- batchTimeInterval));
+ groupTransactionEvents));
vm7.invoke(
() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false,
null, true,
- groupTransactionEvents,
- batchTimeInterval));
+ groupTransactionEvents));
vm2.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR",
null, isOffHeap()));
@@ -298,25 +283,20 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
- int batchTimeInterval = 10000;
boolean groupTransactionEvents = true;
int batchSize = 10;
vm4.invoke(
() -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false,
false, null, true,
- groupTransactionEvents,
- batchTimeInterval));
+ groupTransactionEvents));
vm5.invoke(
() -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false,
false, null, true,
- groupTransactionEvents,
- batchTimeInterval));
+ groupTransactionEvents));
vm6.invoke(
() -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false,
false, null, true,
- groupTransactionEvents,
- batchTimeInterval));
+ groupTransactionEvents));
vm7.invoke(
() -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false,
false, null, true,
- groupTransactionEvents,
- batchTimeInterval));
+ groupTransactionEvents));
vm2.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR",
null, isOffHeap()));
@@ -533,16 +513,16 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
boolean groupTransactionEvents = true;
vm4.invoke(
() -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false,
false, null, true,
- groupTransactionEvents, -1));
+ groupTransactionEvents));
vm5.invoke(
() -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false,
false, null, true,
- groupTransactionEvents, -1));
+ groupTransactionEvents));
vm6.invoke(
() -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false,
false, null, true,
- groupTransactionEvents, -1));
+ groupTransactionEvents));
vm7.invoke(
() -> WANTestBase.createSender("ln", 2, false, 100, batchSize, false,
false, null, true,
- groupTransactionEvents, -1));
+ groupTransactionEvents));
vm2.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR",
null, isOffHeap()));