This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.13 by this push:
     new 3734528  GEODE-8073: Fix NPE after FetchKeysMessage failed. (#5055)
3734528 is described below

commit 37345287327a5b5cd993ee266d23ffc83befff1a
Author: Eric Shu <[email protected]>
AuthorDate: Wed May 6 08:56:20 2020 -0700

    GEODE-8073: Fix NPE after FetchKeysMessage failed. (#5055)
    
    
    (cherry picked from commit 643c617ec681918db3508030bd22922c76b87b25)
---
 .../geode/internal/cache/PartitionedRegion.java    | 73 ++++++++++++++--------
 .../internal/cache/PartitionedRegionTest.java      | 49 +++++++++++++++
 2 files changed, 95 insertions(+), 27 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index cacab2d..53754e3 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -4593,39 +4593,58 @@ public class PartitionedRegion extends LocalRegion
         buckets = bucketKeys.keySet();
       }
 
-      for (Integer bucket : buckets) {
-        Set keys = null;
-        if (bucketKeys == null) {
-          try {
-            FetchKeysResponse fkr = FetchKeysMessage.send(member, this, 
bucket, true);
-            keys = fkr.waitForKeys();
-          } catch (ForceReattemptException ignore) {
-            failures.add(bucket);
-          }
-        } else {
-          keys = bucketKeys.get(bucket);
+      fetchKeysAndValues(values, servConn, failures, member, bucketKeys, 
buckets);
+    }
+    return failures;
+  }
+
+  void fetchKeysAndValues(VersionedObjectList values, ServerConnection 
servConn,
+      Set<Integer> failures, InternalDistributedMember member,
+      HashMap<Integer, HashSet> bucketKeys, Set<Integer> buckets)
+      throws IOException {
+    for (Integer bucket : buckets) {
+      Set keys = null;
+      if (bucketKeys == null) {
+        try {
+          FetchKeysResponse fetchKeysResponse = getFetchKeysResponse(member, 
bucket);
+          keys = fetchKeysResponse.waitForKeys();
+        } catch (ForceReattemptException ignore) {
+          failures.add(bucket);
         }
+      } else {
+        keys = bucketKeys.get(bucket);
+      }
+      if (keys != null) {
+        getValuesForKeys(values, servConn, keys);
+      }
+    }
+  }
 
-        // TODO (ashetkar) Use single Get70 instance for all?
-        for (Object key : keys) {
-          Get70 command = (Get70) Get70.getCommand();
-          Get70.Entry ge = command.getValueAndIsObject(this, key, null, 
servConn);
+  FetchKeysResponse getFetchKeysResponse(InternalDistributedMember member,
+      Integer bucket)
+      throws ForceReattemptException {
+    return FetchKeysMessage.send(member, this, bucket, true);
+  }
 
-          if (ge.keyNotPresent) {
-            values.addObjectPartForAbsentKey(key, ge.value, ge.versionTag);
-          } else {
-            values.addObjectPart(key, ge.value, ge.isObject, ge.versionTag);
-          }
+  void getValuesForKeys(VersionedObjectList values, ServerConnection servConn, 
Set keys)
+      throws IOException {
+    // TODO (ashetkar) Use single Get70 instance for all?
+    for (Object key : keys) {
+      Get70 command = (Get70) Get70.getCommand();
+      Get70.Entry ge = command.getValueAndIsObject(this, key, null, servConn);
 
-          if (values.size() == BaseCommand.MAXIMUM_CHUNK_SIZE) {
-            BaseCommand.sendNewRegisterInterestResponseChunk(this, "keyList", 
values, false,
-                servConn);
-            values.clear();
-          }
-        }
+      if (ge.keyNotPresent) {
+        values.addObjectPartForAbsentKey(key, ge.value, ge.versionTag);
+      } else {
+        values.addObjectPart(key, ge.value, ge.isObject, ge.versionTag);
+      }
+
+      if (values.size() == BaseCommand.MAXIMUM_CHUNK_SIZE) {
+        BaseCommand.sendNewRegisterInterestResponseChunk(this, "keyList", 
values, false,
+            servConn);
+        values.clear();
       }
     }
-    return failures;
   }
 
   /**
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
index 210ce16..ae580e3 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
@@ -25,9 +25,11 @@ import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -69,7 +71,10 @@ import 
org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.control.InternalResourceManager;
+import org.apache.geode.internal.cache.partitioned.FetchKeysMessage;
 import 
org.apache.geode.internal.cache.partitioned.colocation.ColocationLoggerFactory;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
 
 @RunWith(JUnitParamsRunner.class)
 @SuppressWarnings({"deprecation", "unchecked", "unused"})
@@ -573,6 +578,50 @@ public class PartitionedRegionTest {
         
.hasMessage(PartitionedRegion.DATA_MOVED_BY_REBALANCE).hasCause(exception);
   }
 
+  @Test
+  public void failuresSavedIfFetchKeysThrows() throws Exception {
+    PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
+
+    VersionedObjectList values = mock(VersionedObjectList.class);
+    ServerConnection serverConnection = mock(ServerConnection.class);
+    Set<Integer> failures = new HashSet<>();
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    Set<Integer> buckets = new HashSet<>();
+    buckets.add(1);
+    doThrow(new 
ForceReattemptException("")).when(spyPartitionedRegion).getFetchKeysResponse(member,
+        1);
+
+    spyPartitionedRegion.fetchKeysAndValues(values, serverConnection, 
failures, member, null,
+        buckets);
+
+    verify(spyPartitionedRegion, never()).getValuesForKeys(values, 
serverConnection, null);
+    assertThat(failures.contains(1)).isTrue();
+  }
+
+  @Test
+  public void fetchKeysAndValuesInvokesGetValuesForKeys() throws Exception {
+    PartitionedRegion spyPartitionedRegion = spy(partitionedRegion);
+
+    VersionedObjectList values = mock(VersionedObjectList.class);
+    ServerConnection serverConnection = mock(ServerConnection.class);
+    Set<Integer> failures = new HashSet<>();
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    Set<Integer> buckets = new HashSet<>();
+    buckets.add(1);
+    FetchKeysMessage.FetchKeysResponse fetchKeysResponse =
+        mock(FetchKeysMessage.FetchKeysResponse.class);
+    
doReturn(fetchKeysResponse).when(spyPartitionedRegion).getFetchKeysResponse(member,
 1);
+    Set keys = new HashSet();
+    when(fetchKeysResponse.waitForKeys()).thenReturn(keys);
+    doNothing().when(spyPartitionedRegion).getValuesForKeys(values, 
serverConnection, keys);
+
+    spyPartitionedRegion.fetchKeysAndValues(values, serverConnection, 
failures, member, null,
+        buckets);
+
+    verify(spyPartitionedRegion).getValuesForKeys(values, serverConnection, 
keys);
+    assertThat(failures.contains(1)).isFalse();
+  }
+
   private static <K> Set<K> asSet(K... values) {
     Set<K> set = new HashSet<>();
     Collections.addAll(set, values);

Reply via email to