Merge branch 'cassandra-1.2' into cassandra-2.0
Conflicts:
src/java/org/apache/cassandra/net/MessagingService.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3a73e392
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3a73e392
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3a73e392
Branch: refs/heads/trunk
Commit: 3a73e392fa424bff5378d4bb72117cfa28f9b0b7
Parents: 1c2a812 2890cc5
Author: Aleksey Yeschenko <[email protected]>
Authored: Tue Apr 22 19:47:42 2014 +0300
Committer: Aleksey Yeschenko <[email protected]>
Committed: Tue Apr 22 19:47:42 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/BatchlogManager.java | 2 +-
.../cassandra/db/HintedHandOffManager.java | 4 +--
.../apache/cassandra/net/MessagingService.java | 27 ++++++++++++++------
.../apache/cassandra/net/WriteCallbackInfo.java | 16 +++++++++---
.../apache/cassandra/service/StorageProxy.java | 7 ++++-
6 files changed, 41 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 9b73c89,74ddcfd..dbed949
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -8,61 -9,10 +8,62 @@@ Merged from 1.2
* Fix batchlog to account for CF truncation records (CASSANDRA-6999)
* Fix CQLSH parsing of functions and BLOB literals (CASSANDRA-7018)
* Require nodetool rebuild_index to specify index names (CASSANDRA-7038)
+ * Ensure that batchlog and hint timeouts do not produce hints
(CASSANDRA-7058)
-1.2.16
+2.0.7
+ * Put nodes in hibernate when join_ring is false (CASSANDRA-6961)
+ * Avoid early loading of non-system keyspaces before compaction-leftovers
+ cleanup at startup (CASSANDRA-6913)
+ * Restrict Windows to parallel repairs (CASSANDRA-6907)
+ * (Hadoop) Allow manually specifying start/end tokens in CFIF
(CASSANDRA-6436)
+ * Fix NPE in MeteredFlusher (CASSANDRA-6820)
+ * Fix race processing range scan responses (CASSANDRA-6820)
+ * Allow deleting snapshots from dropped keyspaces (CASSANDRA-6821)
+ * Add uuid() function (CASSANDRA-6473)
+ * Omit tombstones from schema digests (CASSANDRA-6862)
+ * Include correct consistencyLevel in LWT timeout (CASSANDRA-6884)
+ * Lower chances for losing new SSTables during nodetool refresh and
+ ColumnFamilyStore.loadNewSSTables (CASSANDRA-6514)
+ * Add support for DELETE ... IF EXISTS to CQL3 (CASSANDRA-5708)
+ * Update hadoop_cql3_word_count example (CASSANDRA-6793)
+ * Fix handling of RejectedExecution in sync Thrift server (CASSANDRA-6788)
+ * Log more information when exceeding tombstone_warn_threshold
(CASSANDRA-6865)
+ * Fix truncate to not abort due to unreachable fat clients (CASSANDRA-6864)
+ * Fix schema concurrency exceptions (CASSANDRA-6841)
+ * Fix leaking validator FH in StreamWriter (CASSANDRA-6832)
+ * Fix saving triggers to schema (CASSANDRA-6789)
+ * Fix trigger mutations when base mutation list is immutable (CASSANDRA-6790)
+ * Fix accounting in FileCacheService to allow re-using RAR (CASSANDRA-6838)
+ * Fix static counter columns (CASSANDRA-6827)
+ * Restore expiring->deleted (cell) compaction optimization (CASSANDRA-6844)
+ * Fix CompactionManager.needsCleanup (CASSANDRA-6845)
+ * Correctly compare BooleanType values other than 0 and 1 (CASSANDRA-6779)
+ * Read message id as string from earlier versions (CASSANDRA-6840)
+ * Properly use the Paxos consistency for (non-protocol) batch
(CASSANDRA-6837)
+ * Add paranoid disk failure option (CASSANDRA-6646)
+ * Improve PerRowSecondaryIndex performance (CASSANDRA-6876)
+ * Extend triggers to support CAS updates (CASSANDRA-6882)
+ * Static columns with IF NOT EXISTS don't always work as expected
(CASSANDRA-6873)
+ * Fix paging with SELECT DISTINCT (CASSANDRA-6857)
+ * Fix UnsupportedOperationException on CAS timeout (CASSANDRA-6923)
+ * Improve MeteredFlusher handling of MF-unaffected column families
+ (CASSANDRA-6867)
+ * Add CqlRecordReader using native pagination (CASSANDRA-6311)
+ * Add QueryHandler interface (CASSANDRA-6659)
+ * Track liveRatio per-memtable, not per-CF (CASSANDRA-6945)
+ * Make sure upgradesstables keeps sstable level (CASSANDRA-6958)
+ * Fix LIMIT with static columns (CASSANDRA-6956)
+ * Fix clash with CQL column name in thrift validation (CASSANDRA-6892)
+ * Fix error with super columns in mixed 1.2-2.0 clusters (CASSANDRA-6966)
+ * Fix bad skip of sstables on slice query with composite start/finish
(CASSANDRA-6825)
+ * Fix unintended update with conditional statement (CASSANDRA-6893)
+ * Fix map element access in IF (CASSANDRA-6914)
+ * Avoid costly range calculations for range queries on system keyspaces
+ (CASSANDRA-6906)
+ * Fix SSTable not released if stream session fails (CASSANDRA-6818)
+ * Avoid build failure due to ANTLR timeout (CASSANDRA-6991)
+Merged from 1.2:
* Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816)
* add extra SSL cipher suites (CASSANDRA-6613)
* fix nodetool getsstables for blob PK (CASSANDRA-6803)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/BatchlogManager.java
index 5770994,02af9d3..5aea736
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@@ -323,7 -328,7 +323,7 @@@ public class BatchlogManager implement
}
};
WriteResponseHandler handler = new WriteResponseHandler(ep,
WriteType.UNLOGGED_BATCH, callback);
- MessagingService.instance().sendRR(mutation.createMessage(), ep,
handler);
- MessagingService.instance().sendUnhintableMutation(mutation, ep,
handler);
++ MessagingService.instance().sendRR(mutation.createMessage(), ep,
handler, false);
handlers.add(handler);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 942707e,a7a3e06..13d1bb0
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@@ -450,8 -399,8 +450,8 @@@ public class HintedHandOffManager imple
deleteHint(hostIdBytes, hint.name(),
hint.maxTimestamp());
}
};
- WriteResponseHandler responseHandler = new
WriteResponseHandler(endpoint, WriteType.UNLOGGED_BATCH, callback);
- MessagingService.instance().sendRR(message, endpoint,
responseHandler);
+ WriteResponseHandler responseHandler = new
WriteResponseHandler(endpoint, WriteType.SIMPLE, callback);
- MessagingService.instance().sendUnhintableMutation(rm,
endpoint, responseHandler);
++ MessagingService.instance().sendRR(message, endpoint,
responseHandler, false);
responseHandlers.add(responseHandler);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index cc5dae5,3f90d7f..cccf698
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -537,21 -527,18 +537,33 @@@ public final class MessagingService imp
return verbHandlers.get(type);
}
- public String addCallback(IMessageCallback cb, MessageOut message,
InetAddress to, long timeout)
+ public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress
to, long timeout)
{
- String messageId = nextId();
- CallbackInfo previous;
-
- // If HH is enabled and this is a mutation message => store the
message to track for potential hints.
- if (DatabaseDescriptor.hintedHandoffEnabled() && message.verb ==
Verb.MUTATION)
- previous = callbacks.put(messageId, new CallbackInfo(to, cb,
message, callbackDeserializers.get(message.verb)), timeout);
- else
- previous = callbacks.put(messageId, new CallbackInfo(to, cb,
callbackDeserializers.get(message.verb)), timeout);
+ assert message.verb != Verb.MUTATION; // mutations need to call the
overload with a ConsistencyLevel
+ int messageId = nextId();
+ CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to,
cb, callbackDeserializers.get(message.verb)), timeout);
+ assert previous == null : String.format("Callback already exists for
id %d! (%s)", messageId, previous);
+ return messageId;
+ }
- public int addCallback(IAsyncCallback cb, MessageOut<? extends IMutation>
message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel)
- assert previous == null;
++ public int addCallback(IAsyncCallback cb,
++ MessageOut<? extends IMutation> message,
++ InetAddress to,
++ long timeout,
++ ConsistencyLevel consistencyLevel,
++ boolean allowHints)
+ {
+ assert message.verb == Verb.MUTATION || message.verb ==
Verb.COUNTER_MUTATION;
+ int messageId = nextId();
- CallbackInfo previous = callbacks.put(messageId, new
WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb),
consistencyLevel), timeout);
++ CallbackInfo previous = callbacks.put(messageId,
++ new WriteCallbackInfo(to,
++ cb,
++ message,
++
callbackDeserializers.get(message.verb),
++
consistencyLevel,
++
allowHints),
++ timeout);
+ assert previous == null : String.format("Callback already exists for
id %d! (%s)", messageId, previous);
return messageId;
}
@@@ -568,24 -559,14 +580,21 @@@
}
/**
- * A special version of sendRR that doesn't trigger a hint for the
mutation on a timeout.
- * Used by BatchlogManager and HintedHandOffManager.
+ * Send a non-mutation message to a given endpoint. This method specifies
a callback
+ * which is invoked with the actual response.
- * Also holds the message (only mutation messages) to determine if it
- * needs to trigger a hint (uses StorageProxy for that).
+ *
+ * @param message message to be sent.
+ * @param to endpoint to which the message needs to be sent
+ * @param cb callback interface which is used to pass the responses
or
+ * suggest that a timeout occurred to the invoker of the
send().
- * suggest that a timeout occurred to the invoker of the
send().
+ * @param timeout the timeout used for expiration
+ * @return an reference to message id used to match with the result
*/
- public void sendUnhintableMutation(RowMutation mutation, InetAddress to,
IMessageCallback cb)
+ public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb,
long timeout)
{
- String id = nextId();
- callbacks.put(id, new CallbackInfo(to, cb, WriteResponse.serializer),
DatabaseDescriptor.getWriteRpcTimeout());
- sendOneWay(mutation.createMessage(), id, to);
+ int id = addCallback(cb, message, to, timeout);
+ sendOneWay(message, id, to);
+ return id;
}
/**
@@@ -596,14 -577,24 +605,16 @@@
*
* @param message message to be sent.
* @param to endpoint to which the message needs to be sent
- * @param cb callback interface which is used to pass the responses
or
+ * @param handler callback interface which is used to pass the responses
or
* suggest that a timeout occurred to the invoker of the
send().
- * suggest that a timeout occurred to the invoker of the
send().
- * @param timeout the timeout used for expiration
* @return an reference to message id used to match with the result
*/
- public int sendRR(MessageOut<? extends IMutation> message, InetAddress
to, AbstractWriteResponseHandler handler)
- public String sendRR(MessageOut message, InetAddress to, IMessageCallback
cb, long timeout)
++ public int sendRR(MessageOut<? extends IMutation> message,
++ InetAddress to,
++ AbstractWriteResponseHandler handler,
++ boolean allowHints)
{
- int id = addCallback(handler, message, to, message.getTimeout(),
handler.consistencyLevel);
- String id = addCallback(cb, message, to, timeout);
-
- if (cb instanceof AbstractWriteResponseHandler)
- {
- PBSPredictor.instance().startWriteOperation(id);
- }
- else if (cb instanceof ReadCallback)
- {
- PBSPredictor.instance().startReadOperation(id);
- }
-
++ int id = addCallback(handler, message, to, message.getTimeout(),
handler.consistencyLevel, allowHints);
sendOneWay(message, id, to);
return id;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/WriteCallbackInfo.java
index be7b668,0000000..987ec15
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
@@@ -1,46 -1,0 +1,54 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.service.StorageProxy;
+
+public class WriteCallbackInfo extends CallbackInfo
+{
+ public final MessageOut sentMessage;
+ private final ConsistencyLevel consistencyLevel;
++ private final boolean allowHints;
+
- public WriteCallbackInfo(InetAddress target, IAsyncCallback callback,
MessageOut message, IVersionedSerializer<?> serializer, ConsistencyLevel
consistencyLevel)
++ public WriteCallbackInfo(InetAddress target,
++ IAsyncCallback callback,
++ MessageOut message,
++ IVersionedSerializer<?> serializer,
++ ConsistencyLevel consistencyLevel,
++ boolean allowHints)
+ {
+ super(target, callback, serializer);
+ assert message != null;
+ this.sentMessage = message;
+ this.consistencyLevel = consistencyLevel;
++ this.allowHints = allowHints;
+ }
+
+ public boolean shouldHint()
+ {
- return sentMessage.verb != MessagingService.Verb.COUNTER_MUTATION
- && consistencyLevel != ConsistencyLevel.ANY
- && StorageProxy.shouldHint(target);
++ return allowHints
++ && sentMessage.verb != MessagingService.Verb.COUNTER_MUTATION
++ && consistencyLevel != ConsistencyLevel.ANY
++ && StorageProxy.shouldHint(target);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 033ce8e,7ef3d72..fc6ee3a
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -917,29 -616,35 +917,34 @@@ public class StorageProxy implements St
Iterator<InetAddress> iter = targets.iterator();
InetAddress target = iter.next();
- // direct writes to local DC or old Cassandra versions
- if (localDC || MessagingService.instance().getVersion(target) <
MessagingService.VERSION_12)
+ // Add the other destinations of the same message as a FORWARD_HEADER
entry
+ DataOutputBuffer out = new DataOutputBuffer();
+ try
{
- // yes, the loop and non-loop code here are the same; this is
clunky but we want to avoid
- // creating a second iterator since we already have a perfectly
good one
- MessagingService.instance().sendRR(message, target, handler);
+ out.writeInt(targets.size() - 1);
while (iter.hasNext())
{
- target = iter.next();
- MessagingService.instance().sendRR(message, target, handler);
+ InetAddress destination = iter.next();
+ CompactEndpointSerializationHelper.serialize(destination,
out);
- int id = MessagingService.instance().addCallback(handler,
message, destination, message.getTimeout(), handler.consistencyLevel);
++ int id = MessagingService.instance().addCallback(handler,
++ message,
++ destination,
++
message.getTimeout(),
++
handler.consistencyLevel,
++ true);
+ out.writeInt(id);
+ logger.trace("Adding FWD message to {}@{}", id, destination);
}
- return;
+ message = message.withParameter(RowMutation.FORWARD_TO,
out.getData());
+ // send the combined message + forward headers
+ int id = MessagingService.instance().sendRR(message, target,
handler);
+ logger.trace("Sending message to {}@{}", id, target);
}
-
- // Add all the other destinations of the same message as a
FORWARD_HEADER entry
- FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- dos.writeInt(targets.size() - 1);
- while (iter.hasNext())
+ catch (IOException e)
{
- InetAddress destination = iter.next();
- CompactEndpointSerializationHelper.serialize(destination, dos);
- String id = MessagingService.instance().addCallback(handler,
message, destination, message.getTimeout());
- dos.writeUTF(id);
+ // DataOutputBuffer is in-memory, doesn't throw IOException
+ throw new AssertionError(e);
}
- message = message.withParameter(RowMutation.FORWARD_TO,
bos.toByteArray());
- // send the combined message + forward headers
- Tracing.trace("Enqueuing message to {}", target);
- MessagingService.instance().sendRR(message, target, handler);
}
private static void insertLocal(final RowMutation rm, final
AbstractWriteResponseHandler responseHandler)