Updated Branches: refs/heads/cassandra-1.2 64890d86d -> 5440a0a67 refs/heads/cassandra-2.0 70239e17b -> 76cb10ca9 refs/heads/trunk dca300d6c -> 87e19fc8f
6132 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5440a0a6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5440a0a6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5440a0a6 Branch: refs/heads/cassandra-1.2 Commit: 5440a0a6767544d6ea1ba34f5d2a3e223f260fb5 Parents: 64890d8 Author: Jonathan Ellis <[email protected]> Authored: Wed Oct 2 14:09:29 2013 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Wed Oct 2 14:09:29 2013 -0500 ---------------------------------------------------------------------- .../org/apache/cassandra/net/CallbackInfo.java | 17 ++---------- .../apache/cassandra/net/MessagingService.java | 20 ++++++++------- .../apache/cassandra/net/WriteCallbackInfo.java | 26 +++++++++++++++++++ .../apache/cassandra/service/StorageProxy.java | 27 ++++++++++++++++---- 4 files changed, 61 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5440a0a6/src/java/org/apache/cassandra/net/CallbackInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/CallbackInfo.java b/src/java/org/apache/cassandra/net/CallbackInfo.java index f0e48e9..f90df8d 100644 --- a/src/java/org/apache/cassandra/net/CallbackInfo.java +++ b/src/java/org/apache/cassandra/net/CallbackInfo.java @@ -31,7 +31,6 @@ public class CallbackInfo { protected final InetAddress target; protected final IMessageCallback callback; - protected final MessageOut<?> sentMessage; protected final IVersionedSerializer<?> serializer; /** @@ -41,27 +40,15 @@ public class CallbackInfo * @param callback * @param serializer serializer to deserialize response message */ - public CallbackInfo(InetAddress target, IMessageCallback callback, IVersionedSerializer<?> serializer) - { - this(target, callback, null, serializer); - } - - public CallbackInfo(InetAddress target, IMessageCallback callback, MessageOut<?> sentMessage, IVersionedSerializer<?> serializer) + public CallbackInfo(InetAddress target, IMessageCallback callback, IVersionedSerializer<?> serializer) { this.target = target; this.callback = callback; - this.sentMessage = sentMessage; this.serializer = serializer; } - /** - * @return TRUE iff a hint should be written for this target. - * - * NOTE: - * Assumes it is only called after the write of "sentMessage" to "target" has timed out. - */ public boolean shouldHint() { - return sentMessage != null && StorageProxy.shouldHint(target); + return false; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5440a0a6/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index a199e83..dd02ca6 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -329,8 +329,7 @@ public final class MessagingService implements MessagingServiceMBean if (expiredCallbackInfo.shouldHint()) { - assert expiredCallbackInfo.sentMessage != null; - RowMutation rm = (RowMutation) expiredCallbackInfo.sentMessage.payload; + RowMutation rm = (RowMutation) ((WriteCallbackInfo) expiredCallbackInfo).sentMessage.payload; return StorageProxy.submitHint(rm, expiredCallbackInfo.target, null, null); } @@ -522,15 +521,18 @@ public final class MessagingService implements MessagingServiceMBean public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout) { + assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel String messageId = nextId(); - CallbackInfo previous; - - // If HH is enabled and this is a mutation message => store the message to track for potential hints. - if (DatabaseDescriptor.hintedHandoffEnabled() && message.verb == Verb.MUTATION) - previous = callbacks.put(messageId, new CallbackInfo(to, cb, message, callbackDeserializers.get(message.verb)), timeout); - else - previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout); + CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)), timeout); + assert previous == null; + return messageId; + } + public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel) + { + assert message.verb == Verb.MUTATION; + String messageId = nextId(); + CallbackInfo previous = callbacks.put(messageId, new WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), consistencyLevel), timeout); assert previous == null; return messageId; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5440a0a6/src/java/org/apache/cassandra/net/WriteCallbackInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java new file mode 100644 index 0000000..8badbcf --- /dev/null +++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java @@ -0,0 +1,26 @@ +package org.apache.cassandra.net; + +import java.net.InetAddress; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.service.StorageProxy; + +public class WriteCallbackInfo extends CallbackInfo +{ + public final MessageOut sentMessage; + private final ConsistencyLevel consistencyLevel; + + public WriteCallbackInfo(InetAddress target, IMessageCallback callback, MessageOut message, IVersionedSerializer<?> serializer, ConsistencyLevel consistencyLevel) + { + super(target, callback, serializer); + assert message != null; + this.sentMessage = message; + this.consistencyLevel = consistencyLevel; + } + + public boolean shouldHint() + { + return consistencyLevel != ConsistencyLevel.ANY && StorageProxy.shouldHint(target); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/5440a0a6/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 8a6e52e..b23de1f 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -196,14 +196,31 @@ public class StorageProxy implements StorageProxyMBean { responseHandler.get(); } - } catch (WriteTimeoutException ex) { - writeMetrics.timeouts.mark(); - ClientRequestMetrics.writeTimeouts.inc(); - Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor); - throw ex; + if (consistency_level == ConsistencyLevel.ANY) + { + for (IMutation mutation : mutations) + { + if (mutation instanceof CounterMutation) + continue; + + Token tk = StorageService.getPartitioner().getToken(mutation.key()); + List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getTable(), tk); + Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getTable()); + for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints)) + submitHint((RowMutation) mutation, target, null, consistency_level); + } + Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write"); + } + else + { + writeMetrics.timeouts.mark(); + ClientRequestMetrics.writeTimeouts.inc(); + Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor); + throw ex; + } } catch (UnavailableException e) {
