This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 262dc08 GEODE-10094: Fix NPE when populating index in partitioned
region. (#7410)
262dc08 is described below
commit 262dc08901cfe36c95c833f0b4deb317f1546e97
Author: Eric Shu <[email protected]>
AuthorDate: Wed Mar 2 13:27:19 2022 -0800
GEODE-10094: Fix NPE when populating index in partitioned region. (#7410)
* IndexUtils.getIndexManager can return null if bucket region is
destroyed due to cache is closing. Make sure CancelException is
thrown in this case.
---
.../geode/internal/cache/PartitionedRegion.java | 9 ++-
.../internal/cache/PartitionedRegionTest.java | 86 ++++++++++++++++++++++
2 files changed, 93 insertions(+), 2 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 540ed07..2dc2f87 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -8581,7 +8581,8 @@ public class PartitionedRegion extends LocalRegion
}
}
- private boolean populateEmptyIndexes(Set<Index> indexes,
+ @VisibleForTesting
+ boolean populateEmptyIndexes(Set<Index> indexes,
HashMap<String, Exception> exceptionsMap) {
boolean throwException = false;
if (getDataStore() != null && indexes.size() > 0) {
@@ -8594,6 +8595,9 @@ public class PartitionedRegion extends LocalRegion
continue;
}
IndexManager bucketIndexManager = IndexUtils.getIndexManager(cache,
bucket, true);
+ if (bucketIndexManager == null) {
+ cache.getCancelCriterion().checkCancelInProgress();
+ }
Set<Index> bucketIndexes = getBucketIndexesForPRIndexes(bucket,
indexes);
try {
bucketIndexManager.populateIndexes(bucketIndexes);
@@ -8606,7 +8610,8 @@ public class PartitionedRegion extends LocalRegion
return throwException;
}
- private Set<Index> getBucketIndexesForPRIndexes(Region bucket, Set<Index>
indexes) {
+ @VisibleForTesting
+ Set<Index> getBucketIndexesForPRIndexes(Region bucket, Set<Index> indexes) {
Set<Index> bucketIndexes = new HashSet<>();
for (Index ind : indexes) {
bucketIndexes.addAll(((PartitionedIndex) ind).getBucketIndexes(bucket));
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
index 110c5e0..67fdf9f 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
@@ -42,9 +42,12 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import junitparams.Parameters;
import junitparams.naming.TestCaseName;
+import org.jetbrains.annotations.NotNull;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -55,6 +58,7 @@ import org.mockito.junit.MockitoRule;
import org.apache.geode.CancelCriterion;
import org.apache.geode.Statistics;
import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheLoader;
import org.apache.geode.cache.CacheWriter;
import org.apache.geode.cache.Operation;
@@ -65,6 +69,9 @@ import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.TransactionDataRebalancedException;
import org.apache.geode.cache.TransactionException;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
+import org.apache.geode.cache.query.Index;
+import org.apache.geode.cache.query.MultiIndexCreationException;
+import org.apache.geode.cache.query.internal.index.IndexManager;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
@@ -637,6 +644,85 @@ public class PartitionedRegionTest {
assertThat(partitionedRegion.isRegionCreateNotified()).isTrue();
}
+ @Test
+ public void
populateEmptyIndexesThrowsIfBucketRegionDestroyedDueToCacheClose() {
+ PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
+ BucketRegion bucketRegion = mock(BucketRegion.class);
+ when(bucketRegion.isDestroyed()).thenReturn(true);
+
+ Set<Index> indexes = setupIndexes();
+ ConcurrentMap<Integer, BucketRegion> map = setupBuckets(bucketRegion);
+ setupDataStore(spyPartitionedRegion, map);
+ CacheClosedException cacheClosedException = new CacheClosedException();
+ setupCancelCriterion(cacheClosedException);
+
+ assertThatThrownBy(() ->
spyPartitionedRegion.populateEmptyIndexes(indexes, new HashMap<>()))
+ .isEqualTo(cacheClosedException);
+ }
+
+ @NotNull
+ private Set<Index> setupIndexes() {
+ Set<Index> indexes = new HashSet<>();
+ Index index = mock(Index.class);
+ indexes.add(index);
+ return indexes;
+ }
+
+ @NotNull
+ private ConcurrentMap<Integer, BucketRegion> setupBuckets(BucketRegion
bucketRegion) {
+ ConcurrentMap<Integer, BucketRegion> map = new ConcurrentHashMap<>();
+ map.put(1, bucketRegion);
+ return map;
+ }
+
+ private void setupDataStore(PartitionedRegion spyPartitionedRegion,
+ ConcurrentMap<Integer, BucketRegion> map) {
+ PartitionedRegionDataStore dataStore =
mock(PartitionedRegionDataStore.class);
+ doReturn(dataStore).when(spyPartitionedRegion).getDataStore();
+ when(dataStore.getAllLocalBuckets()).thenReturn(map.entrySet());
+ }
+
+ private void setupCancelCriterion(CacheClosedException cacheClosedException)
{
+ CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+ when(cache.getCancelCriterion()).thenReturn(cancelCriterion);
+
doThrow(cacheClosedException).when(cancelCriterion).checkCancelInProgress();
+ }
+
+ @Test
+ public void
populateEmptyIndexesReturnsFalseIfIndexManagerPopulateIndexesSuccessfully() {
+ PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
+ IndexManager indexManager = mock(IndexManager.class);
+ BucketRegion bucketRegion = mock(BucketRegion.class);
+ when(bucketRegion.getIndexManager()).thenReturn(indexManager);
+
+ Set<Index> indexes = setupIndexes();
+ ConcurrentMap<Integer, BucketRegion> map = setupBuckets(bucketRegion);
+ setupDataStore(spyPartitionedRegion, map);
+
doReturn(indexes).when(spyPartitionedRegion).getBucketIndexesForPRIndexes(bucketRegion,
+ indexes);
+
+ assertThat(spyPartitionedRegion.populateEmptyIndexes(indexes, new
HashMap<>())).isFalse();
+ }
+
+ @Test
+ public void
populateEmptyIndexesReturnsTrueIfIndexManagerPopulateIndexesThrows()
+ throws Exception {
+ PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
+ IndexManager indexManager = mock(IndexManager.class);
+ BucketRegion bucketRegion = mock(BucketRegion.class);
+ when(bucketRegion.getIndexManager()).thenReturn(indexManager);
+ MultiIndexCreationException exception =
mock(MultiIndexCreationException.class);
+
+ Set<Index> indexes = setupIndexes();
+ ConcurrentMap<Integer, BucketRegion> map = setupBuckets(bucketRegion);
+ setupDataStore(spyPartitionedRegion, map);
+
doReturn(indexes).when(spyPartitionedRegion).getBucketIndexesForPRIndexes(bucketRegion,
+ indexes);
+ doThrow(exception).when(indexManager).populateIndexes(indexes);
+
+ assertThat(spyPartitionedRegion.populateEmptyIndexes(indexes, new
HashMap<>())).isTrue();
+ }
+
private static <K> Set<K> asSet(K... values) {
Set<K> set = new HashSet<>();
Collections.addAll(set, values);