This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-8334 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 4e6705335950d30fcc1e5831ca2d09612e47b78f Author: zhouxh <[email protected]> AuthorDate: Thu Jul 9 15:41:01 2020 -0700 GEODE-8334: PR.clear should sync with putAll or removeAll on rvvLock --- .../apache/geode/internal/cache/BucketRegion.java | 8 +---- .../cache/partitioned/PutAllPRMessage.java | 9 ++++++ .../cache/partitioned/RemoveAllPRMessage.java | 9 ++++++ .../cache/partitioned/PutAllPRMessageTest.java | 33 +++++++++++++++++++++ .../cache/partitioned/RemoveAllPRMessageTest.java | 34 ++++++++++++++++++++++ 5 files changed, 86 insertions(+), 7 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java index c37e1a3..6793ecb 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java @@ -566,12 +566,6 @@ public class BucketRegion extends DistributedRegion implements Bucket { return; } - boolean enableRVV = useRVV && getConcurrencyChecksEnabled(); - RegionVersionVector rvv = null; - if (enableRVV) { - rvv = getVersionVector().getCloneForTransmission(); - } - // get rvvLock Set<InternalDistributedMember> participants = getCacheDistributionAdvisor().adviseInvalidateRegion(); @@ -585,7 +579,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { // no need to dominate my own rvv. // Clear is on going here, there won't be GII for this member clearRegionLocally(regionEvent, cacheWrite, null); - distributeClearOperation(regionEvent, rvv, participants); + distributeClearOperation(regionEvent, null, participants); // TODO: call reindexUserDataRegion if there're lucene indexes } finally { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java index 0c690c5..6bb666c 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java @@ -413,6 +413,7 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply { Object[] keys = getKeysToBeLocked(); if (!notificationOnly) { boolean locked = false; + boolean rvvLocked = false; try { if (putAllPRData.length > 0) { if (this.posDup && bucketRegion.getConcurrencyChecksEnabled()) { @@ -438,6 +439,10 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply { bucketRegion.recordBulkOpStart(membershipID, eventID); } locked = bucketRegion.waitUntilLocked(keys); + if (!rvvLocked) { + bucketRegion.lockRVVForBulkOp(); + rvvLocked = true; + } boolean lockedForPrimary = false; final HashMap succeeded = new HashMap(); PutAllPartialResult partialKeys = new PutAllPartialResult(putAllPRDataSize); @@ -518,6 +523,10 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply { } catch (RegionDestroyedException e) { ds.checkRegionDestroyedOnBucket(bucketRegion, true, e); } finally { + if (rvvLocked) { + bucketRegion.unlockRVVForBulkOp(); + rvvLocked = false; + } if (locked) { bucketRegion.removeAndNotifyKeys(keys); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java index 6f355d6..f295136 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java @@ -406,6 +406,7 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply { if (!notificationOnly) { boolean locked = false; + boolean rvvLocked = false; try { if (removeAllPRData.length > 0) { if (this.posDup && bucketRegion.getConcurrencyChecksEnabled()) { @@ -431,6 +432,10 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply { bucketRegion.recordBulkOpStart(membershipID, eventID); } locked = bucketRegion.waitUntilLocked(keys); + if (!rvvLocked) { + bucketRegion.lockRVVForBulkOp(); + rvvLocked = true; + } boolean lockedForPrimary = false; final ArrayList<Object> succeeded = new ArrayList<Object>(); PutAllPartialResult partialKeys = new PutAllPartialResult(removeAllPRDataSize); @@ -526,6 +531,10 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply { } catch (RegionDestroyedException e) { ds.checkRegionDestroyedOnBucket(bucketRegion, true, e); } finally { + if (rvvLocked) { + bucketRegion.unlockRVVForBulkOp(); + rvvLocked = false; + } if (locked) { bucketRegion.removeAndNotifyKeys(keys); } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java index ab82a93..c5dd140 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java @@ -15,9 +15,12 @@ package org.apache.geode.internal.cache.partitioned; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -119,4 +122,34 @@ public class PutAllPRMessageTest { eq(regionDestroyedException)); } + @Test + public void rvvLockedAfterKeysAreLockedAndUnlockRVVBeforeKeys() throws Exception { + PutAllPRMessage message = spy(new PutAllPRMessage(bucketId, 1, false, false, false, null)); + message.addEntry(entryData); + doReturn(keys).when(message).getKeysToBeLocked(); + when(bucketRegion.waitUntilLocked(keys)).thenReturn(true); + when(bucketRegion.doLockForPrimary(false)).thenThrow(new PrimaryBucketException()); + doNothing().when(bucketRegion).lockRVVForBulkOp(); + doNothing().when(bucketRegion).unlockRVVForBulkOp(); + + InternalCache cache = mock(InternalCache.class); + InternalDistributedSystem ids = mock(InternalDistributedSystem.class); + when(bucketRegion.getCache()).thenReturn(cache); + when(cache.getDistributedSystem()).thenReturn(ids); + when(ids.getOffHeapStore()).thenReturn(null); + + try { + message.doLocalPutAll(partitionedRegion, mock(InternalDistributedMember.class), 1); + fail("Expect PrimaryBucketException"); + } catch (Exception e) { + assertThat(e instanceof PrimaryBucketException); + } + + InOrder inOrder = inOrder(bucketRegion); + inOrder.verify(bucketRegion).waitUntilLocked(keys); + inOrder.verify(bucketRegion).lockRVVForBulkOp(); + inOrder.verify(bucketRegion).unlockRVVForBulkOp(); + inOrder.verify(bucketRegion).removeAndNotifyKeys(keys); + } + } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java index 2309cb0..19d508e 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java @@ -14,9 +14,12 @@ */ package org.apache.geode.internal.cache.partitioned; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -131,4 +134,35 @@ public class RemoveAllPRMessageTest { verify(dataStore).checkRegionDestroyedOnBucket(eq(bucketRegion), eq(true), eq(regionDestroyedException)); } + + @Test + public void rvvLockedAfterKeysAreLockedAndUnlockRVVBeforeKeys() throws Exception { + RemoveAllPRMessage message = + spy(new RemoveAllPRMessage(bucketId, 1, false, false, false, null)); + message.addEntry(entryData); + doReturn(keys).when(message).getKeysToBeLocked(); + when(bucketRegion.waitUntilLocked(keys)).thenReturn(true); + when(bucketRegion.doLockForPrimary(false)).thenThrow(new PrimaryBucketException()); + doNothing().when(bucketRegion).lockRVVForBulkOp(); + doNothing().when(bucketRegion).unlockRVVForBulkOp(); + + InternalCache cache = mock(InternalCache.class); + InternalDistributedSystem ids = mock(InternalDistributedSystem.class); + when(bucketRegion.getCache()).thenReturn(cache); + when(cache.getDistributedSystem()).thenReturn(ids); + when(ids.getOffHeapStore()).thenReturn(null); + + try { + message.doLocalRemoveAll(partitionedRegion, mock(InternalDistributedMember.class), true); + fail("Expect PrimaryBucketException"); + } catch (Exception e) { + assertThat(e instanceof PrimaryBucketException); + } + + InOrder inOrder = inOrder(bucketRegion); + inOrder.verify(bucketRegion).waitUntilLocked(keys); + inOrder.verify(bucketRegion).lockRVVForBulkOp(); + inOrder.verify(bucketRegion).unlockRVVForBulkOp(); + inOrder.verify(bucketRegion).removeAndNotifyKeys(keys); + } }
