This is an automated email from the ASF dual-hosted git repository.

agingade pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new a6ebaa0  GEODE-5605: After removeAll/PutAll messages are processed on 
replicas, check for cache close is done. (#2481)
a6ebaa0 is described below

commit a6ebaa0053e3f21236ba0cd16f8f0984b1d86907
Author: agingade <[email protected]>
AuthorDate: Mon Sep 17 10:53:34 2018 -0700

    GEODE-5605: After removeAll/PutAll messages are processed on replicas, 
check for cache close is done. (#2481)
---
 .../geode/internal/cache/DistributedRegion.java    |  4 ++
 .../cache/partitioned/PutAllPRMessage.java         | 28 ++++++++-----
 .../cache/partitioned/RemoveAllPRMessage.java      | 28 ++++++++-----
 .../cache/partitioned/PutAllPRMessageTest.java     | 49 ++++++++++++++++++++++
 .../cache/partitioned/RemoveAllPRMessageTest.java  | 28 +++++++++++++
 5 files changed, 115 insertions(+), 22 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index d9f5903..42dcd98 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2477,6 +2477,10 @@ public class DistributedRegion extends LocalRegion 
implements InternalDistribute
             this.getFullPath()), ex);
       }
     }
+    waitForCurrentOperations();
+  }
+
+  protected void waitForCurrentOperations() {
     // Fix for #48066 - make sure that region operations are completely
     // distributed to peers before destroying the region.
     Boolean flushOnClose =
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
index b6450b3..a56ea79 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
@@ -507,17 +507,7 @@ public class PutAllPRMessage extends 
PartitionMessageWithDirectReply {
             // encounter cacheWriter exception
             partialKeys.saveFailedKey(key, cwe);
           } finally {
-            try {
-              // Only PutAllPRMessage knows if the thread id is fake. Event 
has no idea.
-              // So we have to manually set useFakeEventId for this DPAO
-              dpao.setUseFakeEventId(true);
-              r.checkReadiness();
-              bucketRegion.getDataView().postPutAll(dpao, this.versions, 
bucketRegion);
-            } finally {
-              if (lockedForPrimary) {
-                bucketRegion.doUnlockForPrimary();
-              }
-            }
+            doPostPutAll(r, dpao, bucketRegion, lockedForPrimary);
           }
           if (partialKeys.hasFailure()) {
             partialKeys.addKeysAndVersions(this.versions);
@@ -559,6 +549,22 @@ public class PutAllPRMessage extends 
PartitionMessageWithDirectReply {
     return true;
   }
 
+  void doPostPutAll(PartitionedRegion r, DistributedPutAllOperation dpao,
+      BucketRegion bucketRegion, boolean lockedForPrimary) {
+    try {
+      // Only PutAllPRMessage knows if the thread id is fake. Event has no 
idea.
+      // So we have to manually set useFakeEventId for this DPAO
+      dpao.setUseFakeEventId(true);
+      r.checkReadiness();
+      bucketRegion.getDataView().postPutAll(dpao, this.versions, bucketRegion);
+      r.checkReadiness();
+    } finally {
+      if (lockedForPrimary) {
+        bucketRegion.doUnlockForPrimary();
+      }
+    }
+  }
+
   public VersionedObjectList getVersions() {
     return this.versions;
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
index d9f1f47..bfc009d 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
@@ -516,17 +516,7 @@ public class RemoveAllPRMessage extends 
PartitionMessageWithDirectReply {
             // encounter cacheWriter exception
             partialKeys.saveFailedKey(key, cwe);
           } finally {
-            try {
-              // Only RemoveAllPRMessage knows if the thread id is fake. Event 
has no idea.
-              // So we have to manually set useFakeEventId for this op
-              op.setUseFakeEventId(true);
-              r.checkReadiness();
-              bucketRegion.getDataView().postRemoveAll(op, this.versions, 
bucketRegion);
-            } finally {
-              if (lockedForPrimary) {
-                bucketRegion.doUnlockForPrimary();
-              }
-            }
+            doPostRemoveAll(r, op, bucketRegion, lockedForPrimary);
           }
           if (partialKeys.hasFailure()) {
             partialKeys.addKeysAndVersions(this.versions);
@@ -567,6 +557,22 @@ public class RemoveAllPRMessage extends 
PartitionMessageWithDirectReply {
     return true;
   }
 
+  void doPostRemoveAll(PartitionedRegion r, DistributedRemoveAllOperation op,
+      BucketRegion bucketRegion, boolean lockedForPrimary) {
+    try {
+      // Only RemoveAllPRMessage knows if the thread id is fake. Event has no 
idea.
+      // So we have to manually set useFakeEventId for this op
+      op.setUseFakeEventId(true);
+      r.checkReadiness();
+      bucketRegion.getDataView().postRemoveAll(op, this.versions, 
bucketRegion);
+      r.checkReadiness();
+    } finally {
+      if (lockedForPrimary) {
+        bucketRegion.doUnlockForPrimary();
+      }
+    }
+  }
+
   public VersionedObjectList getVersions() {
     return this.versions;
   }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java
new file mode 100644
index 0000000..75650ff
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned;
+
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+import org.mockito.InOrder;
+
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DistributedPutAllOperation;
+import org.apache.geode.internal.cache.InternalDataView;
+import org.apache.geode.internal.cache.PartitionedRegion;
+
+public class PutAllPRMessageTest {
+
+  @Test
+  public void doPostPutAllCallsCheckReadinessBeforeAndAfter() throws Exception 
{
+    PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+    DistributedPutAllOperation distributedPutAllOperation = 
mock(DistributedPutAllOperation.class);
+    BucketRegion bucketRegion = mock(BucketRegion.class);
+    InternalDataView internalDataView = mock(InternalDataView.class);
+    when(bucketRegion.getDataView()).thenReturn(internalDataView);
+    PutAllPRMessage putAllPRMessage = new PutAllPRMessage();
+
+    putAllPRMessage.doPostPutAll(partitionedRegion, 
distributedPutAllOperation, bucketRegion, true);
+
+    InOrder inOrder = inOrder(partitionedRegion, internalDataView);
+    inOrder.verify(partitionedRegion).checkReadiness();
+    inOrder.verify(internalDataView).postPutAll(any(), any(), any());
+    inOrder.verify(partitionedRegion).checkReadiness();
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java
index e9e4fb0..f8f7102 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java
@@ -14,12 +14,20 @@
  */
 package org.apache.geode.internal.cache.partitioned;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import org.junit.Test;
+import org.mockito.InOrder;
 
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DistributedRemoveAllOperation;
+import org.apache.geode.internal.cache.InternalDataView;
+import org.apache.geode.internal.cache.PartitionedRegion;
 
 public class RemoveAllPRMessageTest {
 
@@ -32,4 +40,24 @@ public class RemoveAllPRMessageTest {
 
     verify(mockRemoveAllPRMessage, times(1)).appendFields(stringBuilder);
   }
+
+
+  @Test
+  public void doPostRemoveAllCallsCheckReadinessBeforeAndAfter() throws 
Exception {
+    PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+    DistributedRemoveAllOperation distributedRemoveAllOperation =
+        mock(DistributedRemoveAllOperation.class);
+    BucketRegion bucketRegion = mock(BucketRegion.class);
+    InternalDataView internalDataView = mock(InternalDataView.class);
+    when(bucketRegion.getDataView()).thenReturn(internalDataView);
+    RemoveAllPRMessage removeAllPRMessage = new RemoveAllPRMessage();
+
+    removeAllPRMessage.doPostRemoveAll(partitionedRegion, 
distributedRemoveAllOperation,
+        bucketRegion, true);
+
+    InOrder inOrder = inOrder(partitionedRegion, internalDataView);
+    inOrder.verify(partitionedRegion).checkReadiness();
+    inOrder.verify(internalDataView).postRemoveAll(any(), any(), any());
+    inOrder.verify(partitionedRegion).checkReadiness();
+  }
 }

Reply via email to