Repository: cassandra Updated Branches: refs/heads/trunk 0f67b540e -> 7d266b9e7
Retry acquire MV lock on failure instead of throwing WTE on streaming Patch by Benjamin Roth; Reviewed by Paulo Motta for CASSANDRA-12905 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3faa0d92 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3faa0d92 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3faa0d92 Branch: refs/heads/trunk Commit: 3faa0d925791be085b92949a0a0ec20f7e6ae368 Parents: 9fc1ffb Author: brstgt <[email protected]> Authored: Thu Dec 15 12:42:31 2016 -0200 Committer: Paulo Motta <[email protected]> Committed: Thu Dec 15 16:46:00 2016 -0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/Keyspace.java | 92 +++++++++++--------- src/java/org/apache/cassandra/db/Mutation.java | 17 ++-- .../db/commitlog/CommitLogReplayer.java | 10 +-- .../cassandra/service/paxos/PaxosState.java | 9 +- .../cassandra/streaming/StreamReceiveTask.java | 5 +- 6 files changed, 63 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e69bf08..63e095d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.11 + * Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905) * Fix DELETE and UPDATE queries with empty IN restrictions (CASSANDRA-12829) * Mark MVs as built after successful bootstrap (CASSANDRA-12984) * Estimated TS drop-time histogram updated with Cell.NO_DELETION_TIME (CASSANDRA-13040) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index ec5102b..3715995 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -26,6 +26,8 @@ import java.util.concurrent.locks.Lock; import com.google.common.base.Function; import com.google.common.collect.Iterables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; @@ -50,8 +52,6 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.concurrent.OpOrder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * It represents a Keyspace. @@ -62,7 +62,7 @@ public class Keyspace private static final String TEST_FAIL_WRITES_KS = System.getProperty("cassandra.test.fail_writes_ks", ""); private static final boolean TEST_FAIL_WRITES = !TEST_FAIL_WRITES_KS.isEmpty(); - private static int TEST_FAIL_MV_LOCKS_COUNT = Integer.getInteger(System.getProperty("cassandra.test.fail_mv_locks_count", "0"), 0); + private static int TEST_FAIL_MV_LOCKS_COUNT = Integer.getInteger("cassandra.test.fail_mv_locks_count", 0); public final KeyspaceMetrics metric; @@ -379,42 +379,40 @@ public class Keyspace } } - public CompletableFuture<?> apply(Mutation mutation, boolean writeCommitLog) - { - return apply(mutation, writeCommitLog, true, false, null); - } - - /** - * Should be used if caller is blocking and runs in mutation stage. - * Otherwise there is a race condition where ALL mutation workers are beeing blocked ending - * in a complete deadlock of the mutation stage. See CASSANDRA-12689. - * - * @param mutation - * @param writeCommitLog - * @return - */ - public CompletableFuture<?> applyNotDeferrable(Mutation mutation, boolean writeCommitLog) + public CompletableFuture<?> applyFuture(Mutation mutation, boolean writeCommitLog, boolean updateIndexes) { - return apply(mutation, writeCommitLog, true, false, false, null); + return apply(mutation, writeCommitLog, updateIndexes, true, true, null); } - public CompletableFuture<?> apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes) + public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes) { - return apply(mutation, writeCommitLog, updateIndexes, false, null); + apply(mutation, writeCommitLog, updateIndexes, true); } - public CompletableFuture<?> applyFromCommitLog(Mutation mutation) + public void apply(final Mutation mutation, + final boolean writeCommitLog) { - return apply(mutation, false, true, true, null); + apply(mutation, writeCommitLog, true, true); } - public CompletableFuture<?> apply(final Mutation mutation, - final boolean writeCommitLog, - boolean updateIndexes, - boolean isClReplay, - CompletableFuture<?> future) + /** + * If apply is blocking, apply must not be deferred + * Otherwise there is a race condition where ALL mutation workers are beeing blocked ending + * in a complete deadlock of the mutation stage. See CASSANDRA-12689. + * + * @param mutation the row to write. Must not be modified after calling apply, since commitlog append + * may happen concurrently, depending on the CL Executor type. + * @param writeCommitLog false to disable commitlog append entirely + * @param updateIndexes false to disable index updates (used by CollationController "defragmenting") + * @param isDroppable true if this should throw WriteTimeoutException if it does not acquire lock within write_request_timeout_in_ms + * @throws ExecutionException + */ + public void apply(final Mutation mutation, + final boolean writeCommitLog, + boolean updateIndexes, + boolean isDroppable) { - return apply(mutation, writeCommitLog, updateIndexes, isClReplay, true, future); + apply(mutation, writeCommitLog, updateIndexes, isDroppable, false, null); } /** @@ -424,13 +422,13 @@ public class Keyspace * may happen concurrently, depending on the CL Executor type. * @param writeCommitLog false to disable commitlog append entirely * @param updateIndexes false to disable index updates (used by CollationController "defragmenting") - * @param isClReplay true if caller is the commitlog replayer + * @param isDroppable true if this should throw WriteTimeoutException if it does not acquire lock within write_request_timeout_in_ms * @param isDeferrable true if caller is not waiting for future to complete, so that future may be deferred */ - public CompletableFuture<?> apply(final Mutation mutation, + private CompletableFuture<?> apply(final Mutation mutation, final boolean writeCommitLog, boolean updateIndexes, - boolean isClReplay, + boolean isDroppable, boolean isDeferrable, CompletableFuture<?> future) { @@ -438,7 +436,11 @@ public class Keyspace throw new RuntimeException("Testing write failures"); boolean requiresViewUpdate = updateIndexes && viewManager.updatesAffectView(Collections.singleton(mutation), false); - final CompletableFuture<?> mark = future == null ? new CompletableFuture<>() : future; + + // If apply is not deferrable, no future is required, returns always null + if (isDeferrable && future == null) { + future = new CompletableFuture<>(); + } Lock lock = null; if (requiresViewUpdate) @@ -453,15 +455,15 @@ public class Keyspace if (lock == null) { - // avoid throwing a WTE during commitlog replay - if (!isClReplay && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout()) + //throw WTE only if request is droppable + if (isDroppable && (System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout()) { logger.trace("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey())); Tracing.trace("Could not acquire MV lock"); if (future != null) { future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE, 0, 1)); - return mark; + return future; } else { @@ -472,11 +474,12 @@ public class Keyspace { //This view update can't happen right now. so rather than keep this thread busy // we will re-apply ourself to the queue and try again later + final CompletableFuture<?> mark = future; StageManager.getStage(Stage.MUTATION).execute(() -> - apply(mutation, writeCommitLog, true, isClReplay, mark) + apply(mutation, writeCommitLog, true, isDroppable, true, mark) ); - return mark; + return future; } else { @@ -499,7 +502,9 @@ public class Keyspace else { long acquireTime = System.currentTimeMillis() - mutation.viewLockAcquireStart.get(); - if (!isClReplay) + // Metrics are only collected for droppable write operations + // Bulk non-droppable operations (e.g. commitlog replay, hint delivery) are not measured + if (isDroppable) { for (UUID cfid : mutation.getColumnFamilyIds()) columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime, TimeUnit.MILLISECONDS); @@ -534,7 +539,7 @@ public class Keyspace try { Tracing.trace("Creating materialized view mutations from base table replica"); - viewManager.forTable(upd.metadata()).pushViewReplicaUpdates(upd, writeCommitLog && !isClReplay, baseComplete); + viewManager.forTable(upd.metadata()).pushViewReplicaUpdates(upd, writeCommitLog, baseComplete); } catch (Throwable t) { @@ -553,8 +558,11 @@ public class Keyspace if (requiresViewUpdate) baseComplete.set(System.currentTimeMillis()); } - mark.complete(null); - return mark; + + if (future != null) { + future.complete(null); + } + return future; } finally { http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index 2955677..7ed69c0 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -202,20 +202,17 @@ public class Mutation implements IMutation public CompletableFuture<?> applyFuture() { Keyspace ks = Keyspace.open(keyspaceName); - return ks.apply(this, Keyspace.open(keyspaceName).getMetadata().params.durableWrites); + return ks.applyFuture(this, Keyspace.open(keyspaceName).getMetadata().params.durableWrites, true); + } + + public void apply(boolean durableWrites, boolean isDroppable) + { + Keyspace.open(keyspaceName).apply(this, durableWrites, true, isDroppable); } public void apply(boolean durableWrites) { - try - { - Keyspace ks = Keyspace.open(keyspaceName); - Uninterruptibles.getUninterruptibly(ks.applyNotDeferrable(this, durableWrites)); - } - catch (ExecutionException e) - { - throw Throwables.propagate(e.getCause()); - } + apply(durableWrites, true); } /* http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index af8efb4..d53f0f8 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -642,15 +642,7 @@ public class CommitLogReplayer { assert !newMutation.isEmpty(); - try - { - Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation)); - } - catch (ExecutionException e) - { - throw Throwables.propagate(e.getCause()); - } - + Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false, true, false); keyspacesRecovered.add(keyspace); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/service/paxos/PaxosState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java index 0940950..ee1ba6a 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java @@ -147,14 +147,7 @@ public class PaxosState { Tracing.trace("Committing proposal {}", proposal); Mutation mutation = proposal.makeMutation(); - try - { - Uninterruptibles.getUninterruptibly(Keyspace.open(mutation.getKeyspaceName()).applyNotDeferrable(mutation, true)); - } - catch (ExecutionException e) - { - throw Throwables.propagate(e.getCause()); - } + Keyspace.open(mutation.getKeyspaceName()).apply(mutation, true); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 040906b..b6b8387 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -178,14 +178,15 @@ public class StreamReceiveTask extends StreamTask { for (SSTableReader reader : readers) { + Keyspace ks = Keyspace.open(reader.getKeyspaceName()); try (ISSTableScanner scanner = reader.getScanner()) { while (scanner.hasNext()) { try (UnfilteredRowIterator rowIterator = scanner.next()) { - //Apply unsafe (we will flush below before transaction is done) - new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe(); + // MV *can* be applied unsafe as we flush below before transaction is done. + ks.apply(new Mutation(PartitionUpdate.fromIterator(rowIterator)), false, true, false); } } }
