This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-8334 in repository https://gitbox.apache.org/repos/asf/geode.git
commit a4104033ceb20e80b491ed4b3ae89da2c6969f5f Author: zhouxh <gz...@pivotal.io> AuthorDate: Mon Jul 6 17:46:14 2020 -0700 GEODE-8334: PutAll/RemoveAll at primary bucket should get rvvLock to sync with clear --- .../apache/geode/internal/cache/partitioned/PutAllPRMessage.java | 8 ++++++++ .../geode/internal/cache/partitioned/RemoveAllPRMessage.java | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java index 5c2cf3d..fcd379a 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java @@ -418,6 +418,7 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply { Object[] keys = getKeysToBeLocked(); if (!notificationOnly) { boolean locked = false; + boolean rvvLocked = false; try { if (putAllPRData.length > 0) { if (this.posDup && bucketRegion.getConcurrencyChecksEnabled()) { @@ -443,6 +444,10 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply { bucketRegion.recordBulkOpStart(membershipID, eventID); } locked = bucketRegion.waitUntilLocked(keys); + if (!rvvLocked) { + bucketRegion.lockRVVForBulkOp(); + rvvLocked = true; + } boolean lockedForPrimary = false; final HashMap succeeded = new HashMap(); PutAllPartialResult partialKeys = new PutAllPartialResult(putAllPRDataSize); @@ -523,6 +528,9 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply { } catch (RegionDestroyedException e) { ds.checkRegionDestroyedOnBucket(bucketRegion, true, e); } finally { + if (rvvLocked) { + bucketRegion.unlockRVVForBulkOp(); + } if (locked) { bucketRegion.removeAndNotifyKeys(keys); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java index 6e05a41..51fdb43 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java @@ -412,6 +412,7 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply { if (!notificationOnly) { boolean locked = false; + boolean rvvLocked = false; try { if (removeAllPRData.length > 0) { if (this.posDup && bucketRegion.getConcurrencyChecksEnabled()) { @@ -437,6 +438,10 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply { bucketRegion.recordBulkOpStart(membershipID, eventID); } locked = bucketRegion.waitUntilLocked(keys); + if (!rvvLocked) { + bucketRegion.lockRVVForBulkOp(); + rvvLocked = true; + } boolean lockedForPrimary = false; final ArrayList<Object> succeeded = new ArrayList<Object>(); PutAllPartialResult partialKeys = new PutAllPartialResult(removeAllPRDataSize); @@ -532,6 +537,9 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply { } catch (RegionDestroyedException e) { ds.checkRegionDestroyedOnBucket(bucketRegion, true, e); } finally { + if (rvvLocked) { + bucketRegion.unlockRVVForBulkOp(); + } if (locked) { bucketRegion.removeAndNotifyKeys(keys); }