merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e7c90e04 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e7c90e04 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e7c90e04 Branch: refs/heads/cassandra-2.0 Commit: e7c90e04c1e85609ecc80099ad9b5d81b62828be Parents: 76cb10c 6b58745 Author: Jonathan Ellis <[email protected]> Authored: Wed Oct 2 19:09:24 2013 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Wed Oct 2 19:09:24 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/net/CallbackInfo.java | 16 +----------- .../apache/cassandra/net/MessagingService.java | 20 ++++++++------- .../apache/cassandra/net/WriteCallbackInfo.java | 26 +++++++++++++++++++ .../apache/cassandra/service/StorageProxy.java | 27 ++++++++++++++++---- 5 files changed, 61 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7c90e04/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index f012ed1,cc04eca..c1023f6 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,21 -1,5 +1,22 @@@ -1.2.11 +2.0.2 + * Add configurable metrics reporting (CASSANDRA-4430) + * drop queries exceeding a configurable number of tombstones (CASSANDRA-6117) + * Track and persist sstable read activity (CASSANDRA-5515) + * Fixes for speculative retry (CASSANDRA-5932) + * Improve memory usage of metadata min/max column names (CASSANDRA-6077) + * Fix thrift validation refusing row markers on CQL3 tables (CASSANDRA-6081) + * Fix insertion of collections with CAS (CASSANDRA-6069) + * Correctly send metadata on SELECT COUNT (CASSANDRA-6080) + * Track clients' remote addresses in ClientState (CASSANDRA-6070) + * Create snapshot dir if it does not exist when migrating + leveled manifest (CASSANDRA-6093) + * make sequential nodetool repair the default (CASSANDRA-5950) + * Add more hooks for compaction strategy implementations (CASSANDRA-6111) + * Fix potential NPE on composite 2ndary indexes (CASSANDRA-6098) + * Delete can potentially be skipped in batch (CASSANDRA-6115) + * Allow alter keyspace on system_traces (CASSANDRA-6016) +Merged from 1.2: + * Never return WriteTimeout for CL.ANY (CASSANDRA-6032) * Tracing should log write failure rather than raw exceptions (CASSANDRA-6133) * lock access to TM.endpointToHostIdMap (CASSANDRA-6103) * Allow estimated memtable size to exceed slab allocator size (CASSANDRA-6078) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7c90e04/src/java/org/apache/cassandra/net/CallbackInfo.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/CallbackInfo.java index bb1a4e0,f90df8d..0edfee9 --- a/src/java/org/apache/cassandra/net/CallbackInfo.java +++ b/src/java/org/apache/cassandra/net/CallbackInfo.java @@@ -20,7 -20,7 +20,6 @@@ package org.apache.cassandra.net import java.net.InetAddress; import org.apache.cassandra.io.IVersionedSerializer; --import org.apache.cassandra.service.StorageProxy; /** * Encapsulates the callback information. @@@ -30,8 -30,7 +29,7 @@@ public class CallbackInfo { protected final InetAddress target; - protected final IMessageCallback callback; + protected final IAsyncCallback callback; - protected final MessageOut<?> sentMessage; protected final IVersionedSerializer<?> serializer; /** @@@ -41,16 -40,10 +39,10 @@@ * @param callback * @param serializer serializer to deserialize response message */ - public CallbackInfo(InetAddress target, IMessageCallback callback, IVersionedSerializer<?> serializer) + public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer) { - this(target, callback, null, serializer); - } - - public CallbackInfo(InetAddress target, IAsyncCallback callback, MessageOut<?> sentMessage, IVersionedSerializer<?> serializer) - { this.target = target; this.callback = callback; - this.sentMessage = sentMessage; this.serializer = serializer; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7c90e04/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/MessagingService.java index 33e3bfb,dd02ca6..ca3845b --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@@ -538,17 -519,20 +537,20 @@@ public final class MessagingService imp return verbHandlers.get(type); } - public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout) + public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout) { + assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel - String messageId = nextId(); + int messageId = nextId(); - CallbackInfo previous; - - // If HH is enabled and this is a mutation message => store the message to track for potential hints. - if (DatabaseDescriptor.hintedHandoffEnabled() && message.verb == Verb.MUTATION) - previous = callbacks.put(messageId, new CallbackInfo(to, cb, message, callbackDeserializers.get(message.verb)), timeout); - else - previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout); + CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout); + assert previous == null; + return messageId; + } - public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel) ++ public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel) + { + assert message.verb == Verb.MUTATION; - String messageId = nextId(); ++ int messageId = nextId(); + CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout); assert previous == null; return messageId; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7c90e04/src/java/org/apache/cassandra/net/WriteCallbackInfo.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/WriteCallbackInfo.java index 0000000,8badbcf..abded75 mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java +++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java @@@ -1,0 -1,26 +1,26 @@@ + package org.apache.cassandra.net; + + import java.net.InetAddress; + + import org.apache.cassandra.db.ConsistencyLevel; + import org.apache.cassandra.io.IVersionedSerializer; + import org.apache.cassandra.service.StorageProxy; + + public class WriteCallbackInfo extends CallbackInfo + { + public final MessageOut sentMessage; + private final ConsistencyLevel consistencyLevel; + - public WriteCallbackInfo(InetAddress target, IMessageCallback callback, MessageOut message, IVersionedSerializer<?> serializer, ConsistencyLevel consistencyLevel) ++ public WriteCallbackInfo(InetAddress target, IAsyncCallback callback, MessageOut message, IVersionedSerializer<?> serializer, ConsistencyLevel consistencyLevel) + { + super(target, callback, serializer); + assert message != null; + this.sentMessage = message; + this.consistencyLevel = consistencyLevel; + } + + public boolean shouldHint() + { + return consistencyLevel != ConsistencyLevel.ANY && StorageProxy.shouldHint(target); + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7c90e04/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageProxy.java index 51f171d,b23de1f..c430bc2 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@@ -516,11 -199,28 +516,28 @@@ public class StorageProxy implements St } catch (WriteTimeoutException ex) { - writeMetrics.timeouts.mark(); - ClientRequestMetrics.writeTimeouts.inc(); - Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor); - throw ex; + if (consistency_level == ConsistencyLevel.ANY) + { + for (IMutation mutation : mutations) + { + if (mutation instanceof CounterMutation) + continue; + + Token tk = StorageService.getPartitioner().getToken(mutation.key()); - List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getTable(), tk); - Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getTable()); ++ List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getKeyspaceName(), tk); ++ Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getKeyspaceName()); + for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints)) + submitHint((RowMutation) mutation, target, null, consistency_level); + } + Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write"); + } + else + { + writeMetrics.timeouts.mark(); + ClientRequestMetrics.writeTimeouts.inc(); + Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor); + throw ex; + } } catch (UnavailableException e) {
