Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 618441b97 -> 5d4740c58
Always send Paxos commit to all replicas patch by kohlisankalp; reviewed by slebresne for CASSANDRA-7479 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5d4740c5 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5d4740c5 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5d4740c5 Branch: refs/heads/cassandra-2.0 Commit: 5d4740c5841c9c3a8d6c24578c1c6fb512524321 Parents: 618441b Author: Sylvain Lebresne <[email protected]> Authored: Mon Aug 25 15:55:43 2014 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Mon Aug 25 15:55:43 2014 +0200 ---------------------------------------------------------------------- CHANGES.txt | 3 +++ .../apache/cassandra/service/StorageProxy.java | 24 +++++++++++++------- 2 files changed, 19 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d4740c5/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9aeeb29..4d5d851 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,6 @@ +2.0.11: + * Always send Paxos commit to all replicas (CASSANDRA-7479) + 2.0.10 * Don't send schema change responses and events for no-op DDL statements (CASSANDRA-7600) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d4740c5/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index c606d75..904d602 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -248,10 +248,7 @@ public class StorageProxy implements StorageProxyMBean Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot); if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos)) { - if (consistencyForCommit == ConsistencyLevel.ANY) - sendCommit(proposal, liveEndpoints); - else - commitPaxos(proposal, consistencyForCommit); + commitPaxos(proposal, consistencyForCommit); Tracing.trace("CAS successful"); return null; } @@ -413,23 +410,34 @@ public class StorageProxy implements StorageProxyMBean private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel) throws WriteTimeoutException { + boolean shouldBlock = consistencyLevel != ConsistencyLevel.ANY; Keyspace keyspace = Keyspace.open(proposal.update.metadata().ksName); Token tk = StorageService.getPartitioner().getToken(proposal.key); List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspace.getName(), tk); Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspace.getName()); - AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); - AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE); + AbstractWriteResponseHandler responseHandler = null; + if (shouldBlock) + { + AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); + responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE); + } MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer); for (InetAddress destination : Iterables.concat(naturalEndpoints, pendingEndpoints)) { if (FailureDetector.instance.isAlive(destination)) - MessagingService.instance().sendRR(message, destination, responseHandler); + { + if (shouldBlock) + MessagingService.instance().sendRR(message, destination, responseHandler); + else + MessagingService.instance().sendOneWay(message, destination); + } } - responseHandler.get(); + if (shouldBlock) + responseHandler.get(); } /**
