Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 cd9e4638f -> 512c3d1a4 refs/heads/trunk 6125b0b86 -> d202ffee7
Reduce likelihood of contention on local paxos state locking Patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for CASSANDRA-7359 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/512c3d1a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/512c3d1a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/512c3d1a Branch: refs/heads/cassandra-2.1 Commit: 512c3d1a4e9f236f22308eab12d5374d24d48a8c Parents: cd9e463 Author: belliottsmith <[email protected]> Authored: Fri Jun 6 20:18:32 2014 +0100 Committer: belliottsmith <[email protected]> Committed: Fri Jun 6 20:18:32 2014 +0100 ---------------------------------------------------------------------- .../apache/cassandra/db/CounterMutation.java | 5 ++- src/java/org/apache/cassandra/db/Keyspace.java | 13 ------- .../cassandra/service/paxos/PaxosState.java | 38 ++++++++++---------- 3 files changed, 23 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/512c3d1a/src/java/org/apache/cassandra/db/CounterMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java index 95f4ce3..2bfdd4e 100644 --- a/src/java/org/apache/cassandra/db/CounterMutation.java +++ b/src/java/org/apache/cassandra/db/CounterMutation.java @@ -27,6 +27,7 @@ import java.util.concurrent.locks.Lock; import com.google.common.base.Function; import com.google.common.base.Objects; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.Striped; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.composites.CellName; @@ -45,6 +46,8 @@ public class CounterMutation implements IMutation { public static final CounterMutationSerializer serializer = new CounterMutationSerializer(); + private static final Striped<Lock> LOCKS = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentCounterWriters() * 1024); + private final Mutation mutation; private final ConsistencyLevel consistency; @@ -146,7 +149,7 @@ public class CounterMutation implements IMutation { long startTime = System.nanoTime(); - for (Lock lock : Keyspace.counterLocksFor(getCounterLockKeys())) + for (Lock lock : LOCKS.bulkGet(getCounterLockKeys())) { long timeout = TimeUnit.MILLISECONDS.toNanos(getTimeout()) - (System.nanoTime() - startTime); try http://git-wip-us.apache.org/repos/asf/cassandra/blob/512c3d1a/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 17d1364..ae9d9ef 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -29,11 +29,9 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; -import java.util.concurrent.locks.Lock; import com.google.common.base.Function; import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.Striped; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,8 +61,6 @@ public class Keyspace private static final Logger logger = LoggerFactory.getLogger(Keyspace.class); - private static final Striped<Lock> counterLocks = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentCounterWriters() * 1024); - // It is possible to call Keyspace.open without a running daemon, so it makes sense to ensure // proper directories here as well as in CassandraDaemon. static @@ -188,15 +184,6 @@ public class Keyspace } /** - * @param keys the keys to grab the locks for - * @return the striped lock instances - */ - public static Iterable<Lock> counterLocksFor(Iterable<Object> keys) - { - return counterLocks.bulkGet(keys); - } - - /** * Take a snapshot of the specific column family, or the entire set of column families * if columnFamily is null with a given timestamp * http://git-wip-us.apache.org/repos/asf/cassandra/blob/512c3d1a/src/java/org/apache/cassandra/service/paxos/PaxosState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java index f893acf..df7365d 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java @@ -1,4 +1,3 @@ -package org.apache.cassandra.service.paxos; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,14 +18,15 @@ package org.apache.cassandra.service.paxos; * under the License. * */ - +package org.apache.cassandra.service.paxos; import java.nio.ByteBuffer; +import java.util.concurrent.locks.Lock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.google.common.util.concurrent.Striped; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SystemKeyspace; @@ -34,19 +34,7 @@ import org.apache.cassandra.tracing.Tracing; public class PaxosState { - private static final Logger logger = LoggerFactory.getLogger(PaxosState.class); - - private static final Object[] locks; - static - { - locks = new Object[1024]; - for (int i = 0; i < locks.length; i++) - locks[i] = new Object(); - } - private static Object lockFor(ByteBuffer key) - { - return locks[(0x7FFFFFFF & key.hashCode()) % locks.length]; - } + private static final Striped<Lock> LOCKS = Striped.lazyWeakLock(DatabaseDescriptor.getConcurrentWriters() * 1024); private final Commit promised; private final Commit accepted; @@ -69,7 +57,9 @@ public class PaxosState public static PrepareResponse prepare(Commit toPrepare) { - synchronized (lockFor(toPrepare.key)) + Lock lock = LOCKS.get(toPrepare.key); + lock.lock(); + try { PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata()); if (toPrepare.isAfter(state.promised)) @@ -85,11 +75,17 @@ public class PaxosState return new PrepareResponse(false, state.promised, state.mostRecentCommit); } } + finally + { + lock.unlock(); + } } public static Boolean propose(Commit proposal) { - synchronized (lockFor(proposal.key)) + Lock lock = LOCKS.get(proposal.key); + lock.lock(); + try { PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata()); if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised)) @@ -104,6 +100,10 @@ public class PaxosState return false; } } + finally + { + lock.unlock(); + } } public static void commit(Commit proposal)
