This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-5255
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-5255 by this 
push:
     new e96350f  GEODE-5255: check all colocated partitioned regions are ready 
before creating any buckets.
e96350f is described below

commit e96350f556ada8e90fd91976a7fe8b32fcb43a21
Author: eshu <[email protected]>
AuthorDate: Fri May 25 14:17:15 2018 -0700

    GEODE-5255: check all colocated partitioned regions are ready before 
creating any buckets.
---
 .../internal/cache/PartitionedRegionDataStore.java |  37 ++++-
 .../partitioned/ManageBackupBucketMessage.java     |  64 ++++++--
 .../cache/PartitionedRegionDataStoreTest.java      | 171 +++++++++++++++++++++
 .../ManageBackupBucketReplyMessageTest.java        | 125 +++++++++++++++
 4 files changed, 379 insertions(+), 18 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index dc8f151..ee41293 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -265,6 +265,41 @@ public class PartitionedRegionDataStore implements 
HasCachePerfStats {
     return numPrimaries.get();
   }
 
+  public boolean isPartitionedRegionReady(PartitionedRegion partitionedRegion, 
final int bucketId) {
+    List<PartitionedRegion> colocatedWithList = 
getColocatedChildRegions(partitionedRegion);
+    if (colocatedWithList.size() == 0) {
+      return partitionedRegion.isInitialized();
+    }
+    Iterator itr = colocatedWithList.iterator();
+    return areAllColocatedPartitionedRegionsReady(bucketId, itr);
+  }
+
+  private boolean areAllColocatedPartitionedRegionsReady(int bucketId, 
Iterator itr) {
+    while (itr.hasNext()) {
+      PartitionedRegion colocatedChildRegion = (PartitionedRegion) itr.next();
+      boolean success = 
isColocatedPartitionedRegionInitialized(colocatedChildRegion, bucketId);
+      if (!success) {
+        return success;
+      }
+    }
+    return true;
+  }
+
+  private boolean isColocatedPartitionedRegionInitialized(PartitionedRegion 
partitionedRegion,
+      final int bucketId) {
+    if (!partitionedRegion.isInitialized()
+        || !(partitionedRegion.getDataStore().isColocationComplete(bucketId))) 
{
+      return false;
+    }
+    List<PartitionedRegion> colocatedWithList = 
getColocatedChildRegions(partitionedRegion);
+    Iterator itr = colocatedWithList.iterator();
+    return areAllColocatedPartitionedRegionsReady(bucketId, itr);
+  }
+
+  List<PartitionedRegion> getColocatedChildRegions(PartitionedRegion 
partitionedRegion) {
+    return ColocationHelper.getColocatedChildRegions(partitionedRegion);
+  }
+
   /**
    * Try to grab buckets for all the colocated regions /* In case we can't 
grab buckets there is no
    * going back
@@ -621,7 +656,7 @@ public class PartitionedRegionDataStore implements 
HasCachePerfStats {
     return true;
   }
 
-  private boolean isColocationComplete(int bucketId) {
+  boolean isColocationComplete(int bucketId) {
 
     if (!ColocationHelper.isColocationComplete(this.partitionedRegion)) {
       ProxyBucketRegion pb =
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ManageBackupBucketMessage.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ManageBackupBucketMessage.java
index 08d8164..b3b31da 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ManageBackupBucketMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ManageBackupBucketMessage.java
@@ -64,12 +64,16 @@ public class ManageBackupBucketMessage extends 
PartitionMessage {
 
   private boolean forceCreation = true;
 
+  private enum ReplyType {
+    INITIALIZING, SUCCESS, FAIL;
+  }
+
   /**
    * Empty constructor to satisfy {@link DataSerializer} requirements
    */
   public ManageBackupBucketMessage() {}
 
