This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-8432
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 39ea51eb55f7833ca5a2b867f2acb331895ca1ec
Author: zhouxh <[email protected]>
AuthorDate: Tue Aug 18 10:47:35 2020 -0700

    GEODE-8432: use regionPath directly instead of getRegion when put event 
into parallelGatewaySenderQueue
---
 .../wan/parallel/ParallelGatewaySenderQueue.java   | 16 ++++---
 .../ParallelGatewaySenderQueueJUnitTest.java       | 49 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 6 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index df531ce..4e3c9db 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -693,15 +693,19 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
 
     boolean isDREvent = isDREvent(sender.getCache(), value);
 
-    Region region = value.getRegion();
-    String regionPath = null;
-    if (isDREvent) {
-      regionPath = region.getFullPath();
-    } else {
+    String regionPath = value.getRegionPath();
+    if (!isDREvent) {
+      Region region = sender.getCache().getRegion(regionPath);
+      if (region == null) {
+        if (isDebugEnabled) {
+          logger.debug("The PR " + regionPath + " has not finished 
initializing.");
+        }
+        region = value.getRegion();
+      }
       regionPath = ColocationHelper.getLeaderRegion((PartitionedRegion) 
region).getFullPath();
     }
     if (isDebugEnabled) {
-      logger.debug("Put is for the region {}", region);
+      logger.debug("Put is for the region {}", regionPath);
     }
     if (!this.userRegionNameToShadowPRMap.containsKey(regionPath)) {
       if (isDebugEnabled) {
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
index b8acbcf..7f51b7b 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
@@ -17,7 +17,9 @@ package org.apache.geode.internal.cache.wan.parallel;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -37,6 +39,7 @@ import org.mockito.stubbing.Answer;
 
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
@@ -76,6 +79,52 @@ public class ParallelGatewaySenderQueueJUnitTest {
   }
 
   @Test
+  public void whenReplicatedDataRegionNotReadyShouldNotThrowException() throws 
Exception {
+    GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
+    when(event.makeHeapCopyIfOffHeap()).thenReturn(event);
+    when(event.getRegion()).thenReturn(null);
+    String regionPath = "/testRegion";
+    when(event.getRegionPath()).thenReturn(regionPath);
+    Mockito.doThrow(new IllegalStateException()).when(event).release();
+    Queue backingList = new LinkedList();
+    backingList.add(event);
+
+    queue = spy(queue);
+    doReturn(true).when(queue).isDREvent(any(), any());
+    boolean putDone = queue.put(event);
+    assertThat(putDone).isFalse();
+  }
+
+  @Test
+  public void whenPartitionedDataRegionNotReadyShouldNotThrowException() 
throws Exception {
+    GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
+    when(event.makeHeapCopyIfOffHeap()).thenReturn(event);
+    when(event.getRegion()).thenReturn(null);
+    String regionPath = "/testRegion";
+    when(event.getRegionPath()).thenReturn(regionPath);
+    PartitionedRegion region = mock(PartitionedRegion.class);
+    when(region.getFullPath()).thenReturn(regionPath);
+    when(cache.getRegion(regionPath)).thenReturn(region);
+    PartitionAttributes pa = mock(PartitionAttributes.class);
+    when(region.getPartitionAttributes()).thenReturn(pa);
+    when(pa.getColocatedWith()).thenReturn(null);
+
+    Mockito.doThrow(new IllegalStateException()).when(event).release();
+    Queue backingList = new LinkedList();
+    backingList.add(event);
+
+    BucketRegionQueue bucketRegionQueue = mockBucketRegionQueue(backingList);
+
+    TestableParallelGatewaySenderQueue queue = new 
TestableParallelGatewaySenderQueue(sender,
+        Collections.emptySet(), 0, 1, metaRegionFactory);
+    queue.setMockedAbstractBucketRegionQueue(bucketRegionQueue);
+
+    queue = spy(queue);
+    boolean putDone = queue.put(event);
+    assertThat(putDone).isFalse();
+  }
+
+  @Test
   public void 
whenEventReleaseFromOffHeapFailsExceptionShouldNotBeThrownToAckReaderThread()
       throws Exception {
     GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);

Reply via email to