This is an automated email from the ASF dual-hosted git repository. klund pushed a commit to branch feature/GEODE-7665 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-7665 by this push: new e4a7b61 GEODE-9195: Remove PR clear local locking (#6410) e4a7b61 is described below commit e4a7b618ccccbf56d2582cfd11b6a86ea91b6e44 Author: Kirk Lund <kl...@apache.org> AuthorDate: Fri Apr 30 13:51:33 2021 -0700 GEODE-9195: Remove PR clear local locking (#6410) Unit test changes in BucketRegion and DistributedRegion. Unit test most of PartitionedRegionClearMessage. --- .../codeAnalysis/sanctionedDataSerializables.txt | 2 +- .../apache/geode/internal/cache/BucketRegion.java | 25 +- .../geode/internal/cache/DistributedRegion.java | 29 ++- .../internal/cache/PartitionedRegionClear.java | 15 +- .../cache/PartitionedRegionClearMessage.java | 109 ++++++-- .../geode/internal/cache/RegionEventFactory.java | 30 +++ .../internal/cache/BucketRegionJUnitTest.java | 59 ++++- .../internal/cache/DistributedRegionTest.java | 93 +++++-- .../cache/PartitionedRegionClearMessageTest.java | 285 +++++++++++++++++++++ 9 files changed, 561 insertions(+), 86 deletions(-) diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt index d1a8742..35d7a2b 100644 --- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt +++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt @@ -1076,7 +1076,7 @@ fromData,207 toData,178 org/apache/geode/internal/cache/PartitionedRegionClearMessage,2 -fromData,40 +fromData,49 toData,36 org/apache/geode/internal/cache/PartitionedRegionClearMessage$PartitionedRegionClearReplyMessage,2 diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java index 49f6aad..18f2ef9 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java @@ -37,6 +37,7 @@ import org.apache.geode.InternalGemFireError; import org.apache.geode.InvalidDeltaException; import org.apache.geode.SystemFailure; import org.apache.geode.annotations.Immutable; +import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.CacheWriter; import org.apache.geode.cache.CacheWriterException; @@ -577,11 +578,9 @@ public class BucketRegion extends DistributedRegion implements Bucket { // get rvvLock Set<InternalDistributedMember> participants = getCacheDistributionAdvisor().adviseInvalidateRegion(); - boolean isLockedAlready = this.partitionedRegion.getPartitionedRegionClear() - .isLockedForListenerAndClientNotification(); try { - obtainWriteLocksForClear(regionEvent, participants, isLockedAlready); + obtainWriteLocksForClear(regionEvent, participants); // no need to dominate my own rvv. // Clear is on going here, there won't be GII for this member clearRegionLocally(regionEvent, cacheWrite, null); @@ -589,10 +588,28 @@ public class BucketRegion extends DistributedRegion implements Bucket { // TODO: call reindexUserDataRegion if there're lucene indexes } finally { - releaseWriteLocksForClear(regionEvent, participants, isLockedAlready); + releaseWriteLocksForClear(regionEvent, participants); } } + @Override + protected void obtainWriteLocksForClear(RegionEventImpl regionEvent, + Set<InternalDistributedMember> participants) { + lockAndFlushClearToOthers(regionEvent, participants); + } + + @Override + protected void releaseWriteLocksForClear(RegionEventImpl regionEvent, + Set<InternalDistributedMember> participants) { + distributedClearOperationReleaseLocks(regionEvent, participants); + } + + @VisibleForTesting + void distributedClearOperationReleaseLocks(RegionEventImpl regionEvent, + Set<InternalDistributedMember> participants) { + DistributedClearOperation.releaseLocks(regionEvent, participants); + } + long generateTailKey() { long key = eventSeqNum.addAndGet(partitionedRegion.getTotalNumberOfBuckets()); if (key < 0 || key % getPartitionedRegion().getTotalNumberOfBuckets() != getId()) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index 3d6df11..0f419ad 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -2027,13 +2027,13 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute getCacheDistributionAdvisor().adviseInvalidateRegion(); // pause all generation of versions and flush from the other members to this one try { - obtainWriteLocksForClear(regionEvent, participants, false); + obtainWriteLocksForClear(regionEvent, participants); clearRegionLocally(regionEvent, cacheWrite, null); if (!regionEvent.isOriginRemote() && regionEvent.getOperation().isDistributed()) { distributeClearOperation(regionEvent, null, participants); } } finally { - releaseWriteLocksForClear(regionEvent, participants, false); + releaseWriteLocksForClear(regionEvent, participants); } } finally { distributedUnlockForClear(); @@ -2082,30 +2082,31 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute } } - /** * obtain locks preventing generation of new versions in other members */ protected void obtainWriteLocksForClear(RegionEventImpl regionEvent, - Set<InternalDistributedMember> participants, boolean localLockedAlready) { - if (!localLockedAlready) { - lockLocallyForClear(getDistributionManager(), getMyId(), regionEvent); - } - lockAndFlushClearToOthers(regionEvent, participants); + Set<InternalDistributedMember> recipients) { + lockLocallyForClear(getDistributionManager(), getMyId(), regionEvent); + lockAndFlushClearToOthers(regionEvent, recipients); } /** * releases the locks obtained in obtainWriteLocksForClear */ protected void releaseWriteLocksForClear(RegionEventImpl regionEvent, - Set<InternalDistributedMember> participants, - boolean localLockedAlready) { - if (!localLockedAlready) { - releaseLockLocallyForClear(regionEvent); - } - DistributedClearOperation.releaseLocks(regionEvent, participants); + Set<InternalDistributedMember> recipients) { + releaseLockLocallyForClear(regionEvent); + distributedClearOperationReleaseLocks(regionEvent, recipients); } + @VisibleForTesting + void distributedClearOperationReleaseLocks(RegionEventImpl regionEvent, + Set<InternalDistributedMember> recipients) { + DistributedClearOperation.releaseLocks(regionEvent, recipients); + } + + @VisibleForTesting void lockAndFlushClearToOthers(RegionEventImpl regionEvent, Set<InternalDistributedMember> participants) { DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java index 5f4e589..8403306 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java @@ -36,6 +36,7 @@ import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.MembershipListener; import org.apache.geode.distributed.internal.ReplyException; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.PartitionedRegionClearMessage.OperationType; import org.apache.geode.internal.cache.PartitionedRegionClearMessage.PartitionedRegionClearResponse; import org.apache.geode.internal.serialization.KnownVersion; import org.apache.geode.logging.internal.log4j.api.LogService; @@ -141,8 +142,7 @@ public class PartitionedRegionClear { */ void obtainLockForClear(RegionEventImpl event) { obtainClearLockLocal(partitionedRegion.getDistributionManager().getId()); - sendPartitionedRegionClearMessage(event, - PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR); + sendPartitionedRegionClearMessage(event, OperationType.OP_LOCK_FOR_PR_CLEAR); } /** @@ -150,8 +150,7 @@ public class PartitionedRegionClear { */ void releaseLockForClear(RegionEventImpl event) { releaseClearLockLocal(); - sendPartitionedRegionClearMessage(event, - PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR); + sendPartitionedRegionClearMessage(event, OperationType.OP_UNLOCK_FOR_PR_CLEAR); } /** @@ -162,7 +161,7 @@ public class PartitionedRegionClear { Set<Integer> localPrimaryBuckets = clearRegionLocal(regionEvent); // this includes all remote primary buckets and their secondaries Set<Integer> remotePrimaryBuckets = sendPartitionedRegionClearMessage(regionEvent, - PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR); + OperationType.OP_PR_CLEAR); Set<Integer> allBucketsCleared = new HashSet<>(); allBucketsCleared.addAll(localPrimaryBuckets); @@ -332,7 +331,7 @@ public class PartitionedRegionClear { } protected Set<Integer> sendPartitionedRegionClearMessage(RegionEventImpl event, - PartitionedRegionClearMessage.OperationType op) { + OperationType op) { RegionEventImpl eventForLocalClear = (RegionEventImpl) event.clone(); eventForLocalClear.setOperation(Operation.REGION_LOCAL_CLEAR); @@ -349,7 +348,7 @@ public class PartitionedRegionClear { * @return buckets that are cleared. empty set if any exception happened */ protected Set<Integer> attemptToSendPartitionedRegionClearMessage(RegionEventImpl event, - PartitionedRegionClearMessage.OperationType op) + OperationType op) throws ForceReattemptException { Set<Integer> clearedBuckets = new HashSet<>(); @@ -394,7 +393,7 @@ public class PartitionedRegionClear { clearMessage.send(); clearResponse.waitForRepliesUninterruptibly(); - clearedBuckets = clearResponse.bucketsCleared; + clearedBuckets = clearResponse.getBucketsCleared(); } catch (ReplyException e) { Throwable cause = e.getCause(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java index cd33f78..c4c1ca5 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java @@ -14,15 +14,19 @@ */ package org.apache.geode.internal.cache; +import static java.util.Collections.unmodifiableSet; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Collection; import java.util.Objects; import java.util.Set; import org.apache.logging.log4j.Logger; import org.apache.geode.DataSerializer; +import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.Operation; import org.apache.geode.distributed.internal.ClusterDistributionManager; @@ -52,22 +56,47 @@ public class PartitionedRegionClearMessage extends PartitionMessage { private Object callbackArgument; private OperationType operationType; private EventID eventId; - private PartitionedRegion partitionedRegion; private Set<Integer> bucketsCleared; + private DistributionManager distributionManager; + private RegionEventFactory regionEventFactory; public PartitionedRegionClearMessage() { // nothing } - PartitionedRegionClearMessage(Set<InternalDistributedMember> recipients, - PartitionedRegion partitionedRegion, ReplyProcessor21 replyProcessor21, - PartitionedRegionClearMessage.OperationType operationType, + PartitionedRegionClearMessage(Collection<InternalDistributedMember> recipients, + PartitionedRegion partitionedRegion, + ReplyProcessor21 replyProcessor21, + OperationType operationType, final RegionEventImpl regionEvent) { - super(recipients, partitionedRegion.getPRId(), replyProcessor21); - this.partitionedRegion = partitionedRegion; + this(recipients, + partitionedRegion.getDistributionManager(), + partitionedRegion.getPRId(), + replyProcessor21, + operationType, + regionEvent.getRawCallbackArgument(), + regionEvent.getEventId(), + partitionedRegion.getCache().getTxManager().isDistributed(), + RegionEventImpl::new); + } + + @VisibleForTesting + PartitionedRegionClearMessage(Collection<InternalDistributedMember> recipients, + DistributionManager distributionManager, + int partitionedRegionId, + ReplyProcessor21 replyProcessor21, + OperationType operationType, + Object callbackArgument, + EventID eventId, + boolean isTransactionDistributed, + RegionEventFactory regionEventFactory) { + super(recipients, partitionedRegionId, replyProcessor21); + setTransactionDistributed(isTransactionDistributed); + this.distributionManager = distributionManager; this.operationType = operationType; - callbackArgument = regionEvent.getRawCallbackArgument(); - eventId = regionEvent.getEventId(); + this.callbackArgument = callbackArgument; + this.eventId = eventId; + this.regionEventFactory = regionEventFactory; } @Override @@ -82,8 +111,7 @@ public class PartitionedRegionClearMessage extends PartitionMessage { public void send() { Objects.requireNonNull(getRecipients(), "ClearMessage NULL recipients set"); - setTransactionDistributed(partitionedRegion.getCache().getTxManager().isDistributed()); - partitionedRegion.getDistributionManager().putOutgoing(this); + distributionManager.putOutgoing(this); } @Override @@ -108,15 +136,17 @@ public class PartitionedRegionClearMessage extends PartitionMessage { return true; } + PartitionedRegionClear partitionedRegionClear = partitionedRegion.getPartitionedRegionClear(); + if (operationType == OperationType.OP_LOCK_FOR_PR_CLEAR) { - partitionedRegion.getPartitionedRegionClear().obtainClearLockLocal(getSender()); + partitionedRegionClear.obtainClearLockLocal(getSender()); } else if (operationType == OperationType.OP_UNLOCK_FOR_PR_CLEAR) { - partitionedRegion.getPartitionedRegionClear().releaseClearLockLocal(); + partitionedRegionClear.releaseClearLockLocal(); } else { - RegionEventImpl event = - new RegionEventImpl(partitionedRegion, Operation.REGION_CLEAR, callbackArgument, true, + RegionEventImpl event = (RegionEventImpl) regionEventFactory + .create(partitionedRegion, Operation.REGION_CLEAR, callbackArgument, true, partitionedRegion.getMyId(), getEventID()); - bucketsCleared = partitionedRegion.getPartitionedRegionClear().clearRegionLocal(event); + bucketsCleared = partitionedRegionClear.clearRegionLocal(event); } return true; } @@ -125,9 +155,9 @@ public class PartitionedRegionClearMessage extends PartitionMessage { protected void appendFields(StringBuilder stringBuilder) { super.appendFields(stringBuilder); stringBuilder - .append(" cbArg=") + .append(" callbackArgument=") .append(callbackArgument) - .append(" op=") + .append(" operationType=") .append(operationType); } @@ -141,8 +171,10 @@ public class PartitionedRegionClearMessage extends PartitionMessage { throws IOException, ClassNotFoundException { super.fromData(in, context); callbackArgument = DataSerializer.readObject(in); - operationType = PartitionedRegionClearMessage.OperationType.values()[in.readByte()]; + operationType = OperationType.values()[in.readByte()]; eventId = DataSerializer.readObject(in); + + regionEventFactory = RegionEventImpl::new; } @Override @@ -160,21 +192,36 @@ public class PartitionedRegionClearMessage extends PartitionMessage { if (partitionedRegion != null && startTime > 0) { partitionedRegion.getPrStats().endPartitionMessagesProcessing(startTime); } - PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage + PartitionedRegionClearReplyMessage .send(recipient, processorId, getReplySender(distributionManager), operationType, bucketsCleared, replyException); } + @VisibleForTesting + DistributionManager getDistributionManagerForTesting() { + return distributionManager; + } + + @VisibleForTesting + Object getCallbackArgumentForTesting() { + return callbackArgument; + } + + @VisibleForTesting + RegionEventFactory getRegionEventFactoryForTesting() { + return regionEventFactory; + } + /** * The response on which to wait for all the replies. This response ignores any exceptions * received from the "far side" */ public static class PartitionedRegionClearResponse extends ReplyProcessor21 { - CopyOnWriteHashSet<Integer> bucketsCleared = new CopyOnWriteHashSet<>(); + private final Set<Integer> bucketsCleared = new CopyOnWriteHashSet<>(); public PartitionedRegionClearResponse(InternalDistributedSystem system, - Set<InternalDistributedMember> recipients) { + Collection<InternalDistributedMember> recipients) { super(system, recipients); } @@ -188,12 +235,15 @@ public class PartitionedRegionClearMessage extends PartitionMessage { } process(message, true); } + + Set<Integer> getBucketsCleared() { + return unmodifiableSet(bucketsCleared); + } } public static class PartitionedRegionClearReplyMessage extends ReplyMessage { private Set<Integer> bucketsCleared; - private OperationType operationType; @Override @@ -201,14 +251,17 @@ public class PartitionedRegionClearMessage extends PartitionMessage { return true; } - public static void send(InternalDistributedMember recipient, int processorId, - ReplySender replySender, OperationType operationType, Set<Integer> bucketsCleared, + private static void send(InternalDistributedMember recipient, + int processorId, + ReplySender replySender, + OperationType operationType, + Set<Integer> bucketsCleared, ReplyException replyException) { Objects.requireNonNull(recipient, "partitionedRegionClearReplyMessage NULL reply message"); - PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage replyMessage = - new PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage(processorId, - operationType, bucketsCleared, replyException); + PartitionedRegionClearReplyMessage replyMessage = + new PartitionedRegionClearReplyMessage(processorId, operationType, bucketsCleared, + replyException); replyMessage.setRecipient(recipient); replySender.putOutgoing(replyMessage); @@ -260,7 +313,7 @@ public class PartitionedRegionClearMessage extends PartitionMessage { public void fromData(DataInput in, DeserializationContext context) throws IOException, ClassNotFoundException { super.fromData(in, context); - operationType = PartitionedRegionClearMessage.OperationType.values()[in.readByte()]; + operationType = OperationType.values()[in.readByte()]; bucketsCleared = DataSerializer.readObject(in); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventFactory.java new file mode 100644 index 0000000..c759a44 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.distributed.DistributedMember; + +@FunctionalInterface +public interface RegionEventFactory { + + RegionEvent create(PartitionedRegion partitionedRegion, + Operation operation, + Object callbackArgument, + boolean originRemote, + DistributedMember distributedMember, + EventID eventId); +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java index 0d1cc87..c0a635f 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java @@ -14,11 +14,13 @@ */ package org.apache.geode.internal.cache; +import static java.util.Collections.emptySet; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -29,6 +31,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -36,6 +39,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.junit.Test; import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.versions.RegionVersionVector; import org.apache.geode.internal.statistics.StatisticsClock; @@ -183,17 +187,6 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest { } @Test - public void obtainWriteLocksForClearInBRShouldDistribute() { - RegionEventImpl event = createClearRegionEvent(); - BucketRegion region = (BucketRegion) event.getRegion(); - doNothing().when(region).lockLocallyForClear(any(), any(), any()); - doNothing().when(region).lockAndFlushClearToOthers(any(), any()); - region.obtainWriteLocksForClear(event, null, false); - verify(region).lockLocallyForClear(any(), any(), eq(event)); - verify(region).lockAndFlushClearToOthers(eq(event), eq(null)); - } - - @Test public void updateSizeToZeroOnClearBucketRegion() { RegionEventImpl event = createClearRegionEvent(); BucketRegion region = (BucketRegion) event.getRegion(); @@ -211,4 +204,48 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest { long sizeAfterClear = region.getTotalBytes(); assertEquals(0, sizeAfterClear); } + + @Test + public void obtainWriteLocksForClearInBRShouldLockAndFlushToOthers() { + RegionEventImpl event = createClearRegionEvent(); + BucketRegion region = (BucketRegion) event.getRegion(); + doNothing().when(region).lockAndFlushClearToOthers(any(), any()); + region.obtainWriteLocksForClear(event, null); + verify(region).lockAndFlushClearToOthers(eq(event), eq(null)); + } + + @Test + public void obtainWriteLocksForClear_invokes_lockAndFlushClearToOthers() { + Set<InternalDistributedMember> recipients = emptySet(); + BucketRegion bucketRegion = bucketRegionForClearLocking(); + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + + bucketRegion.obtainWriteLocksForClear(regionEvent, recipients); + + verify(bucketRegion).lockAndFlushClearToOthers(regionEvent, recipients); + } + + @Test + public void releaseWriteLocksForClear_invokes_distributedClearOperationReleaseLocks() { + Set<InternalDistributedMember> recipients = emptySet(); + BucketRegion bucketRegion = bucketRegionForClearLocking(); + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + + bucketRegion.releaseWriteLocksForClear(regionEvent, recipients); + + verify(bucketRegion).distributedClearOperationReleaseLocks(regionEvent, recipients); + } + + private BucketRegion bucketRegionForClearLocking() { + // use partial-mock with null fields to verify method invocations + BucketRegion bucketRegion = mock(BucketRegion.class, CALLS_REAL_METHODS); + + // doNothing when invoking locking methods for clear + doNothing().when(bucketRegion).lockAndFlushClearToOthers(any(), any()); + + // doNothing when invoking unlocking methods for clear + doNothing().when(bucketRegion).distributedClearOperationReleaseLocks(any(), any()); + + return bucketRegion; + } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java index 13a2685..185c67d 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java @@ -14,14 +14,19 @@ */ package org.apache.geode.internal.cache; +import static java.util.Collections.emptySet; import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId; +import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Matchers.eq; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -29,6 +34,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Collections; +import java.util.Set; import org.junit.Before; import org.junit.Test; @@ -43,36 +49,22 @@ import org.apache.geode.internal.cache.versions.VersionSource; import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException; import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException; - public class DistributedRegionTest { + private RegionVersionVector<VersionSource<Object>> vector; private RegionVersionHolder<VersionSource<Object>> holder; private VersionSource<Object> lostMemberVersionID; private InternalDistributedMember member; @Before - @SuppressWarnings("unchecked") public void setup() { - vector = mock(RegionVersionVector.class); - holder = mock(RegionVersionHolder.class); - lostMemberVersionID = mock(VersionSource.class); + vector = uncheckedCast(mock(RegionVersionVector.class)); + holder = uncheckedCast(mock(RegionVersionHolder.class)); + lostMemberVersionID = uncheckedCast(mock(VersionSource.class)); member = mock(InternalDistributedMember.class); } @Test - public void shouldBeMockable() throws Exception { - DistributedRegion mockDistributedRegion = mock(DistributedRegion.class); - EntryEventImpl mockEntryEventImpl = mock(EntryEventImpl.class); - Object returnValue = new Object(); - - when(mockDistributedRegion.validatedDestroy(any(), eq(mockEntryEventImpl))) - .thenReturn(returnValue); - - assertThat(mockDistributedRegion.validatedDestroy(new Object(), mockEntryEventImpl)) - .isSameAs(returnValue); - } - - @Test public void cleanUpAfterFailedInitialImageHoldsLockForClear() { DistributedRegion distributedRegion = mock(DistributedRegion.class, RETURNS_DEEP_STUBS); RegionMap regionMap = mock(RegionMap.class); @@ -99,7 +91,7 @@ public class DistributedRegionTest { distributedRegion.cleanUpAfterFailedGII(true); - verify(diskRegion).resetRecoveredEntries(eq(distributedRegion)); + verify(diskRegion).resetRecoveredEntries(distributedRegion); verify(distributedRegion, never()).closeEntries(); } @@ -260,4 +252,65 @@ public class DistributedRegionTest { .hasMessage("Parallel Gateway Sender " + senderId + " can not be used with replicated region " + regionPath); } + + @Test + public void obtainWriteLocksForClear_invokes_lockLocallyForClear() { + DistributedRegion distributedRegion = distributedRegionForClearLocking(); + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + + distributedRegion.obtainWriteLocksForClear(regionEvent, emptySet()); + + verify(distributedRegion).lockLocallyForClear(any(), any(), eq(regionEvent)); + } + + @Test + public void obtainWriteLocksForClear_invokes_lockAndFlushClearToOthers() { + Set<InternalDistributedMember> recipients = emptySet(); + DistributedRegion distributedRegion = distributedRegionForClearLocking(); + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + + distributedRegion.obtainWriteLocksForClear(regionEvent, recipients); + + verify(distributedRegion).lockAndFlushClearToOthers(regionEvent, recipients); + } + + @Test + public void releaseWriteLocksForClear_invokes_releaseLockLocallyForClear() { + DistributedRegion distributedRegion = distributedRegionForClearLocking(); + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + + distributedRegion.releaseWriteLocksForClear(regionEvent, emptySet()); + + verify(distributedRegion).releaseLockLocallyForClear(regionEvent); + } + + @Test + public void releaseWriteLocksForClear_invokes_distributedClearOperationReleaseLocks() { + Set<InternalDistributedMember> recipients = emptySet(); + DistributedRegion distributedRegion = distributedRegionForClearLocking(); + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + + distributedRegion.releaseWriteLocksForClear(regionEvent, recipients); + + verify(distributedRegion).distributedClearOperationReleaseLocks(regionEvent, recipients); + } + + private DistributedRegion distributedRegionForClearLocking() { + // use partial-mock with null fields to verify method invocations + DistributedRegion distributedRegion = mock(DistributedRegion.class, CALLS_REAL_METHODS); + + // stub out getDistributionManager and getMyId + doReturn(null).when(distributedRegion).getDistributionManager(); + doReturn(null).when(distributedRegion).getMyId(); + + // doNothing when invoking locking methods for clear + doNothing().when(distributedRegion).lockAndFlushClearToOthers(any(), any()); + doNothing().when(distributedRegion).lockLocallyForClear(any(), any(), any()); + + // doNothing when invoking unlocking methods for clear + doNothing().when(distributedRegion).distributedClearOperationReleaseLocks(any(), any()); + doNothing().when(distributedRegion).releaseLockLocallyForClear(any()); + + return distributedRegion; + } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearMessageTest.java new file mode 100644 index 0000000..4e67fc1 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearMessageTest.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static java.util.Collections.emptySet; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collection; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionAdvisor; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.ReplyProcessor21; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.PartitionedRegionClearMessage.OperationType; + +public class PartitionedRegionClearMessageTest { + + private Collection<InternalDistributedMember> recipients; + private DistributionManager distributionManager; + private PartitionedRegion partitionedRegion; + private ReplyProcessor21 replyProcessor21; + private Object callbackArgument; + private EventID eventId; + private RegionEventFactory regionEventFactory; + + @Before + public void setUp() { + recipients = emptySet(); + distributionManager = mock(DistributionManager.class); + partitionedRegion = mock(PartitionedRegion.class); + replyProcessor21 = mock(ReplyProcessor21.class); + callbackArgument = new Object(); + eventId = mock(EventID.class); + regionEventFactory = mock(RegionEventFactory.class); + } + + @Test + public void construction_throwsNullPointerExceptionIfRecipientsIsNull() { + Throwable thrown = catchThrowable(() -> { + new PartitionedRegionClearMessage(null, distributionManager, 1, + replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false, + regionEventFactory); + }); + + assertThat(thrown).isInstanceOf(NullPointerException.class); + } + + @Test + public void construction_findsAllDependencies() { + boolean isTransactionDistributed = true; + int regionId = 10; + InternalCache cache = mock(InternalCache.class); + RegionEventImpl regionEvent = mock(RegionEventImpl.class); + TXManagerImpl txManager = mock(TXManagerImpl.class); + when(cache.getTxManager()).thenReturn(txManager); + when(partitionedRegion.getCache()).thenReturn(cache); + when(partitionedRegion.getDistributionManager()).thenReturn(distributionManager); + when(partitionedRegion.getPRId()).thenReturn(regionId); + when(regionEvent.getEventId()).thenReturn(eventId); + when(regionEvent.getRawCallbackArgument()).thenReturn(callbackArgument); + when(txManager.isDistributed()).thenReturn(isTransactionDistributed); + + PartitionedRegionClearMessage message = new PartitionedRegionClearMessage(recipients, + partitionedRegion, + replyProcessor21, + OperationType.OP_PR_CLEAR, + regionEvent); + + assertThat(message.getDistributionManagerForTesting()).isSameAs(distributionManager); + assertThat(message.getCallbackArgumentForTesting()).isSameAs(callbackArgument); + assertThat(message.getRegionId()).isEqualTo(regionId); + assertThat(message.getEventID()).isEqualTo(eventId); + assertThat(message.isTransactionDistributed()).isEqualTo(isTransactionDistributed); + + RegionEventFactory regionEventFactory = message.getRegionEventFactoryForTesting(); + RegionEvent<?, ?> created = + regionEventFactory.create(partitionedRegion, Operation.DESTROY, callbackArgument, false, + mock(DistributedMember.class), mock(EventID.class)); + assertThat(created).isInstanceOf(RegionEventImpl.class); + } + + @Test + public void construction_setsTransactionDistributed() { + boolean isTransactionDistributed = true; + PartitionedRegionClearMessage message = + new PartitionedRegionClearMessage(recipients, distributionManager, 1, + replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, + isTransactionDistributed, regionEventFactory); + + boolean value = message.isTransactionDistributed(); + + assertThat(value).isEqualTo(isTransactionDistributed); + } + + @Test + public void getEventID_returnsTheEventId() { + PartitionedRegionClearMessage message = + new PartitionedRegionClearMessage(recipients, distributionManager, 1, + replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false, + regionEventFactory); + + EventID value = message.getEventID(); + + assertThat(value).isSameAs(eventId); + } + + @Test + public void getOperationType_returnsTheOperationType() { + PartitionedRegionClearMessage message = + new PartitionedRegionClearMessage(recipients, distributionManager, 1, + replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false, + regionEventFactory); + + OperationType value = message.getOperationType(); + + assertThat(value).isSameAs(OperationType.OP_PR_CLEAR); + } + + @Test + public void send_putsOutgoing() { + PartitionedRegionClearMessage message = + new PartitionedRegionClearMessage(recipients, distributionManager, 1, + replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false, + regionEventFactory); + + message.send(); + + verify(distributionManager).putOutgoing(message); + } + + @Test + public void processCheckForPR_returnsForceReattemptException_whenRegionIsNotInitialized() { + DistributionAdvisor distributionAdvisor = mock(DistributionAdvisor.class); + when(distributionAdvisor.isInitialized()).thenReturn(false); + when(partitionedRegion.getDistributionAdvisor()).thenReturn(distributionAdvisor); + PartitionedRegionClearMessage message = + new PartitionedRegionClearMessage(recipients, distributionManager, 1, + replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false, + regionEventFactory); + + Throwable throwable = message.processCheckForPR(partitionedRegion, distributionManager); + + assertThat(throwable) + .isInstanceOf(ForceReattemptException.class) + .hasMessageContaining("could not find partitioned region with Id"); + } + + @Test + public void processCheckForPR_returnsNull_whenRegionIsNull() { + PartitionedRegionClearMessage message = + new PartitionedRegionClearMessage(recipients, distributionManager, 1, + replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false, + regionEventFactory); + + Throwable throwable = message.processCheckForPR(null, distributionManager); + + assertThat(throwable).isNull(); + } + + @Test + public void processCheckForPR_returnsNull_whenRegionIsInitialized() { + DistributionAdvisor distributionAdvisor = mock(DistributionAdvisor.class); + when(distributionAdvisor.isInitialized()).thenReturn(true); + when(partitionedRegion.getDistributionAdvisor()).thenReturn(distributionAdvisor); + PartitionedRegionClearMessage message = + new PartitionedRegionClearMessage(recipients, distributionManager, 1, + replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false, + regionEventFactory); + + Throwable throwable = message.processCheckForPR(null, distributionManager); + + assertThat(throwable).isNull(); + } + + @Test + public void operateOnPartitionedRegion_returnsTrue_whenRegionIsNull() { + ClusterDistributionManager clusterDistributionManager = mock(ClusterDistributionManager.class); + PartitionedRegionClear partitionedRegionClear = mock(PartitionedRegionClear.class); + when(partitionedRegion.getPartitionedRegionClear()).thenReturn(partitionedRegionClear); + when(partitionedRegionClear.clearRegionLocal(any())).thenReturn(emptySet()); + PartitionedRegionClearMessage message = + new PartitionedRegionClearMessage(recipients, distributionManager, 1, + replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false, + regionEventFactory); + + boolean result = + message.operateOnPartitionedRegion(clusterDistributionManager, partitionedRegion, 30); + + assertThat(result).isTrue(); + } + + @Test + public void operateOnPartitionedRegion_returnsTrue_whenRegionIsDestroyed() { + ClusterDistributionManager clusterDistributionManager = mock(ClusterDistributionManager.class); + when(partitionedRegion.isDestroyed()).thenReturn(true); + PartitionedRegionClearMessage message = + new PartitionedRegionClearMessage(recipients, distributionManager, 1, + replyProcessor21, OperationType.OP_PR_CLEAR, callbackArgument, eventId, false, + regionEventFactory); + + boolean result = + message.operateOnPartitionedRegion(clusterDistributionManager, partitionedRegion, 30); + + assertThat(result).isTrue(); + } + + @Test + public void operateOnPartitionedRegion_obtainsClearLockLocal_whenOperationTypeIs_OP_LOCK_FOR_PR_CLEAR() { + ClusterDistributionManager clusterDistributionManager = mock(ClusterDistributionManager.class); + PartitionedRegionClear partitionedRegionClear = mock(PartitionedRegionClear.class); + when(partitionedRegion.getPartitionedRegionClear()).thenReturn(partitionedRegionClear); + PartitionedRegionClearMessage message = new PartitionedRegionClearMessage(recipients, + clusterDistributionManager, 1, replyProcessor21, + OperationType.OP_LOCK_FOR_PR_CLEAR, callbackArgument, eventId, false, + regionEventFactory); + + boolean result = + message.operateOnPartitionedRegion(clusterDistributionManager, partitionedRegion, 30); + + assertThat(result).isTrue(); + verify(partitionedRegionClear).obtainClearLockLocal(any()); + } + + @Test + public void operateOnPartitionedRegion_releasesClearLockLocal_whenOperationTypeIs_OP_UNLOCK_FOR_PR_CLEAR() { + ClusterDistributionManager clusterDistributionManager = mock(ClusterDistributionManager.class); + PartitionedRegionClear partitionedRegionClear = mock(PartitionedRegionClear.class); + when(partitionedRegion.getPartitionedRegionClear()).thenReturn(partitionedRegionClear); + PartitionedRegionClearMessage message = new PartitionedRegionClearMessage(recipients, + clusterDistributionManager, 1, replyProcessor21, + OperationType.OP_UNLOCK_FOR_PR_CLEAR, callbackArgument, eventId, false, + regionEventFactory); + + boolean result = + message.operateOnPartitionedRegion(clusterDistributionManager, partitionedRegion, 30); + + assertThat(result).isTrue(); + verify(partitionedRegionClear).releaseClearLockLocal(); + } + + @Test + public void operateOnPartitionedRegion_clearsRegionLocal_whenOperationTypeIs_OP_PR_CLEAR() { + ClusterDistributionManager clusterDistributionManager = mock(ClusterDistributionManager.class); + PartitionedRegionClear partitionedRegionClear = mock(PartitionedRegionClear.class); + when(partitionedRegion.getPartitionedRegionClear()) + .thenReturn(partitionedRegionClear); + when(regionEventFactory.create(any(), any(), any(), anyBoolean(), any(), any())) + .thenReturn(mock(RegionEventImpl.class)); + PartitionedRegionClearMessage message = new PartitionedRegionClearMessage(recipients, + clusterDistributionManager, 1, replyProcessor21, + OperationType.OP_PR_CLEAR, callbackArgument, eventId, false, + regionEventFactory); + + boolean result = + message.operateOnPartitionedRegion(clusterDistributionManager, partitionedRegion, 30); + + assertThat(result).isTrue(); + verify(partitionedRegionClear).clearRegionLocal(any(RegionEventImpl.class)); + } +}