This is an automated email from the ASF dual-hosted git repository.
mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new bbe9a3a GEODE-9853: get all members hosting bucket (#7144)
bbe9a3a is described below
commit bbe9a3acf2f0812ef733dfe74f07fb9412c886e3
Author: Mario Ivanac <[email protected]>
AuthorDate: Thu Jan 27 11:23:29 2022 +0100
GEODE-9853: get all members hosting bucket (#7144)
* GEODE-9853: get all members hosting bucket
---
.../apache/geode/internal/cache/BucketAdvisor.java | 2 +-
...currentParallelGatewaySenderEventProcessor.java | 2 +-
.../wan/parallel/ParallelGatewaySenderQueue.java | 74 +++++++++----
.../wan/parallel/ParallelQueueRemovalMessage.java | 8 +-
.../geode/internal/cache/BucketAdvisorTest.java | 89 ++++++++++++++++
.../ParallelQueueRemovalMessageJUnitTest.java | 8 +-
.../geode/internal/cache/wan/WANTestBase.java | 114 ++++++++++----------
.../ParallelGatewaySenderOperationsDUnitTest.java | 116 +++++++++++++++++++++
8 files changed, 328 insertions(+), 85 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
index 29287c9..2b70f86 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
@@ -1768,7 +1768,7 @@ public class BucketAdvisor extends
CacheDistributionAdvisor {
return redundancyTracker.getCurrentRedundancy();
}
- Set<InternalDistributedMember> adviseInitialized() {
+ public Set<InternalDistributedMember> adviseInitialized() {
return adviseFilter(profile -> {
assert profile instanceof BucketProfile;
BucketProfile bucketProfile = (BucketProfile) profile;
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index bcd70b7..84205a0 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -167,7 +167,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor
ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue)
cpgsq.getQueueByBucket(bucketId);
boolean isPrimary =
prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
if (isPrimary) {
- pgsq.sendQueueRemovalMesssageForDroppedEvent(prQ, bucketId, shadowKey);
+ pgsq.sendQueueRemovalMessageForDroppedEvent(prQ, bucketId, shadowKey);
sender.getStatistics().incEventsDroppedDueToPrimarySenderNotRunning();
if (logger.isDebugEnabled()) {
logger.debug("register dropped event for primary queue. BucketId is "
+ bucketId
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 235947e..54715b7 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -105,7 +105,8 @@ public class ParallelGatewaySenderQueue implements
RegionQueue {
SEPARATOR + PartitionedRegionHelper.PR_ROOT_REGION_NAME + SEPARATOR;
// <PartitionedRegion, Map<Integer, List<Object>>>
- private final Map regionToDispatchedKeysMap = new ConcurrentHashMap();
+ private final Map<String, Map<Integer, List<Object>>>
regionToDispatchedKeysMap =
+ new ConcurrentHashMap<String, Map<Integer, List<Object>>>();
protected final StoppableReentrantLock buckToDispatchLock;
private final StoppableCondition regionToDispatchedKeysMapEmpty;
@@ -1172,9 +1173,10 @@ public class ParallelGatewaySenderQueue implements
RegionQueue {
lock.lock();
boolean wasEmpty = regionToDispatchedKeysMap.isEmpty();
try {
- Map bucketIdToDispatchedKeys = (Map)
regionToDispatchedKeysMap.get(prQ.getFullPath());
+ Map<Integer, List<Object>> bucketIdToDispatchedKeys =
+ regionToDispatchedKeysMap.get(prQ.getFullPath());
if (bucketIdToDispatchedKeys == null) {
- bucketIdToDispatchedKeys = new ConcurrentHashMap();
+ bucketIdToDispatchedKeys = new ConcurrentHashMap<Integer,
List<Object>>();
regionToDispatchedKeysMap.put(prQ.getFullPath(),
bucketIdToDispatchedKeys);
}
addRemovedEventToMap(bucketIdToDispatchedKeys, bucketId, key);
@@ -1187,23 +1189,26 @@ public class ParallelGatewaySenderQueue implements
RegionQueue {
}
}
- public void sendQueueRemovalMesssageForDroppedEvent(PartitionedRegion prQ,
int bucketId,
+ public void sendQueueRemovalMessageForDroppedEvent(PartitionedRegion prQ,
int bucketId,
Object key) {
- final HashMap<String, Map<Integer, List>> temp = new HashMap<>();
- Map bucketIdToDispatchedKeys = new ConcurrentHashMap();
- temp.put(prQ.getFullPath(), bucketIdToDispatchedKeys);
- addRemovedEventToMap(bucketIdToDispatchedKeys, bucketId, key);
+
Set<InternalDistributedMember> recipients =
- removalThread.getAllRecipients(sender.getCache(), temp);
+ removalThread.getAllRecipientsForEvent(sender.getCache(),
prQ.getFullPath(), bucketId);
+
if (!recipients.isEmpty()) {
+ final Map<String, Map<Integer, List<Object>>> temp = new HashMap<>();
+ Map<Integer, List<Object>> bucketIdToDispatchedKeys = new
ConcurrentHashMap<>();
+ temp.put(prQ.getFullPath(), bucketIdToDispatchedKeys);
+ addRemovedEventToMap(bucketIdToDispatchedKeys, bucketId, key);
ParallelQueueRemovalMessage pqrm = new ParallelQueueRemovalMessage(temp);
pqrm.setRecipients(recipients);
sender.getCache().getInternalDistributedSystem().getDistributionManager().putOutgoing(pqrm);
}
}
- private void addRemovedEventToMap(Map bucketIdToDispatchedKeys, int
bucketId, Object key) {
- List dispatchedKeys = (List) bucketIdToDispatchedKeys.get(bucketId);
+ private void addRemovedEventToMap(Map<Integer, List<Object>>
bucketIdToDispatchedKeys,
+ int bucketId, Object key) {
+ List<Object> dispatchedKeys = bucketIdToDispatchedKeys.get(bucketId);
if (dispatchedKeys == null) {
dispatchedKeys = new ArrayList<>();
bucketIdToDispatchedKeys.put(bucketId, dispatchedKeys);
@@ -1215,9 +1220,10 @@ public class ParallelGatewaySenderQueue implements
RegionQueue {
buckToDispatchLock.lock();
boolean wasEmpty = regionToDispatchedKeysMap.isEmpty();
try {
- Map bucketIdToDispatchedKeys = (Map)
regionToDispatchedKeysMap.get(prQ.getFullPath());
+ Map<Integer, List<Object>> bucketIdToDispatchedKeys =
+ regionToDispatchedKeysMap.get(prQ.getFullPath());
if (bucketIdToDispatchedKeys == null) {
- bucketIdToDispatchedKeys = new ConcurrentHashMap();
+ bucketIdToDispatchedKeys = new ConcurrentHashMap<>();
regionToDispatchedKeysMap.put(prQ.getFullPath(),
bucketIdToDispatchedKeys);
}
addRemovedEventsToMap(bucketIdToDispatchedKeys, bucketId, shadowKeys);
@@ -1233,9 +1239,9 @@ public class ParallelGatewaySenderQueue implements
RegionQueue {
buckToDispatchLock.lock();
boolean wasEmpty = regionToDispatchedKeysMap.isEmpty();
try {
- Map bucketIdToDispatchedKeys = (Map)
regionToDispatchedKeysMap.get(prQPath);
+ Map<Integer, List<Object>> bucketIdToDispatchedKeys =
regionToDispatchedKeysMap.get(prQPath);
if (bucketIdToDispatchedKeys == null) {
- bucketIdToDispatchedKeys = new ConcurrentHashMap();
+ bucketIdToDispatchedKeys = new ConcurrentHashMap<>();
regionToDispatchedKeysMap.put(prQPath, bucketIdToDispatchedKeys);
}
addRemovedEventsToMap(bucketIdToDispatchedKeys, bucketId, shadowKeys);
@@ -1247,8 +1253,9 @@ public class ParallelGatewaySenderQueue implements
RegionQueue {
}
}
- private void addRemovedEventsToMap(Map bucketIdToDispatchedKeys, int
bucketId, List keys) {
- List dispatchedKeys = (List) bucketIdToDispatchedKeys.get(bucketId);
+ private void addRemovedEventsToMap(Map<Integer, List<Object>>
bucketIdToDispatchedKeys,
+ int bucketId, List<Object> keys) {
+ List<Object> dispatchedKeys = bucketIdToDispatchedKeys.get(bucketId);
if (dispatchedKeys == null) {
dispatchedKeys = keys == null ? new ArrayList<>() : keys;
} else {
@@ -1886,7 +1893,7 @@ public class ParallelGatewaySenderQueue implements
RegionQueue {
}
}
- final HashMap<String, Map<Integer, List>> temp;
+ final Map<String, Map<Integer, List<Object>>> temp;
buckToDispatchLock.lock();
try {
boolean wasEmpty = regionToDispatchedKeysMap.isEmpty();
@@ -1897,7 +1904,9 @@ public class ParallelGatewaySenderQueue implements
RegionQueue {
continue;
}
// TODO: This should be optimized.
+
temp = new HashMap<>(regionToDispatchedKeysMap);
+
regionToDispatchedKeysMap.clear();
} finally {
buckToDispatchLock.unlock();
@@ -1966,6 +1975,35 @@ public class ParallelGatewaySenderQueue implements
RegionQueue {
return recipients;
}
+ private Set<InternalDistributedMember>
getAllRecipientsForEvent(InternalCache cache,
+ String partitionedRegionName, int bucketId) {
+ Set<InternalDistributedMember> recipients = new ObjectOpenHashSet<>();
+ PartitionedRegion partitionedRegion =
+ (PartitionedRegion) cache.getRegion(partitionedRegionName);
+ if (partitionedRegion != null && partitionedRegion.getRegionAdvisor() !=
null) {
+ final String bucketFullPath =
+ SEPARATOR + PartitionedRegionHelper.PR_ROOT_REGION_NAME + SEPARATOR
+ + partitionedRegion.getBucketName(bucketId);
+ AbstractBucketRegionQueue bucketRegionQueue =
+ (AbstractBucketRegionQueue)
cache.getInternalRegionByPath(bucketFullPath);
+ if (bucketRegionQueue != null && bucketRegionQueue.getBucketAdvisor()
!= null) {
+ Set<InternalDistributedMember> bucketMembers =
+ bucketRegionQueue.getBucketAdvisor().adviseInitialized();
+ if (!bucketMembers.isEmpty()) {
+ recipients.addAll(bucketMembers);
+ } else {
+
recipients.addAll(partitionedRegion.getRegionAdvisor().adviseDataStore());
+ }
+ } else {
+
recipients.addAll(partitionedRegion.getRegionAdvisor().adviseDataStore());
+ }
+ }
+
+ return recipients;
+ }
+
+
+
/**
* shutdown this thread and the caller thread will join this thread
*/
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
index cc801f2..10951e8 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
@@ -20,7 +20,6 @@ import static
org.apache.geode.internal.cache.LocalRegion.InitializationLevel.BE
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -59,12 +58,13 @@ public class ParallelQueueRemovalMessage extends
PooledDistributionMessage {
private static final Logger logger = LogService.getLogger();
- private HashMap regionToDispatchedKeysMap;
+ private Map<String, Map<Integer, List<Object>>> regionToDispatchedKeysMap;
public ParallelQueueRemovalMessage() {}
- public ParallelQueueRemovalMessage(HashMap rgnToDispatchedKeysMap) {
- regionToDispatchedKeysMap = rgnToDispatchedKeysMap;
+ public ParallelQueueRemovalMessage(
+ Map<String, Map<Integer, List<Object>>> rgnToDispatchedKeysMap) {
+ this.regionToDispatchedKeysMap = rgnToDispatchedKeysMap;
}
@Override
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
index 1c8f1e1..817386c 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
@@ -228,4 +228,93 @@ public class BucketAdvisorTest {
// Unknown Shadow Bucket
assertThat(bucketAdvisor.isShadowBucketDestroyed(SEPARATOR +
"b5")).isFalse();
}
+
+ @Test
+ public void
testGetAllHostingMembersReturnsNoMembersWhenBucketAdvisorHasNoProfiles() {
+ DistributionManager distributionManager = mock(DistributionManager.class);
+ when(distributionManager.getId()).thenReturn(new
InternalDistributedMember("localhost", 321));
+
+ Bucket bucket = mock(Bucket.class);
+ when(bucket.isHosting()).thenReturn(true);
+ when(bucket.isPrimary()).thenReturn(false);
+ when(bucket.getDistributionManager()).thenReturn(distributionManager);
+
+ PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+ when(partitionedRegion.getRedundantCopies()).thenReturn(0);
+ when(partitionedRegion.getPartitionAttributes()).thenReturn(new
PartitionAttributesImpl());
+ RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+ when(regionAdvisor.getPartitionedRegion()).thenReturn(partitionedRegion);
+
+ BucketAdvisor bucketAdvisor = BucketAdvisor.createBucketAdvisor(bucket,
regionAdvisor);
+ bucketAdvisor.setInitialized();
+
+ assertThat(bucketAdvisor.adviseInitialized().isEmpty()).isTrue();
+ }
+
+ @Test
+ public void
testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasOneProfileWithHostingBucket()
{
+ DistributionManager distributionManager = mock(DistributionManager.class);
+ InternalDistributedMember memberId = new
InternalDistributedMember("localhost", 321);
+
+ when(distributionManager.getId()).thenReturn(memberId);
+
+ Bucket bucket = mock(Bucket.class);
+ when(bucket.isHosting()).thenReturn(true);
+ when(bucket.isPrimary()).thenReturn(false);
+ when(bucket.getDistributionManager()).thenReturn(distributionManager);
+
+ PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+ when(partitionedRegion.getRedundantCopies()).thenReturn(0);
+ when(partitionedRegion.getPartitionAttributes()).thenReturn(new
PartitionAttributesImpl());
+ when(partitionedRegion.getRedundancyTracker())
+ .thenReturn(mock(PartitionedRegionRedundancyTracker.class));
+
+ RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+ when(regionAdvisor.getPartitionedRegion()).thenReturn(partitionedRegion);
+
+ BucketAdvisor bucketAdvisor = BucketAdvisor.createBucketAdvisor(bucket,
regionAdvisor);
+ bucketAdvisor.setInitialized();
+
+ BucketAdvisor.BucketProfile bp = new BucketAdvisor.BucketProfile(memberId,
0, bucket);
+
+ assertThat(bucketAdvisor.putProfile(bp, true)).isTrue();
+ assertThat(bucketAdvisor.adviseInitialized().size()).isEqualTo(1);
+ }
+
+ @Test
+ public void
testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasTwoProfilesAndOneIsHostingBucket()
{
+ DistributionManager distributionManager = mock(DistributionManager.class);
+ InternalDistributedMember memberId = new
InternalDistributedMember("localhost", 321);
+ InternalDistributedMember memberId2 = new
InternalDistributedMember("localhost", 323);
+
+ when(distributionManager.getId()).thenReturn(memberId);
+
+ Bucket bucket = mock(Bucket.class);
+ when(bucket.isHosting()).thenReturn(true);
+ when(bucket.isPrimary()).thenReturn(false);
+ when(bucket.getDistributionManager()).thenReturn(distributionManager);
+
+ PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+ when(partitionedRegion.getRedundantCopies()).thenReturn(0);
+ when(partitionedRegion.getPartitionAttributes()).thenReturn(new
PartitionAttributesImpl());
+ when(partitionedRegion.getRedundancyTracker())
+ .thenReturn(mock(PartitionedRegionRedundancyTracker.class));
+
+ RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+ when(regionAdvisor.getPartitionedRegion()).thenReturn(partitionedRegion);
+
+ BucketAdvisor bucketAdvisor = BucketAdvisor.createBucketAdvisor(bucket,
regionAdvisor);
+
+ BucketAdvisor.BucketProfile bp = new BucketAdvisor.BucketProfile(memberId,
0, bucket);
+ BucketAdvisor.BucketProfile bp2 = new
BucketAdvisor.BucketProfile(memberId2, 0, bucket);
+ bp2.isHosting = false;
+ bp2.isInitializing = true;
+ bp2.isPrimary = false;
+
+ bucketAdvisor.setInitialized();
+ assertThat(bucketAdvisor.putProfile(bp, true)).isTrue();
+ assertThat(bucketAdvisor.putProfile(bp2, true)).isTrue();
+
+ assertThat(bucketAdvisor.adviseInitialized().size()).isEqualTo(1);
+ }
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
index 2e58626..84a6048 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
@@ -249,10 +249,10 @@ public class ParallelQueueRemovalMessageJUnitTest {
message.process((ClusterDistributionManager)
cache.getDistributionManager());
}
- private HashMap<String, Map<Integer, List<Long>>>
createRegionToDispatchedKeysMap() {
- HashMap<String, Map<Integer, List<Long>>> regionToDispatchedKeys = new
HashMap<>();
- Map<Integer, List<Long>> bucketIdToDispatchedKeys = new HashMap<>();
- List<Long> dispatchedKeys = new ArrayList<>();
+ private HashMap<String, Map<Integer, List<Object>>>
createRegionToDispatchedKeysMap() {
+ HashMap<String, Map<Integer, List<Object>>> regionToDispatchedKeys = new
HashMap<>();
+ Map<Integer, List<Object>> bucketIdToDispatchedKeys = new HashMap<>();
+ List<Object> dispatchedKeys = new ArrayList<>();
dispatchedKeys.add(KEY);
bucketIdToDispatchedKeys.put(BUCKET_ID, dispatchedKeys);
regionToDispatchedKeys.put(ParallelGatewaySenderHelper.getRegionQueueName(GATEWAY_SENDER_ID),
diff --git
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 4c5bba9..d272baa 100644
---
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -243,9 +243,9 @@ public class WANTestBase extends DistributedTestCase {
public void setUpWANTestBase() throws Exception {
shuffleNumDispatcherThreads();
Invoke.invokeInEveryVM(() ->
setNumDispatcherThreadsForTheRun(dispatcherThreads.get(0)));
- IgnoredException.addIgnoredException("Connection refused");
- IgnoredException.addIgnoredException("Software caused connection abort");
- IgnoredException.addIgnoredException("Connection reset");
+ addIgnoredException("Connection refused");
+ addIgnoredException("Software caused connection abort");
+ addIgnoredException("Connection reset");
postSetUpWANTestBase();
}
@@ -417,11 +417,11 @@ public class WANTestBase extends DistributedTestCase {
public static void createReplicatedRegion(String regionName, String
senderIds, Boolean offHeap) {
IgnoredException exp =
-
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+ addIgnoredException(ForceReattemptException.class.getName());
IgnoredException exp1 =
-
IgnoredException.addIgnoredException(InterruptedException.class.getName());
+ addIgnoredException(InterruptedException.class.getName());
IgnoredException exp2 =
-
IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
+ addIgnoredException(GatewaySenderException.class.getName());
try {
RegionFactory fact = cache.createRegionFactory(RegionShortcut.REPLICATE);
if (senderIds != null) {
@@ -444,11 +444,11 @@ public class WANTestBase extends DistributedTestCase {
public static void createReplicatedProxyRegion(String regionName, String
senderIds,
Boolean offHeap) {
IgnoredException exp =
-
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+ addIgnoredException(ForceReattemptException.class.getName());
IgnoredException exp1 =
-
IgnoredException.addIgnoredException(InterruptedException.class.getName());
+ addIgnoredException(InterruptedException.class.getName());
IgnoredException exp2 =
-
IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
+ addIgnoredException(GatewaySenderException.class.getName());
try {
RegionFactory fact =
cache.createRegionFactory(RegionShortcut.REPLICATE_PROXY);
if (senderIds != null) {
@@ -499,7 +499,7 @@ public class WANTestBase extends DistributedTestCase {
public static void createReplicatedRegionWithAsyncEventQueue(String
regionName,
String asyncQueueIds, Boolean offHeap) {
IgnoredException exp1 =
-
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+ addIgnoredException(ForceReattemptException.class.getName());
try {
RegionFactory fact = cache.createRegionFactory(RegionShortcut.REPLICATE);
if (asyncQueueIds != null) {
@@ -520,7 +520,7 @@ public class WANTestBase extends DistributedTestCase {
public static void createReplicatedRegionWithSenderAndAsyncEventQueue(String
regionName,
String senderIds, String asyncChannelId, Boolean offHeap) {
IgnoredException exp =
-
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+ addIgnoredException(ForceReattemptException.class.getName());
try {
RegionFactory fact = cache.createRegionFactory(RegionShortcut.REPLICATE);
if (senderIds != null) {
@@ -609,9 +609,9 @@ public class WANTestBase extends DistributedTestCase {
Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap,
RegionShortcut shortcut,
boolean statisticsEnabled, boolean concurrencyChecksEnabled) {
IgnoredException exp =
-
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+ addIgnoredException(ForceReattemptException.class.getName());
IgnoredException exp1 =
-
IgnoredException.addIgnoredException(PartitionOfflineException.class.getName());
+ addIgnoredException(PartitionOfflineException.class.getName());
try {
RegionFactory fact = cache.createRegionFactory(shortcut);
if (senderIds != null) {
@@ -640,9 +640,9 @@ public class WANTestBase extends DistributedTestCase {
public static void createPartitionedRegionWithPersistence(String regionName,
String senderIds,
Integer redundantCopies, Integer totalNumBuckets) {
IgnoredException exp =
-
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+ addIgnoredException(ForceReattemptException.class.getName());
IgnoredException exp1 =
-
IgnoredException.addIgnoredException(PartitionOfflineException.class.getName());
+ addIgnoredException(PartitionOfflineException.class.getName());
try {
RegionFactory fact =
cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT);
if (senderIds != null) {
@@ -667,9 +667,9 @@ public class WANTestBase extends DistributedTestCase {
public static void createColocatedPartitionedRegion(String regionName,
String senderIds,
Integer redundantCopies, Integer totalNumBuckets, String colocatedWith) {
IgnoredException exp =
-
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+ addIgnoredException(ForceReattemptException.class.getName());
IgnoredException exp1 =
-
IgnoredException.addIgnoredException(PartitionOfflineException.class.getName());
+ addIgnoredException(PartitionOfflineException.class.getName());
try {
RegionFactory fact = cache.createRegionFactory(RegionShortcut.PARTITION);
if (senderIds != null) {
@@ -750,9 +750,9 @@ public class WANTestBase extends DistributedTestCase {
Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap) {
IgnoredException exp =
-
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+ addIgnoredException(ForceReattemptException.class.getName());
IgnoredException exp1 =
-
IgnoredException.addIgnoredException(PartitionOfflineException.class.getName());
+ addIgnoredException(PartitionOfflineException.class.getName());
try {
RegionFactory fact =
cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT);
@@ -778,7 +778,7 @@ public class WANTestBase extends DistributedTestCase {
public static void createCustomerOrderShipmentPartitionedRegion(String
senderIds,
Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap) {
IgnoredException exp =
-
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+ addIgnoredException(ForceReattemptException.class.getName());
try {
RegionFactory fact = cache.createRegionFactory(RegionShortcut.PARTITION);
if (senderIds != null) {
@@ -1100,12 +1100,12 @@ public class WANTestBase extends DistributedTestCase {
}
public static void startSender(String senderId) {
- final IgnoredException exln = IgnoredException.addIgnoredException("Could
not connect");
+ final IgnoredException exln = addIgnoredException("Could not connect");
IgnoredException exp =
-
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+ addIgnoredException(ForceReattemptException.class.getName());
IgnoredException exp1 =
-
IgnoredException.addIgnoredException(InterruptedException.class.getName());
+ addIgnoredException(InterruptedException.class.getName());
try {
GatewaySender sender = getGatewaySender(senderId);
sender.start();
@@ -1118,12 +1118,12 @@ public class WANTestBase extends DistributedTestCase {
}
public static void startSenderwithCleanQueues(String senderId) {
- final IgnoredException exln = IgnoredException.addIgnoredException("Could
not connect");
+ final IgnoredException exln = addIgnoredException("Could not connect");
IgnoredException exp =
-
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+ addIgnoredException(ForceReattemptException.class.getName());
IgnoredException exp1 =
-
IgnoredException.addIgnoredException(InterruptedException.class.getName());
+ addIgnoredException(InterruptedException.class.getName());
try {
GatewaySender sender = getGatewaySender(senderId);
sender.startWithCleanQueue();
@@ -1431,7 +1431,7 @@ public class WANTestBase extends DistributedTestCase {
}
public static void waitForSenderRunningState(String senderId) {
- final IgnoredException exln = IgnoredException.addIgnoredException("Could
not connect");
+ final IgnoredException exln = addIgnoredException("Could not connect");
try {
Set<GatewaySender> senders = cache.getGatewaySenders();
final GatewaySender sender = getGatewaySenderById(senders, senderId);
@@ -1641,9 +1641,9 @@ public class WANTestBase extends DistributedTestCase {
}
public static void pauseSender(String senderId) {
- final IgnoredException exln = IgnoredException.addIgnoredException("Could
not connect");
+ final IgnoredException exln = addIgnoredException("Could not connect");
IgnoredException exp =
-
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+ addIgnoredException(ForceReattemptException.class.getName());
try {
GatewaySender sender = getGatewaySender(senderId);
sender.pause();
@@ -1656,9 +1656,9 @@ public class WANTestBase extends DistributedTestCase {
}
public static void resumeSender(String senderId) {
- final IgnoredException exln = IgnoredException.addIgnoredException("Could
not connect");
+ final IgnoredException exln = addIgnoredException("Could not connect");
IgnoredException exp =
-
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+ addIgnoredException(ForceReattemptException.class.getName());
try {
GatewaySender sender = getGatewaySender(senderId);
sender.resume();
@@ -1683,9 +1683,9 @@ public class WANTestBase extends DistributedTestCase {
}
public static void stopSender(String senderId) {
- final IgnoredException exln = IgnoredException.addIgnoredException("Could
not connect");
+ final IgnoredException exln = addIgnoredException("Could not connect");
IgnoredException exp =
-
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+ addIgnoredException(ForceReattemptException.class.getName());
try {
GatewaySender sender = getGatewaySender(senderId);
AbstractGatewaySenderEventProcessor eventProcessor = null;
@@ -1767,7 +1767,7 @@ public class WANTestBase extends DistributedTestCase {
public static void createSender(String dsName, int remoteDsId, boolean
isParallel,
Integer maxMemory, Integer batchSize, boolean isConflation, boolean
isPersistent,
GatewayEventFilter filter, boolean isManualStart, boolean
groupTransactionEvents) {
- final IgnoredException exln = IgnoredException.addIgnoredException("Could
not connect");
+ final IgnoredException exln = addIgnoredException("Could not connect");
try {
File persistentDirectory =
new File(dsName + "_disk_" + System.currentTimeMillis() + "_" +
VM.getCurrentVMNum());
@@ -1803,7 +1803,7 @@ public class WANTestBase extends DistributedTestCase {
boolean isParallel, Integer maxMemory, Integer batchSize, boolean
isConflation,
boolean isPersistent, GatewayEventFilter filter, boolean isManualStart,
int numDispatchers,
OrderPolicy orderPolicy, int socketBufferSize) {
- final IgnoredException exln = IgnoredException.addIgnoredException("Could
not connect");
+ final IgnoredException exln = addIgnoredException("Could not connect");
try {
File persistentDirectory =
new File(dsName + "_disk_" + System.currentTimeMillis() + "_" +
VM.getCurrentVMNum());
@@ -1870,7 +1870,7 @@ public class WANTestBase extends DistributedTestCase {
List<GatewayEventFilter> eventFilters, List<GatewayTransportFilter>
transportFilters,
boolean isManualStart, boolean isDiskSync) {
IgnoredException exp1 =
-
IgnoredException.addIgnoredException(RegionDestroyedException.class.getName());
+ addIgnoredException(RegionDestroyedException.class.getName());
try {
File persistentDirectory =
new File(dsName + "_disk_" + System.currentTimeMillis() + "_" +
VM.getCurrentVMNum());
@@ -2286,9 +2286,9 @@ public class WANTestBase extends DistributedTestCase {
txMgr.setDistributed(true);
IgnoredException exp1 =
-
IgnoredException.addIgnoredException(InterruptedException.class.getName());
+ addIgnoredException(InterruptedException.class.getName());
IgnoredException exp2 =
-
IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
+ addIgnoredException(GatewaySenderException.class.getName());
try {
Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
for (long i = 1; i <= numPuts; i++) {
@@ -2305,9 +2305,9 @@ public class WANTestBase extends DistributedTestCase {
public static void doPuts(String regionName, int numPuts, Object value) {
IgnoredException exp1 =
-
IgnoredException.addIgnoredException(InterruptedException.class.getName());
+ addIgnoredException(InterruptedException.class.getName());
IgnoredException exp2 =
-
IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
+ addIgnoredException(GatewaySenderException.class.getName());
try {
Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
for (long i = 0; i < numPuts; i++) {
@@ -2321,9 +2321,9 @@ public class WANTestBase extends DistributedTestCase {
public static void doPuts(String regionName, int numPuts) {
IgnoredException exp1 =
-
IgnoredException.addIgnoredException(InterruptedException.class.getName());
+ addIgnoredException(InterruptedException.class.getName());
IgnoredException exp2 =
-
IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
+ addIgnoredException(GatewaySenderException.class.getName());
try {
Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
for (long i = 0; i < numPuts; i++) {
@@ -2337,9 +2337,9 @@ public class WANTestBase extends DistributedTestCase {
public static void doTxPuts(String regionName, int numPuts) {
try (
- IgnoredException ignored =
IgnoredException.addIgnoredException(InterruptedException.class);
+ IgnoredException ignored =
addIgnoredException(InterruptedException.class);
IgnoredException ignored1 =
-
IgnoredException.addIgnoredException(GatewaySenderException.class)) {
+ addIgnoredException(GatewaySenderException.class)) {
Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
for (long i = 0; i < numPuts; i++) {
cache.getCacheTransactionManager().begin();
@@ -2351,9 +2351,9 @@ public class WANTestBase extends DistributedTestCase {
public static void doPutsSameKey(String regionName, int numPuts, String key)
{
IgnoredException exp1 =
-
IgnoredException.addIgnoredException(InterruptedException.class.getName());
+ addIgnoredException(InterruptedException.class.getName());
IgnoredException exp2 =
-
IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
+ addIgnoredException(GatewaySenderException.class.getName());
try {
Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
for (long i = 0; i < numPuts; i++) {
@@ -2473,7 +2473,7 @@ public class WANTestBase extends DistributedTestCase {
public static void localDestroyRegion(String regionName) {
IgnoredException exp =
-
IgnoredException.addIgnoredException(PRLocallyDestroyedException.class.getName());
+ addIgnoredException(PRLocallyDestroyedException.class.getName());
try {
Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
r.localDestroyRegion();
@@ -2774,7 +2774,7 @@ public class WANTestBase extends DistributedTestCase {
public static void doNextPuts(String regionName, int start, int numPuts) {
IgnoredException exp =
-
IgnoredException.addIgnoredException(CacheClosedException.class.getName());
+ addIgnoredException(CacheClosedException.class.getName());
try {
Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName);
for (long i = start; i < numPuts; i++) {
@@ -2900,9 +2900,9 @@ public class WANTestBase extends DistributedTestCase {
public static void validateRegionSize(String regionName, final int
regionSize) {
IgnoredException exp =
-
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+ addIgnoredException(ForceReattemptException.class.getName());
IgnoredException exp1 =
-
IgnoredException.addIgnoredException(CacheClosedException.class.getName());
+ addIgnoredException(CacheClosedException.class.getName());
try {
final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName);
if (regionSize != r.keySet().size()) {
@@ -3169,11 +3169,11 @@ public class WANTestBase extends DistributedTestCase {
}
public static Boolean killSender(String senderId) {
- final IgnoredException exln = IgnoredException.addIgnoredException("Could
not connect");
+ final IgnoredException exln = addIgnoredException("Could not connect");
IgnoredException exp =
-
IgnoredException.addIgnoredException(CacheClosedException.class.getName());
+ addIgnoredException(CacheClosedException.class.getName());
IgnoredException exp1 =
-
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+ addIgnoredException(ForceReattemptException.class.getName());
try {
AbstractGatewaySender sender = (AbstractGatewaySender)
getGatewaySender(senderId);
if (sender.isPrimary()) {
@@ -3278,9 +3278,9 @@ public class WANTestBase extends DistributedTestCase {
public static void validateQueueContents(final String senderId, final int
regionSize) {
IgnoredException exp1 =
-
IgnoredException.addIgnoredException(InterruptedException.class.getName());
+ addIgnoredException(InterruptedException.class.getName());
IgnoredException exp2 =
-
IgnoredException.addIgnoredException(GatewaySenderException.class.getName());
+ addIgnoredException(GatewaySenderException.class.getName());
try {
GatewaySender sender = getGatewaySender(senderId);
@@ -3430,9 +3430,9 @@ public class WANTestBase extends DistributedTestCase {
public static void validateParallelSenderQueueAllBucketsDrained(final String
senderId) {
IgnoredException exp =
-
IgnoredException.addIgnoredException(RegionDestroyedException.class.getName());
+ addIgnoredException(RegionDestroyedException.class.getName());
IgnoredException exp1 =
-
IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
+ addIgnoredException(ForceReattemptException.class.getName());
try {
GatewaySender sender = getGatewaySender(senderId);
final AbstractGatewaySender abstractSender = (AbstractGatewaySender)
sender;
diff --git
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index 7cd8f61..9a75d6b 100644
---
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -26,6 +26,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -39,10 +40,12 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -55,6 +58,9 @@ import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegion;
@@ -69,6 +75,7 @@ import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.RMIException;
+import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
import org.apache.geode.test.dunit.rules.MemberVM;
@@ -100,6 +107,13 @@ public class ParallelGatewaySenderOperationsDUnitTest
extends WANTestBase {
addIgnoredException("could not get remote locator information");
}
+ @After
+ public void tearDown() {
+ for (VM vm : asList(vm4, vm5, vm6, vm7)) {
+ vm.invoke(() -> DistributionMessageObserver.setInstance(null));
+ }
+ }
+
@Test(timeout = 300_000)
public void testStopOneConcurrentGatewaySenderWithSSL() throws Exception {
Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
@@ -1182,6 +1196,84 @@ public class ParallelGatewaySenderOperationsDUnitTest
extends WANTestBase {
}
+ /**
+ * Put entries in region after gateway sender is stopped. Count number of
PQRM messages sent.
+ */
+ @Test
+ public void
testDroppedEventsSignalizationToSecondaryQueueWhileSenderStopped() {
+ int lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+ int nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
+
+ createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true);
+
+ // make sure all the senders are running before doing any puts
+ waitForSendersRunning();
+
+ // FIRST RUN: now, the senders are started. So, start the puts
+ vm4.invoke(() -> doPuts(getUniqueName() + "_PR", 100));
+
+ vm2.invoke(() -> validateRegionSizeRemainsSame(getUniqueName() + "_PR",
100));
+
+ stopSenders();
+
+ waitForAllSendersNotRunning();
+
+ vm4.invoke(() -> {
+ DistributionMessageObserver.setInstance(new CountSentPQRMObserver());
+ });
+ vm5.invoke(() -> {
+ DistributionMessageObserver.setInstance(new CountSentPQRMObserver());
+ });
+ vm6.invoke(() -> {
+ DistributionMessageObserver.setInstance(new CountSentPQRMObserver());
+ });
+ vm7.invoke(() -> {
+ DistributionMessageObserver.setInstance(new CountSentPQRMObserver());
+ });
+
+ // SECOND RUN: keep one thread doing puts to the region
+ vm4.invoke(() -> doPutsFrom(getUniqueName() + "_PR", 100, 200));
+
+ vm2.invoke(() -> validateRegionSizeRemainsSame(getUniqueName() + "_PR",
100));
+
+ int parallelQueueRemovalMessageCountInVm4 = vm4.invoke(() -> {
+ CountSentPQRMObserver observer =
+ (CountSentPQRMObserver) DistributionMessageObserver.getInstance();
+ return observer.getNumberOfSentPQRM();
+ });
+
+ int parallelQueueRemovalMessageCountInVm5 = vm5.invoke(() -> {
+ CountSentPQRMObserver observer =
+ (CountSentPQRMObserver) DistributionMessageObserver.getInstance();
+ return observer.getNumberOfSentPQRM();
+ });
+
+ int parallelQueueRemovalMessageCountInVm6 = vm6.invoke(() -> {
+ CountSentPQRMObserver observer =
+ (CountSentPQRMObserver) DistributionMessageObserver.getInstance();
+ return observer.getNumberOfSentPQRM();
+ });
+
+ int parallelQueueRemovalMessageCountInVm7 = vm7.invoke(() -> {
+ CountSentPQRMObserver observer =
+ (CountSentPQRMObserver) DistributionMessageObserver.getInstance();
+ return observer.getNumberOfSentPQRM();
+ });
+
+ assertThat(parallelQueueRemovalMessageCountInVm4 +
parallelQueueRemovalMessageCountInVm5
+ + parallelQueueRemovalMessageCountInVm6 +
parallelQueueRemovalMessageCountInVm7)
+ .isEqualTo(100);
+
+ await().untilAsserted(() -> {
+ int vm4SecondarySize = vm4.invoke(() ->
getSecondaryQueueSizeInStats("ln"));
+ int vm5SecondarySize = vm5.invoke(() ->
getSecondaryQueueSizeInStats("ln"));
+ int vm6SecondarySize = vm6.invoke(() ->
getSecondaryQueueSizeInStats("ln"));
+ int vm7SecondarySize = vm7.invoke(() ->
getSecondaryQueueSizeInStats("ln"));
+ assertThat(vm4SecondarySize + vm5SecondarySize + vm6SecondarySize +
vm7SecondarySize)
+ .isEqualTo(0);
+ });
+
+ }
private void clearShadowBucketRegions(PartitionedRegion shadowRegion) {
PartitionedRegionDataStore.BucketVisitor bucketVisitor =
@@ -1506,10 +1598,34 @@ public class ParallelGatewaySenderOperationsDUnitTest
extends WANTestBase {
vm7.invoke(() -> waitForSenderRunningState("ln"));
}
+ private void waitForAllSendersNotRunning() {
+ vm4.invoke(() -> waitForSenderNonRunningState("ln"));
+ vm5.invoke(() -> waitForSenderNonRunningState("ln"));
+ vm6.invoke(() -> waitForSenderNonRunningState("ln"));
+ vm7.invoke(() -> waitForSenderNonRunningState("ln"));
+ }
+
private void validateParallelSenderQueueAllBucketsDrained() {
vm4.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
vm5.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
vm6.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
}
+
+ private static class CountSentPQRMObserver extends
DistributionMessageObserver
+ implements Serializable {
+ private final AtomicInteger numberOfSentPQRM = new AtomicInteger(0);
+
+ @Override
+ public void beforeSendMessage(ClusterDistributionManager dm,
DistributionMessage message) {
+ if (message instanceof ParallelQueueRemovalMessage) {
+ numberOfSentPQRM.addAndGet(message.getRecipients().size());
+ }
+ }
+
+ public int getNumberOfSentPQRM() {
+ return numberOfSentPQRM.get();
+ }
+ }
+
}