Repository: cassandra Updated Branches: refs/heads/trunk f9fb5339d -> 97dcba601
Ensure that batchlog and hint timeouts do not produce hints patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for CASSANDRA-7058 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2890cc5b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2890cc5b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2890cc5b Branch: refs/heads/trunk Commit: 2890cc5be986740cadf491bb5efbb49af2b11c57 Parents: 0547d16 Author: Aleksey Yeschenko <[email protected]> Authored: Tue Apr 22 19:10:51 2014 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Tue Apr 22 19:10:51 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/BatchlogManager.java | 2 +- .../org/apache/cassandra/db/HintedHandOffManager.java | 8 +++----- src/java/org/apache/cassandra/net/MessagingService.java | 12 +++++++++++- 4 files changed, 16 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2890cc5b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index dc48131..74ddcfd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ * Fix batchlog to account for CF truncation records (CASSANDRA-6999) * Fix CQLSH parsing of functions and BLOB literals (CASSANDRA-7018) * Require nodetool rebuild_index to specify index names (CASSANDRA-7038) + * Ensure that batchlog and hint timeouts do not produce hints (CASSANDRA-7058) 1.2.16 http://git-wip-us.apache.org/repos/asf/cassandra/blob/2890cc5b/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index ea32e9d..02af9d3 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -328,7 +328,7 @@ public class BatchlogManager implements BatchlogManagerMBean } }; WriteResponseHandler handler = new WriteResponseHandler(ep, WriteType.UNLOGGED_BATCH, callback); - MessagingService.instance().sendRR(mutation.createMessage(), ep, handler); + MessagingService.instance().sendUnhintableMutation(mutation, ep, handler); handlers.add(handler); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2890cc5b/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index 427bbf2..a7a3e06 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -55,7 +55,6 @@ import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.metrics.HintedHandoffMetrics; -import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.*; import org.apache.cassandra.thrift.*; @@ -391,8 +390,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean continue; } - MessageOut<RowMutation> message = rm.createMessage(); - rateLimiter.acquire(message.serializedSize(MessagingService.current_version)); + rateLimiter.acquire((int) RowMutation.serializer.serializedSize(rm, MessagingService.current_version)); Runnable callback = new Runnable() { public void run() @@ -401,8 +399,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp()); } }; - WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.UNLOGGED_BATCH, callback); - MessagingService.instance().sendRR(message, endpoint, responseHandler); + WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.SIMPLE, callback); + MessagingService.instance().sendUnhintableMutation(rm, endpoint, responseHandler); responseHandlers.add(responseHandler); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2890cc5b/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 09fa272..3f90d7f 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -559,6 +559,17 @@ public final class MessagingService implements MessagingServiceMBean } /** + * A special version of sendRR that doesn't trigger a hint for the mutation on a timeout. + * Used by BatchlogManager and HintedHandOffManager. + */ + public void sendUnhintableMutation(RowMutation mutation, InetAddress to, IMessageCallback cb) + { + String id = nextId(); + callbacks.put(id, new CallbackInfo(to, cb, WriteResponse.serializer), DatabaseDescriptor.getWriteRpcTimeout()); + sendOneWay(mutation.createMessage(), id, to); + } + + /** * Send a message to a given endpoint. This method specifies a callback * which is invoked with the actual response. * Also holds the message (only mutation messages) to determine if it @@ -568,7 +579,6 @@ public final class MessagingService implements MessagingServiceMBean * @param to endpoint to which the message needs to be sent * @param cb callback interface which is used to pass the responses or * suggest that a timeout occurred to the invoker of the send(). - * suggest that a timeout occurred to the invoker of the send(). * @param timeout the timeout used for expiration * @return an reference to message id used to match with the result */
