Updated Branches: refs/heads/trunk 42c55896b -> d5fc1932e
Merge branch 'cassandra-1.1' into trunk Conflicts: CHANGES.txt src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java src/java/org/apache/cassandra/service/StorageProxy.java src/java/org/apache/cassandra/service/WriteResponseHandler.java Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d5fc1932 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d5fc1932 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d5fc1932 Branch: refs/heads/trunk Commit: d5fc1932e965a32c944efcdf95fd0236624949fc Parents: 42c5589 7371e10 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Fri Sep 7 10:50:38 2012 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri Sep 7 10:50:38 2012 +0200 ---------------------------------------------------------------------- CHANGES.txt | 6 ++++ .../org/apache/cassandra/db/CounterColumn.java | 2 +- .../cassandra/db/CounterMutationVerbHandler.java | 23 +++++++++++---- .../locator/AbstractReplicationStrategy.java | 8 ++-- .../service/AbstractWriteResponseHandler.java | 17 +++++++++- .../DatacenterSyncWriteResponseHandler.java | 10 +++--- .../service/DatacenterWriteResponseHandler.java | 10 +++--- .../org/apache/cassandra/service/StorageProxy.java | 17 ++++++---- .../cassandra/service/WriteResponseHandler.java | 12 ++++---- 9 files changed, 69 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index ec4992c,f192be2..0905cf9 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,58 -1,9 +1,64 @@@ +1.2-beta1 + * include message initiation time to replicas so they can more + accurately drop timed-out requests (CASSANDRA-2858) + * fix clientutil.jar dependencies (CASSANDRA-4566) + * optimize WriteResponse (CASSANDRA-4548) + * new metrics (CASSANDRA-4009) + * redesign KEYS indexes to avoid read-before-write (CASSANDRA-2897) + * debug tracing (CASSANDRA-1123) + * parallelize row cache loading (CASSANDRA-4282) + * Make compaction, flush JBOD-aware (CASSANDRA-4292) + * run local range scans on the read stage (CASSANDRA-3687) + * clean up ioexceptions (CASSANDRA-2116) + * add disk_failure_policy (CASSANDRA-2118) + * Introduce new json format with row level deletion (CASSANDRA-4054) + * remove redundant "name" column from schema_keyspaces (CASSANDRA-4433) + * improve "nodetool ring" handling of multi-dc clusters (CASSANDRA-3047) + * update NTS calculateNaturalEndpoints to be O(N log N) (CASSANDRA-3881) + * add UseCondCardMark XX jvm settings on jdk 1.7 (CASSANDRA-4366) + * split up rpc timeout by operation type (CASSANDRA-2819) + * rewrite key cache save/load to use only sequential i/o (CASSANDRA-3762) + * update MS protocol with a version handshake + broadcast address id + (CASSANDRA-4311) + * multithreaded hint replay (CASSANDRA-4189) + * add inter-node message compression (CASSANDRA-3127) + * remove COPP (CASSANDRA-2479) + * Track tombstone expiration and compact when tombstone content is + higher than a configurable threshold, default 20% (CASSANDRA-3442, 4234) + * update MurmurHash to version 3 (CASSANDRA-2975) + * (CLI) track elapsed time for `delete' operation (CASSANDRA-4060) + * (CLI) jline version is bumped to 1.0 to properly support + 'delete' key function (CASSANDRA-4132) + * Save IndexSummary into new SSTable 'Summary' component (CASSANDRA-2392, 4289) + * Add support for range tombstones (CASSANDRA-3708) + * Improve MessagingService efficiency (CASSANDRA-3617) + * Avoid ID conflicts from concurrent schema changes (CASSANDRA-3794) + * Set thrift HSHA server thread limit to unlimited by default (CASSANDRA-4277) + * Avoids double serialization of CF id in RowMutation messages + (CASSANDRA-4293) + * stream compressed sstables directly with java nio (CASSANDRA-4297) + * Support multiple ranges in SliceQueryFilter (CASSANDRA-3885) + * Add column metadata to system column families (CASSANDRA-4018) + * (cql3) Always use composite types by default (CASSANDRA-4329) + * (cql3) Add support for set, map and list (CASSANDRA-3647) + * Validate date type correctly (CASSANDRA-4441) + * (cql3) Allow definitions with only a PK (CASSANDRA-4361) + * (cql3) Add support for row key composites (CASSANDRA-4179) + * improve DynamicEndpointSnitch by using reservoir sampling (CASSANDRA-4038) + * (cql3) Add support for 2ndary indexes (CASSANDRA-3680) + * (cql3) fix defining more than one PK to be invalid (CASSANDRA-4477) + * remove schema agreement checking from all external APIs (Thrift, CQL and CQL3) (CASSANDRA-4487) + * add Murmur3Partitioner and make it default for new installations (CASSANDRA-3772) + * (cql3) update pseudo-map syntax to use map syntax (CASSANDRA-4497) + * Finer grained exceptions hierarchy and provides error code with exceptions (CASSANDRA-3979) + + + 1.1.6 + * (cql3) fix potential NPE with both equal and unequal restriction (CASSANDRA-4532) + * (cql3) improves ORDER BY validation (CASSANDRA-4624) + * Fix potential deadlock during counter writes (CASSANDRA-4578) + + 1.1.5 * add SecondaryIndex.reload API (CASSANDRA-4581) * use millis + atomicint for commitlog segment creation instead of http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/db/CounterColumn.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java index f94495d,3ecbe8b..f28e2bd --- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java @@@ -23,34 -25,61 +23,45 @@@ import org.slf4j.Logger import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.io.util.FastByteArrayInputStream; -import org.apache.cassandra.net.*; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageProxy; -import org.apache.cassandra.thrift.UnavailableException; +import org.apache.cassandra.utils.FBUtilities; -public class CounterMutationVerbHandler implements IVerbHandler +public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation> { - private static Logger logger = LoggerFactory.getLogger(CounterMutationVerbHandler.class); + private static final Logger logger = LoggerFactory.getLogger(CounterMutationVerbHandler.class); - public void doVerb(MessageIn<CounterMutation> message, String id) - public void doVerb(final Message message, final String id) ++ public void doVerb(final MessageIn<CounterMutation> message, final String id) { - byte[] bytes = message.getMessageBody(); - FastByteArrayInputStream buffer = new FastByteArrayInputStream(bytes); - try { - CounterMutation cm = message.payload; - DataInputStream is = new DataInputStream(buffer); - final CounterMutation cm = CounterMutation.serializer().deserialize(is, message.getVersion()); ++ final CounterMutation cm = message.payload; if (logger.isDebugEnabled()) logger.debug("Applying forwarded " + cm); String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); - StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter).get(); - WriteResponse response = new WriteResponse(); - MessagingService.instance().sendReply(response.createMessage(), id, message.from); + // We should not wait for the result of the write in this thread, + // otherwise we could have a distributed deadlock between replicas + // running this VerbHandler (see #4578). + // Instead, we use a callback to send the response. Note that the callback + // will not be called if the request timeout, but this is ok + // because the coordinator of the counter mutation will timeout on + // it's own in that case. + StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter, new Runnable(){ + public void run() + { - try - { - WriteResponse response = new WriteResponse(cm.getTable(), cm.key(), true); - Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response); - MessagingService.instance().sendReply(responseMessage, id, message.getFrom()); - } - catch (IOException e) - { - logger.error("Error writing response to counter mutation", e); - } ++ WriteResponse response = new WriteResponse(); ++ MessagingService.instance().sendReply(response.createMessage(), id, message.from); + } + }); } - catch (UnavailableException e) - { - // We check for UnavailableException in the coordinator not. It is - // hence reasonable to let the coordinator timeout in the very - // unlikely case we arrive here - } - catch (TimeoutException e) + catch (RequestExecutionException e) { - // The coordinator will timeout on itself, so let that go + // The coordinator will timeout on it's own so ignore + logger.debug("counter error", e); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index 328e1ea,d280a8e..664d9ea --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@@ -40,12 -49,12 +45,13 @@@ public abstract class AbstractWriteResp startTime = System.currentTimeMillis(); this.consistencyLevel = consistencyLevel; this.writeEndpoints = writeEndpoints; + this.callback = callback; } - public void get() throws TimeoutException + public void get() throws WriteTimeoutException { - long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime); + long timeout = DatabaseDescriptor.getWriteRpcTimeout() - (System.currentTimeMillis() - startTime); + boolean success; try { @@@ -57,15 -66,20 +63,22 @@@ } if (!success) - { - throw new TimeoutException(); - } + throw new WriteTimeoutException(consistencyLevel, ackCount(), blockFor()); } + protected abstract int ackCount(); + + protected abstract int blockFor(); + /** null message means "response from local write" */ - public abstract void response(Message msg); + public abstract void response(MessageIn msg); public abstract void assureSufficientLiveNodes() throws UnavailableException; + + protected void signal() + { + condition.signal(); + if (callback != null) + callback.run(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java index a33ce60,cbecf6b..85afade --- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java @@@ -46,17 -53,15 +46,17 @@@ public class DatacenterSyncWriteRespons localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress()); } - private final NetworkTopologyStrategy strategy; - private HashMap<String, AtomicInteger> responses = new HashMap<String, AtomicInteger>(); + private final String table; + private final NetworkTopologyStrategy strategy; + private final HashMap<String, AtomicInteger> responses = new HashMap<String, AtomicInteger>(); - protected DatacenterSyncWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table) + protected DatacenterSyncWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback) { // Response is been managed by the map so make it 1 for the superclass. - super(writeEndpoints, consistencyLevel); + super(writeEndpoints, consistencyLevel, callback); assert consistencyLevel == ConsistencyLevel.EACH_QUORUM; + this.table = table; strategy = (NetworkTopologyStrategy) Table.open(table).getReplicationStrategy(); for (String dc : strategy.getDatacenters()) @@@ -66,12 -71,12 +66,12 @@@ } } - public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table) + public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback) { - return new DatacenterSyncWriteResponseHandler(writeEndpoints, consistencyLevel, table); + return new DatacenterSyncWriteResponseHandler(writeEndpoints, consistencyLevel, table, callback); } - public void response(Message message) + public void response(MessageIn message) { String dataCenter = message == null ? localdc @@@ -86,26 -91,9 +86,26 @@@ } // all the quorum conditions are met - condition.signal(); + signal(); } + protected int blockFor() + { + return consistencyLevel.blockFor(table); + } + + protected int ackCount() + { + int n = 0; + for (Map.Entry<String, AtomicInteger> entry : responses.entrySet()) + { + String dc = entry.getKey(); + AtomicInteger i = entry.getValue(); + n += (strategy.getReplicationFactor(dc) / 2) + 1 - i.get(); + } + return n; + } + public void assureSufficientLiveNodes() throws UnavailableException { Map<String, AtomicInteger> dcEndpoints = new HashMap<String, AtomicInteger>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java index 6644b1a,881c99d..2821e3b --- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java @@@ -55,12 -62,20 +55,12 @@@ public class DatacenterWriteResponseHan } @Override - protected int determineBlockFor(String table) - { - NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) Table.open(table).getReplicationStrategy(); - return (strategy.getReplicationFactor(localdc) / 2) + 1; - } - - - @Override - public void response(Message message) + public void response(MessageIn message) { - if (message == null || localdc.equals(snitch.getDatacenter(message.getFrom()))) + if (message == null || localdc.equals(snitch.getDatacenter(message.from))) { if (responses.decrementAndGet() == 0) - condition.signal(); + signal(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageProxy.java index 6f461c9,23e0de4..24625df --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@@ -247,8 -246,9 +249,9 @@@ public class StorageProxy implements St public static IWriteResponseHandler performWrite(IMutation mutation, ConsistencyLevel consistency_level, String localDataCenter, - WritePerformer performer) + WritePerformer performer, + Runnable callback) - throws UnavailableException, TimeoutException, IOException + throws UnavailableException, OverloadedException, IOException { String table = mutation.getTable(); AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy(); @@@ -540,20 -537,20 +543,20 @@@ } } - - // Must be called on a replica of the mutation. This replica becomes the // leader of this mutation. - public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter) - public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback) throws UnavailableException, TimeoutException, IOException ++ public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback) + throws UnavailableException, IOException, OverloadedException { - return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer); + return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback); } // Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while // applyCounterMutationOnLeader assumes it is on the MUTATION stage already) - public static IWriteResponseHandler applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter) throws UnavailableException, TimeoutException, IOException + public static IWriteResponseHandler applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter) + throws UnavailableException, IOException, OverloadedException { - return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer); + return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null); } private static Runnable counterWriteTask(final IMutation mutation, http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/service/WriteResponseHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/WriteResponseHandler.java index 5b0f64c,baf8558..cc803cb --- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java @@@ -40,19 -41,16 +40,19 @@@ public class WriteResponseHandler exten protected static final Logger logger = LoggerFactory.getLogger(WriteResponseHandler.class); protected final AtomicInteger responses; + private final int blockFor; - protected WriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table) + protected WriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback) { - super(writeEndpoints, consistencyLevel); + super(writeEndpoints, consistencyLevel, callback); - responses = new AtomicInteger(determineBlockFor(table)); + blockFor = consistencyLevel.blockFor(table); + responses = new AtomicInteger(blockFor); } protected WriteResponseHandler(InetAddress endpoint) { - super(Arrays.asList(endpoint), ConsistencyLevel.ALL); + super(Arrays.asList(endpoint), ConsistencyLevel.ALL, null); + blockFor = 1; responses = new AtomicInteger(1); } @@@ -66,20 -64,31 +66,20 @@@ return new WriteResponseHandler(endpoint); } - public void response(Message m) + public void response(MessageIn m) { if (responses.decrementAndGet() == 0) - condition.signal(); + signal(); } - protected int determineBlockFor(String table) + protected int ackCount() { - switch (consistencyLevel) - { - case ONE: - return 1; - case ANY: - return 1; - case TWO: - return 2; - case THREE: - return 3; - case QUORUM: - return (Table.open(table).getReplicationStrategy().getReplicationFactor() / 2) + 1; - case ALL: - return Table.open(table).getReplicationStrategy().getReplicationFactor(); - default: - throw new UnsupportedOperationException("invalid consistency level: " + consistencyLevel.toString()); - } + return blockFor - responses.get(); + } + + protected int blockFor() + { + return blockFor; } public void assureSufficientLiveNodes() throws UnavailableException