This is an automated email from the ASF dual-hosted git repository. jinmeiliao pushed a commit to branch feature/GEODE-7665 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 9a67231f5a107cf019af551f5a94d1ce49614d94 Author: Xiaojian Zhou <[email protected]> AuthorDate: Fri Jul 10 08:59:26 2020 -0700 GEODE-8334: PR.clear should sync with putAll or removeAll on rvvLock (#5365) Co-authored-by: Xiaojian Zhou <[email protected]> Co-authored-by: Anil Gingade <[email protected]> --- .../apache/geode/internal/cache/BucketRegion.java | 8 +----- .../cache/partitioned/PutAllPRMessage.java | 9 +++++++ .../cache/partitioned/RemoveAllPRMessage.java | 9 +++++++ .../cache/partitioned/PutAllPRMessageTest.java | 29 ++++++++++++++++++++++ .../cache/partitioned/RemoveAllPRMessageTest.java | 29 ++++++++++++++++++++++ 5 files changed, 77 insertions(+), 7 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java index c37e1a3..6793ecb 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java @@ -566,12 +566,6 @@ public class BucketRegion extends DistributedRegion implements Bucket { return; } - boolean enableRVV = useRVV && getConcurrencyChecksEnabled(); - RegionVersionVector rvv = null; - if (enableRVV) { - rvv = getVersionVector().getCloneForTransmission(); - } - // get rvvLock Set<InternalDistributedMember> participants = getCacheDistributionAdvisor().adviseInvalidateRegion(); @@ -585,7 +579,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { // no need to dominate my own rvv. // Clear is on going here, there won't be GII for this member clearRegionLocally(regionEvent, cacheWrite, null); - distributeClearOperation(regionEvent, rvv, participants); + distributeClearOperation(regionEvent, null, participants); // TODO: call reindexUserDataRegion if there're lucene indexes } finally { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java index 0c690c5..6bb666c 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java @@ -413,6 +413,7 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply { Object[] keys = getKeysToBeLocked(); if (!notificationOnly) { boolean locked = false; + boolean rvvLocked = false; try { if (putAllPRData.length > 0) { if (this.posDup && bucketRegion.getConcurrencyChecksEnabled()) { @@ -438,6 +439,10 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply { bucketRegion.recordBulkOpStart(membershipID, eventID); } locked = bucketRegion.waitUntilLocked(keys); + if (!rvvLocked) { + bucketRegion.lockRVVForBulkOp(); + rvvLocked = true; + } boolean lockedForPrimary = false; final HashMap succeeded = new HashMap(); PutAllPartialResult partialKeys = new PutAllPartialResult(putAllPRDataSize); @@ -518,6 +523,10 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply { } catch (RegionDestroyedException e) { ds.checkRegionDestroyedOnBucket(bucketRegion, true, e); } finally { + if (rvvLocked) { + bucketRegion.unlockRVVForBulkOp(); + rvvLocked = false; + } if (locked) { bucketRegion.removeAndNotifyKeys(keys); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java index 6f355d6..f295136 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java @@ -406,6 +406,7 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply { if (!notificationOnly) { boolean locked = false; + boolean rvvLocked = false; try { if (removeAllPRData.length > 0) { if (this.posDup && bucketRegion.getConcurrencyChecksEnabled()) { @@ -431,6 +432,10 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply { bucketRegion.recordBulkOpStart(membershipID, eventID); } locked = bucketRegion.waitUntilLocked(keys); + if (!rvvLocked) { + bucketRegion.lockRVVForBulkOp(); + rvvLocked = true; + } boolean lockedForPrimary = false; final ArrayList<Object> succeeded = new ArrayList<Object>(); PutAllPartialResult partialKeys = new PutAllPartialResult(removeAllPRDataSize); @@ -526,6 +531,10 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply { } catch (RegionDestroyedException e) { ds.checkRegionDestroyedOnBucket(bucketRegion, true, e); } finally { + if (rvvLocked) { + bucketRegion.unlockRVVForBulkOp(); + rvvLocked = false; + } if (locked) { bucketRegion.removeAndNotifyKeys(keys); } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java index ab82a93..f5480a5 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java @@ -15,9 +15,11 @@ package org.apache.geode.internal.cache.partitioned; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -119,4 +121,31 @@ public class PutAllPRMessageTest { eq(regionDestroyedException)); } + @Test + public void rvvLockedAfterKeysAreLockedAndUnlockRVVBeforeKeys() throws Exception { + PutAllPRMessage message = spy(new PutAllPRMessage(bucketId, 1, false, false, false, null)); + message.addEntry(entryData); + doReturn(keys).when(message).getKeysToBeLocked(); + when(bucketRegion.waitUntilLocked(keys)).thenReturn(true); + when(bucketRegion.doLockForPrimary(false)).thenThrow(new PrimaryBucketException()); + doNothing().when(bucketRegion).lockRVVForBulkOp(); + doNothing().when(bucketRegion).unlockRVVForBulkOp(); + + InternalCache cache = mock(InternalCache.class); + InternalDistributedSystem ids = mock(InternalDistributedSystem.class); + when(bucketRegion.getCache()).thenReturn(cache); + when(cache.getDistributedSystem()).thenReturn(ids); + when(ids.getOffHeapStore()).thenReturn(null); + + assertThatThrownBy( + () -> message.doLocalPutAll(partitionedRegion, mock(InternalDistributedMember.class), 1)) + .isInstanceOf(PrimaryBucketException.class); + + InOrder inOrder = inOrder(bucketRegion); + inOrder.verify(bucketRegion).waitUntilLocked(keys); + inOrder.verify(bucketRegion).lockRVVForBulkOp(); + inOrder.verify(bucketRegion).unlockRVVForBulkOp(); + inOrder.verify(bucketRegion).removeAndNotifyKeys(keys); + } + } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java index 2309cb0..a3ee31b 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java @@ -14,9 +14,11 @@ */ package org.apache.geode.internal.cache.partitioned; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -131,4 +133,31 @@ public class RemoveAllPRMessageTest { verify(dataStore).checkRegionDestroyedOnBucket(eq(bucketRegion), eq(true), eq(regionDestroyedException)); } + + @Test + public void rvvLockedAfterKeysAreLockedAndUnlockRVVBeforeKeys() throws Exception { + RemoveAllPRMessage message = + spy(new RemoveAllPRMessage(bucketId, 1, false, false, false, null)); + message.addEntry(entryData); + doReturn(keys).when(message).getKeysToBeLocked(); + when(bucketRegion.waitUntilLocked(keys)).thenReturn(true); + when(bucketRegion.doLockForPrimary(false)).thenThrow(new PrimaryBucketException()); + doNothing().when(bucketRegion).lockRVVForBulkOp(); + doNothing().when(bucketRegion).unlockRVVForBulkOp(); + + InternalCache cache = mock(InternalCache.class); + InternalDistributedSystem ids = mock(InternalDistributedSystem.class); + when(bucketRegion.getCache()).thenReturn(cache); + when(cache.getDistributedSystem()).thenReturn(ids); + when(ids.getOffHeapStore()).thenReturn(null); + + assertThatThrownBy(() -> message.doLocalRemoveAll(partitionedRegion, + mock(InternalDistributedMember.class), true)).isInstanceOf(PrimaryBucketException.class); + + InOrder inOrder = inOrder(bucketRegion); + inOrder.verify(bucketRegion).waitUntilLocked(keys); + inOrder.verify(bucketRegion).lockRVVForBulkOp(); + inOrder.verify(bucketRegion).unlockRVVForBulkOp(); + inOrder.verify(bucketRegion).removeAndNotifyKeys(keys); + } }
