Always send Paxos commit to all replicas patch by kohlisankalp; reviewed by slebresne for CASSANDRA-7479
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/536a08c2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/536a08c2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/536a08c2 Branch: refs/heads/trunk Commit: 536a08c29773548203845ad562e278e899b35a4d Parents: 3fa5a9e Author: Sylvain Lebresne <[email protected]> Authored: Mon Aug 25 15:55:43 2014 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Mon Aug 25 16:00:35 2014 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/service/StorageProxy.java | 24 +++++++++++++------- 2 files changed, 17 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/536a08c2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2329333..a0ff1d1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -72,6 +72,7 @@ Merged from 1.2: values (CASSANDRA-7792) * Fix ordering of static cells (CASSANDRA-7763) Merged from 2.0: + * Always send Paxos commit to all replicas (CASSANDRA-7479) * Configure system.paxos with LeveledCompactionStrategy (CASSANDRA-7753) * Fix ALTER clustering column type from DateType to TimestampType when using DESC clustering order (CASSANRDA-7797) http://git-wip-us.apache.org/repos/asf/cassandra/blob/536a08c2/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 1c0c482..ff6d89c 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -251,10 +251,7 @@ public class StorageProxy implements StorageProxyMBean Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot); if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos)) { - if (consistencyForCommit == ConsistencyLevel.ANY) - sendCommit(proposal, liveEndpoints); - else - commitPaxos(proposal, consistencyForCommit); + commitPaxos(proposal, consistencyForCommit); Tracing.trace("CAS successful"); return null; } @@ -416,23 +413,34 @@ public class StorageProxy implements StorageProxyMBean private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel) throws WriteTimeoutException { + boolean shouldBlock = consistencyLevel != ConsistencyLevel.ANY; Keyspace keyspace = Keyspace.open(proposal.update.metadata().ksName); Token tk = StorageService.getPartitioner().getToken(proposal.key); List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspace.getName(), tk); Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspace.getName()); - AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); - AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE); + AbstractWriteResponseHandler responseHandler = null; + if (shouldBlock) + { + AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); + responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE); + } MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer); for (InetAddress destination : Iterables.concat(naturalEndpoints, pendingEndpoints)) { if (FailureDetector.instance.isAlive(destination)) - MessagingService.instance().sendRR(message, destination, responseHandler); + { + if (shouldBlock) + MessagingService.instance().sendRR(message, destination, responseHandler); + else + MessagingService.instance().sendOneWay(message, destination); + } } - responseHandler.get(); + if (shouldBlock) + responseHandler.get(); } /**
