Avoid deadlock due to MV lock contention Patch by Benjamin Roth; reviewed by Tyler Hobbs for CASSANDRA-12689
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d38a732c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d38a732c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d38a732c Branch: refs/heads/cassandra-3.X Commit: d38a732ce15caab57ce6dddb3c0d6a436506db29 Parents: e4f840a Author: brstgt <brs...@googlemail.com> Authored: Fri Oct 28 15:39:03 2016 -0500 Committer: Tyler Hobbs <tylerlho...@gmail.com> Committed: Fri Oct 28 15:39:03 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/config/DatabaseDescriptor.java | 2 +- src/java/org/apache/cassandra/db/Keyspace.java | 107 ++++++++++++++----- src/java/org/apache/cassandra/db/Mutation.java | 12 +-- .../cassandra/service/paxos/PaxosState.java | 2 +- 5 files changed, 87 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38a732c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index bf1e7d6..c80e045 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.10 + * Avoid deadlock due to materialized view lock contention (CASSANDRA-12689) * Fix for KeyCacheCqlTest flakiness (CASSANDRA-12801) * Include SSTable filename in compacting large row message (CASSANDRA-12384) * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38a732c/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index baea210..7b32a34 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -392,7 +392,7 @@ public class DatabaseDescriptor throw new ConfigurationException("concurrent_reads must be at least 2, but was " + conf.concurrent_reads, false); } - if (conf.concurrent_writes != null && conf.concurrent_writes < 2) + if (conf.concurrent_writes != null && conf.concurrent_writes < 2 && System.getProperty("cassandra.test.fail_mv_locks_count", "").isEmpty()) { throw new ConfigurationException("concurrent_writes must be at least 2, but was " + conf.concurrent_writes, false); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38a732c/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 8d710d1..75aab8f 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -63,6 +63,7 @@ public class Keyspace private static final String TEST_FAIL_WRITES_KS = System.getProperty("cassandra.test.fail_writes_ks", ""); private static final boolean TEST_FAIL_WRITES = !TEST_FAIL_WRITES_KS.isEmpty(); + private static int TEST_FAIL_MV_LOCKS_COUNT = Integer.getInteger(System.getProperty("cassandra.test.fail_mv_locks_count", "0"), 0); public final KeyspaceMetrics metric; @@ -384,6 +385,20 @@ public class Keyspace return apply(mutation, writeCommitLog, true, false, null); } + /** + * Should be used if caller is blocking and runs in mutation stage. + * Otherwise there is a race condition where ALL mutation workers are beeing blocked ending + * in a complete deadlock of the mutation stage. See CASSANDRA-12689. + * + * @param mutation + * @param writeCommitLog + * @return + */ + public CompletableFuture<?> applyNotDeferrable(Mutation mutation, boolean writeCommitLog) + { + return apply(mutation, writeCommitLog, true, false, false, null); + } + public CompletableFuture<?> apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes) { return apply(mutation, writeCommitLog, updateIndexes, false, null); @@ -394,6 +409,15 @@ public class Keyspace return apply(mutation, false, true, true, null); } + public CompletableFuture<?> apply(final Mutation mutation, + final boolean writeCommitLog, + boolean updateIndexes, + boolean isClReplay, + CompletableFuture<?> future) + { + return apply(mutation, writeCommitLog, updateIndexes, isClReplay, true, future); + } + /** * This method appends a row to the global CommitLog, then updates memtables and indexes. * @@ -402,57 +426,86 @@ public class Keyspace * @param writeCommitLog false to disable commitlog append entirely * @param updateIndexes false to disable index updates (used by CollationController "defragmenting") * @param isClReplay true if caller is the commitlog replayer + * @param isDeferrable true if caller is not waiting for future to complete, so that future may be deferred */ public CompletableFuture<?> apply(final Mutation mutation, final boolean writeCommitLog, boolean updateIndexes, boolean isClReplay, + boolean isDeferrable, CompletableFuture<?> future) { if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS)) throw new RuntimeException("Testing write failures"); - Lock lock = null; boolean requiresViewUpdate = updateIndexes && viewManager.updatesAffectView(Collections.singleton(mutation), false); final CompletableFuture<?> mark = future == null ? new CompletableFuture<>() : future; + Lock lock = null; if (requiresViewUpdate) { mutation.viewLockAcquireStart.compareAndSet(0L, System.currentTimeMillis()); - lock = ViewManager.acquireLockFor(mutation.key().getKey()); - - if (lock == null) + while (true) { - // avoid throwing a WTE during commitlog replay - if (!isClReplay && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout()) - { - logger.trace("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey())); - Tracing.trace("Could not acquire MV lock"); - if (future != null) - future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1)); - else - throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1); - } + if (TEST_FAIL_MV_LOCKS_COUNT == 0) + lock = ViewManager.acquireLockFor(mutation.key().getKey()); else + TEST_FAIL_MV_LOCKS_COUNT--; + + if (lock == null) { - //This view update can't happen right now. so rather than keep this thread busy - // we will re-apply ourself to the queue and try again later - StageManager.getStage(Stage.MUTATION).execute(() -> - apply(mutation, writeCommitLog, true, isClReplay, mark) - ); + // avoid throwing a WTE during commitlog replay + if (!isClReplay && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout()) + { + logger.trace("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey())); + Tracing.trace("Could not acquire MV lock"); + if (future != null) + { + future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1)); + return mark; + } + else + { + throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1); + } + } + else if (isDeferrable) + { + //This view update can't happen right now. so rather than keep this thread busy + // we will re-apply ourself to the queue and try again later + StageManager.getStage(Stage.MUTATION).execute(() -> + apply(mutation, writeCommitLog, true, isClReplay, mark) + ); - return mark; + return mark; + } + else + { + // Retry lock on same thread, if mutation is not deferrable. + // Mutation is not deferrable, if applied from MutationStage and caller is waiting for future to finish + // If blocking caller defers future, this may lead to deadlock situation with all MutationStage workers + // being blocked by waiting for futures which will never be processed as all workers are blocked + try + { + // Wait a little bit before retrying to lock + Thread.sleep(10); + } + catch (InterruptedException e) + { + // Just continue + } + // continue in while loop + } } - } - else - { - long acquireTime = System.currentTimeMillis() - mutation.viewLockAcquireStart.get(); - if (!isClReplay) + else { - for(UUID cfid : mutation.getColumnFamilyIds()) + long acquireTime = System.currentTimeMillis() - mutation.viewLockAcquireStart.get(); + if (!isClReplay) { - columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime, TimeUnit.MILLISECONDS); + for (UUID cfid : mutation.getColumnFamilyIds()) + columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime, TimeUnit.MILLISECONDS); } + break; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38a732c/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index c6ad9b8..2955677 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -199,22 +199,18 @@ public class Mutation implements IMutation return new Mutation(ks, key, modifications); } - private CompletableFuture<?> applyFuture(boolean durableWrites) - { - Keyspace ks = Keyspace.open(keyspaceName); - return ks.apply(this, durableWrites); - } - public CompletableFuture<?> applyFuture() { - return applyFuture(Keyspace.open(keyspaceName).getMetadata().params.durableWrites); + Keyspace ks = Keyspace.open(keyspaceName); + return ks.apply(this, Keyspace.open(keyspaceName).getMetadata().params.durableWrites); } public void apply(boolean durableWrites) { try { - Uninterruptibles.getUninterruptibly(applyFuture(durableWrites)); + Keyspace ks = Keyspace.open(keyspaceName); + Uninterruptibles.getUninterruptibly(ks.applyNotDeferrable(this, durableWrites)); } catch (ExecutionException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38a732c/src/java/org/apache/cassandra/service/paxos/PaxosState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java index e01f568..0940950 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java @@ -149,7 +149,7 @@ public class PaxosState Mutation mutation = proposal.makeMutation(); try { - Uninterruptibles.getUninterruptibly(Keyspace.open(mutation.getKeyspaceName()).apply(mutation, true)); + Uninterruptibles.getUninterruptibly(Keyspace.open(mutation.getKeyspaceName()).applyNotDeferrable(mutation, true)); } catch (ExecutionException e) {