This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.13 by this push:
new 2dfef38 GEODE-5922: concurrency problems in SerialGatewaySenderQueue
(#5870)
2dfef38 is described below
commit 2dfef383298035d6a65b939c5d0edef5060bcd46
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Wed Jan 6 09:56:54 2021 -0800
GEODE-5922: concurrency problems in SerialGatewaySenderQueue (#5870)
reverting 3ed37a754d789bb52cf190db23088e819955fd58
(cherry picked from commit ab16f68c7c3b121af00c3aca64a92d9809cb6019)
---
.../cache/wan/serial/SerialGatewaySenderQueue.java | 124 ++++++++-------------
1 file changed, 49 insertions(+), 75 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index 243fe73..42ff881 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -25,7 +25,6 @@ import java.util.Set;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.logging.log4j.Logger;
@@ -141,13 +140,6 @@ public class SerialGatewaySenderQueue implements
RegionQueue {
private boolean isDiskSynchronous;
/**
- * The writeLock of this concurrent lock is used to protect access to the
queue.
- * It is implemented as a fair lock to ensure FIFO ordering of queueing
attempts.
- * Otherwise threads can be unfairly delayed.
- */
- private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
-
- /**
* The <code>Map</code> mapping the regionName->key to the queue key. This
index allows fast
* updating of entries in the queue for conflation.
*/
@@ -219,23 +211,18 @@ public class SerialGatewaySenderQueue implements
RegionQueue {
}
@Override
- public boolean put(Object event) throws CacheException {
- lock.writeLock().lock();
- try {
- GatewaySenderEventImpl eventImpl = (GatewaySenderEventImpl) event;
- final Region r = eventImpl.getRegion();
- final boolean isPDXRegion =
- (r instanceof DistributedRegion &&
r.getName().equals(PeerTypeRegistration.REGION_NAME));
- final boolean isWbcl =
-
this.regionName.startsWith(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX);
- if (!(isPDXRegion && isWbcl)) {
- putAndGetKey(event);
- return true;
- }
- return false;
- } finally {
- lock.writeLock().unlock();
+ public synchronized boolean put(Object event) throws CacheException {
+ GatewaySenderEventImpl eventImpl = (GatewaySenderEventImpl) event;
+ final Region r = eventImpl.getRegion();
+ final boolean isPDXRegion =
+ (r instanceof DistributedRegion &&
r.getName().equals(PeerTypeRegistration.REGION_NAME));
+ final boolean isWbcl =
+
this.regionName.startsWith(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX);
+ if (!(isPDXRegion && isWbcl)) {
+ putAndGetKey(event);
+ return true;
}
+ return false;
}
private long putAndGetKey(Object object) throws CacheException {
@@ -259,6 +246,7 @@ public class SerialGatewaySenderQueue implements
RegionQueue {
return key.longValue();
}
+
@Override
public AsyncEvent take() throws CacheException {
// Unsupported since we have no callers.
@@ -280,49 +268,44 @@ public class SerialGatewaySenderQueue implements
RegionQueue {
* have peeked. If the entry was not peeked, this method will silently
return.
*/
@Override
- public void remove() throws CacheException {
- lock.writeLock().lock();
+ public synchronized void remove() throws CacheException {
+ if (this.peekedIds.isEmpty()) {
+ return;
+ }
+ Long key = this.peekedIds.remove();
try {
- if (this.peekedIds.isEmpty()) {
- return;
- }
- Long key = this.peekedIds.remove();
- try {
- // Increment the head key
- updateHeadKey(key.longValue());
- removeIndex(key);
- // Remove the entry at that key with a callback arg signifying it is
- // a WAN queue so that AbstractRegionEntry.destroy can get the value
- // even if it has been evicted to disk. In the normal case, the
- // AbstractRegionEntry.destroy only gets the value in the VM.
- this.region.localDestroy(key, WAN_QUEUE_TOKEN);
- this.stats.decQueueSize();
-
- } catch (EntryNotFoundException ok) {
- // this is acceptable because the conflation can remove entries
- // out from underneath us.
- if (logger.isDebugEnabled()) {
- logger.debug(
- "{}: Did not destroy entry at {} it was not there. It should
have been removed by conflation.",
- this, key);
- }
- }
-
- boolean wasEmpty = this.lastDispatchedKey == this.lastDestroyedKey;
- this.lastDispatchedKey = key;
- if (wasEmpty) {
- synchronized (this) {
- notifyAll();
- }
- }
+ // Increment the head key
+ updateHeadKey(key.longValue());
+ removeIndex(key);
+ // Remove the entry at that key with a callback arg signifying it is
+ // a WAN queue so that AbstractRegionEntry.destroy can get the value
+ // even if it has been evicted to disk. In the normal case, the
+ // AbstractRegionEntry.destroy only gets the value in the VM.
+ this.region.localDestroy(key, WAN_QUEUE_TOKEN);
+ this.stats.decQueueSize();
+ } catch (EntryNotFoundException ok) {
+ // this is acceptable because the conflation can remove entries
+ // out from underneath us.
if (logger.isDebugEnabled()) {
logger.debug(
- "{}: Destroyed entry at key {} setting the lastDispatched Key to
{}. The last destroyed entry was {}",
- this, key, this.lastDispatchedKey, this.lastDestroyedKey);
+ "{}: Did not destroy entry at {} it was not there. It should have
been removed by conflation.",
+ this, key);
+ }
+ }
+
+ boolean wasEmpty = this.lastDispatchedKey == this.lastDestroyedKey;
+ this.lastDispatchedKey = key;
+ if (wasEmpty) {
+ synchronized (this) {
+ notifyAll();
}
- } finally {
- lock.writeLock().unlock();
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "{}: Destroyed entry at key {} setting the lastDispatched Key to {}.
The last destroyed entry was {}",
+ this, key, this.lastDispatchedKey, this.lastDestroyedKey);
}
}
@@ -463,8 +446,7 @@ public class SerialGatewaySenderQueue implements
RegionQueue {
Object key = object.getKeyToConflate();
Long previousIndex;
- lock.writeLock().lock();
- try {
+ synchronized (this) {
Map<Object, Long> latestIndexesForRegion = this.indexes.get(rName);
if (latestIndexesForRegion == null) {
latestIndexesForRegion = new HashMap<Object, Long>();
@@ -472,8 +454,6 @@ public class SerialGatewaySenderQueue implements
RegionQueue {
}
previousIndex = latestIndexesForRegion.put(key, tailKey);
- } finally {
- lock.writeLock().unlock();
}
if (isDebugEnabled) {
@@ -554,7 +534,7 @@ public class SerialGatewaySenderQueue implements
RegionQueue {
}
/*
- * this must be invoked with lock.writeLock() held
+ * this must be invoked under synchronization
*/
private void removeIndex(Long qkey) {
// Determine whether conflation is enabled for this queue and object
@@ -742,8 +722,7 @@ public class SerialGatewaySenderQueue implements
RegionQueue {
if (tailKey.get() != -1) {
return;
}
- lock.writeLock().lock();
- try {
+ synchronized (this) {
long largestKey = -1;
long largestKeyLessThanHalfMax = -1;
long smallestKey = -1;
@@ -791,8 +770,6 @@ public class SerialGatewaySenderQueue implements
RegionQueue {
logger.debug("{}: Initialized tail key to: {}, head key to: {}", this,
this.tailKey,
this.headKey);
}
- } finally {
- lock.writeLock().unlock();
}
}
@@ -1031,8 +1008,7 @@ public class SerialGatewaySenderQueue implements
RegionQueue {
}
long temp;
- lock.writeLock().lock();
- try {
+ synchronized (SerialGatewaySenderQueue.this) {
temp = lastDispatchedKey;
boolean wasEmpty = temp == lastDestroyedKey;
while (lastDispatchedKey == lastDestroyedKey) {
@@ -1041,8 +1017,6 @@ public class SerialGatewaySenderQueue implements
RegionQueue {
}
if (wasEmpty)
continue;
- } finally {
- lock.writeLock().unlock();
}
// release not needed since disallowOffHeapValues called
EntryEventImpl event = EntryEventImpl.create((LocalRegion) region,
Operation.DESTROY,