This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.13 by this push:
new 3734528 GEODE-8073: Fix NPE after FetchKeysMessage failed. (#5055)
3734528 is described below
commit 37345287327a5b5cd993ee266d23ffc83befff1a
Author: Eric Shu <[email protected]>
AuthorDate: Wed May 6 08:56:20 2020 -0700
GEODE-8073: Fix NPE after FetchKeysMessage failed. (#5055)
(cherry picked from commit 643c617ec681918db3508030bd22922c76b87b25)
---
.../geode/internal/cache/PartitionedRegion.java | 73 ++++++++++++++--------
.../internal/cache/PartitionedRegionTest.java | 49 +++++++++++++++
2 files changed, 95 insertions(+), 27 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index cacab2d..53754e3 100755
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -4593,39 +4593,58 @@ public class PartitionedRegion extends LocalRegion
buckets = bucketKeys.keySet();
}
- for (Integer bucket : buckets) {
- Set keys = null;
- if (bucketKeys == null) {
- try {
- FetchKeysResponse fkr = FetchKeysMessage.send(member, this,
bucket, true);
- keys = fkr.waitForKeys();
- } catch (ForceReattemptException ignore) {
- failures.add(bucket);
- }
- } else {
- keys = bucketKeys.get(bucket);
+ fetchKeysAndValues(values, servConn, failures, member, bucketKeys,
buckets);
+ }
+ return failures;
+ }
+
+ void fetchKeysAndValues(VersionedObjectList values, ServerConnection
servConn,
+ Set<Integer> failures, InternalDistributedMember member,
+ HashMap<Integer, HashSet> bucketKeys, Set<Integer> buckets)
+ throws IOException {
+ for (Integer bucket : buckets) {
+ Set keys = null;
+ if (bucketKeys == null) {
+ try {
+ FetchKeysResponse fetchKeysResponse = getFetchKeysResponse(member,
bucket);
+ keys = fetchKeysResponse.waitForKeys();
+ } catch (ForceReattemptException ignore) {
+ failures.add(bucket);
}
+ } else {
+ keys = bucketKeys.get(bucket);
+ }
+ if (keys != null) {
+ getValuesForKeys(values, servConn, keys);
+ }
+ }
+ }
- // TODO (ashetkar) Use single Get70 instance for all?
- for (Object key : keys) {
- Get70 command = (Get70) Get70.getCommand();
- Get70.Entry ge = command.getValueAndIsObject(this, key, null,
servConn);
+ FetchKeysResponse getFetchKeysResponse(InternalDistributedMember member,
+ Integer bucket)
+ throws ForceReattemptException {
+ return FetchKeysMessage.send(member, this, bucket, true);
+ }
- if (ge.keyNotPresent) {
- values.addObjectPartForAbsentKey(key, ge.value, ge.versionTag);
- } else {
- values.addObjectPart(key, ge.value, ge.isObject, ge.versionTag);
- }
+ void getValuesForKeys(VersionedObjectList values, ServerConnection servConn,
Set keys)
+ throws IOException {
+ // TODO (ashetkar) Use single Get70 instance for all?
+ for (Object key : keys) {
+ Get70 command = (Get70) Get70.getCommand();
+ Get70.Entry ge = command.getValueAndIsObject(this, key, null, servConn);
- if (values.size() == BaseCommand.MAXIMUM_CHUNK_SIZE) {
- BaseCommand.sendNewRegisterInterestResponseChunk(this, "keyList",
values, false,
- servConn);
- values.clear();
- }
- }
+ if (ge.keyNotPresent) {
+ values.addObjectPartForAbsentKey(key, ge.value, ge.versionTag);
+ } else {
+ values.addObjectPart(key, ge.value, ge.isObject, ge.versionTag);
+ }
+
+ if (values.size() == BaseCommand.MAXIMUM_CHUNK_SIZE) {
+ BaseCommand.sendNewRegisterInterestResponseChunk(this, "keyList",
values, false,
+ servConn);
+ values.clear();
}
}
- return failures;
}
/**
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
index 210ce16..ae580e3 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
@@ -25,9 +25,11 @@ import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -69,7 +71,10 @@ import
org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.control.InternalResourceManager;
+import org.apache.geode.internal.cache.partitioned.FetchKeysMessage;
import
org.apache.geode.internal.cache.partitioned.colocation.ColocationLoggerFactory;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
@RunWith(JUnitParamsRunner.class)
@SuppressWarnings({"deprecation", "unchecked", "unused"})
@@ -573,6 +578,50 @@ public class PartitionedRegionTest {
.hasMessage(PartitionedRegion.DATA_MOVED_BY_REBALANCE).hasCause(exception);
}
+ @Test
+ public void failuresSavedIfFetchKeysThrows() throws Exception {
+ PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
+
+ VersionedObjectList values = mock(VersionedObjectList.class);
+ ServerConnection serverConnection = mock(ServerConnection.class);
+ Set<Integer> failures = new HashSet<>();
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+ Set<Integer> buckets = new HashSet<>();
+ buckets.add(1);
+ doThrow(new
ForceReattemptException("")).when(spyPartitionedRegion).getFetchKeysResponse(member,
+ 1);
+
+ spyPartitionedRegion.fetchKeysAndValues(values, serverConnection,
failures, member, null,
+ buckets);
+
+ verify(spyPartitionedRegion, never()).getValuesForKeys(values,
serverConnection, null);
+ assertThat(failures.contains(1)).isTrue();
+ }
+
+ @Test
+ public void fetchKeysAndValuesInvokesGetValuesForKeys() throws Exception {
+ PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
+
+ VersionedObjectList values = mock(VersionedObjectList.class);
+ ServerConnection serverConnection = mock(ServerConnection.class);
+ Set<Integer> failures = new HashSet<>();
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+ Set<Integer> buckets = new HashSet<>();
+ buckets.add(1);
+ FetchKeysMessage.FetchKeysResponse fetchKeysResponse =
+ mock(FetchKeysMessage.FetchKeysResponse.class);
+
doReturn(fetchKeysResponse).when(spyPartitionedRegion).getFetchKeysResponse(member,
1);
+ Set keys = new HashSet();
+ when(fetchKeysResponse.waitForKeys()).thenReturn(keys);
+ doNothing().when(spyPartitionedRegion).getValuesForKeys(values,
serverConnection, keys);
+
+ spyPartitionedRegion.fetchKeysAndValues(values, serverConnection,
failures, member, null,
+ buckets);
+
+ verify(spyPartitionedRegion).getValuesForKeys(values, serverConnection,
keys);
+ assertThat(failures.contains(1)).isFalse();
+ }
+
private static <K> Set<K> asSet(K... values) {
Set<K> set = new HashSet<>();
Collections.addAll(set, values);