This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch feature/GEODE-5255
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-5255 by this
push:
new e96350f GEODE-5255: check all colocated partitioned regions are ready
before creating any buckets.
e96350f is described below
commit e96350f556ada8e90fd91976a7fe8b32fcb43a21
Author: eshu <[email protected]>
AuthorDate: Fri May 25 14:17:15 2018 -0700
GEODE-5255: check all colocated partitioned regions are ready before
creating any buckets.
---
.../internal/cache/PartitionedRegionDataStore.java | 37 ++++-
.../partitioned/ManageBackupBucketMessage.java | 64 ++++++--
.../cache/PartitionedRegionDataStoreTest.java | 171 +++++++++++++++++++++
.../ManageBackupBucketReplyMessageTest.java | 125 +++++++++++++++
4 files changed, 379 insertions(+), 18 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index dc8f151..ee41293 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -265,6 +265,41 @@ public class PartitionedRegionDataStore implements
HasCachePerfStats {
return numPrimaries.get();
}
+ public boolean isPartitionedRegionReady(PartitionedRegion partitionedRegion,
final int bucketId) {
+ List<PartitionedRegion> colocatedWithList =
getColocatedChildRegions(partitionedRegion);
+ if (colocatedWithList.size() == 0) {
+ return partitionedRegion.isInitialized();
+ }
+ Iterator itr = colocatedWithList.iterator();
+ return areAllColocatedPartitionedRegionsReady(bucketId, itr);
+ }
+
+ private boolean areAllColocatedPartitionedRegionsReady(int bucketId,
Iterator itr) {
+ while (itr.hasNext()) {
+ PartitionedRegion colocatedChildRegion = (PartitionedRegion) itr.next();
+ boolean success =
isColocatedPartitionedRegionInitialized(colocatedChildRegion, bucketId);
+ if (!success) {
+ return success;
+ }
+ }
+ return true;
+ }
+
+ private boolean isColocatedPartitionedRegionInitialized(PartitionedRegion
partitionedRegion,
+ final int bucketId) {
+ if (!partitionedRegion.isInitialized()
+ || !(partitionedRegion.getDataStore().isColocationComplete(bucketId)))
{
+ return false;
+ }
+ List<PartitionedRegion> colocatedWithList =
getColocatedChildRegions(partitionedRegion);
+ Iterator itr = colocatedWithList.iterator();
+ return areAllColocatedPartitionedRegionsReady(bucketId, itr);
+ }
+
+ List<PartitionedRegion> getColocatedChildRegions(PartitionedRegion
partitionedRegion) {
+ return ColocationHelper.getColocatedChildRegions(partitionedRegion);
+ }
+
/**
* Try to grab buckets for all the colocated regions /* In case we can't
grab buckets there is no
* going back
@@ -621,7 +656,7 @@ public class PartitionedRegionDataStore implements
HasCachePerfStats {
return true;
}
- private boolean isColocationComplete(int bucketId) {
+ boolean isColocationComplete(int bucketId) {
if (!ColocationHelper.isColocationComplete(this.partitionedRegion)) {
ProxyBucketRegion pb =
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ManageBackupBucketMessage.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ManageBackupBucketMessage.java
index 08d8164..b3b31da 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ManageBackupBucketMessage.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ManageBackupBucketMessage.java
@@ -64,12 +64,16 @@ public class ManageBackupBucketMessage extends
PartitionMessage {
private boolean forceCreation = true;
+ private enum ReplyType {
+ INITIALIZING, SUCCESS, FAIL;
+ }
+
/**
* Empty constructor to satisfy {@link DataSerializer} requirements
*/
public ManageBackupBucketMessage() {}
- private ManageBackupBucketMessage(InternalDistributedMember recipient, int
regionId,
+ ManageBackupBucketMessage(InternalDistributedMember recipient, int regionId,
ReplyProcessor21 processor, int bucketId, boolean isRebalance, boolean
replaceOfflineData,
InternalDistributedMember moveSource, boolean forceCreation) {
super(recipient, regionId, processor);
@@ -128,36 +132,53 @@ public class ManageBackupBucketMessage extends
PartitionMessage {
* indefinitely for the acknowledgement
*/
@Override
- protected boolean operateOnPartitionedRegion(ClusterDistributionManager dm,
PartitionedRegion r,
- long startTime) {
+ protected boolean operateOnPartitionedRegion(ClusterDistributionManager dm,
+ PartitionedRegion partitionedRegion, long startTime) {
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace(LogMarker.DM_VERBOSE, "ManageBucketMessage operateOnRegion:
{}",
- r.getFullPath());
+ partitionedRegion.getFullPath());
}
- // This is to ensure that initialization is complete before bucket
creation request is
- // serviced. BUGFIX for 35888
- if (!r.isInitialized()) {
+ partitionedRegion.checkReadiness(); // Don't allow closed
PartitionedRegions that have
+ // datastores to host buckets
+ PartitionedRegionDataStore prDs = partitionedRegion.getDataStore();
+
+ // This is to ensure that initialization is complete for all colocated
regions
+ // before bucket creation request is serviced. BUGFIX for 35888
+ // GEODE-5255
+ boolean isReady = prDs.isPartitionedRegionReady(partitionedRegion,
bucketId);
+ if (!isReady) {
// This VM is NOT ready to manage a new bucket, refuse operation
- ManageBackupBucketReplyMessage.sendStillInitializing(getSender(),
getProcessorId(), dm);
+ sendManageBackupBucketReplyMessage(dm, partitionedRegion, startTime,
ReplyType.INITIALIZING);
return false;
}
- r.checkReadiness(); // Don't allow closed PartitionedRegions that have
datastores to host
- // buckets
- PartitionedRegionDataStore prDs = r.getDataStore();
boolean managingBucket = prDs.grabBucket(this.bucketId, this.moveSource,
this.forceCreation,
replaceOfflineData, this.isRebalance, null, false) ==
CreateBucketResult.CREATED;
- r.getPrStats().endPartitionMessagesProcessing(startTime);
- if (managingBucket) {
- ManageBackupBucketReplyMessage.sendAcceptance(getSender(),
getProcessorId(), dm);
- } else {
- ManageBackupBucketReplyMessage.sendRefusal(getSender(),
getProcessorId(), dm);
- }
+ sendManageBackupBucketReplyMessage(dm, partitionedRegion, startTime,
+ managingBucket ? ReplyType.SUCCESS : ReplyType.FAIL);
return false;
}
+ private void sendManageBackupBucketReplyMessage(ClusterDistributionManager
dm,
+ PartitionedRegion partitionedRegion, long startTime, ReplyType type) {
+ partitionedRegion.getPrStats().endPartitionMessagesProcessing(startTime);
+ switch (type) {
+ case INITIALIZING:
+ ManageBackupBucketReplyMessage.sendStillInitializing(getSender(),
getProcessorId(), dm);
+ break;
+ case FAIL:
+ ManageBackupBucketReplyMessage.sendRefusal(getSender(),
getProcessorId(), dm);
+ break;
+ case SUCCESS:
+ ManageBackupBucketReplyMessage.sendAcceptance(getSender(),
getProcessorId(), dm);
+ break;
+ default:
+ throw new RuntimeException("unreachable");
+ }
+ }
+
public int getDSFID() {
return PR_MANAGE_BACKUP_BUCKET_MESSAGE;
}
@@ -219,6 +240,7 @@ public class ManageBackupBucketMessage extends
PartitionMessage {
*/
public static class ManageBackupBucketReplyMessage extends ReplyMessage {
+
protected boolean acceptedBucket;
/** true if the vm refused because it was still in initialization */
@@ -239,6 +261,14 @@ public class ManageBackupBucketMessage extends
PartitionMessage {
this.notYetInitialized = initializing;
}
+ boolean isAcceptedBucket() {
+ return acceptedBucket;
+ }
+
+ boolean isNotYetInitialized() {
+ return notYetInitialized;
+ }
+
/**
* Refuse the request to manage the bucket
*
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreTest.java
new file mode 100644
index 0000000..913a728
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDataStoreTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mock;
+
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class PartitionedRegionDataStoreTest {
+ @Mock
+ private PartitionedRegion partitionedRegion;
+ @Mock
+ private PartitionedRegion childRegion1;
+ @Mock
+ private PartitionedRegion childRegion2;
+ @Mock
+ private PartitionedRegion grandChildRegion1_1;
+ @Mock
+ private PartitionedRegion grandChildRegion1_2;
+ @Mock
+ private PartitionedRegion grandChildRegion2_1;
+ @Mock
+ private PartitionedRegion grandChildRegion2_2;
+ @Mock
+ private PartitionedRegion grandChildRegion2_3;
+ @Mock
+ private PartitionedRegionDataStore colocatedRegionDateStore;
+ @Mock
+ private PartitionedRegionDataStore grandChildRegionDateStore2_3;
+ private final int bucketId = 29;
+
+ @Before
+ public void setup() {
+ initMocks(this);
+
+ when(partitionedRegion.isInitialized()).thenReturn(true);
+ when(childRegion1.isInitialized()).thenReturn(true);
+ when(childRegion2.isInitialized()).thenReturn(true);
+ when(grandChildRegion1_1.isInitialized()).thenReturn(true);
+ when(grandChildRegion1_2.isInitialized()).thenReturn(true);
+ when(grandChildRegion2_1.isInitialized()).thenReturn(true);
+ when(grandChildRegion2_2.isInitialized()).thenReturn(true);
+ when(grandChildRegion2_3.isInitialized()).thenReturn(true);
+
+ when(childRegion1.getDataStore()).thenReturn(colocatedRegionDateStore);
+ when(childRegion2.getDataStore()).thenReturn(colocatedRegionDateStore);
+
when(grandChildRegion1_1.getDataStore()).thenReturn(colocatedRegionDateStore);
+
when(grandChildRegion1_2.getDataStore()).thenReturn(colocatedRegionDateStore);
+
when(grandChildRegion2_1.getDataStore()).thenReturn(colocatedRegionDateStore);
+
when(grandChildRegion2_2.getDataStore()).thenReturn(colocatedRegionDateStore);
+
when(grandChildRegion2_3.getDataStore()).thenReturn(grandChildRegionDateStore2_3);
+
+
when(colocatedRegionDateStore.isColocationComplete(bucketId)).thenReturn(true);
+
when(grandChildRegionDateStore2_3.isColocationComplete(bucketId)).thenReturn(true);
+ }
+
+ @Test
+ public void
initializedPartitionedRegionWithoutColocationReturnsRegionReady() {
+ PartitionedRegionDataStore partitionedRegionDataStore = spy(new
PartitionedRegionDataStore());
+ List<PartitionedRegion> colocatedChildRegions = new
ArrayList<PartitionedRegion>();
+
+ doReturn(colocatedChildRegions).when(partitionedRegionDataStore)
+ .getColocatedChildRegions(partitionedRegion);
+
+
assertThat(partitionedRegionDataStore.isPartitionedRegionReady(partitionedRegion,
bucketId))
+ .isTrue();
+ }
+
+ @Test
+ public void
notInitializedPartitionedRegionWithoutColocationReturnsRegionNotReady() {
+ PartitionedRegionDataStore partitionedRegionDataStore = spy(new
PartitionedRegionDataStore());
+ List<PartitionedRegion> colocatedChildRegions = new
ArrayList<PartitionedRegion>();
+
+ doReturn(colocatedChildRegions).when(partitionedRegionDataStore)
+ .getColocatedChildRegions(partitionedRegion);
+ when(partitionedRegion.isInitialized()).thenReturn(false);
+
+
assertThat(partitionedRegionDataStore.isPartitionedRegionReady(partitionedRegion,
bucketId))
+ .isFalse();
+ }
+
+ @Test
+ public void returnRegionReadyIfAllColocatedRegionsAreReady() {
+ PartitionedRegionDataStore partitionedRegionDataStore = spy(new
PartitionedRegionDataStore());
+
+ setupColocatedRegions(partitionedRegionDataStore);
+
+
assertThat(partitionedRegionDataStore.isPartitionedRegionReady(partitionedRegion,
bucketId))
+ .isTrue();
+ }
+
+ private void setupColocatedRegions(PartitionedRegionDataStore
partitionedRegionDataStore) {
+ List<PartitionedRegion> colocatedChildRegions = new
ArrayList<PartitionedRegion>();
+ colocatedChildRegions.add(childRegion1);
+ colocatedChildRegions.add(childRegion2);
+ doReturn(colocatedChildRegions).when(partitionedRegionDataStore)
+ .getColocatedChildRegions(partitionedRegion);
+
+ List<PartitionedRegion> childRegion1ColocatedRegions = new
ArrayList<PartitionedRegion>();
+ colocatedChildRegions.add(grandChildRegion1_1);
+ colocatedChildRegions.add(grandChildRegion1_2);
+ doReturn(childRegion1ColocatedRegions).when(partitionedRegionDataStore)
+ .getColocatedChildRegions(childRegion1);
+
+ List<PartitionedRegion> childRegion2ColocatedRegions = new
ArrayList<PartitionedRegion>();
+ colocatedChildRegions.add(grandChildRegion2_1);
+ colocatedChildRegions.add(grandChildRegion2_2);
+ colocatedChildRegions.add(grandChildRegion2_3);
+ doReturn(childRegion2ColocatedRegions).when(partitionedRegionDataStore)
+ .getColocatedChildRegions(childRegion2);
+
+ List<PartitionedRegion> emptyList = new ArrayList<PartitionedRegion>();
+ doReturn(emptyList).when(partitionedRegionDataStore)
+ .getColocatedChildRegions(grandChildRegion1_1);
+ doReturn(emptyList).when(partitionedRegionDataStore)
+ .getColocatedChildRegions(grandChildRegion1_2);
+ doReturn(emptyList).when(partitionedRegionDataStore)
+ .getColocatedChildRegions(grandChildRegion2_1);
+ doReturn(emptyList).when(partitionedRegionDataStore)
+ .getColocatedChildRegions(grandChildRegion2_2);
+ doReturn(emptyList).when(partitionedRegionDataStore)
+ .getColocatedChildRegions(grandChildRegion2_3);
+ }
+
+ @Test
+ public void
returnRegionNotReadyIfColocationNotCompletedForAColocatedRegion() {
+ PartitionedRegionDataStore partitionedRegionDataStore = spy(new
PartitionedRegionDataStore());
+
+ setupColocatedRegions(partitionedRegionDataStore);
+
when(grandChildRegionDateStore2_3.isColocationComplete(bucketId)).thenReturn(false);
+
+
assertThat(partitionedRegionDataStore.isPartitionedRegionReady(partitionedRegion,
bucketId))
+ .isFalse();
+ }
+
+ @Test
+ public void returnRegionNotReadyIfAColocatedRegionIsNotInitialized() {
+ PartitionedRegionDataStore partitionedRegionDataStore = spy(new
PartitionedRegionDataStore());
+
+ setupColocatedRegions(partitionedRegionDataStore);
+ when(grandChildRegion2_2.isInitialized()).thenReturn(false);
+
+
assertThat(partitionedRegionDataStore.isPartitionedRegionReady(partitionedRegion,
bucketId))
+ .isFalse();
+ }
+}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ManageBackupBucketReplyMessageTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ManageBackupBucketReplyMessageTest.java
new file mode 100644
index 0000000..97d9663
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ManageBackupBucketReplyMessageTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionDataStore;
+import org.apache.geode.internal.cache.PartitionedRegionStats;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class ManageBackupBucketReplyMessageTest {
+ @Mock
+ private ClusterDistributionManager distributionManager;
+ @Mock
+ private PartitionedRegion partitionedRegion;
+ @Mock
+ private PartitionedRegionDataStore partitionedRegionDataStore;
+ @Mock
+ private PartitionedRegionStats partitionedRegionStats;
+ @Mock
+ private InternalDistributedMember source;
+ @Mock
+ private InternalDistributedMember recipent;
+ @Mock
+ private ReplyProcessor21 processor;
+ @Captor
+ private
ArgumentCaptor<ManageBackupBucketMessage.ManageBackupBucketReplyMessage>
replyMessage;
+ private final int bucketId = 15;
+ private final int regionId = 2;
+ private final boolean isReblance = true;
+ private final boolean replaceOfflineDate = false;
+ private final boolean forecCreation = true;
+
+ @Before
+ public void setup() {
+ initMocks(this);
+
+
when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
+ when(partitionedRegion.getPrStats()).thenReturn(partitionedRegionStats);
+ }
+
+ @Test
+ public void sendStillInitializingReplyIfColocatedRegionsAreInitializing() {
+ ManageBackupBucketMessage message = new ManageBackupBucketMessage();
+
+
when(partitionedRegionDataStore.isPartitionedRegionReady(partitionedRegion,
bucketId))
+ .thenReturn(false);
+
+ message.operateOnPartitionedRegion(distributionManager, partitionedRegion,
1);
+
+ verify(distributionManager, times(1)).putOutgoing(replyMessage.capture());
+
+ assertThat(replyMessage.getValue().isAcceptedBucket()).isFalse();
+ assertThat(replyMessage.getValue().isNotYetInitialized()).isTrue();
+ }
+
+ @Test
+ public void sendRefusalReplyIfDataStoreNotGrabbedBucket() {
+ ManageBackupBucketMessage message = spy(new
ManageBackupBucketMessage(recipent, regionId,
+ processor, bucketId, isReblance, replaceOfflineDate, source,
forecCreation));
+
+
when(partitionedRegionDataStore.isPartitionedRegionReady(partitionedRegion,
bucketId))
+ .thenReturn(true);
+ when(partitionedRegionDataStore.grabBucket(bucketId, source,
forecCreation, replaceOfflineDate,
+ isReblance, null,
false)).thenReturn(PartitionedRegionDataStore.CreateBucketResult.FAILED);
+ doReturn(recipent).when(message).getSender();
+
+ message.operateOnPartitionedRegion(distributionManager, partitionedRegion,
1);
+
+ verify(distributionManager, times(1)).putOutgoing(replyMessage.capture());
+
+ assertThat(replyMessage.getValue().isAcceptedBucket()).isFalse();
+ assertThat(replyMessage.getValue().isNotYetInitialized()).isFalse();
+ }
+
+ @Test
+ public void sendAcceptanceReplyIfDataStoreGrabbedBucket() {
+ ManageBackupBucketMessage message = spy(new
ManageBackupBucketMessage(recipent, regionId,
+ processor, bucketId, isReblance, replaceOfflineDate, source,
forecCreation));
+
+
when(partitionedRegionDataStore.isPartitionedRegionReady(partitionedRegion,
bucketId))
+ .thenReturn(true);
+ when(partitionedRegionDataStore.grabBucket(bucketId, source,
forecCreation, replaceOfflineDate,
+ isReblance, null,
false)).thenReturn(PartitionedRegionDataStore.CreateBucketResult.CREATED);
+ doReturn(recipent).when(message).getSender();
+
+ message.operateOnPartitionedRegion(distributionManager, partitionedRegion,
1);
+
+ verify(distributionManager, times(1)).putOutgoing(replyMessage.capture());
+
+ assertThat(replyMessage.getValue().isAcceptedBucket()).isTrue();
+ assertThat(replyMessage.getValue().isNotYetInitialized()).isFalse();
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
[email protected].