This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.13 by this push:
new 52b932b GEODE-8475: Resolve a potential dead lock in
ParallelGatewaySenderQueue (#5492)
52b932b is described below
commit 52b932bb6847dcc89c357c7e5ec960566ad965c7
Author: Xiaojian Zhou <[email protected]>
AuthorDate: Wed Sep 2 10:35:33 2020 -0700
GEODE-8475: Resolve a potential dead lock in ParallelGatewaySenderQueue
(#5492)
Co-authored-by: Darrel Schneider <[email protected]>
(cherry picked from commit b62e033af490d6f1e8f40621ff084b099f5b52e8)
---
.../wan/parallel/ParallelGatewaySenderQueue.java | 10 ++-
.../ParallelGatewaySenderQueueJUnitTest.java | 91 ++++++++++++++++++++++
2 files changed, 100 insertions(+), 1 deletion(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index aff49e1..3749eec 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -713,12 +713,16 @@ public class ParallelGatewaySenderQueue implements
RegionQueue {
bucketFullPath, brq);
}
if (brq != null) {
+ boolean intializingLocked = brq.lockWhenRegionIsInitializing();
brq.getInitializationLock().readLock().lock();
try {
putIntoBucketRegionQueue(brq, key, value);
putDone = true;
} finally {
brq.getInitializationLock().readLock().unlock();
+ if (intializingLocked) {
+ brq.unlockWhenRegionIsInitializing();
+ }
}
} else if (isDREvent) {
// in case of DR with PGS, if shadow bucket is not found event
after
@@ -758,12 +762,16 @@ public class ParallelGatewaySenderQueue implements
RegionQueue {
brq = (AbstractBucketRegionQueue) prQ.getCache()
.getInternalRegionByPath(bucketFullPath);
if (brq != null) {
+ boolean intializingLocked =
brq.lockWhenRegionIsInitializing();
brq.getInitializationLock().readLock().lock();
try {
putIntoBucketRegionQueue(brq, key, value);
putDone = true;
} finally {
brq.getInitializationLock().readLock().unlock();
+ if (intializingLocked) {
+ brq.unlockWhenRegionIsInitializing();
+ }
}
} else {
tempQueue.add(value);
@@ -832,7 +840,7 @@ public class ParallelGatewaySenderQueue implements
RegionQueue {
}
}
- private void putIntoBucketRegionQueue(AbstractBucketRegionQueue brq, Object
key,
+ void putIntoBucketRegionQueue(AbstractBucketRegionQueue brq, Object key,
GatewaySenderEventImpl value) {
boolean addedValueToQueue = false;
try {
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
index ba4d6ba..18ee31d 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
@@ -16,8 +16,11 @@ package org.apache.geode.internal.cache.wan.parallel;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -33,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
@@ -42,12 +46,14 @@ import org.apache.geode.cache.PartitionAttributes;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
+import org.apache.geode.internal.cache.BucketAdvisor;
import org.apache.geode.internal.cache.BucketRegionQueue;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalRegionFactory;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionDataStore;
+import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderStats;
@@ -70,6 +76,7 @@ public class ParallelGatewaySenderQueueJUnitTest {
when(sender.getCache()).thenReturn(cache);
when(sender.getMaximumQueueMemory()).thenReturn(100);
when(sender.getLifeCycleLock()).thenReturn(new ReentrantReadWriteLock());
+ when(sender.getId()).thenReturn("");
metaRegionFactory = mock(MetaRegionFactory.class);
queue = new ParallelGatewaySenderQueue(sender, Collections.emptySet(), 0,
1, metaRegionFactory);
}
@@ -120,6 +127,90 @@ public class ParallelGatewaySenderQueueJUnitTest {
assertThat(putDone).isFalse();
}
+ private void testEnqueueToBrqAfterLockFailedInitialImageReadLock(boolean
isTmpQueue)
+ throws InterruptedException {
+ GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
+ String regionPath = "/userPR";
+ when(event.getRegionPath()).thenReturn(regionPath);
+ when(event.makeHeapCopyIfOffHeap()).thenReturn(event);
+ when(event.getRegion()).thenReturn(null);
+ when(event.getBucketId()).thenReturn(1);
+ when(event.getShadowKey()).thenReturn(100L);
+ when(sender.isPersistenceEnabled()).thenReturn(true);
+ PartitionedRegionDataStore prds = mock(PartitionedRegionDataStore.class);
+ PartitionedRegion prQ = mock(PartitionedRegion.class);
+ AbstractBucketRegionQueue brq = mock(AbstractBucketRegionQueue.class);
+ ReentrantReadWriteLock initializationLock =
mock(ReentrantReadWriteLock.class);
+ ReentrantReadWriteLock.ReadLock readLock =
mock(ReentrantReadWriteLock.ReadLock.class);
+ when(initializationLock.readLock()).thenReturn(readLock);
+ doNothing().when(readLock).lock();
+ doNothing().when(readLock).unlock();
+ doNothing().when(brq).unlockWhenRegionIsInitializing();
+ when(brq.getInitializationLock()).thenReturn(initializationLock);
+ when(brq.lockWhenRegionIsInitializing()).thenReturn(true);
+ when(prQ.getDataStore()).thenReturn(prds);
+ when(prQ.getCache()).thenReturn(cache);
+
when(prQ.getBucketName(1)).thenReturn("_B__PARALLEL_GATEWAY_SENDER_QUEUE_1");
+ when(prds.getLocalBucketById(1)).thenReturn(null);
+ PartitionedRegion userPR = mock(PartitionedRegion.class);
+ PartitionAttributes pa = mock(PartitionAttributes.class);
+ when(userPR.getPartitionAttributes()).thenReturn(pa);
+ when(pa.getColocatedWith()).thenReturn(null);
+ when(userPR.getDataPolicy()).thenReturn(DataPolicy.PERSISTENT_PARTITION);
+ when(userPR.getFullPath()).thenReturn(regionPath);
+ when(cache.getRegion("_PARALLEL_GATEWAY_SENDER_QUEUE")).thenReturn(prQ);
+ when(cache.getRegion(regionPath, true)).thenReturn(userPR);
+ when(prQ.getColocatedWithRegion()).thenReturn(userPR);
+ RegionAdvisor ra = mock(RegionAdvisor.class);
+ BucketAdvisor ba = mock(BucketAdvisor.class);
+ when(userPR.getRegionAdvisor()).thenReturn(ra);
+ when(ra.getBucketAdvisor(1)).thenReturn(ba);
+
when(ba.isShadowBucketDestroyed("/__PR/_B__PARALLEL_GATEWAY_SENDER_QUEUE_1")).thenReturn(false);
+
+ prepareBrq(brq, isTmpQueue);
+
+ Mockito.doThrow(new IllegalStateException()).when(event).release();
+ Queue backingList = new LinkedList();
+ backingList.add(event);
+
+ BucketRegionQueue bucketRegionQueue = mockBucketRegionQueue(backingList);
+
+ TestableParallelGatewaySenderQueue queue = new
TestableParallelGatewaySenderQueue(sender,
+ Collections.emptySet(), 0, 1, metaRegionFactory);
+ queue.setMockedAbstractBucketRegionQueue(bucketRegionQueue);
+
+ InOrder inOrder = inOrder(brq, readLock);
+ queue = spy(queue);
+ queue.addShadowPartitionedRegionForUserPR(userPR);
+ doNothing().when(queue).putIntoBucketRegionQueue(eq(brq), any(),
eq(event));
+ boolean putDone = queue.put(event);
+ assertThat(putDone).isTrue();
+ inOrder.verify(brq).lockWhenRegionIsInitializing();
+ inOrder.verify(readLock).lock();
+ inOrder.verify(readLock).unlock();
+ inOrder.verify(brq).unlockWhenRegionIsInitializing();
+ }
+
+ private void prepareBrq(AbstractBucketRegionQueue brq, boolean isTmpQueue) {
+ if (isTmpQueue) {
+
when(cache.getInternalRegionByPath("/__PR/_B__PARALLEL_GATEWAY_SENDER_QUEUE_1"))
+ .thenReturn(null).thenReturn(brq);
+ } else {
+
when(cache.getInternalRegionByPath("/__PR/_B__PARALLEL_GATEWAY_SENDER_QUEUE_1"))
+ .thenReturn(brq);
+ }
+ }
+
+ @Test
+ public void enqueueToInitializingBrqShouldLockFailedInitialImageReadLock()
throws Exception {
+ testEnqueueToBrqAfterLockFailedInitialImageReadLock(false);
+ }
+
+ @Test
+ public void enqueueToTmpQueueShouldLockFailedInitialImageReadLock() throws
Exception {
+ testEnqueueToBrqAfterLockFailedInitialImageReadLock(true);
+ }
+
@Test
public void
whenEventReleaseFromOffHeapFailsExceptionShouldNotBeThrownToAckReaderThread()
throws Exception {