Avoid deadlock due to MV lock contention

Patch by Benjamin Roth; reviewed by Tyler Hobbs for CASSANDRA-12689


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d38a732c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d38a732c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d38a732c

Branch: refs/heads/cassandra-3.X
Commit: d38a732ce15caab57ce6dddb3c0d6a436506db29
Parents: e4f840a
Author: brstgt <brs...@googlemail.com>
Authored: Fri Oct 28 15:39:03 2016 -0500
Committer: Tyler Hobbs <tylerlho...@gmail.com>
Committed: Fri Oct 28 15:39:03 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |   2 +-
 src/java/org/apache/cassandra/db/Keyspace.java  | 107 ++++++++++++++-----
 src/java/org/apache/cassandra/db/Mutation.java  |  12 +--
 .../cassandra/service/paxos/PaxosState.java     |   2 +-
 5 files changed, 87 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38a732c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bf1e7d6..c80e045 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.10
+ * Avoid deadlock due to materialized view lock contention (CASSANDRA-12689)
  * Fix for KeyCacheCqlTest flakiness (CASSANDRA-12801)
  * Include SSTable filename in compacting large row message (CASSANDRA-12384)
  * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38a732c/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index baea210..7b32a34 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -392,7 +392,7 @@ public class DatabaseDescriptor
             throw new ConfigurationException("concurrent_reads must be at 
least 2, but was " + conf.concurrent_reads, false);
         }
 
-        if (conf.concurrent_writes != null && conf.concurrent_writes < 2)
+        if (conf.concurrent_writes != null && conf.concurrent_writes < 2 && 
System.getProperty("cassandra.test.fail_mv_locks_count", "").isEmpty())
         {
             throw new ConfigurationException("concurrent_writes must be at 
least 2, but was " + conf.concurrent_writes, false);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38a732c/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java 
b/src/java/org/apache/cassandra/db/Keyspace.java
index 8d710d1..75aab8f 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -63,6 +63,7 @@ public class Keyspace
 
     private static final String TEST_FAIL_WRITES_KS = 
System.getProperty("cassandra.test.fail_writes_ks", "");
     private static final boolean TEST_FAIL_WRITES = 
!TEST_FAIL_WRITES_KS.isEmpty();
+    private static int TEST_FAIL_MV_LOCKS_COUNT = 
Integer.getInteger(System.getProperty("cassandra.test.fail_mv_locks_count", 
"0"), 0);
 
     public final KeyspaceMetrics metric;
 
@@ -384,6 +385,20 @@ public class Keyspace
         return apply(mutation, writeCommitLog, true, false, null);
     }
 
+    /**
+     * Should be used if caller is blocking and runs in mutation stage.
+     * Otherwise there is a race condition where ALL mutation workers are 
beeing blocked ending
+     * in a complete deadlock of the mutation stage. See CASSANDRA-12689.
+     *
+     * @param mutation
+     * @param writeCommitLog
+     * @return
+     */
+    public CompletableFuture<?> applyNotDeferrable(Mutation mutation, boolean 
writeCommitLog)
+    {
+        return apply(mutation, writeCommitLog, true, false, false, null);
+    }
+
     public CompletableFuture<?> apply(Mutation mutation, boolean 
writeCommitLog, boolean updateIndexes)
     {
         return apply(mutation, writeCommitLog, updateIndexes, false, null);
@@ -394,6 +409,15 @@ public class Keyspace
         return apply(mutation, false, true, true, null);
     }
 
+    public CompletableFuture<?> apply(final Mutation mutation,
+                                      final boolean writeCommitLog,
+                                      boolean updateIndexes,
+                                      boolean isClReplay,
+                                      CompletableFuture<?> future)
+    {
+        return apply(mutation, writeCommitLog, updateIndexes, isClReplay, 
true, future);
+    }
+
     /**
      * This method appends a row to the global CommitLog, then updates 
memtables and indexes.
      *
@@ -402,57 +426,86 @@ public class Keyspace
      * @param writeCommitLog false to disable commitlog append entirely
      * @param updateIndexes  false to disable index updates (used by 
CollationController "defragmenting")
      * @param isClReplay     true if caller is the commitlog replayer
+     * @param isDeferrable   true if caller is not waiting for future to 
complete, so that future may be deferred
      */
     public CompletableFuture<?> apply(final Mutation mutation,
                                       final boolean writeCommitLog,
                                       boolean updateIndexes,
                                       boolean isClReplay,
+                                      boolean isDeferrable,
                                       CompletableFuture<?> future)
     {
         if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS))
             throw new RuntimeException("Testing write failures");
 
-        Lock lock = null;
         boolean requiresViewUpdate = updateIndexes && 
viewManager.updatesAffectView(Collections.singleton(mutation), false);
         final CompletableFuture<?> mark = future == null ? new 
CompletableFuture<>() : future;
 