-  private ManageBackupBucketMessage(InternalDistributedMember recipient, int 
regionId,
+  ManageBackupBucketMessage(InternalDistributedMember recipient, int regionId,
       ReplyProcessor21 processor, int bucketId, boolean isRebalance, boolean 
replaceOfflineData,
       InternalDistributedMember moveSource, boolean forceCreation) {
     super(recipient, regionId, processor);
@@ -128,36 +132,53 @@ public class ManageBackupBucketMessage extends 
PartitionMessage {
    * indefinitely for the acknowledgement
    */
   @Override
-  protected boolean operateOnPartitionedRegion(ClusterDistributionManager dm, 
PartitionedRegion r,
-      long startTime) {
+  protected boolean operateOnPartitionedRegion(ClusterDistributionManager dm,
+      PartitionedRegion partitionedRegion, long startTime) {
     if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
       logger.trace(LogMarker.DM_VERBOSE, "ManageBucketMessage operateOnRegion: 
{}",
-          r.getFullPath());
+          partitionedRegion.getFullPath());
     }
 
-    // This is to ensure that initialization is complete before bucket 
creation request is
-    // serviced. BUGFIX for 35888
-    if (!r.isInitialized()) {
+    partitionedRegion.checkReadiness(); // Don't allow closed 
PartitionedRegions that have
+                                        // datastores to host buckets
+    PartitionedRegionDataStore prDs = partitionedRegion.getDataStore();
+
+    // This is to ensure that initialization is complete for all colocated 
regions
+    // before bucket creation request is serviced. BUGFIX for 35888
+    // GEODE-5255
+    boolean isReady = prDs.isPartitionedRegionReady(partitionedRegion, 
bucketId);
+    if (!isReady) {
       // This VM is NOT ready to manage a new bucket, refuse operation
-      ManageBackupBucketReplyMessage.sendStillInitializing(getSender(), 
getProcessorId(), dm);
+      sendManageBackupBucketReplyMessage(dm, partitionedRegion, startTime, 
ReplyType.INITIALIZING);
       return false;
     }
 
-    r.checkReadiness(); // Don't allow closed PartitionedRegions that have 
datastores to host
-                        // buckets
-    PartitionedRegionDataStore prDs = r.getDataStore();
     boolean managingBucket = prDs.grabBucket(this.bucketId, this.moveSource, 
this.forceCreation,
         replaceOfflineData, this.isRebalance, null, false) == 
CreateBucketResult.CREATED;
 
-    r.getPrStats().endPartitionMessagesProcessing(startTime);
-    if (managingBucket) {
-      ManageBackupBucketReplyMessage.sendAcceptance(getSender(), 
getProcessorId(), dm);
-    } else {
-      ManageBackupBucketReplyMessage.sendRefusal(getSender(), 
getProcessorId(), dm);
-    }
+    sendManageBackupBucketReplyMessage(dm, partitionedRegion, startTime,
+        managingBucket ? ReplyType.SUCCESS : ReplyType.FAIL);
     return false;
   }
 
+  private void sendManageBackupBucketReplyMessage(ClusterDistributionManager 
dm,
+      PartitionedRegion partitionedRegion, long startTime, ReplyType type) {
+    partitionedRegion.getPrStats().endPartitionMessagesProcessing(startTime);
+    switch (type) {
+      case INITIALIZING:
+        ManageBackupBucketReplyMessage.sendStillInitializing(getSender(), 
getProcessorId(), dm);
+        break;
+      case FAIL:
+        ManageBackupBucketReplyMessage.sendRefusal(getSender(), 
getProcessorId(), dm);
+        break;
+      case SUCCESS:
+        ManageBackupBucketReplyMessage.sendAcceptance(getSender(), 
getProcessorId(), dm);
+        break;
+      default:
+        throw new RuntimeException("unreachable");
+    }
+  }
+
   public int getDSFID() {
     return PR_MANAGE_BACKUP_BUCKET_MESSAGE;
   }
@@ -219,6 +240,7 @@ public class ManageBackupBucketMessage extends 
PartitionMessage {
    */
   public static class ManageBackupBucketReplyMessage extends ReplyMessage {
 
+
     protected boolean acceptedBucket;
 
     /** true if the vm refused because it was still in initialization */
@@ -239,6 +261,14 @@ public class ManageBackupBucketMessage extends 
PartitionMessage {
       this.notYetInitialized = initializing;
     }
 
+    boolean isAcceptedBucket() {
+      return acceptedBucket;
+    }
+
+    boolean isNotYetInitialized() {
+      return notYetInitialized;
+    }
+
     /**
      * Refuse the request to manage the bucket
      *
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreTest.java
new file mode 100644
index 0000000..913a728
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mock;
+
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class PartitionedRegionDataStoreTest {
+  @Mock
+  private PartitionedRegion partitionedRegion;
+  @Mock
+  private PartitionedRegion childRegion1;
+  @Mock
+  private PartitionedRegion childRegion2;
+  @Mock
+  private PartitionedRegion grandChildRegion1_1;
+  @Mock
+  private PartitionedRegion grandChildRegion1_2;
+  @Mock
+  private PartitionedRegion grandChildRegion2_1;
+  @Mock
+  private PartitionedRegion grandChildRegion2_2;
+  @Mock
+  private PartitionedRegion grandChildRegion2_3;
+  @Mock
+  private PartitionedRegionDataStore colocatedRegionDateStore;
+  @Mock
+  private PartitionedRegionDataStore grandChildRegionDateStore2_3;
+  private final int bucketId = 29;
+
+  @Before
+  public void setup() {
+    initMocks(this);
+
+    when(partitionedRegion.isInitialized()).thenReturn(true);
+    when(childRegion1.isInitialized()).thenReturn(true);
+    when(childRegion2.isInitialized()).thenReturn(true);
+    when(grandChildRegion1_1.isInitialized()).thenReturn(true);
+    when(grandChildRegion1_2.isInitialized()).thenReturn(true);
+    when(grandChildRegion2_1.isInitialized()).thenReturn(true);
+    when(grandChildRegion2_2.isInitialized()).thenReturn(true);
+    when(grandChildRegion2_3.isInitialized()).thenReturn(true);
+
+    when(childRegion1.getDataStore()).thenReturn(colocatedRegionDateStore);
+    when(childRegion2.getDataStore()).thenReturn(colocatedRegionDateStore);
+    
when(grandChildRegion1_1.getDataStore()).thenReturn(colocatedRegionDateStore);
+    
when(grandChildRegion1_2.getDataStore()).thenReturn(colocatedRegionDateStore);
+    
when(grandChildRegion2_1.getDataStore()).thenReturn(colocatedRegionDateStore);
+    
when(grandChildRegion2_2.getDataStore()).thenReturn(colocatedRegionDateStore);
+    
when(grandChildRegion2_3.getDataStore()).thenReturn(grandChildRegionDateStore2_3);
+
+    
when(colocatedRegionDateStore.isColocationComplete(bucketId)).thenReturn(true);
+    
when(grandChildRegionDateStore2_3.isColocationComplete(bucketId)).thenReturn(true);
+  }
+
+  @Test
+  public void 
initializedPartitionedRegionWithoutColocationReturnsRegionReady() {
+    PartitionedRegionDataStore partitionedRegionDataStore = spy(new 
PartitionedRegionDataStore());
+    List<PartitionedRegion> colocatedChildRegions = new 
ArrayList<PartitionedRegion>();
+
+    doReturn(colocatedChildRegions).when(partitionedRegionDataStore)
+        .getColocatedChildRegions(partitionedRegion);
+
+    
assertThat(partitionedRegionDataStore.isPartitionedRegionReady(partitionedRegion,
 bucketId))
+        .isTrue();
+  }
+
+  @Test
+  public void 
notInitializedPartitionedRegionWithoutColocationReturnsRegionNotReady() {
+    PartitionedRegionDataStore partitionedRegionDataStore = spy(new 
PartitionedRegionDataStore());
+    List<PartitionedRegion> colocatedChildRegions = new 
ArrayList<PartitionedRegion>();
+
+    doReturn(colocatedChildRegions).when(partitionedRegionDataStore)
+        .getColocatedChildRegions(partitionedRegion);
+    when(partitionedRegion.isInitialized()).thenReturn(false);
+
+    
assertThat(partitionedRegionDataStore.isPartitionedRegionReady(partitionedRegion,
 bucketId))
+        .isFalse();
+  }
+
+  @Test
+  public void returnRegionReadyIfAllColocatedRegionsAreReady() {
+    PartitionedRegionDataStore partitionedRegionDataStore = spy(new 
PartitionedRegionDataStore());
+
+    setupColocatedRegions(partitionedRegionDataStore);
+
+    
assertThat(partitionedRegionDataStore.isPartitionedRegionReady(partitionedRegion,
 bucketId))
+        .isTrue();
+  }
+
+  private void setupColocatedRegions(PartitionedRegionDataStore 
partitionedRegionDataStore) {
+    List<PartitionedRegion> colocatedChildRegions = new 
ArrayList<PartitionedRegion>();
+    colocatedChildRegions.add(childRegion1);
+    colocatedChildRegions.add(childRegion2);
+    doReturn(colocatedChildRegions).when(partitionedRegionDataStore)
+        .getColocatedChildRegions(partitionedRegion);
+
+    List<PartitionedRegion> childRegion1ColocatedRegions = new 
ArrayList<PartitionedRegion>();
+    colocatedChildRegions.add(grandChildRegion1_1);
+    colocatedChildRegions.add(grandChildRegion1_2);
+    doReturn(childRegion1ColocatedRegions).when(partitionedRegionDataStore)
+        .getColocatedChildRegions(childRegion1);
+
+    List<PartitionedRegion> childRegion2ColocatedRegions = new 
ArrayList<PartitionedRegion>();
+    colocatedChildRegions.add(grandChildRegion2_1);
+    colocatedChildRegions.add(grandChildRegion2_2);
+    colocatedChildRegions.add(grandChildRegion2_3);
+    doReturn(childRegion2ColocatedRegions).when(partitionedRegionDataStore)
+        .getColocatedChildRegions(childRegion2);
+
+    List<PartitionedRegion> emptyList = new ArrayList<PartitionedRegion>();
+    doReturn(emptyList).when(partitionedRegionDataStore)
+        .getColocatedChildRegions(grandChildRegion1_1);
+    doReturn(emptyList).when(partitionedRegionDataStore)
+        .getColocatedChildRegions(grandChildRegion1_2);
+    doReturn(emptyList).when(partitionedRegionDataStore)
+        .getColocatedChildRegions(grandChildRegion2_1);
+    doReturn(emptyList).when(partitionedRegionDataStore)
+        .getColocatedChildRegions(grandChildRegion2_2);
+    doReturn(emptyList).when(partitionedRegionDataStore)
+        .getColocatedChildRegions(grandChildRegion2_3);
+  }
+
+  @Test
+  public void 
returnRegionNotReadyIfColocationNotCompletedForAColocatedRegion() {
+    PartitionedRegionDataStore partitionedRegionDataStore = spy(new 
PartitionedRegionDataStore());
+
+    setupColocatedRegions(partitionedRegionDataStore);
+    
when(grandChildRegionDateStore2_3.isColocationComplete(bucketId)).thenReturn(false);
+
+    
assertThat(partitionedRegionDataStore.isPartitionedRegionReady(partitionedRegion,
 bucketId))
+        .isFalse();
+  }
+
+  @Test
+  public void returnRegionNotReadyIfAColocatedRegionIsNotInitialized() {
+    PartitionedRegionDataStore partitionedRegionDataStore = spy(new 
PartitionedRegionDataStore());
+
+    setupColocatedRegions(partitionedRegionDataStore);
+    when(grandChildRegion2_2.isInitialized()).thenReturn(false);
+
+    
assertThat(partitionedRegionDataStore.isPartitionedRegionReady(partitionedRegion,
 bucketId))
+        .isFalse();
+  }
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ManageBackupBucketReplyMessageTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ManageBackupBucketReplyMessageTest.java
new file mode 100644
index 0000000..97d9663
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ManageBackupBucketReplyMessageTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionDataStore;
+import org.apache.geode.internal.cache.PartitionedRegionStats;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class ManageBackupBucketReplyMessageTest {
+  @Mock
+  private ClusterDistributionManager distributionManager;
+  @Mock
+  private PartitionedRegion partitionedRegion;
+  @Mock
+  private PartitionedRegionDataStore partitionedRegionDataStore;
+  @Mock
+  private PartitionedRegionStats partitionedRegionStats;
+  @Mock
+  private InternalDistributedMember source;
+  @Mock
+  private InternalDistributedMember recipent;
+  @Mock
+  private ReplyProcessor21 processor;
+  @Captor
+  private 
ArgumentCaptor<ManageBackupBucketMessage.ManageBackupBucketReplyMessage> 
replyMessage;
+  private final int bucketId = 15;
+  private final int regionId = 2;
+  private final boolean isReblance = true;
+  private final boolean replaceOfflineDate = false;
+  private final boolean forecCreation = true;
+
+  @Before
+  public void setup() {
+    initMocks(this);
+
+    
when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
+    when(partitionedRegion.getPrStats()).thenReturn(partitionedRegionStats);
+  }
+
+  @Test
+  public void sendStillInitializingReplyIfColocatedRegionsAreInitializing() {
+    ManageBackupBucketMessage message = new ManageBackupBucketMessage();
+
+    
when(partitionedRegionDataStore.isPartitionedRegionReady(partitionedRegion, 
bucketId))
+        .thenReturn(false);
+
+    message.operateOnPartitionedRegion(distributionManager, partitionedRegion, 
1);
+
+    verify(distributionManager, times(1)).putOutgoing(replyMessage.capture());
+
+    assertThat(replyMessage.getValue().isAcceptedBucket()).isFalse();
+    assertThat(replyMessage.getValue().isNotYetInitialized()).isTrue();
+  }
+
+  @Test
+  public void sendRefusalReplyIfDataStoreNotGrabbedBucket() {
+    ManageBackupBucketMessage message = spy(new 
ManageBackupBucketMessage(recipent, regionId,
+        processor, bucketId, isReblance, replaceOfflineDate, source, 
forecCreation));
+
+    
when(partitionedRegionDataStore.isPartitionedRegionReady(partitionedRegion, 
bucketId))
+        .thenReturn(true);
+    when(partitionedRegionDataStore.grabBucket(bucketId, source, 
forecCreation, replaceOfflineDate,
+        isReblance, null, 
false)).thenReturn(PartitionedRegionDataStore.CreateBucketResult.FAILED);
+    doReturn(recipent).when(message).getSender();
+
+    message.operateOnPartitionedRegion(distributionManager, partitionedRegion, 
1);
+
+    verify(distributionManager, times(1)).putOutgoing(replyMessage.capture());
+
+    assertThat(replyMessage.getValue().isAcceptedBucket()).isFalse();
+    assertThat(replyMessage.getValue().isNotYetInitialized()).isFalse();
+  }
+
+  @Test
+  public void sendAcceptanceReplyIfDataStoreGrabbedBucket() {
+    ManageBackupBucketMessage message = spy(new 
ManageBackupBucketMessage(recipent, regionId,
+        processor, bucketId, isReblance, replaceOfflineDate, source, 
forecCreation));
+
+    
when(partitionedRegionDataStore.isPartitionedRegionReady(partitionedRegion, 
bucketId))
+        .thenReturn(true);
+    when(partitionedRegionDataStore.grabBucket(bucketId, source, 
forecCreation, replaceOfflineDate,
+        isReblance, null, 
false)).thenReturn(PartitionedRegionDataStore.CreateBucketResult.CREATED);
+    doReturn(recipent).when(message).getSender();
+
+    message.operateOnPartitionedRegion(distributionManager, partitionedRegion, 
1);
+
+    verify(distributionManager, times(1)).putOutgoing(replyMessage.capture());
+
+    assertThat(replyMessage.getValue().isAcceptedBucket()).isTrue();
+    assertThat(replyMessage.getValue().isNotYetInitialized()).isFalse();
+  }
+
+}

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to