This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.13 by this push:
     new 52b932b  GEODE-8475: Resolve a potential dead lock in 
ParallelGatewaySenderQueue (#5492)
52b932b is described below

commit 52b932bb6847dcc89c357c7e5ec960566ad965c7
Author: Xiaojian Zhou <[email protected]>
AuthorDate: Wed Sep 2 10:35:33 2020 -0700

    GEODE-8475: Resolve a potential dead lock in ParallelGatewaySenderQueue 
(#5492)
    
        Co-authored-by: Darrel Schneider <[email protected]>
    
    (cherry picked from commit b62e033af490d6f1e8f40621ff084b099f5b52e8)
---
 .../wan/parallel/ParallelGatewaySenderQueue.java   | 10 ++-
 .../ParallelGatewaySenderQueueJUnitTest.java       | 91 ++++++++++++++++++++++
 2 files changed, 100 insertions(+), 1 deletion(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index aff49e1..3749eec 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -713,12 +713,16 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
                 bucketFullPath, brq);
           }
           if (brq != null) {
+            boolean intializingLocked = brq.lockWhenRegionIsInitializing();
             brq.getInitializationLock().readLock().lock();
             try {
               putIntoBucketRegionQueue(brq, key, value);
               putDone = true;
             } finally {
               brq.getInitializationLock().readLock().unlock();
+              if (intializingLocked) {
+                brq.unlockWhenRegionIsInitializing();
+              }
             }
           } else if (isDREvent) {
             // in case of DR with PGS, if shadow bucket is not found event 
after
@@ -758,12 +762,16 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
                 brq = (AbstractBucketRegionQueue) prQ.getCache()
                     .getInternalRegionByPath(bucketFullPath);
                 if (brq != null) {
+                  boolean intializingLocked = 
brq.lockWhenRegionIsInitializing();
                   brq.getInitializationLock().readLock().lock();
                   try {
                     putIntoBucketRegionQueue(brq, key, value);
                     putDone = true;
                   } finally {
                     brq.getInitializationLock().readLock().unlock();
+                    if (intializingLocked) {
+                      brq.unlockWhenRegionIsInitializing();
+                    }
                   }
                 } else {
                   tempQueue.add(value);
@@ -832,7 +840,7 @@ public class ParallelGatewaySenderQueue implements 
RegionQueue {
     }
   }
 
-  private void putIntoBucketRegionQueue(AbstractBucketRegionQueue brq, Object 
key,
+  void putIntoBucketRegionQueue(AbstractBucketRegionQueue brq, Object key,
       GatewaySenderEventImpl value) {
     boolean addedValueToQueue = false;
     try {
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
index ba4d6ba..18ee31d 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
@@ -16,8 +16,11 @@ package org.apache.geode.internal.cache.wan.parallel;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -33,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.InOrder;
 import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
 
@@ -42,12 +46,14 @@ import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
+import org.apache.geode.internal.cache.BucketAdvisor;
 import org.apache.geode.internal.cache.BucketRegionQueue;
 import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalRegionFactory;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.PartitionedRegionDataStore;
+import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import org.apache.geode.internal.cache.wan.GatewaySenderStats;
@@ -70,6 +76,7 @@ public class ParallelGatewaySenderQueueJUnitTest {
     when(sender.getCache()).thenReturn(cache);
     when(sender.getMaximumQueueMemory()).thenReturn(100);
     when(sender.getLifeCycleLock()).thenReturn(new ReentrantReadWriteLock());
+    when(sender.getId()).thenReturn("");
     metaRegionFactory = mock(MetaRegionFactory.class);
     queue = new ParallelGatewaySenderQueue(sender, Collections.emptySet(), 0, 
1, metaRegionFactory);
   }
@@ -120,6 +127,90 @@ public class ParallelGatewaySenderQueueJUnitTest {
     assertThat(putDone).isFalse();
   }
 
+  private void testEnqueueToBrqAfterLockFailedInitialImageReadLock(boolean 
isTmpQueue)
+      throws InterruptedException {
+    GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
+    String regionPath = "/userPR";
+    when(event.getRegionPath()).thenReturn(regionPath);
+    when(event.makeHeapCopyIfOffHeap()).thenReturn(event);
+    when(event.getRegion()).thenReturn(null);
+    when(event.getBucketId()).thenReturn(1);
+    when(event.getShadowKey()).thenReturn(100L);
+    when(sender.isPersistenceEnabled()).thenReturn(true);
+    PartitionedRegionDataStore prds = mock(PartitionedRegionDataStore.class);
+    PartitionedRegion prQ = mock(PartitionedRegion.class);
+    AbstractBucketRegionQueue brq = mock(AbstractBucketRegionQueue.class);
+    ReentrantReadWriteLock initializationLock = 
mock(ReentrantReadWriteLock.class);
+    ReentrantReadWriteLock.ReadLock readLock = 
mock(ReentrantReadWriteLock.ReadLock.class);
+    when(initializationLock.readLock()).thenReturn(readLock);
+    doNothing().when(readLock).lock();
+    doNothing().when(readLock).unlock();
+    doNothing().when(brq).unlockWhenRegionIsInitializing();
+    when(brq.getInitializationLock()).thenReturn(initializationLock);
+    when(brq.lockWhenRegionIsInitializing()).thenReturn(true);
+    when(prQ.getDataStore()).thenReturn(prds);
+    when(prQ.getCache()).thenReturn(cache);
+    
when(prQ.getBucketName(1)).thenReturn("_B__PARALLEL_GATEWAY_SENDER_QUEUE_1");
+    when(prds.getLocalBucketById(1)).thenReturn(null);
+    PartitionedRegion userPR = mock(PartitionedRegion.class);
+    PartitionAttributes pa = mock(PartitionAttributes.class);
+    when(userPR.getPartitionAttributes()).thenReturn(pa);
+    when(pa.getColocatedWith()).thenReturn(null);
+    when(userPR.getDataPolicy()).thenReturn(DataPolicy.PERSISTENT_PARTITION);
+    when(userPR.getFullPath()).thenReturn(regionPath);
+    when(cache.getRegion("_PARALLEL_GATEWAY_SENDER_QUEUE")).thenReturn(prQ);
+    when(cache.getRegion(regionPath, true)).thenReturn(userPR);
+    when(prQ.getColocatedWithRegion()).thenReturn(userPR);
+    RegionAdvisor ra = mock(RegionAdvisor.class);
+    BucketAdvisor ba = mock(BucketAdvisor.class);
+    when(userPR.getRegionAdvisor()).thenReturn(ra);
+    when(ra.getBucketAdvisor(1)).thenReturn(ba);
+    
when(ba.isShadowBucketDestroyed("/__PR/_B__PARALLEL_GATEWAY_SENDER_QUEUE_1")).thenReturn(false);
+
+    prepareBrq(brq, isTmpQueue);
+
+    Mockito.doThrow(new IllegalStateException()).when(event).release();
+    Queue backingList = new LinkedList();
+    backingList.add(event);
+
+    BucketRegionQueue bucketRegionQueue = mockBucketRegionQueue(backingList);
+
+    TestableParallelGatewaySenderQueue queue = new 
TestableParallelGatewaySenderQueue(sender,
+        Collections.emptySet(), 0, 1, metaRegionFactory);
+    queue.setMockedAbstractBucketRegionQueue(bucketRegionQueue);
+
+    InOrder inOrder = inOrder(brq, readLock);
+    queue = spy(queue);
+    queue.addShadowPartitionedRegionForUserPR(userPR);
+    doNothing().when(queue).putIntoBucketRegionQueue(eq(brq), any(), 
eq(event));
+    boolean putDone = queue.put(event);
+    assertThat(putDone).isTrue();
+    inOrder.verify(brq).lockWhenRegionIsInitializing();
+    inOrder.verify(readLock).lock();
+    inOrder.verify(readLock).unlock();
+    inOrder.verify(brq).unlockWhenRegionIsInitializing();
+  }
+
+  private void prepareBrq(AbstractBucketRegionQueue brq, boolean isTmpQueue) {
+    if (isTmpQueue) {
+      
when(cache.getInternalRegionByPath("/__PR/_B__PARALLEL_GATEWAY_SENDER_QUEUE_1"))
+          .thenReturn(null).thenReturn(brq);
+    } else {
+      
when(cache.getInternalRegionByPath("/__PR/_B__PARALLEL_GATEWAY_SENDER_QUEUE_1"))
+          .thenReturn(brq);
+    }
+  }
+
+  @Test
+  public void enqueueToInitializingBrqShouldLockFailedInitialImageReadLock() 
throws Exception {
+    testEnqueueToBrqAfterLockFailedInitialImageReadLock(false);
+  }
+
+  @Test
+  public void enqueueToTmpQueueShouldLockFailedInitialImageReadLock() throws 
Exception {
+    testEnqueueToBrqAfterLockFailedInitialImageReadLock(true);
+  }
+
   @Test
   public void 
whenEventReleaseFromOffHeapFailsExceptionShouldNotBeThrownToAckReaderThread()
       throws Exception {

Reply via email to