+        Lock lock = null;
         if (requiresViewUpdate)
         {
             mutation.viewLockAcquireStart.compareAndSet(0L, 
System.currentTimeMillis());
-            lock = ViewManager.acquireLockFor(mutation.key().getKey());
-
-            if (lock == null)
+            while (true)
             {
-                // avoid throwing a WTE during commitlog replay
-                if (!isClReplay && (System.currentTimeMillis() - 
mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
-                {
-                    logger.trace("Could not acquire lock for {}", 
ByteBufferUtil.bytesToHex(mutation.key().getKey()));
-                    Tracing.trace("Could not acquire MV lock");
-                    if (future != null)
-                        future.completeExceptionally(new 
WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1));
-                    else
-                        throw new WriteTimeoutException(WriteType.VIEW, 
ConsistencyLevel.LOCAL_ONE, 0, 1);
-                }
+                if (TEST_FAIL_MV_LOCKS_COUNT == 0)
+                    lock = ViewManager.acquireLockFor(mutation.key().getKey());
                 else
+                    TEST_FAIL_MV_LOCKS_COUNT--;
+
+                if (lock == null)
                 {
-                    //This view update can't happen right now. so rather than 
keep this thread busy
-                    // we will re-apply ourself to the queue and try again 
later
-                    StageManager.getStage(Stage.MUTATION).execute(() ->
-                        apply(mutation, writeCommitLog, true, isClReplay, mark)
-                    );
+                    // avoid throwing a WTE during commitlog replay
+                    if (!isClReplay && (System.currentTimeMillis() - 
mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
+                    {
+                        logger.trace("Could not acquire lock for {}", 
ByteBufferUtil.bytesToHex(mutation.key().getKey()));
+                        Tracing.trace("Could not acquire MV lock");
+                        if (future != null)
+                        {
+                            future.completeExceptionally(new 
WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1));
+                            return mark;
+                        }
+                        else
+                        {
+                            throw new WriteTimeoutException(WriteType.VIEW, 
ConsistencyLevel.LOCAL_ONE, 0, 1);
+                        }
+                    }
+                    else if (isDeferrable)
+                    {
+                        //This view update can't happen right now. so rather 
than keep this thread busy
+                        // we will re-apply ourself to the queue and try again 
later
+                        StageManager.getStage(Stage.MUTATION).execute(() ->
+                                apply(mutation, writeCommitLog, true, 
isClReplay, mark)
+                        );
 
-                    return mark;
+                        return mark;
+                    }
+                    else
+                    {
+                        // Retry lock on same thread, if mutation is not 
deferrable.
+                        // Mutation is not deferrable, if applied from 
MutationStage and caller is waiting for future to finish
+                        // If blocking caller defers future, this may lead to 
deadlock situation with all MutationStage workers
+                        // being blocked by waiting for futures which will 
never be processed as all workers are blocked
+                        try
+                        {
+                            // Wait a little bit before retrying to lock
+                            Thread.sleep(10);
+                        }
+                        catch (InterruptedException e)
+                        {
+                            // Just continue
+                        }
+                        // continue in while loop
+                    }
                 }
-            }
-            else
-            {
-                long acquireTime = System.currentTimeMillis() - 
mutation.viewLockAcquireStart.get();
-                if (!isClReplay)
+                else
                 {
-                    for(UUID cfid : mutation.getColumnFamilyIds())
+                    long acquireTime = System.currentTimeMillis() - 
mutation.viewLockAcquireStart.get();
+                    if (!isClReplay)
                     {
-                        
columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime, 
TimeUnit.MILLISECONDS);
+                        for (UUID cfid : mutation.getColumnFamilyIds())
+                            
columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime, 
TimeUnit.MILLISECONDS);
                     }
+                    break;
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38a732c/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java 
b/src/java/org/apache/cassandra/db/Mutation.java
index c6ad9b8..2955677 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -199,22 +199,18 @@ public class Mutation implements IMutation
         return new Mutation(ks, key, modifications);
     }
 
-    private CompletableFuture<?> applyFuture(boolean durableWrites)
-    {
-        Keyspace ks = Keyspace.open(keyspaceName);
-        return ks.apply(this, durableWrites);
-    }
-
     public CompletableFuture<?> applyFuture()
     {
-        return 
applyFuture(Keyspace.open(keyspaceName).getMetadata().params.durableWrites);
+        Keyspace ks = Keyspace.open(keyspaceName);
+        return ks.apply(this, 
Keyspace.open(keyspaceName).getMetadata().params.durableWrites);
     }
 
     public void apply(boolean durableWrites)
     {
         try
         {
-            Uninterruptibles.getUninterruptibly(applyFuture(durableWrites));
+            Keyspace ks = Keyspace.open(keyspaceName);
+            Uninterruptibles.getUninterruptibly(ks.applyNotDeferrable(this, 
durableWrites));
         }
         catch (ExecutionException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38a732c/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java 
b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index e01f568..0940950 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -149,7 +149,7 @@ public class PaxosState
                 Mutation mutation = proposal.makeMutation();
                 try
                 {
-                    
Uninterruptibles.getUninterruptibly(Keyspace.open(mutation.getKeyspaceName()).apply(mutation,
 true));
+                    
Uninterruptibles.getUninterruptibly(Keyspace.open(mutation.getKeyspaceName()).applyNotDeferrable(mutation,
 true));
                 }
                 catch (ExecutionException e)
                 {

Reply via email to