This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-8334
in repository https://gitbox.apache.org/repos/asf/geode.git

commit a4104033ceb20e80b491ed4b3ae89da2c6969f5f
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Mon Jul 6 17:46:14 2020 -0700

    GEODE-8334: PutAll/RemoveAll at primary bucket should get rvvLock to sync 
with clear
---
 .../apache/geode/internal/cache/partitioned/PutAllPRMessage.java  | 8 ++++++++
 .../geode/internal/cache/partitioned/RemoveAllPRMessage.java      | 8 ++++++++
 2 files changed, 16 insertions(+)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
index 5c2cf3d..fcd379a 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
@@ -418,6 +418,7 @@ public class PutAllPRMessage extends 
PartitionMessageWithDirectReply {
       Object[] keys = getKeysToBeLocked();
       if (!notificationOnly) {
         boolean locked = false;
+        boolean rvvLocked = false;
         try {
           if (putAllPRData.length > 0) {
             if (this.posDup && bucketRegion.getConcurrencyChecksEnabled()) {
@@ -443,6 +444,10 @@ public class PutAllPRMessage extends 
PartitionMessageWithDirectReply {
             bucketRegion.recordBulkOpStart(membershipID, eventID);
           }
           locked = bucketRegion.waitUntilLocked(keys);
+          if (!rvvLocked) {
+            bucketRegion.lockRVVForBulkOp();
+            rvvLocked = true;
+          }
           boolean lockedForPrimary = false;
           final HashMap succeeded = new HashMap();
           PutAllPartialResult partialKeys = new 
PutAllPartialResult(putAllPRDataSize);
@@ -523,6 +528,9 @@ public class PutAllPRMessage extends 
PartitionMessageWithDirectReply {
         } catch (RegionDestroyedException e) {
           ds.checkRegionDestroyedOnBucket(bucketRegion, true, e);
         } finally {
+          if (rvvLocked) {
+            bucketRegion.unlockRVVForBulkOp();
+          }
           if (locked) {
             bucketRegion.removeAndNotifyKeys(keys);
           }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
index 6e05a41..51fdb43 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
@@ -412,6 +412,7 @@ public class RemoveAllPRMessage extends 
PartitionMessageWithDirectReply {
 
       if (!notificationOnly) {
         boolean locked = false;
+        boolean rvvLocked = false;
         try {
           if (removeAllPRData.length > 0) {
             if (this.posDup && bucketRegion.getConcurrencyChecksEnabled()) {
@@ -437,6 +438,10 @@ public class RemoveAllPRMessage extends 
PartitionMessageWithDirectReply {
             bucketRegion.recordBulkOpStart(membershipID, eventID);
           }
           locked = bucketRegion.waitUntilLocked(keys);
+          if (!rvvLocked) {
+            bucketRegion.lockRVVForBulkOp();
+            rvvLocked = true;
+          }
           boolean lockedForPrimary = false;
           final ArrayList<Object> succeeded = new ArrayList<Object>();
           PutAllPartialResult partialKeys = new 
PutAllPartialResult(removeAllPRDataSize);
@@ -532,6 +537,9 @@ public class RemoveAllPRMessage extends 
PartitionMessageWithDirectReply {
         } catch (RegionDestroyedException e) {
           ds.checkRegionDestroyedOnBucket(bucketRegion, true, e);
         } finally {
+          if (rvvLocked) {
+            bucketRegion.unlockRVVForBulkOp();
+          }
           if (locked) {
             bucketRegion.removeAndNotifyKeys(keys);
           }

Reply via email to