merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5556414c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5556414c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5556414c Branch: refs/heads/trunk Commit: 5556414c37f565afb01621ddd69527cb3c17ad78 Parents: 89e792f b1d7405 Author: Jonathan Ellis <[email protected]> Authored: Tue Jun 18 22:41:24 2013 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Tue Jun 18 22:41:24 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/RowMutationVerbHandler.java | 11 +++++------ src/java/org/apache/cassandra/db/Table.java | 4 +--- src/java/org/apache/cassandra/net/MessageOut.java | 11 ----------- .../org/apache/cassandra/service/StorageProxy.java | 14 +------------- 5 files changed, 8 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5556414c/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 46b67e3,5d36bd9..1450aa0 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,67 -1,5 +1,68 @@@ +2.0 + * Removed on-heap row cache (CASSANDRA-5348) + * use nanotime consistently for node-local timeouts (CASSANDRA-5581) + * Avoid unnecessary second pass on name-based queries (CASSANDRA-5577) + * Experimental triggers (CASSANDRA-1311) + * JEMalloc support for off-heap allocation (CASSANDRA-3997) + * Single-pass compaction (CASSANDRA-4180) + * Removed token range bisection (CASSANDRA-5518) + * Removed compatibility with pre-1.2.5 sstables and network messages + (CASSANDRA-5511) + * removed PBSPredictor (CASSANDRA-5455) + * CAS support (CASSANDRA-5062, 5441, 5442, 5443) + * Leveled compaction performs size-tiered compactions in L0 + (CASSANDRA-5371, 5439) + * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339) + * Log when a node is down longer than the hint window (CASSANDRA-4554) + * Optimize tombstone creation for ExpiringColumns (CASSANDRA-4917) + * Improve LeveledScanner work estimation (CASSANDRA-5250, 5407) + * Replace compaction lock with runWithCompactionsDisabled (CASSANDRA-3430) + * Change Message IDs to ints (CASSANDRA-5307) + * Move sstable level information into the Stats component, removing the + need for a separate Manifest file (CASSANDRA-4872) + * avoid serializing to byte[] on commitlog append (CASSANDRA-5199) + * make index_interval configurable per columnfamily (CASSANDRA-3961, CASSANDRA-5650) + * add default_time_to_live (CASSANDRA-3974) + * add memtable_flush_period_in_ms (CASSANDRA-4237) + * replace supercolumns internally by composites (CASSANDRA-3237, 5123) + * upgrade thrift to 0.9.0 (CASSANDRA-3719) + * drop unnecessary keyspace parameter from user-defined compaction API + (CASSANDRA-5139) + * more robust solution to incomplete compactions + counters (CASSANDRA-5151) + * Change order of directory searching for c*.in.sh (CASSANDRA-3983) + * Add tool to reset SSTable compaction level for LCS (CASSANDRA-5271) + * Allow custom configuration loader (CASSANDRA-5045) + * Remove memory emergency pressure valve logic (CASSANDRA-3534) + * Reduce request latency with eager retry (CASSANDRA-4705) + * cqlsh: Remove ASSUME command (CASSANDRA-5331) + * Rebuild BF when loading sstables if bloom_filter_fp_chance + has changed since compaction (CASSANDRA-5015) + * remove row-level bloom filters (CASSANDRA-4885) + * Change Kernel Page Cache skipping into row preheating (disabled by default) + (CASSANDRA-4937) + * Improve repair by deciding on a gcBefore before sending + out TreeRequests (CASSANDRA-4932) + * Add an official way to disable compactions (CASSANDRA-5074) + * Reenable ALTER TABLE DROP with new semantics (CASSANDRA-3919) + * Add binary protocol versioning (CASSANDRA-5436) + * Swap THshaServer for TThreadedSelectorServer (CASSANDRA-5530) + * Add alias support to SELECT statement (CASSANDRA-5075) + * Don't create empty RowMutations in CommitLogReplayer (CASSANDRA-5541) + * Use range tombstones when dropping cfs/columns from schema (CASSANDRA-5579) + * cqlsh: drop CQL2/CQL3-beta support (CASSANDRA-5585) + * Track max/min column names in sstables to be able to optimize slice + queries (CASSANDRA-5514, CASSANDRA-5595, CASSANDRA-5600) + * Binary protocol: allow batching already prepared statements (CASSANDRA-4693) + * Allow preparing timestamp, ttl and limit in CQL3 queries (CASSANDRA-4450) + * Support native link w/o JNA in Java7 (CASSANDRA-3734) + * Use SASL authentication in binary protocol v2 (CASSANDRA-5545) + * Replace Thrift HsHa with LMAX Disruptor based implementation (CASSANDRA-5582) + * cqlsh: Add row count to SELECT output (CASSANDRA-5636) + * Include a timestamp with all read commands to determine column expiration + (CASSANDRA-5149) + 1.2.6 + * Fix cross-DC mutation forwarding (CASSANDRA-5632) * Reduce SSTableLoader memory usage (CASSANDRA-5555) * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272) * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5556414c/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/RowMutationVerbHandler.java index 744dca9,eedd134..dcdfc2e --- a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java @@@ -37,15 -37,15 +37,15 @@@ public class RowMutationVerbHandler imp try { RowMutation rm = message.payload; - logger.debug("Applying mutation"); // Check if there were any forwarding headers in this message - InetAddress replyTo = message.from; byte[] from = message.parameters.get(RowMutation.FORWARD_FROM); + InetAddress replyTo; if (from == null) { + replyTo = message.from; byte[] forwardBytes = message.parameters.get(RowMutation.FORWARD_TO); - if (forwardBytes != null && message.version >= MessagingService.VERSION_11) + if (forwardBytes != null) forwardToLocalNodes(rm, message.verb, forwardBytes, message.from); } else @@@ -70,18 -70,17 +70,17 @@@ */ private void forwardToLocalNodes(RowMutation rm, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException { - DataInputStream dis = new DataInputStream(new FastByteArrayInputStream(forwardBytes)); - int size = dis.readInt(); + DataInputStream in = new DataInputStream(new FastByteArrayInputStream(forwardBytes)); + int size = in.readInt(); - // remove fwds from message to avoid infinite loop + // tell the recipients who to send their ack to MessageOut<RowMutation> message = new MessageOut<RowMutation>(verb, rm, RowMutation.serializer).withParameter(RowMutation.FORWARD_FROM, from.getAddress()); + // Send a message to each of the addresses on our Forward List for (int i = 0; i < size; i++) { - // Send a message to each of the addresses on our Forward List - InetAddress address = CompactEndpointSerializationHelper.deserialize(dis); - String id = dis.readUTF(); + InetAddress address = CompactEndpointSerializationHelper.deserialize(in); + int id = in.readInt(); - logger.debug("Forwarding message to {}@{}", id, address); - // Let the response go back to the coordinator + Tracing.trace("Enqueuing forwarded write to {}", address); MessagingService.instance().sendOneWay(message, id, address); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5556414c/src/java/org/apache/cassandra/db/Table.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5556414c/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageProxy.java index 9d2f6c1,ee045eb..eb3d908 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@@ -812,17 -522,14 +805,12 @@@ public class StorageProxy implements St if (dcGroups != null) { - MessageOut<RowMutation> message = rm.createMessage(); // for each datacenter, send the message to one node to relay the write to other replicas - for (Map.Entry<String, Collection<InetAddress>> entry: dcGroups.entrySet()) - { - boolean isLocalDC = entry.getKey().equals(localDataCenter); - Collection<InetAddress> dcTargets = entry.getValue(); - sendMessagesToOneDC(message, dcTargets, isLocalDC, responseHandler); - } + if (message == null) + message = rm.createMessage(); + + for (Collection<InetAddress> dcTargets : dcGroups.values()) - { - // clean out any forwards from previous loop iterations - message = message.withHeaderRemoved(RowMutation.FORWARD_TO); - + sendMessagesToNonlocalDC(message, dcTargets, responseHandler); - } } }
