This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 6b79dab GEODE-8339: fix Redis Rename hang (#5501)
6b79dab is described below
commit 6b79dab953089979657bc9763321765f45c0f37e
Author: Ray Ingles <[email protected]>
AuthorDate: Thu Sep 10 15:36:09 2020 -0400
GEODE-8339: fix Redis Rename hang (#5501)
The hang was caused by a thread holding a read lock, the rebalance waiting
for that thread so it could get the write lock, and then another thread waiting
to get the same read lock that is now blocked behind the write lock. This other
thread needs to complete before the first thread will release its read lock so
we ended up deadlocked.
Now the second thread is told that the read lock is already held on hits
behalf so it does not try to obtain it again.
Co-authored-by: Ray Ingles <[email protected]>
Co-authored-by: Sarah <[email protected]>
---
.../internal/cache/AbstractBucketRegionQueue.java | 4 +--
.../apache/geode/internal/cache/BucketRegion.java | 40 ++++++++++++++++------
.../geode/internal/cache/BucketRegionQueue.java | 4 +--
.../sanctioned-geode-core-serializables.txt | 1 +
.../executor/CrashAndNoRepeatDUnitTest.java | 9 ++---
.../redis/internal/data/AbstractRedisData.java | 12 +++++--
.../key/RedisKeyCommandsFunctionInvoker.java | 7 ++--
.../internal/executor/key/RenameFunction.java | 2 +-
8 files changed, 52 insertions(+), 27 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
index fd8be1d..0813eab 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
@@ -294,11 +294,11 @@ public abstract class AbstractBucketRegionQueue extends
BucketRegion {
@Override
public boolean virtualPut(EntryEventImpl event, boolean ifNew, boolean ifOld,
Object expectedOldValue, boolean requireOldValue, long lastModified,
- boolean overwriteDestroyed, boolean invokeCallbacks, boolean
throwConcurrentModificaiton)
+ boolean overwriteDestroyed, boolean invokeCallbacks, boolean
throwConcurrentModification)
throws TimeoutException, CacheWriterException {
try {
boolean success = super.virtualPut(event, ifNew, ifOld,
expectedOldValue, requireOldValue,
- lastModified, overwriteDestroyed, invokeCallbacks,
throwConcurrentModificaiton);
+ lastModified, overwriteDestroyed, invokeCallbacks,
throwConcurrentModification);
if (success) {
if (logger.isDebugEnabled()) {
logger.debug("Key : ----> {}", event.getKey());
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 0136de6..8662a6e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -18,6 +18,7 @@ package org.apache.geode.internal.cache;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
+import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -517,9 +518,11 @@ public class BucketRegion extends DistributedRegion
implements Bucket {
@Override
public boolean virtualPut(EntryEventImpl event, boolean ifNew, boolean ifOld,
Object expectedOldValue, boolean requireOldValue, long lastModified,
- boolean overwriteDestroyed, boolean invokeCallbacks, boolean
throwConcurrentModificaiton)
+ boolean overwriteDestroyed, boolean invokeCallbacks,
+ boolean throwConcurrentModification)
throws TimeoutException, CacheWriterException {
- boolean locked = lockKeysAndPrimary(event);
+
+ boolean isLocked = lockKeysAndPrimary(event);
try {
if (partitionedRegion.isParallelWanEnabled()) {
@@ -551,7 +554,7 @@ public class BucketRegion extends DistributedRegion
implements Bucket {
}
return true;
} finally {
- if (locked) {
+ if (isLocked) {
releaseLockForKeysAndPrimary(event);
}
}
@@ -753,7 +756,11 @@ public class BucketRegion extends DistributedRegion
implements Bucket {
Object[] keys = getKeysToBeLocked(event);
waitUntilLocked(keys); // it might wait for long time
+ if (wasPrimaryLockedPreviously(event)) {
+ return true;
+ }
boolean lockedForPrimary = false;
+
try {
lockedForPrimary = doLockForPrimary(false);
// tryLock is false means doLockForPrimary won't return false.
@@ -872,7 +879,9 @@ public class BucketRegion extends DistributedRegion
implements Bucket {
* And release/remove the lockObject on the key(s)
*/
void releaseLockForKeysAndPrimary(EntryEventImpl event) {
- doUnlockForPrimary();
+ if (!wasPrimaryLockedPreviously(event)) {
+ doUnlockForPrimary();
+ }
Object[] keys = getKeysToBeLocked(event);
removeAndNotifyKeys(keys);
@@ -1214,6 +1223,13 @@ public class BucketRegion extends DistributedRegion
implements Bucket {
}
}
+ public static class PrimaryMoveReadLockAcquired implements Serializable {
+ };
+
+ private boolean wasPrimaryLockedPreviously(EntryEventImpl event) {
+ return event.getCallbackArgument() instanceof PrimaryMoveReadLockAcquired;
+ }
+
protected void distributeDestroyOperation(EntryEventImpl event) {
long token = -1;
DestroyOperation op = null;
@@ -2093,9 +2109,7 @@ public class BucketRegion extends DistributedRegion
implements Bucket {
// if GII has failed, because there is not primary. So it's safe to set
these
// counters to 0.
oldMemValue = bytesInMemory.getAndSet(0);
- }
-
- else {
+ } else {
throw new InternalGemFireError(
"Trying to clear a bucket region that was not destroyed or in
initialization.");
}
@@ -2255,8 +2269,9 @@ public class BucketRegion extends DistributedRegion
implements Bucket {
final int memoryDelta = op.computeMemoryDelta(oldSize, newSize);
- if (memoryDelta == 0)
+ if (memoryDelta == 0) {
return;
+ }
// do the bigger one first to keep the sum > 0
updateBucketMemoryStats(memoryDelta);
}
@@ -2308,8 +2323,9 @@ public class BucketRegion extends DistributedRegion
implements Bucket {
}
public void incNumOverflowBytesOnDisk(long delta) {
- if (delta == 0)
+ if (delta == 0) {
return;
+ }
numOverflowBytesOnDisk.addAndGet(delta);
// The following could be reenabled at a future time.
// I deadcoded for now to make sure I didn't have it break
@@ -2346,11 +2362,13 @@ public class BucketRegion extends DistributedRegion
implements Bucket {
public int getSizeForEviction() {
EvictionAttributes ea = getAttributes().getEvictionAttributes();
- if (ea == null)
+ if (ea == null) {
return 0;
+ }
EvictionAlgorithm algo = ea.getAlgorithm();
- if (!algo.isLRUHeap())
+ if (!algo.isLRUHeap()) {
return 0;
+ }
EvictionAction action = ea.getAction();
return action.isLocalDestroy() ? getRegionMap().sizeInVM() : (int)
getNumEntriesInVM();
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 198ba87..f9d5ab7 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -251,11 +251,11 @@ public class BucketRegionQueue extends
AbstractBucketRegionQueue {
@Override
public boolean virtualPut(EntryEventImpl event, boolean ifNew, boolean ifOld,
Object expectedOldValue, boolean requireOldValue, long lastModified,
- boolean overwriteDestroyed, boolean invokeCallbacks, boolean
throwConcurrentModificaiton)
+ boolean overwriteDestroyed, boolean invokeCallbacks, boolean
throwConcurrentModification)
throws TimeoutException, CacheWriterException {
try {
boolean success = super.virtualPut(event, ifNew, ifOld,
expectedOldValue, requireOldValue,
- lastModified, overwriteDestroyed, invokeCallbacks,
throwConcurrentModificaiton);
+ lastModified, overwriteDestroyed, invokeCallbacks,
throwConcurrentModification);
if (success) {
if (getPartitionedRegion().getColocatedWith() == null) {
diff --git
a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
index 2f14f8b..652d1b2 100644
---
a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
+++
b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
@@ -272,6 +272,7 @@
org/apache/geode/internal/admin/remote/DistributionLocatorId,true,65873901869719
org/apache/geode/internal/admin/remote/EntryValueNodeImpl,false,fields:org/apache/geode/internal/admin/remote/EntryValueNodeImpl[],name:java/lang/String,primitive:boolean,primitiveVal:java/lang/Object,type:java/lang/String
org/apache/geode/internal/cache/BucketAdvisor$SetFromMap,true,2454657854757543876,m:java/util/Map
org/apache/geode/internal/cache/BucketNotFoundException,true,2898657229184289911
+org/apache/geode/internal/cache/BucketRegion$PrimaryMoveReadLockAcquired,false
org/apache/geode/internal/cache/BucketRegion$SizeOp,false
org/apache/geode/internal/cache/CacheClientStatus,true,-56148046466517217,_id:org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID,_memberId:java/lang/String,_numberOfConnections:int,_socketAddresses:java/util/List,_socketPorts:java/util/List
org/apache/geode/internal/cache/CommitReplyException,true,-7711083075296622596,exceptions:java/util/Set
diff --git
a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java
b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java
index 2ef20a8..00dfbbb 100644
---
a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java
+++
b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java
@@ -34,11 +34,11 @@ import org.apache.logging.log4j.Logger;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
+import redis.clients.jedis.exceptions.JedisDataException;
import org.apache.geode.cache.control.RebalanceFactory;
import org.apache.geode.cache.control.RebalanceResults;
@@ -176,7 +176,6 @@ public class CrashAndNoRepeatDUnitTest {
}
@Test
- @Ignore("GEODE-8339")
public void givenServerCrashesDuringRename_thenDataIsNotLost() throws
Exception {
AtomicBoolean running1 = new AtomicBoolean(true);
AtomicBoolean running2 = new AtomicBoolean(true);
@@ -245,13 +244,15 @@ public class CrashAndNoRepeatDUnitTest {
try {
jedisRef.get().rename(oldKey, newKey);
iterationCount += 1;
- } catch (JedisConnectionException ex) {
+ } catch (JedisConnectionException | JedisDataException ex) {
if (ex.getMessage().contains("Unexpected end of stream.")) {
if (!doWithRetry(() -> connect(jedisRef).exists(oldKey))) {
iterationCount += 1;
}
} else if (ex.getMessage().contains("no such key")) {
- iterationCount += 1;
+ if (!doWithRetry(() -> connect(jedisRef).exists(oldKey))) {
+ iterationCount += 1;
+ }
} else {
throw ex;
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
index 2d90ad1..a07f652 100644
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
@@ -24,7 +24,9 @@ import java.util.Objects;
import org.apache.geode.DataSerializer;
import org.apache.geode.InvalidDeltaException;
+import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.redis.internal.delta.AddsDeltaInfo;
import org.apache.geode.redis.internal.delta.AppendDeltaInfo;
import org.apache.geode.redis.internal.delta.DeltaInfo;
@@ -33,6 +35,9 @@ import org.apache.geode.redis.internal.delta.RemsDeltaInfo;
import org.apache.geode.redis.internal.delta.TimestampDeltaInfo;
public abstract class AbstractRedisData implements RedisData {
+ private static final BucketRegion.PrimaryMoveReadLockAcquired
primaryMoveReadLockAcquired =
+ new BucketRegion.PrimaryMoveReadLockAcquired();
+
@Override
public String toString() {
return "expirationTimestamp=" + expirationTimestamp;
@@ -81,8 +86,11 @@ public abstract class AbstractRedisData implements RedisData
{
@Override
public boolean rename(Region<ByteArrayWrapper, RedisData> region,
ByteArrayWrapper oldKey,
ByteArrayWrapper newKey) {
- region.put(newKey, this);
- region.remove(oldKey);
+ region.put(newKey, this, primaryMoveReadLockAcquired);
+ try {
+ region.destroy(oldKey, primaryMoveReadLockAcquired);
+ } catch (EntryNotFoundException ignore) {
+ }
return true;
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RedisKeyCommandsFunctionInvoker.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RedisKeyCommandsFunctionInvoker.java
index 00f12af..d7a2b13 100644
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RedisKeyCommandsFunctionInvoker.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RedisKeyCommandsFunctionInvoker.java
@@ -31,9 +31,8 @@ import org.apache.geode.redis.internal.data.RedisData;
import org.apache.geode.redis.internal.executor.RedisCommandsFunctionInvoker;
/**
- * This class is used by netty redis key command executors
- * to invoke a geode function that will run on a
- * particular server to do the redis command.
+ * This class is used by netty redis key command executors to invoke a geode
function that will run
+ * on a particular server to do the redis command.
*/
public class RedisKeyCommandsFunctionInvoker extends
RedisCommandsFunctionInvoker
implements RedisKeyCommands {
@@ -74,9 +73,7 @@ public class RedisKeyCommandsFunctionInvoker extends
RedisCommandsFunctionInvoke
@Override
- @SuppressWarnings("unchecked")
public boolean rename(ByteArrayWrapper oldKey, ByteArrayWrapper newKey) {
-
if (!region.containsKey(oldKey)) {
return false;
}
diff --git
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameFunction.java
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameFunction.java
index 6f0c013..e8c85bd 100644
---
a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameFunction.java
+++
b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameFunction.java
@@ -51,7 +51,6 @@ public class RenameFunction implements InternalFunction {
FunctionService.registerFunction(new RenameFunction(dataRegion,
stripedExecutor, redisStats));
}
-
public RenameFunction(
Region<ByteArrayWrapper, RedisData> dataRegion,
StripedExecutor stripedExecutor,
@@ -72,6 +71,7 @@ public class RenameFunction implements InternalFunction {
};
partitionedRegion.computeWithPrimaryLocked(renameContext.getKeyToLock(),
computation);
+
} else {
Object result = acquireLockIfNeeded(renameContext);
context.getResultSender().lastResult(result);