This is an automated email from the ASF dual-hosted git repository. jinmeiliao pushed a commit to branch feature/GEODE-7665 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 862cad919fe6abf03d6bfa33900b7a2b7fc9ee30 Author: agingade <[email protected]> AuthorDate: Mon Jun 8 10:23:50 2020 -0700 GEODE-8173: Add unit test (coverage) for PartitionedRegionClear class. (#5208) * GEODE-8173: Add unit test (coverage) for PartitionedRegionClear class. Co-authored-by: anilkumar gingade <[email protected]> --- .../cache/PRCacheListenerDistributedTest.java | 337 +++++++++++- .../ReplicateCacheListenerDistributedTest.java | 4 +- .../geode/internal/cache/PartitionedRegion.java | 2 +- .../internal/cache/PartitionedRegionClear.java | 83 ++- .../internal/cache/PartitionedRegionClearTest.java | 611 +++++++++++++++++++++ 5 files changed, 999 insertions(+), 38 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java index f4a9ac9..7d95473 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java @@ -17,10 +17,18 @@ package org.apache.geode.cache; import static org.apache.geode.test.dunit.VM.getVM; import static org.apache.geode.test.dunit.VM.getVMCount; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import java.io.Serializable; import java.util.Arrays; import java.util.Collection; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -28,7 +36,13 @@ import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; import org.junit.runners.Parameterized.UseParametersRunnerFactory; +import org.apache.geode.cache.util.CacheListenerAdapter; import org.apache.geode.logging.internal.log4j.api.LogService; +import org.apache.geode.test.dunit.rules.CacheRule; +import org.apache.geode.test.dunit.rules.DistributedRule; +import org.apache.geode.test.dunit.rules.SharedCountersRule; +import org.apache.geode.test.dunit.rules.SharedErrorCollector; +import org.apache.geode.test.junit.rules.serializable.SerializableTestName; import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory; /** @@ -43,7 +57,28 @@ import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactor @RunWith(Parameterized.class) @UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class) @SuppressWarnings("serial") -public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistributedTest { +public class PRCacheListenerDistributedTest implements Serializable { + + protected static final String CLEAR = "CLEAR"; + protected static final String REGION_DESTROY = "REGION_DESTROY"; + private static final String CREATES = "CREATES"; + private static final String UPDATES = "UPDATES"; + private static final String INVALIDATES = "INVALIDATES"; + private static final String DESTROYS = "DESTROYS"; + private static final int ENTRY_VALUE = 0; + private static final int UPDATED_ENTRY_VALUE = 1; + private static final String KEY = "key-1"; + @Rule + public DistributedRule distributedRule = new DistributedRule(); + @Rule + public CacheRule cacheRule = CacheRule.builder().createCacheInAll().build(); + @Rule + public SerializableTestName testName = new SerializableTestName(); + @Rule + public SharedCountersRule sharedCountersRule = new SharedCountersRule(); + @Rule + public SharedErrorCollector errorCollector = new SharedErrorCollector(); + protected String regionName; @Parameters public static Collection<Object[]> data() { @@ -59,7 +94,6 @@ public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistri @Parameter(1) public Boolean withData; - @Override protected Region<String, Integer> createRegion(final String name, final CacheListener<String, Integer> listener) { return createPartitionedRegion(name, listener, false); @@ -99,22 +133,18 @@ public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistri } } - @Override protected int expectedCreates() { return 1; } - @Override protected int expectedUpdates() { return 1; } - @Override protected int expectedInvalidates() { return 1; } - @Override protected int expectedDestroys() { return 1; } @@ -132,7 +162,8 @@ public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistri region.destroyRegion(); - assertThat(sharedCountersRule.getTotal(REGION_DESTROY)).isEqualTo(expectedRegionDestroys()); + assertThat(sharedCountersRule.getTotal(REGION_DESTROY)) + .isGreaterThanOrEqualTo(expectedRegionDestroys()); } @Test @@ -321,4 +352,296 @@ public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistri assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(1); } + @Before + public void setUp() { + regionName = getClass().getSimpleName(); + + sharedCountersRule.initialize(CREATES); + sharedCountersRule.initialize(DESTROYS); + sharedCountersRule.initialize(INVALIDATES); + sharedCountersRule.initialize(UPDATES); + sharedCountersRule.initialize(CLEAR); + sharedCountersRule.initialize(REGION_DESTROY); + } + + @Test + public void afterCreateIsInvokedInEveryMember() { + CacheListener<String, Integer> listener = new CreateCountingCacheListener(); + Region<String, Integer> region = createRegion(regionName, listener); + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + createRegion(regionName, listener); + }); + } + + region.put(KEY, ENTRY_VALUE, cacheRule.getSystem().getDistributedMember()); + + assertThat(sharedCountersRule.getTotal(CREATES)).isEqualTo(expectedCreates()); + } + + @Test + public void afterUpdateIsInvokedInEveryMember() { + CacheListener<String, Integer> listener = new UpdateCountingCacheListener(); + Region<String, Integer> region = createRegion(regionName, listener); + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + createRegion(regionName, listener); + }); + } + + region.put(KEY, ENTRY_VALUE, cacheRule.getSystem().getDistributedMember()); + region.put(KEY, UPDATED_ENTRY_VALUE, cacheRule.getSystem().getDistributedMember()); + + assertThat(sharedCountersRule.getTotal(UPDATES)).isEqualTo(expectedUpdates()); + } + + @Test + public void afterInvalidateIsInvokedInEveryMember() { + CacheListener<String, Integer> listener = new InvalidateCountingCacheListener(); + Region<String, Integer> region = createRegion(regionName, listener); + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + createRegion(regionName, listener); + }); + } + + region.put(KEY, 0, cacheRule.getSystem().getDistributedMember()); + region.invalidate(KEY); + + assertThat(sharedCountersRule.getTotal(INVALIDATES)).isEqualTo(expectedInvalidates()); + assertThat(region.get(KEY)).isNull(); + } + + @Test + public void afterDestroyIsInvokedInEveryMember() { + CacheListener<String, Integer> listener = new DestroyCountingCacheListener(); + Region<String, Integer> region = createRegion(regionName, listener); + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + createRegion(regionName, listener); + }); + } + + region.put(KEY, 0, cacheRule.getSystem().getDistributedMember()); + region.destroy(KEY); + + assertThat(sharedCountersRule.getTotal(DESTROYS)).isEqualTo(expectedDestroys()); + } + + @Test + public void afterClearIsInvokedInEveryMember() { + CacheListener<String, Integer> listener = new ClearCountingCacheListener(); + Region<String, Integer> region = createRegion(regionName, listener); + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + createRegion(regionName, listener); + }); + } + + region.clear(); + + assertThat(sharedCountersRule.getTotal(CLEAR)).isEqualTo(expectedClears()); + } + + protected int expectedClears() { + return getVMCount() + 1; + } + + protected int expectedRegionDestroys() { + return getVMCount() + 1; + } + + /** + * Overridden within tests to increment shared counters. + */ + private abstract static class BaseCacheListener extends CacheListenerAdapter<String, Integer> + implements Serializable { + + @Override + public void afterCreate(final EntryEvent<String, Integer> event) { + fail("Unexpected listener callback: afterCreate"); + } + + @Override + public void afterInvalidate(final EntryEvent<String, Integer> event) { + fail("Unexpected listener callback: afterInvalidate"); + } + + @Override + public void afterDestroy(final EntryEvent<String, Integer> event) { + fail("Unexpected listener callback: afterDestroy"); + } + + @Override + public void afterUpdate(final EntryEvent<String, Integer> event) { + fail("Unexpected listener callback: afterUpdate"); + } + + @Override + public void afterRegionInvalidate(final RegionEvent<String, Integer> event) { + fail("Unexpected listener callback: afterRegionInvalidate"); + } + } + + private class CreateCountingCacheListener extends BaseCacheListener { + + @Override + public void afterCreate(final EntryEvent<String, Integer> event) { + sharedCountersRule.increment(CREATES); + + errorCollector.checkThat(event.getDistributedMember(), equalTo(event.getCallbackArgument())); + errorCollector.checkThat(event.getOperation(), equalTo(Operation.CREATE)); + errorCollector.checkThat(event.getOldValue(), nullValue()); + errorCollector.checkThat(event.getNewValue(), equalTo(ENTRY_VALUE)); + + if (event.getSerializedOldValue() != null) { + errorCollector.checkThat(event.getSerializedOldValue().getDeserializedValue(), + equalTo(event.getOldValue())); + } + if (event.getSerializedNewValue() != null) { + errorCollector.checkThat(event.getSerializedNewValue().getDeserializedValue(), + equalTo(event.getNewValue())); + } + } + } + + private class UpdateCountingCacheListener extends BaseCacheListener { + + @Override + public void afterCreate(final EntryEvent<String, Integer> event) { + // nothing + } + + @Override + public void afterUpdate(final EntryEvent<String, Integer> event) { + sharedCountersRule.increment(UPDATES); + + errorCollector.checkThat(event.getDistributedMember(), equalTo(event.getCallbackArgument())); + errorCollector.checkThat(event.getOperation(), equalTo(Operation.UPDATE)); + errorCollector.checkThat(event.getOldValue(), anyOf(equalTo(ENTRY_VALUE), nullValue())); + errorCollector.checkThat(event.getNewValue(), equalTo(UPDATED_ENTRY_VALUE)); + + if (event.getSerializedOldValue() != null) { + errorCollector.checkThat(event.getSerializedOldValue().getDeserializedValue(), + equalTo(event.getOldValue())); + } + if (event.getSerializedNewValue() != null) { + errorCollector.checkThat(event.getSerializedNewValue().getDeserializedValue(), + equalTo(event.getNewValue())); + } + } + } + + private class InvalidateCountingCacheListener extends BaseCacheListener { + + @Override + public void afterCreate(final EntryEvent<String, Integer> event) { + // ignore + } + + @Override + public void afterInvalidate(final EntryEvent<String, Integer> event) { + sharedCountersRule.increment(INVALIDATES); + + if (event.isOriginRemote()) { + errorCollector.checkThat(event.getDistributedMember(), + not(cacheRule.getSystem().getDistributedMember())); + } else { + errorCollector.checkThat(event.getDistributedMember(), + equalTo(cacheRule.getSystem().getDistributedMember())); + } + errorCollector.checkThat(event.getOperation(), equalTo(Operation.INVALIDATE)); + errorCollector.checkThat(event.getOldValue(), anyOf(equalTo(ENTRY_VALUE), nullValue())); + errorCollector.checkThat(event.getNewValue(), nullValue()); + } + } + + private class DestroyCountingCacheListener extends BaseCacheListener { + + @Override + public void afterCreate(final EntryEvent<String, Integer> event) { + sharedCountersRule.increment(CREATES); + } + + @Override + public void afterUpdate(final EntryEvent<String, Integer> event) { + sharedCountersRule.increment(UPDATES); + } + + @Override + public void afterDestroy(final EntryEvent<String, Integer> event) { + sharedCountersRule.increment(DESTROYS); + + if (event.isOriginRemote()) { + errorCollector.checkThat(event.getDistributedMember(), + not(cacheRule.getSystem().getDistributedMember())); + } else { + errorCollector.checkThat(event.getDistributedMember(), + equalTo(cacheRule.getSystem().getDistributedMember())); + } + errorCollector.checkThat(event.getOperation(), equalTo(Operation.DESTROY)); + errorCollector.checkThat(event.getOldValue(), anyOf(equalTo(ENTRY_VALUE), nullValue())); + errorCollector.checkThat(event.getNewValue(), nullValue()); + } + } + + protected class ClearCountingCacheListener extends BaseCacheListener { + + @Override + public void afterCreate(final EntryEvent<String, Integer> event) { + sharedCountersRule.increment(CREATES); + } + + @Override + public void afterUpdate(final EntryEvent<String, Integer> event) { + sharedCountersRule.increment(UPDATES); + } + + @Override + public void afterRegionClear(RegionEvent<String, Integer> event) { + + sharedCountersRule.increment(CLEAR); + if (!event.getRegion().getAttributes().getDataPolicy().withPartitioning()) { + if (event.isOriginRemote()) { + errorCollector.checkThat(event.getDistributedMember(), + not(cacheRule.getSystem().getDistributedMember())); + } else { + errorCollector.checkThat(event.getDistributedMember(), + equalTo(cacheRule.getSystem().getDistributedMember())); + } + } + errorCollector.checkThat(event.getOperation(), equalTo(Operation.REGION_CLEAR)); + errorCollector.checkThat(event.getRegion().getName(), equalTo(regionName)); + } + } + + protected class RegionDestroyCountingCacheListener extends BaseCacheListener { + + @Override + public void afterCreate(final EntryEvent<String, Integer> event) { + sharedCountersRule.increment(CREATES); + } + + @Override + public void afterUpdate(final EntryEvent<String, Integer> event) { + sharedCountersRule.increment(UPDATES); + } + + @Override + public void afterRegionDestroy(final RegionEvent<String, Integer> event) { + sharedCountersRule.increment(REGION_DESTROY); + + if (!event.getRegion().getAttributes().getDataPolicy().withPartitioning()) { + if (event.isOriginRemote()) { + errorCollector.checkThat(event.getDistributedMember(), + not(cacheRule.getSystem().getDistributedMember())); + } else { + errorCollector.checkThat(event.getDistributedMember(), + equalTo(cacheRule.getSystem().getDistributedMember())); + } + } + errorCollector.checkThat(event.getOperation(), equalTo(Operation.REGION_DESTROY)); + errorCollector.checkThat(event.getRegion().getName(), equalTo(regionName)); + } + } } diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java index 227c813..497e315 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/ReplicateCacheListenerDistributedTest.java @@ -51,8 +51,8 @@ public class ReplicateCacheListenerDistributedTest implements Serializable { private static final String UPDATES = "UPDATES"; private static final String INVALIDATES = "INVALIDATES"; private static final String DESTROYS = "DESTROYS"; - protected static final String CLEAR = "CLEAR"; - protected static final String REGION_DESTROY = "REGION_DESTROY"; + private static final String CLEAR = "CLEAR"; + private static final String REGION_DESTROY = "REGION_DESTROY"; private static final int ENTRY_VALUE = 0; private static final int UPDATED_ENTRY_VALUE = 1; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index b802650..1e019b0 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -10255,7 +10255,7 @@ public class PartitionedRegion extends LocalRegion void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) { // Synchronized to avoid other threads invoking clear on this vm/node. synchronized (clearLock) { - partitionedRegionClear.doClear(regionEvent, cacheWrite, this); + partitionedRegionClear.doClear(regionEvent, cacheWrite); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java index 69277ef..030b36e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java @@ -39,21 +39,24 @@ public class PartitionedRegionClear { private static final Logger logger = LogService.getLogger(); - private static final String CLEAR_OPERATION = "_clearOperation"; + protected static final String CLEAR_OPERATION = "_clearOperation"; private final int retryTime = 2 * 60 * 1000; private final PartitionedRegion partitionedRegion; - private final LockForListenerAndClientNotification lockForListenerAndClientNotification = + protected final LockForListenerAndClientNotification lockForListenerAndClientNotification = new LockForListenerAndClientNotification(); private volatile boolean membershipChange = false; + protected final PartitionedRegionClearListener partitionedRegionClearListener = + new PartitionedRegionClearListener(); + public PartitionedRegionClear(PartitionedRegion partitionedRegion) { this.partitionedRegion = partitionedRegion; partitionedRegion.getDistributionManager() - .addMembershipListener(new PartitionedRegionClearListener()); + .addMembershipListener(partitionedRegionClearListener); } public boolean isLockedForListenerAndClientNotification() { @@ -79,6 +82,10 @@ public class PartitionedRegionClear { } } + protected PartitionedRegionClearListener getPartitionedRegionClearListener() { + return partitionedRegionClearListener; + } + void obtainLockForClear(RegionEventImpl event) { obtainClearLockLocal(partitionedRegion.getDistributionManager().getId()); sendPartitionedRegionClearMessage(event, @@ -100,9 +107,8 @@ public class PartitionedRegionClear { return allBucketsCleared; } - private void waitForPrimary() { + protected void waitForPrimary(PartitionedRegion.RetryTimeKeeper retryTimer) { boolean retry; - PartitionedRegion.RetryTimeKeeper retryTimer = new PartitionedRegion.RetryTimeKeeper(retryTime); do { retry = false; for (BucketRegion bucketRegion : partitionedRegion.getDataStore() @@ -122,7 +128,7 @@ public class PartitionedRegionClear { public ArrayList clearRegionLocal(RegionEventImpl regionEvent) { ArrayList clearedBuckets = new ArrayList(); - membershipChange = false; + setMembershipChange(false); // Synchronized to handle the requester departure. synchronized (lockForListenerAndClientNotification) { if (partitionedRegion.getDataStore() != null) { @@ -130,18 +136,22 @@ public class PartitionedRegionClear { try { boolean retry; do { - waitForPrimary(); - + waitForPrimary(new PartitionedRegion.RetryTimeKeeper(retryTime)); + RegionEventImpl bucketRegionEvent; for (BucketRegion localPrimaryBucketRegion : partitionedRegion.getDataStore() .getAllLocalPrimaryBucketRegions()) { if (localPrimaryBucketRegion.size() > 0) { - localPrimaryBucketRegion.clear(); + bucketRegionEvent = + new RegionEventImpl(localPrimaryBucketRegion, Operation.REGION_CLEAR, null, + false, partitionedRegion.getMyId(), regionEvent.getEventId()); + localPrimaryBucketRegion.cmnClearRegion(bucketRegionEvent, false, true); } clearedBuckets.add(localPrimaryBucketRegion.getId()); } - if (membershipChange) { - membershipChange = false; + if (getMembershipChange()) { + // Retry and reset the membership change status. + setMembershipChange(false); retry = true; } else { retry = false; @@ -160,7 +170,7 @@ public class PartitionedRegionClear { return clearedBuckets; } - private void doAfterClear(RegionEventImpl regionEvent) { + protected void doAfterClear(RegionEventImpl regionEvent) { if (partitionedRegion.hasAnyClientsInterested()) { notifyClients(regionEvent); } @@ -245,7 +255,7 @@ public class PartitionedRegionClear { } } - private List sendPartitionedRegionClearMessage(RegionEventImpl event, + protected List sendPartitionedRegionClearMessage(RegionEventImpl event, PartitionedRegionClearMessage.OperationType op) { RegionEventImpl eventForLocalClear = (RegionEventImpl) event.clone(); eventForLocalClear.setOperation(Operation.REGION_LOCAL_CLEAR); @@ -259,7 +269,7 @@ public class PartitionedRegionClear { } while (true); } - private List attemptToSendPartitionedRegionClearMessage(RegionEventImpl event, + protected List attemptToSendPartitionedRegionClearMessage(RegionEventImpl event, PartitionedRegionClearMessage.OperationType op) throws ForceReattemptException { List bucketsOperated = null; @@ -321,30 +331,27 @@ public class PartitionedRegionClear { return bucketsOperated; } - void doClear(RegionEventImpl regionEvent, boolean cacheWrite, - PartitionedRegion partitionedRegion) { - String lockName = CLEAR_OPERATION + partitionedRegion.getDisplayName(); + void doClear(RegionEventImpl regionEvent, boolean cacheWrite) { + String lockName = CLEAR_OPERATION + partitionedRegion.getName(); try { // distributed lock to make sure only one clear op is in progress in the cluster. acquireDistributedClearLock(lockName); // Force all primary buckets to be created before clear. - PartitionRegionHelper.assignBucketsToPartitions(partitionedRegion); + assignAllPrimaryBuckets(); // do cacheWrite - try { - partitionedRegion.cacheWriteBeforeRegionClear(regionEvent); - } catch (OperationAbortedException operationAbortedException) { - throw new CacheWriterException(operationAbortedException); + if (cacheWrite) { + invokeCacheWriter(regionEvent); } // Check if there are any listeners or clients interested. If so, then clear write // locks needs to be taken on all local and remote primary buckets in order to // preserve the ordering of client events (for concurrent operations on the region). - boolean acquireClearLockForClientNotification = - (partitionedRegion.hasAnyClientsInterested() && partitionedRegion.hasListener()); - if (acquireClearLockForClientNotification) { + boolean acquireClearLockForNotification = + (partitionedRegion.hasAnyClientsInterested() || partitionedRegion.hasListener()); + if (acquireClearLockForNotification) { obtainLockForClear(regionEvent); } try { @@ -362,7 +369,7 @@ public class PartitionedRegionClear { throw new PartitionedRegionPartialClearException(message); } } finally { - if (acquireClearLockForClientNotification) { + if (acquireClearLockForNotification) { releaseLockForClear(regionEvent); } } @@ -372,7 +379,19 @@ public class PartitionedRegionClear { } } - void handleClearFromDepartedMember(InternalDistributedMember departedMember) { + protected void invokeCacheWriter(RegionEventImpl regionEvent) { + try { + partitionedRegion.cacheWriteBeforeRegionClear(regionEvent); + } catch (OperationAbortedException operationAbortedException) { + throw new CacheWriterException(operationAbortedException); + } + } + + protected void assignAllPrimaryBuckets() { + PartitionRegionHelper.assignBucketsToPartitions(partitionedRegion); + } + + protected void handleClearFromDepartedMember(InternalDistributedMember departedMember) { if (departedMember.equals(lockForListenerAndClientNotification.getLockRequester())) { synchronized (lockForListenerAndClientNotification) { if (lockForListenerAndClientNotification.getLockRequester() != null) { @@ -407,12 +426,20 @@ public class PartitionedRegionClear { } } + protected void setMembershipChange(boolean membershipChange) { + this.membershipChange = membershipChange; + } + + protected boolean getMembershipChange() { + return membershipChange; + } + protected class PartitionedRegionClearListener implements MembershipListener { @Override public synchronized void memberDeparted(DistributionManager distributionManager, InternalDistributedMember id, boolean crashed) { - membershipChange = true; + setMembershipChange(true); handleClearFromDepartedMember(id); } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java new file mode 100644 index 0000000..d8c42af --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java @@ -0,0 +1,611 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.cache.PartitionedRegionPartialClearException; +import org.apache.geode.cache.Region; +import org.apache.geode.distributed.DistributedLockService; +import org.apache.geode.distributed.internal.DMStats; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.partitioned.RegionAdvisor; +import org.apache.geode.internal.cache.versions.RegionVersionVector; + +public class PartitionedRegionClearTest { + + + private PartitionedRegionClear partitionedRegionClear; + private DistributionManager distributionManager; + private PartitionedRegion partitionedRegion; + + @Before + public void setUp() { + + partitionedRegion = mock(PartitionedRegion.class); + distributionManager = mock(DistributionManager.class); + + when(partitionedRegion.getDistributionManager()).thenReturn(distributionManager); + when(partitionedRegion.getName()).thenReturn("prRegion"); + + partitionedRegionClear = new PartitionedRegionClear(partitionedRegion); + } + + private Set<BucketRegion> setupBucketRegions( + PartitionedRegionDataStore partitionedRegionDataStore, + BucketAdvisor bucketAdvisor) { + final int numBuckets = 2; + Set<BucketRegion> bucketRegions = new HashSet<>(); + for (int i = 0; i < numBuckets; i++) { + BucketRegion bucketRegion = mock(BucketRegion.class); + when(bucketRegion.getBucketAdvisor()).thenReturn(bucketAdvisor); + when(bucketRegion.size()).thenReturn(1); + when(bucketRegion.getId()).thenReturn(i); + bucketRegions.add(bucketRegion); + } + + when(partitionedRegionDataStore.getAllLocalBucketRegions()).thenReturn(bucketRegions); + when(partitionedRegionDataStore.getAllLocalPrimaryBucketRegions()).thenReturn(bucketRegions); + + return bucketRegions; + } + + @Test + public void isLockedForListenerAndClientNotificationReturnsTrueWhenLocked() { + InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class); + when(distributionManager.isCurrentMember(internalDistributedMember)).thenReturn(true); + partitionedRegionClear.obtainClearLockLocal(internalDistributedMember); + + assertThat(partitionedRegionClear.isLockedForListenerAndClientNotification()).isTrue(); + } + + @Test + public void isLockedForListenerAndClientNotificationReturnsFalseWhenMemberNotInTheSystemRequestsLock() { + InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class); + when(distributionManager.isCurrentMember(internalDistributedMember)).thenReturn(false); + + assertThat(partitionedRegionClear.isLockedForListenerAndClientNotification()).isFalse(); + } + + @Test + public void acquireDistributedClearLockGetsDistributedLock() { + DistributedLockService distributedLockService = mock(DistributedLockService.class); + String lockName = PartitionedRegionClear.CLEAR_OPERATION + partitionedRegion.getName(); + when(partitionedRegion.getPartitionedRegionLockService()).thenReturn(distributedLockService); + + partitionedRegionClear.acquireDistributedClearLock(lockName); + + verify(distributedLockService, times(1)).lock(lockName, -1, -1); + } + + @Test + public void releaseDistributedClearLockReleasesDistributedLock() { + DistributedLockService distributedLockService = mock(DistributedLockService.class); + String lockName = PartitionedRegionClear.CLEAR_OPERATION + partitionedRegion.getName(); + when(partitionedRegion.getPartitionedRegionLockService()).thenReturn(distributedLockService); + + partitionedRegionClear.releaseDistributedClearLock(lockName); + + verify(distributedLockService, times(1)).unlock(lockName); + } + + @Test + public void obtainLockForClearGetsLocalLockAndSendsMessageForRemote() throws Exception { + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + when(regionEvent.clone()).thenReturn(mock(RegionEventImpl.class)); + Region<String, PartitionRegionConfig> region = mock(Region.class); + when(partitionedRegion.getPRRoot()).thenReturn(region); + PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear); + doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear) + .attemptToSendPartitionedRegionClearMessage(regionEvent, + PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR); + InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class); + when(distributionManager.getId()).thenReturn(internalDistributedMember); + + spyPartitionedRegionClear.obtainLockForClear(regionEvent); + + verify(spyPartitionedRegionClear, times(1)).obtainClearLockLocal(internalDistributedMember); + verify(spyPartitionedRegionClear, times(1)).sendPartitionedRegionClearMessage(regionEvent, + PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR); + } + + @Test + public void releaseLockForClearReleasesLocalLockAndSendsMessageForRemote() throws Exception { + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + when(regionEvent.clone()).thenReturn(mock(RegionEventImpl.class)); + Region<String, PartitionRegionConfig> region = mock(Region.class); + when(partitionedRegion.getPRRoot()).thenReturn(region); + PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear); + doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear) + .attemptToSendPartitionedRegionClearMessage(regionEvent, + PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR); + InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class); + when(distributionManager.getId()).thenReturn(internalDistributedMember); + + spyPartitionedRegionClear.releaseLockForClear(regionEvent); + + verify(spyPartitionedRegionClear, times(1)).releaseClearLockLocal(); + verify(spyPartitionedRegionClear, times(1)).sendPartitionedRegionClearMessage(regionEvent, + PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR); + } + + @Test + public void clearRegionClearsLocalAndSendsMessageForRemote() throws Exception { + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + when(regionEvent.clone()).thenReturn(mock(RegionEventImpl.class)); + Region<String, PartitionRegionConfig> region = mock(Region.class); + when(partitionedRegion.getPRRoot()).thenReturn(region); + PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear); + doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear) + .attemptToSendPartitionedRegionClearMessage(regionEvent, + PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR); + InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class); + when(distributionManager.getId()).thenReturn(internalDistributedMember); + RegionVersionVector regionVersionVector = mock(RegionVersionVector.class); + + spyPartitionedRegionClear.clearRegion(regionEvent, false, regionVersionVector); + + verify(spyPartitionedRegionClear, times(1)).clearRegionLocal(regionEvent); + verify(spyPartitionedRegionClear, times(1)).sendPartitionedRegionClearMessage(regionEvent, + PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR); + } + + @Test + public void waitForPrimaryReturnsAfterFindingAllPrimary() { + PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class); + BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class); + when(bucketAdvisor.hasPrimary()).thenReturn(true); + setupBucketRegions(partitionedRegionDataStore, bucketAdvisor); + when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore); + PartitionedRegion.RetryTimeKeeper retryTimer = mock(PartitionedRegion.RetryTimeKeeper.class); + + partitionedRegionClear.waitForPrimary(retryTimer); + + verify(retryTimer, times(0)).waitForBucketsRecovery(); + } + + @Test + public void waitForPrimaryReturnsAfterRetryForPrimary() { + PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class); + BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class); + when(bucketAdvisor.hasPrimary()).thenReturn(false).thenReturn(true); + setupBucketRegions(partitionedRegionDataStore, bucketAdvisor); + when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore); + PartitionedRegion.RetryTimeKeeper retryTimer = mock(PartitionedRegion.RetryTimeKeeper.class); + + partitionedRegionClear.waitForPrimary(retryTimer); + + verify(retryTimer, times(1)).waitForBucketsRecovery(); + } + + @Test + public void waitForPrimaryThrowsPartitionedRegionPartialClearException() { + PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class); + BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class); + setupBucketRegions(partitionedRegionDataStore, bucketAdvisor); + when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore); + PartitionedRegion.RetryTimeKeeper retryTimer = mock(PartitionedRegion.RetryTimeKeeper.class); + when(retryTimer.overMaximum()).thenReturn(true); + + Throwable thrown = catchThrowable(() -> partitionedRegionClear.waitForPrimary(retryTimer)); + + assertThat(thrown) + .isInstanceOf(PartitionedRegionPartialClearException.class) + .hasMessage( + "Unable to find primary bucket region during clear operation for region: prRegion"); + verify(retryTimer, times(0)).waitForBucketsRecovery(); + } + + @Test + public void clearRegionLocalCallsClearOnLocalPrimaryBucketRegions() { + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class); + when(bucketAdvisor.hasPrimary()).thenReturn(true); + PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class); + doNothing().when(partitionedRegionDataStore).lockBucketCreationForRegionClear(); + Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor); + when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore); + + List bucketsCleared = partitionedRegionClear.clearRegionLocal(regionEvent); + + assertThat(bucketsCleared).hasSize(buckets.size()); + + ArgumentCaptor<RegionEventImpl> argument = ArgumentCaptor.forClass(RegionEventImpl.class); + for (BucketRegion bucketRegion : buckets) { + verify(bucketRegion, times(1)).cmnClearRegion(argument.capture(), eq(false), eq(true)); + RegionEventImpl bucketRegionEvent = argument.getValue(); + assertThat(bucketRegionEvent.getRegion()).isEqualTo(bucketRegion); + } + } + + @Test + public void clearRegionLocalRetriesClearOnLocalPrimaryBucketRegions() { + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class); + when(bucketAdvisor.hasPrimary()).thenReturn(true); + PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class); + doNothing().when(partitionedRegionDataStore).lockBucketCreationForRegionClear(); + Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor); + when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore); + PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear); + when(spyPartitionedRegionClear.getMembershipChange()).thenReturn(true).thenReturn(false); + + List bucketsCleared = spyPartitionedRegionClear.clearRegionLocal(regionEvent); + + int expectedClears = buckets.size() * 2; /* clear is called twice on each bucket */ + assertThat(bucketsCleared).hasSize(expectedClears); + + ArgumentCaptor<RegionEventImpl> argument = ArgumentCaptor.forClass(RegionEventImpl.class); + for (BucketRegion bucketRegion : buckets) { + verify(bucketRegion, times(2)).cmnClearRegion(argument.capture(), eq(false), eq(true)); + RegionEventImpl bucketRegionEvent = argument.getValue(); + assertThat(bucketRegionEvent.getRegion()).isEqualTo(bucketRegion); + } + } + + @Test + public void doAfterClearCallsNotifyClientsWhenClientHaveInterests() { + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + when(partitionedRegion.hasAnyClientsInterested()).thenReturn(true); + FilterProfile filterProfile = mock(FilterProfile.class); + FilterRoutingInfo filterRoutingInfo = mock(FilterRoutingInfo.class); + when(filterProfile.getFilterRoutingInfoPart1(regionEvent, FilterProfile.NO_PROFILES, + Collections.emptySet())).thenReturn(filterRoutingInfo); + when(filterProfile.getFilterRoutingInfoPart2(filterRoutingInfo, regionEvent)).thenReturn( + filterRoutingInfo); + when(partitionedRegion.getFilterProfile()).thenReturn(filterProfile); + + partitionedRegionClear.doAfterClear(regionEvent); + + verify(regionEvent, times(1)).setLocalFilterInfo(any()); + verify(partitionedRegion, times(1)).notifyBridgeClients(regionEvent); + } + + @Test + public void doAfterClearDispatchesListenerEvents() { + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + when(partitionedRegion.hasListener()).thenReturn(true); + + partitionedRegionClear.doAfterClear(regionEvent); + + verify(partitionedRegion, times(1)).dispatchListenerEvent( + EnumListenerEvent.AFTER_REGION_CLEAR, regionEvent); + } + + @Test + public void obtainClearLockLocalGetsLockOnPrimaryBuckets() { + BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class); + when(bucketAdvisor.hasPrimary()).thenReturn(true); + PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class); + Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor); + when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore); + InternalDistributedMember member = mock(InternalDistributedMember.class); + when(distributionManager.isCurrentMember(member)).thenReturn(true); + + partitionedRegionClear.obtainClearLockLocal(member); + + assertThat(partitionedRegionClear.lockForListenerAndClientNotification.getLockRequester()) + .isSameAs(member); + for (BucketRegion bucketRegion : buckets) { + verify(bucketRegion, times(1)).lockLocallyForClear(partitionedRegion.getDistributionManager(), + partitionedRegion.getMyId(), null); + } + } + + @Test + public void obtainClearLockLocalDoesNotGetLocksOnPrimaryBucketsWhenMemberIsNotCurrent() { + BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class); + when(bucketAdvisor.hasPrimary()).thenReturn(true); + PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class); + Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor); + when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore); + InternalDistributedMember member = mock(InternalDistributedMember.class); + when(distributionManager.isCurrentMember(member)).thenReturn(false); + + partitionedRegionClear.obtainClearLockLocal(member); + + assertThat(partitionedRegionClear.lockForListenerAndClientNotification.getLockRequester()) + .isNull(); + for (BucketRegion bucketRegion : buckets) { + verify(bucketRegion, times(0)).lockLocallyForClear(partitionedRegion.getDistributionManager(), + partitionedRegion.getMyId(), null); + } + } + + @Test + public void releaseClearLockLocalReleasesLockOnPrimaryBuckets() { + BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class); + when(bucketAdvisor.hasPrimary()).thenReturn(true); + PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class); + Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor); + when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore); + InternalDistributedMember member = mock(InternalDistributedMember.class); + when(distributionManager.isCurrentMember(member)).thenReturn(true); + partitionedRegionClear.lockForListenerAndClientNotification.setLocked(member); + + partitionedRegionClear.releaseClearLockLocal(); + + for (BucketRegion bucketRegion : buckets) { + verify(bucketRegion, times(1)).releaseLockLocallyForClear(null); + } + } + + @Test + public void releaseClearLockLocalDoesNotReleaseLocksOnPrimaryBucketsWhenMemberIsNotCurrent() { + BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class); + when(bucketAdvisor.hasPrimary()).thenReturn(true); + PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class); + Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor); + when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore); + InternalDistributedMember member = mock(InternalDistributedMember.class); + + partitionedRegionClear.releaseClearLockLocal(); + + assertThat(partitionedRegionClear.lockForListenerAndClientNotification.getLockRequester()) + .isNull(); + for (BucketRegion bucketRegion : buckets) { + verify(bucketRegion, times(0)).releaseLockLocallyForClear(null); + } + } + + @Test + public void sendPartitionedRegionClearMessageSendsClearMessageToPRNodes() { + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + when(regionEvent.clone()).thenReturn(mock(RegionEventImpl.class)); + Region<String, PartitionRegionConfig> prRoot = mock(Region.class); + when(partitionedRegion.getPRRoot()).thenReturn(prRoot); + InternalDistributedMember member = mock(InternalDistributedMember.class); + RegionAdvisor regionAdvisor = mock(RegionAdvisor.class); + Set<InternalDistributedMember> prNodes = Collections.singleton(member); + Node node = mock(Node.class); + when(node.getMemberId()).thenReturn(member); + Set<Node> configNodes = Collections.singleton(node); + when(regionAdvisor.adviseAllPRNodes()).thenReturn(prNodes); + when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor); + PartitionRegionConfig partitionRegionConfig = mock(PartitionRegionConfig.class); + when(partitionRegionConfig.getNodes()).thenReturn(configNodes); + when(prRoot.get(any())).thenReturn(partitionRegionConfig); + InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class); + when(internalDistributedSystem.getDistributionManager()).thenReturn(distributionManager); + when(partitionedRegion.getSystem()).thenReturn(internalDistributedSystem); + InternalCache internalCache = mock(InternalCache.class); + TXManagerImpl txManager = mock(TXManagerImpl.class); + when(txManager.isDistributed()).thenReturn(false); + when(internalCache.getTxManager()).thenReturn(txManager); + when(partitionedRegion.getCache()).thenReturn(internalCache); + + when(distributionManager.getCancelCriterion()).thenReturn(mock(CancelCriterion.class)); + when(distributionManager.getStats()).thenReturn(mock(DMStats.class)); + + partitionedRegionClear.sendPartitionedRegionClearMessage(regionEvent, + PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR); + + verify(distributionManager, times(1)).putOutgoing(any()); + } + + @Test + public void doClearAcquiresAndReleasesDistributedClearLockAndCreatesAllPrimaryBuckets() { + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear); + doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any()); + doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets(); + doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear).clearRegion(regionEvent, false, + null); + + spyPartitionedRegionClear.doClear(regionEvent, false); + + verify(spyPartitionedRegionClear, times(1)).acquireDistributedClearLock(any()); + verify(spyPartitionedRegionClear, times(1)).releaseDistributedClearLock(any()); + verify(spyPartitionedRegionClear, times(1)).assignAllPrimaryBuckets(); + } + + @Test + public void doClearInvokesCacheWriterWhenCacheWriteIsSet() { + boolean cacheWrite = true; + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear); + doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any()); + doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets(); + doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear).clearRegion(regionEvent, + cacheWrite, null); + + spyPartitionedRegionClear.doClear(regionEvent, cacheWrite); + + verify(spyPartitionedRegionClear, times(1)).invokeCacheWriter(regionEvent); + } + + @Test + public void doClearDoesNotInvokesCacheWriterWhenCacheWriteIsNotSet() { + boolean cacheWrite = false; + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear); + doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any()); + doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets(); + doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear).clearRegion(regionEvent, + cacheWrite, null); + + spyPartitionedRegionClear.doClear(regionEvent, cacheWrite); + + verify(spyPartitionedRegionClear, times(0)).invokeCacheWriter(regionEvent); + } + + @Test + public void doClearObtainsAndReleasesLockForClearWhenRegionHasListener() { + boolean cacheWrite = false; + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + when(partitionedRegion.hasListener()).thenReturn(true); + when(partitionedRegion.hasAnyClientsInterested()).thenReturn(false); + PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear); + doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any()); + doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets(); + doNothing().when(spyPartitionedRegionClear).obtainLockForClear(regionEvent); + doNothing().when(spyPartitionedRegionClear).releaseLockForClear(regionEvent); + doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear).clearRegion(regionEvent, + cacheWrite, null); + + spyPartitionedRegionClear.doClear(regionEvent, cacheWrite); + + verify(spyPartitionedRegionClear, times(1)).obtainLockForClear(regionEvent); + verify(spyPartitionedRegionClear, times(1)).releaseLockForClear(regionEvent); + } + + @Test + public void doClearObtainsAndReleasesLockForClearWhenRegionHasClientInterest() { + boolean cacheWrite = false; + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + when(partitionedRegion.hasListener()).thenReturn(false); + when(partitionedRegion.hasAnyClientsInterested()).thenReturn(true); + PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear); + doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any()); + doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets(); + doNothing().when(spyPartitionedRegionClear).obtainLockForClear(regionEvent); + doNothing().when(spyPartitionedRegionClear).releaseLockForClear(regionEvent); + doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear).clearRegion(regionEvent, + cacheWrite, null); + + spyPartitionedRegionClear.doClear(regionEvent, cacheWrite); + + verify(spyPartitionedRegionClear, times(1)).obtainLockForClear(regionEvent); + verify(spyPartitionedRegionClear, times(1)).releaseLockForClear(regionEvent); + } + + @Test + public void doClearDoesNotObtainLockForClearWhenRegionHasNoListenerAndNoClientInterest() { + boolean cacheWrite = false; + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + when(partitionedRegion.hasListener()).thenReturn(false); + when(partitionedRegion.hasAnyClientsInterested()).thenReturn(false); + PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear); + doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any()); + doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets(); + doNothing().when(spyPartitionedRegionClear).obtainLockForClear(regionEvent); + doNothing().when(spyPartitionedRegionClear).releaseLockForClear(regionEvent); + doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear).clearRegion(regionEvent, + cacheWrite, null); + + spyPartitionedRegionClear.doClear(regionEvent, cacheWrite); + + verify(spyPartitionedRegionClear, times(0)).obtainLockForClear(regionEvent); + verify(spyPartitionedRegionClear, times(0)).releaseLockForClear(regionEvent); + } + + @Test + public void doClearThrowsPartitionedRegionPartialClearException() { + boolean cacheWrite = false; + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + when(partitionedRegion.hasListener()).thenReturn(false); + when(partitionedRegion.hasAnyClientsInterested()).thenReturn(false); + when(partitionedRegion.getTotalNumberOfBuckets()).thenReturn(1); + when(partitionedRegion.getName()).thenReturn("prRegion"); + PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear); + doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any()); + doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets(); + doNothing().when(spyPartitionedRegionClear).obtainLockForClear(regionEvent); + doNothing().when(spyPartitionedRegionClear).releaseLockForClear(regionEvent); + doReturn(Collections.EMPTY_LIST).when(spyPartitionedRegionClear).clearRegion(regionEvent, + cacheWrite, null); + + Throwable thrown = + catchThrowable(() -> spyPartitionedRegionClear.doClear(regionEvent, cacheWrite)); + + assertThat(thrown) + .isInstanceOf(PartitionedRegionPartialClearException.class) + .hasMessage( + "Unable to clear all the buckets from the partitioned region prRegion, either data (buckets) moved or member departed."); + } + + @Test + public void handleClearFromDepartedMemberReleasesTheLockForRequesterDeparture() { + InternalDistributedMember member = mock(InternalDistributedMember.class); + partitionedRegionClear.lockForListenerAndClientNotification.setLocked(member); + PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear); + + spyPartitionedRegionClear.handleClearFromDepartedMember(member); + + verify(spyPartitionedRegionClear, times(1)).releaseClearLockLocal(); + } + + @Test + public void handleClearFromDepartedMemberDoesNotReleasesTheLockForNonRequesterDeparture() { + InternalDistributedMember requesterMember = mock(InternalDistributedMember.class); + InternalDistributedMember member = mock(InternalDistributedMember.class); + partitionedRegionClear.lockForListenerAndClientNotification.setLocked(requesterMember); + PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear); + + spyPartitionedRegionClear.handleClearFromDepartedMember(member); + + verify(spyPartitionedRegionClear, times(0)).releaseClearLockLocal(); + } + + @Test + public void partitionedRegionClearRegistersMembershipListener() { + MembershipListener membershipListener = + partitionedRegionClear.getPartitionedRegionClearListener(); + + verify(distributionManager, times(1)).addMembershipListener(membershipListener); + } + + @Test + public void lockRequesterDepartureReleasesTheLock() { + InternalDistributedMember member = mock(InternalDistributedMember.class); + partitionedRegionClear.lockForListenerAndClientNotification.setLocked(member); + PartitionedRegionClear.PartitionedRegionClearListener partitionedRegionClearListener = + partitionedRegionClear.getPartitionedRegionClearListener(); + + partitionedRegionClearListener.memberDeparted(distributionManager, member, true); + + assertThat(partitionedRegionClear.getMembershipChange()).isTrue(); + assertThat(partitionedRegionClear.lockForListenerAndClientNotification.getLockRequester()) + .isNull(); + } + + @Test + public void nonLockRequesterDepartureDoesNotReleasesTheLock() { + InternalDistributedMember requesterMember = mock(InternalDistributedMember.class); + InternalDistributedMember member = mock(InternalDistributedMember.class); + partitionedRegionClear.lockForListenerAndClientNotification.setLocked(requesterMember); + PartitionedRegionClear.PartitionedRegionClearListener partitionedRegionClearListener = + partitionedRegionClear.getPartitionedRegionClearListener(); + + partitionedRegionClearListener.memberDeparted(distributionManager, member, true); + + assertThat(partitionedRegionClear.getMembershipChange()).isTrue(); + assertThat(partitionedRegionClear.lockForListenerAndClientNotification.getLockRequester()) + .isNotNull(); + } +}
