This is an automated email from the ASF dual-hosted git repository. jinmeiliao pushed a commit to branch feature/GEODE-7665 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 7e2ee0911f742fdaa855745e2e9fd3357db568e6 Author: Kirk Lund <[email protected]> AuthorDate: Wed Apr 21 15:24:06 2021 -0700 GEODE-9132: Always acquire write lock for PR clear --- ...gionClearWithConcurrentOperationsDUnitTest.java | 110 ++++++++--------- .../internal/cache/PartitionedRegionClear.java | 115 ++++++++++++------ .../internal/cache/PartitionedRegionClearTest.java | 131 +++++++++++++++++---- 3 files changed, 246 insertions(+), 110 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java index 7ef187f..710ae6f 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java @@ -14,6 +14,9 @@ */ package org.apache.geode.internal.cache; +import static java.time.Duration.ofMillis; +import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT; +import static org.apache.geode.distributed.ConfigurationProperties.MEMBER_TIMEOUT; import static org.apache.geode.internal.util.ArrayUtils.asList; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; import static org.apache.geode.test.dunit.VM.getVM; @@ -21,6 +24,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.Serializable; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; @@ -73,11 +77,16 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements private static final int BUCKETS = 13; private static final String REGION_NAME = "PartitionedRegion"; + private static final Duration WORK_DURATION = Duration.ofSeconds(15); @Rule public DistributedRule distributedRule = new DistributedRule(3); @Rule - public CacheRule cacheRule = CacheRule.builder().createCacheInAll().build(); + public CacheRule cacheRule = CacheRule.builder() + .addSystemProperty(MAX_WAIT_TIME_RECONNECT, "1000") + .addSystemProperty(MEMBER_TIMEOUT, "2000") + .createCacheInAll() + .build(); private VM server1; private VM server2; @@ -106,15 +115,15 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements createRegions(regionShortcut); // Let all VMs continuously execute puts and gets for 60 seconds. - final int workMillis = 60000; + // final int workMillis = 60000; final int entries = 15000; List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( - server1.invokeAsync(() -> executePuts(entries, workMillis)), - server2.invokeAsync(() -> executeGets(entries, workMillis)), - accessor.invokeAsync(() -> executeRemoves(entries, workMillis))); + server1.invokeAsync(() -> executePuts(entries, WORK_DURATION)), + server2.invokeAsync(() -> executeGets(entries, WORK_DURATION)), + accessor.invokeAsync(() -> executeRemoves(entries, WORK_DURATION))); // Clear the region every second for 60 seconds. - getVM(coordinatorVM.getVmId()).invoke(() -> executeClears(workMillis, 1000)); + getVM(coordinatorVM.getVmId()).invoke(() -> executeClears(WORK_DURATION, ofMillis(1000))); // Let asyncInvocations finish. for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) { @@ -142,17 +151,17 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements createRegions(regionShortcut); // Let all VMs continuously execute putAll and removeAll for 15 seconds. - final int workMillis = 15000; + // final int workMillis = 15000; List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( - server1.invokeAsync(() -> executePutAlls(0, 2000, workMillis)), - server1.invokeAsync(() -> executeRemoveAlls(0, 2000, workMillis)), - server2.invokeAsync(() -> executePutAlls(2000, 4000, workMillis)), - server2.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)), - accessor.invokeAsync(() -> executePutAlls(4000, 6000, workMillis)), - accessor.invokeAsync(() -> executeRemoveAlls(4000, 6000, workMillis))); + server1.invokeAsync(() -> executePutAlls(0, 2000, WORK_DURATION)), + server1.invokeAsync(() -> executeRemoveAlls(0, 2000, WORK_DURATION)), + server2.invokeAsync(() -> executePutAlls(2000, 4000, WORK_DURATION)), + server2.invokeAsync(() -> executeRemoveAlls(2000, 4000, WORK_DURATION)), + accessor.invokeAsync(() -> executePutAlls(4000, 6000, WORK_DURATION)), + accessor.invokeAsync(() -> executeRemoveAlls(4000, 6000, WORK_DURATION))); // Clear the region every half second for 15 seconds. - getVM(coordinatorVM.getVmId()).invoke(() -> executeClears(workMillis, 500)); + getVM(coordinatorVM.getVmId()).invoke(() -> executeClears(WORK_DURATION, ofMillis(500))); // Let asyncInvocations finish. for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) { @@ -226,12 +235,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false))); // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30" - final int workMillis = 30000; + // final int workMillis = 30000; List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( - server1.invokeAsync(() -> executeGets(entries, workMillis)), - server1.invokeAsync(() -> executePuts(entries, workMillis)), - accessor.invokeAsync(() -> executeGets(entries, workMillis)), - accessor.invokeAsync(() -> executeRemoves(entries, workMillis))); + server1.invokeAsync(() -> executeGets(entries, WORK_DURATION)), + server1.invokeAsync(() -> executePuts(entries, WORK_DURATION)), + accessor.invokeAsync(() -> executeGets(entries, WORK_DURATION)), + accessor.invokeAsync(() -> executeRemoves(entries, WORK_DURATION))); // Retry the clear operation on the region until success (server2 will go down, but other // members will eventually become primary for those buckets previously hosted by server2). @@ -278,12 +287,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false))); // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30" - final int workMillis = 30000; + // final int workMillis = 30000; List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( - server1.invokeAsync(() -> executeGets(entries, workMillis)), - server1.invokeAsync(() -> executePuts(entries, workMillis)), - accessor.invokeAsync(() -> executeGets(entries, workMillis)), - accessor.invokeAsync(() -> executeRemoves(entries, workMillis))); + server1.invokeAsync(() -> executeGets(entries, WORK_DURATION)), + server1.invokeAsync(() -> executePuts(entries, WORK_DURATION)), + accessor.invokeAsync(() -> executeGets(entries, WORK_DURATION)), + accessor.invokeAsync(() -> executeRemoves(entries, WORK_DURATION))); // Clear the region. getVM(coordinatorVM.getVmId()).invoke(() -> { @@ -315,10 +324,10 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false))); // Let all VMs continuously execute putAll/removeAll for 30 seconds. - final int workMillis = 30000; + // final int workMillis = 30000; List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( - server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)), - accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis))); + server1.invokeAsync(() -> executePutAlls(0, 6000, WORK_DURATION)), + accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, WORK_DURATION))); // Retry the clear operation on the region until success (server2 will go down, but other // members will eventually become primary for those buckets previously hosted by server2). @@ -360,10 +369,9 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements createRegions(RegionShortcut.PARTITION); server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false))); - final int workMillis = 30000; List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( - server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)), - accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis))); + server1.invokeAsync(() -> executePutAlls(0, 6000, WORK_DURATION)), + accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, WORK_DURATION))); // Clear the region. getVM(coordinatorVM.getVmId()).invoke(() -> { @@ -520,12 +528,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements } /** - * Continuously execute get operations on the PartitionedRegion for the given durationInMillis. + * Continuously execute get operations on the PartitionedRegion for the given duration. */ - private void executeGets(final int numEntries, final long durationInMillis) { + private void executeGets(final int numEntries, final Duration duration) { Cache cache = cacheRule.getCache(); Region<String, String> region = cache.getRegion(REGION_NAME); - Instant finishTime = Instant.now().plusMillis(durationInMillis); + Instant finishTime = Instant.now().plusMillis(duration.toMillis()); while (Instant.now().isBefore(finishTime)) { // Region might have been cleared in between, that's why we check for null. @@ -537,12 +545,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements } /** - * Continuously execute put operations on the PartitionedRegion for the given durationInMillis. + * Continuously execute put operations on the PartitionedRegion for the given duration. */ - private void executePuts(final int numEntries, final long durationInMillis) { + private void executePuts(final int numEntries, final Duration duration) { Cache cache = cacheRule.getCache(); Region<String, String> region = cache.getRegion(REGION_NAME); - Instant finishTime = Instant.now().plusMillis(durationInMillis); + Instant finishTime = Instant.now().plusMillis(duration.toMillis()); while (Instant.now().isBefore(finishTime)) { IntStream.range(0, numEntries).forEach(i -> region.put(String.valueOf(i), "Value_" + i)); @@ -550,16 +558,15 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements } /** - * Continuously execute putAll operations on the PartitionedRegion for the given - * durationInMillis. + * Continuously execute putAll operations on the PartitionedRegion for the given duration. */ - private void executePutAlls(final int startKey, final int finalKey, final long durationInMillis) { + private void executePutAlls(final int startKey, final int finalKey, final Duration duration) { Cache cache = cacheRule.getCache(); Map<String, String> valuesToInsert = new HashMap<>(); Region<String, String> region = cache.getRegion(REGION_NAME); IntStream.range(startKey, finalKey) .forEach(i -> valuesToInsert.put(String.valueOf(i), "Value_" + i)); - Instant finishTime = Instant.now().plusMillis(durationInMillis); + Instant finishTime = Instant.now().plusMillis(duration.toMillis()); while (Instant.now().isBefore(finishTime)) { region.putAll(valuesToInsert); @@ -567,13 +574,12 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements } /** - * Continuously execute remove operations on the PartitionedRegion for the given - * durationInMillis. + * Continuously execute remove operations on the PartitionedRegion for the given duration. */ - private void executeRemoves(final int numEntries, final long durationInMillis) { + private void executeRemoves(final int numEntries, final Duration duration) { Cache cache = cacheRule.getCache(); Region<String, String> region = cache.getRegion(REGION_NAME); - Instant finishTime = Instant.now().plusMillis(durationInMillis); + Instant finishTime = Instant.now().plusMillis(duration.toMillis()); while (Instant.now().isBefore(finishTime)) { // Region might have been cleared in between, that's why we check for null. @@ -585,16 +591,14 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements } /** - * Continuously execute removeAll operations on the PartitionedRegion for the given - * durationInMillis. + * Continuously execute removeAll operations on the PartitionedRegion for the given duration. */ - private void executeRemoveAlls(final int startKey, final int finalKey, - final long durationInMillis) { + private void executeRemoveAlls(final int startKey, final int finalKey, final Duration duration) { Cache cache = cacheRule.getCache(); List<String> keysToRemove = new ArrayList<>(); Region<String, String> region = cache.getRegion(REGION_NAME); IntStream.range(startKey, finalKey).forEach(i -> keysToRemove.add(String.valueOf(i))); - Instant finishTime = Instant.now().plusMillis(durationInMillis); + Instant finishTime = Instant.now().plusMillis(duration.toMillis()); while (Instant.now().isBefore(finishTime)) { region.removeAll(keysToRemove); @@ -622,13 +626,13 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements } /** - * Continuously execute clear operations on the PartitionedRegion every periodInMillis for the - * given durationInMillis. + * Continuously execute clear operations on the PartitionedRegion every period for the + * given duration. */ - private void executeClears(final long durationInMillis, final long periodInMillis) { + private void executeClears(final Duration duration, final Duration period) { Cache cache = cacheRule.getCache(); Region<String, String> region = cache.getRegion(REGION_NAME); - long minimumInvocationCount = durationInMillis / periodInMillis; + long minimumInvocationCount = duration.toMillis() / period.toMillis(); for (int invocationCount = 0; invocationCount < minimumInvocationCount; invocationCount++) { region.clear(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java index 569f78c..b8597c1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java @@ -23,6 +23,7 @@ import java.util.Set; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; +import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.cache.CacheWriterException; import org.apache.geode.cache.Operation; import org.apache.geode.cache.OperationAbortedException; @@ -30,10 +31,12 @@ import org.apache.geode.cache.PartitionedRegionPartialClearException; import org.apache.geode.cache.asyncqueue.AsyncEventQueue; import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.distributed.DistributedLockService; import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.MembershipListener; import org.apache.geode.distributed.internal.ReplyException; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.PartitionedRegionClearMessage.PartitionedRegionClearResponse; import org.apache.geode.internal.serialization.KnownVersion; import org.apache.geode.logging.internal.log4j.api.LogService; @@ -47,6 +50,8 @@ public class PartitionedRegionClear { private final PartitionedRegion partitionedRegion; + private final DistributedLockService distributedLockService; + protected final LockForListenerAndClientNotification lockForListenerAndClientNotification = new LockForListenerAndClientNotification(); @@ -55,8 +60,35 @@ public class PartitionedRegionClear { protected final PartitionedRegionClearListener partitionedRegionClearListener = new PartitionedRegionClearListener(); + private final ColocationLeaderRegionProvider colocationLeaderRegionProvider; + private final AssignBucketsToPartitions assignBucketsToPartitions; + private final UpdateAttributesProcessorFactory updateAttributesProcessorFactory; + public PartitionedRegionClear(PartitionedRegion partitionedRegion) { + this(partitionedRegion, + partitionedRegion.getPartitionedRegionLockService(), + ColocationHelper::getLeaderRegion, + PartitionRegionHelper::assignBucketsToPartitions, + pr -> new UpdateAttributesProcessor(pr, true)); + } + + @VisibleForTesting + PartitionedRegionClear(PartitionedRegion partitionedRegion, + DistributedLockService distributedLockService, + ColocationLeaderRegionProvider colocationLeaderRegionProvider, + AssignBucketsToPartitions assignBucketsToPartitions, + UpdateAttributesProcessorFactory updateAttributesProcessorFactory) { this.partitionedRegion = partitionedRegion; + this.distributedLockService = distributedLockService; + this.colocationLeaderRegionProvider = colocationLeaderRegionProvider; + this.assignBucketsToPartitions = assignBucketsToPartitions; + this.updateAttributesProcessorFactory = updateAttributesProcessorFactory; + + // TODO: initialize needs to move out of constructor to prevent escape of reference to 'this' + initialize(); + } + + private void initialize() { partitionedRegion.getDistributionManager() .addMembershipListener(partitionedRegionClearListener); } @@ -67,7 +99,7 @@ public class PartitionedRegionClear { void acquireDistributedClearLock(String clearLock) { try { - partitionedRegion.getPartitionedRegionLockService().lock(clearLock, -1, -1); + distributedLockService.lock(clearLock, -1, -1); } catch (IllegalStateException e) { partitionedRegion.lockCheckReadiness(); throw e; @@ -76,7 +108,7 @@ public class PartitionedRegionClear { void releaseDistributedClearLock(String clearLock) { try { - partitionedRegion.getPartitionedRegionLockService().unlock(clearLock); + distributedLockService.unlock(clearLock); } catch (IllegalStateException e) { partitionedRegion.lockCheckReadiness(); } catch (Exception ex) { @@ -303,7 +335,7 @@ public class PartitionedRegionClear { protected Set<Integer> attemptToSendPartitionedRegionClearMessage(RegionEventImpl event, PartitionedRegionClearMessage.OperationType op) throws ForceReattemptException { - Set<Integer> bucketsOperated = new HashSet<>(); + Set<Integer> clearedBuckets = new HashSet<>(); if (partitionedRegion.getPRRoot() == null) { if (logger.isDebugEnabled()) { @@ -311,8 +343,10 @@ public class PartitionedRegionClear { "Partition region {} failed to initialize. Remove its profile from remote members.", this.partitionedRegion); } - new UpdateAttributesProcessor(partitionedRegion, true).distribute(false); - return bucketsOperated; + updateAttributesProcessorFactory + .create(partitionedRegion) + .distribute(false); + return clearedBuckets; } final Set<InternalDistributedMember> configRecipients = @@ -325,9 +359,9 @@ public class PartitionedRegionClear { if (prConfig != null) { for (Node node : prConfig.getNodes()) { - InternalDistributedMember idm = node.getMemberId(); - if (!idm.equals(partitionedRegion.getMyId())) { - configRecipients.add(idm); + InternalDistributedMember memberId = node.getMemberId(); + if (!memberId.equals(partitionedRegion.getMyId())) { + configRecipients.add(memberId); } } } @@ -336,29 +370,29 @@ public class PartitionedRegionClear { } try { - PartitionedRegionClearMessage.PartitionedRegionClearResponse resp = - new PartitionedRegionClearMessage.PartitionedRegionClearResponse( - partitionedRegion.getSystem(), configRecipients); - PartitionedRegionClearMessage partitionedRegionClearMessage = - new PartitionedRegionClearMessage(configRecipients, partitionedRegion, resp, op, event); - partitionedRegionClearMessage.send(); + PartitionedRegionClearResponse clearResponse = + new PartitionedRegionClearResponse(partitionedRegion.getSystem(), configRecipients); + PartitionedRegionClearMessage clearMessage = + new PartitionedRegionClearMessage(configRecipients, partitionedRegion, clearResponse, op, + event); + clearMessage.send(); - resp.waitForRepliesUninterruptibly(); - bucketsOperated = resp.bucketsCleared; + clearResponse.waitForRepliesUninterruptibly(); + clearedBuckets = clearResponse.bucketsCleared; } catch (ReplyException e) { - Throwable t = e.getCause(); - if (t instanceof ForceReattemptException) { - throw (ForceReattemptException) t; + Throwable cause = e.getCause(); + if (cause instanceof ForceReattemptException) { + throw (ForceReattemptException) cause; } - if (t instanceof PartitionedRegionPartialClearException) { - throw new PartitionedRegionPartialClearException(t.getMessage(), t); + if (cause instanceof PartitionedRegionPartialClearException) { + throw (PartitionedRegionPartialClearException) cause; } logger.warn( "PartitionedRegionClear#sendPartitionedRegionClearMessage: Caught exception during ClearRegionMessage send and waiting for response", e); } - return bucketsOperated; + return clearedBuckets; } /** @@ -412,14 +446,9 @@ public class PartitionedRegionClear { invokeCacheWriter(regionEvent); } - // Check if there are any listeners or clients interested. If so, then clear write - // locks needs to be taken on all local and remote primary buckets in order to - // preserve the ordering of client events (for concurrent operations on the region). - boolean acquireClearLockForNotification = - (partitionedRegion.hasAnyClientsInterested() || partitionedRegion.hasListener()); - if (acquireClearLockForNotification) { - obtainLockForClear(regionEvent); - } + // clear write locks need to be taken on all local and remote primary buckets + // whether or not the partitioned region has any listeners clients interested + obtainLockForClear(regionEvent); try { Set<Integer> bucketsCleared = clearRegion(regionEvent); @@ -435,9 +464,7 @@ public class PartitionedRegionClear { throw new PartitionedRegionPartialClearException(message); } } finally { - if (acquireClearLockForNotification) { - releaseLockForClear(regionEvent); - } + releaseLockForClear(regionEvent); } } finally { releaseDistributedClearLock(lockName); @@ -458,8 +485,8 @@ public class PartitionedRegionClear { } protected void assignAllPrimaryBuckets() { - PartitionedRegion leader = ColocationHelper.getLeaderRegion(partitionedRegion); - PartitionRegionHelper.assignBucketsToPartitions(leader); + PartitionedRegion leader = colocationLeaderRegionProvider.getLeaderRegion(partitionedRegion); + assignBucketsToPartitions.assignBucketsToPartitions(leader); } protected void handleClearFromDepartedMember(InternalDistributedMember departedMember) { @@ -505,6 +532,24 @@ public class PartitionedRegionClear { return membershipChange; } + @FunctionalInterface + @VisibleForTesting + interface ColocationLeaderRegionProvider { + PartitionedRegion getLeaderRegion(PartitionedRegion partitionedRegion); + } + + @FunctionalInterface + @VisibleForTesting + interface AssignBucketsToPartitions { + void assignBucketsToPartitions(PartitionedRegion partitionedRegion); + } + + @FunctionalInterface + @VisibleForTesting + interface UpdateAttributesProcessorFactory { + UpdateAttributesProcessor create(PartitionedRegion partitionedRegion); + } + protected class PartitionedRegionClearListener implements MembershipListener { @Override diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java index 721d236..376fc8e 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java @@ -22,6 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.catchThrowable; import static org.mockito.ArgumentCaptor.forClass; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; @@ -42,7 +43,6 @@ import org.mockito.ArgumentCaptor; import org.apache.geode.CancelCriterion; import org.apache.geode.cache.PartitionedRegionPartialClearException; import org.apache.geode.cache.Region; -import org.apache.geode.cache.asyncqueue.AsyncEventQueue; import org.apache.geode.distributed.DistributedLockService; import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.distributed.internal.DistributionManager; @@ -50,41 +50,74 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.MembershipListener; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.PartitionedRegion.RetryTimeKeeper; +import org.apache.geode.internal.cache.PartitionedRegionClear.AssignBucketsToPartitions; +import org.apache.geode.internal.cache.PartitionedRegionClear.ColocationLeaderRegionProvider; import org.apache.geode.internal.cache.PartitionedRegionClear.PartitionedRegionClearListener; +import org.apache.geode.internal.cache.PartitionedRegionClear.UpdateAttributesProcessorFactory; import org.apache.geode.internal.cache.PartitionedRegionClearMessage.OperationType; import org.apache.geode.internal.cache.partitioned.RegionAdvisor; import org.apache.geode.internal.serialization.KnownVersion; public class PartitionedRegionClearTest { - private GemFireCacheImpl cache; - private HashSet<AsyncEventQueue> allAEQs = new HashSet<>(); private PartitionedRegionClear partitionedRegionClear; private DistributionManager distributionManager; private PartitionedRegion partitionedRegion; private RegionAdvisor regionAdvisor; private InternalDistributedMember internalDistributedMember; + private DistributedLockService distributedLockService; @Before public void setUp() { - - cache = mock(GemFireCacheImpl.class); + AssignBucketsToPartitions assignBucketsToPartitions = mock(AssignBucketsToPartitions.class); + GemFireCacheImpl cache = mock(GemFireCacheImpl.class); + ColocationLeaderRegionProvider colocationLeaderRegionProvider = + mock(ColocationLeaderRegionProvider.class); + distributedLockService = mock(DistributedLockService.class); distributionManager = mock(DistributionManager.class); + FilterProfile filterProfile = mock(FilterProfile.class); internalDistributedMember = mock(InternalDistributedMember.class); partitionedRegion = mock(PartitionedRegion.class); regionAdvisor = mock(RegionAdvisor.class); + UpdateAttributesProcessorFactory updateAttributesProcessorFactory = + mock(UpdateAttributesProcessorFactory.class); + + when(cache.getAsyncEventQueues(false)) + .thenReturn(emptySet()); + when(colocationLeaderRegionProvider.getLeaderRegion(any())) + .thenReturn(partitionedRegion); + when(distributedLockService.lock(anyString(), anyInt(), anyInt())) + .thenReturn(true); + when(distributionManager.getDistributionManagerId()) + .thenReturn(internalDistributedMember); + when(distributionManager.getId()) + .thenReturn(internalDistributedMember); + when(internalDistributedMember.getVersion()) + .thenReturn(KnownVersion.CURRENT); + when(partitionedRegion.getCache()) + .thenReturn(cache); + when(partitionedRegion.getDistributionManager()) + .thenReturn(distributionManager); + when(partitionedRegion.getName()) + .thenReturn("prRegion"); + when(partitionedRegion.getRegionAdvisor()) + .thenReturn(regionAdvisor); + when(partitionedRegion.getFilterProfile()) + .thenReturn(filterProfile); + when(filterProfile.getFilterRoutingInfoPart1(any(), any(), any())) + .thenReturn(mock(FilterRoutingInfo.class)); + when(filterProfile.getFilterRoutingInfoPart2(any(), any())) + .thenReturn(mock(FilterRoutingInfo.class)); + when(regionAdvisor.getDistributionManager()) + .thenReturn(distributionManager); + when(updateAttributesProcessorFactory.create(any())) + .thenReturn(mock(UpdateAttributesProcessor.class)); - when(distributionManager.getDistributionManagerId()).thenReturn(internalDistributedMember); - when(distributionManager.getId()).thenReturn(internalDistributedMember); - when(internalDistributedMember.getVersion()).thenReturn(KnownVersion.CURRENT); - when(partitionedRegion.getCache()).thenReturn(cache); - when(cache.getAsyncEventQueues(false)).thenReturn(allAEQs); - when(partitionedRegion.getDistributionManager()).thenReturn(distributionManager); - when(partitionedRegion.getName()).thenReturn("prRegion"); - when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor); - when(regionAdvisor.getDistributionManager()).thenReturn(distributionManager); + doNothing().when(distributedLockService).unlock(anyString()); - partitionedRegionClear = new PartitionedRegionClear(partitionedRegion); + partitionedRegionClear = new PartitionedRegionClear(partitionedRegion, distributedLockService, + colocationLeaderRegionProvider, assignBucketsToPartitions, + updateAttributesProcessorFactory); } @Test @@ -115,9 +148,7 @@ public class PartitionedRegionClearTest { @Test public void acquireDistributedClearLockGetsDistributedLock() { // arrange - DistributedLockService distributedLockService = mock(DistributedLockService.class); String lockName = PartitionedRegionClear.CLEAR_OPERATION + partitionedRegion.getName(); - when(partitionedRegion.getPartitionedRegionLockService()).thenReturn(distributedLockService); // act partitionedRegionClear.acquireDistributedClearLock(lockName); @@ -129,9 +160,7 @@ public class PartitionedRegionClearTest { @Test public void releaseDistributedClearLockReleasesDistributedLock() { // arrange - DistributedLockService distributedLockService = mock(DistributedLockService.class); String lockName = PartitionedRegionClear.CLEAR_OPERATION + partitionedRegion.getName(); - when(partitionedRegion.getPartitionedRegionLockService()).thenReturn(distributedLockService); // act partitionedRegionClear.releaseDistributedClearLock(lockName); @@ -567,6 +596,7 @@ public class PartitionedRegionClearTest { public void doClearAcquiresAndReleasesDistributedClearLockAndCreatesAllPrimaryBuckets() { // arrange RegionEventImpl regionEvent = mock(RegionEventImpl.class); + when(regionEvent.clone()).thenReturn(regionEvent); // partial mocking to stub some methods and verify PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear); @@ -587,6 +617,7 @@ public class PartitionedRegionClearTest { public void doClearInvokesCacheWriterWhenCacheWriteIsSet() { // arrange RegionEventImpl regionEvent = mock(RegionEventImpl.class); + when(regionEvent.clone()).thenReturn(regionEvent); // partial mocking to stub some methods and verify PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear); @@ -605,6 +636,7 @@ public class PartitionedRegionClearTest { public void doClearDoesNotInvokesCacheWriterWhenCacheWriteIsNotSet() { // arrange RegionEventImpl regionEvent = mock(RegionEventImpl.class); + when(regionEvent.clone()).thenReturn(regionEvent); // partial mocking to stub some methods and verify PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear); @@ -669,12 +701,13 @@ public class PartitionedRegionClearTest { } @Test - public void doClearDoesNotObtainLockForClearWhenRegionHasNoListenerAndNoClientInterest() { + public void doClearObtainsLockForClearWhenRegionHasNoListenerAndNoClientInterest() { // arrange RegionEventImpl regionEvent = mock(RegionEventImpl.class); when(partitionedRegion.hasAnyClientsInterested()).thenReturn(false); when(partitionedRegion.hasListener()).thenReturn(false); + when(regionEvent.clone()).thenReturn(regionEvent); // partial mocking to stub some methods and verify PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear); @@ -688,8 +721,8 @@ public class PartitionedRegionClearTest { spyPartitionedRegionClear.doClear(regionEvent, false); // assert - verify(spyPartitionedRegionClear, never()).obtainLockForClear(regionEvent); - verify(spyPartitionedRegionClear, never()).releaseLockForClear(regionEvent); + verify(spyPartitionedRegionClear).obtainLockForClear(regionEvent); + verify(spyPartitionedRegionClear).releaseLockForClear(regionEvent); } @Test @@ -867,6 +900,60 @@ public class PartitionedRegionClearTest { .isNotNull(); } + @Test + public void doClearAcquiresLockForClearWhenHasAnyClientsInterestedIsTrue() { + // arrange + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + when(partitionedRegion.hasAnyClientsInterested()).thenReturn(true); + when(partitionedRegion.hasListener()).thenReturn(false); + when(regionEvent.clone()).thenReturn(regionEvent); + + partitionedRegionClear = spy(partitionedRegionClear); + doNothing().when(partitionedRegionClear).obtainLockForClear(regionEvent); + + // act + partitionedRegionClear.doClear(regionEvent, false); + + // assert + verify(partitionedRegionClear).obtainLockForClear(regionEvent); + } + + @Test + public void doClearAcquiresLockForClearWhenHasListenerIsTrue() { + // arrange + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + when(partitionedRegion.hasAnyClientsInterested()).thenReturn(false); + when(partitionedRegion.hasListener()).thenReturn(true); + when(regionEvent.clone()).thenReturn(regionEvent); + + partitionedRegionClear = spy(partitionedRegionClear); + doNothing().when(partitionedRegionClear).obtainLockForClear(regionEvent); + + // act + partitionedRegionClear.doClear(regionEvent, false); + + // assert + verify(partitionedRegionClear).obtainLockForClear(regionEvent); + } + + @Test + public void doClearAcquiresLockForClearWhenHasAnyClientsInterestedAndHasListenerAreFalse() { + // arrange + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + when(partitionedRegion.hasAnyClientsInterested()).thenReturn(false); + when(partitionedRegion.hasListener()).thenReturn(false); + when(regionEvent.clone()).thenReturn(regionEvent); + + partitionedRegionClear = spy(partitionedRegionClear); + doNothing().when(partitionedRegionClear).obtainLockForClear(regionEvent); + + // act + partitionedRegionClear.doClear(regionEvent, false); + + // assert + verify(partitionedRegionClear).obtainLockForClear(regionEvent); + } + private Set<BucketRegion> setupBucketRegions( PartitionedRegionDataStore dataStore, BucketAdvisor bucketAdvisor) {
