merge from 1.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5556414c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5556414c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5556414c

Branch: refs/heads/trunk
Commit: 5556414c37f565afb01621ddd69527cb3c17ad78
Parents: 89e792f b1d7405
Author: Jonathan Ellis <[email protected]>
Authored: Tue Jun 18 22:41:24 2013 -0500
Committer: Jonathan Ellis <[email protected]>
Committed: Tue Jun 18 22:41:24 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                           |  1 +
 .../apache/cassandra/db/RowMutationVerbHandler.java   | 11 +++++------
 src/java/org/apache/cassandra/db/Table.java           |  4 +---
 src/java/org/apache/cassandra/net/MessageOut.java     | 11 -----------
 .../org/apache/cassandra/service/StorageProxy.java    | 14 +-------------
 5 files changed, 8 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5556414c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 46b67e3,5d36bd9..1450aa0
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,67 -1,5 +1,68 @@@
 +2.0
 + * Removed on-heap row cache (CASSANDRA-5348)
 + * use nanotime consistently for node-local timeouts (CASSANDRA-5581)
 + * Avoid unnecessary second pass on name-based queries (CASSANDRA-5577)
 + * Experimental triggers (CASSANDRA-1311)
 + * JEMalloc support for off-heap allocation (CASSANDRA-3997)
 + * Single-pass compaction (CASSANDRA-4180)
 + * Removed token range bisection (CASSANDRA-5518)
 + * Removed compatibility with pre-1.2.5 sstables and network messages
 +   (CASSANDRA-5511)
 + * removed PBSPredictor (CASSANDRA-5455)
 + * CAS support (CASSANDRA-5062, 5441, 5442, 5443)
 + * Leveled compaction performs size-tiered compactions in L0 
 +   (CASSANDRA-5371, 5439)
 + * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339)
 + * Log when a node is down longer than the hint window (CASSANDRA-4554)
 + * Optimize tombstone creation for ExpiringColumns (CASSANDRA-4917)
 + * Improve LeveledScanner work estimation (CASSANDRA-5250, 5407)
 + * Replace compaction lock with runWithCompactionsDisabled (CASSANDRA-3430)
 + * Change Message IDs to ints (CASSANDRA-5307)
 + * Move sstable level information into the Stats component, removing the
 +   need for a separate Manifest file (CASSANDRA-4872)
 + * avoid serializing to byte[] on commitlog append (CASSANDRA-5199)
 + * make index_interval configurable per columnfamily (CASSANDRA-3961, 
CASSANDRA-5650)
 + * add default_time_to_live (CASSANDRA-3974)
 + * add memtable_flush_period_in_ms (CASSANDRA-4237)
 + * replace supercolumns internally by composites (CASSANDRA-3237, 5123)
 + * upgrade thrift to 0.9.0 (CASSANDRA-3719)
 + * drop unnecessary keyspace parameter from user-defined compaction API 
 +   (CASSANDRA-5139)
 + * more robust solution to incomplete compactions + counters (CASSANDRA-5151)
 + * Change order of directory searching for c*.in.sh (CASSANDRA-3983)
 + * Add tool to reset SSTable compaction level for LCS (CASSANDRA-5271)
 + * Allow custom configuration loader (CASSANDRA-5045)
 + * Remove memory emergency pressure valve logic (CASSANDRA-3534)
 + * Reduce request latency with eager retry (CASSANDRA-4705)
 + * cqlsh: Remove ASSUME command (CASSANDRA-5331)
 + * Rebuild BF when loading sstables if bloom_filter_fp_chance
 +   has changed since compaction (CASSANDRA-5015)
 + * remove row-level bloom filters (CASSANDRA-4885)
 + * Change Kernel Page Cache skipping into row preheating (disabled by default)
 +   (CASSANDRA-4937)
 + * Improve repair by deciding on a gcBefore before sending
 +   out TreeRequests (CASSANDRA-4932)
 + * Add an official way to disable compactions (CASSANDRA-5074)
 + * Reenable ALTER TABLE DROP with new semantics (CASSANDRA-3919)
 + * Add binary protocol versioning (CASSANDRA-5436)
 + * Swap THshaServer for TThreadedSelectorServer (CASSANDRA-5530)
 + * Add alias support to SELECT statement (CASSANDRA-5075)
 + * Don't create empty RowMutations in CommitLogReplayer (CASSANDRA-5541)
 + * Use range tombstones when dropping cfs/columns from schema (CASSANDRA-5579)
 + * cqlsh: drop CQL2/CQL3-beta support (CASSANDRA-5585)
 + * Track max/min column names in sstables to be able to optimize slice
 +   queries (CASSANDRA-5514, CASSANDRA-5595, CASSANDRA-5600)
 + * Binary protocol: allow batching already prepared statements 
(CASSANDRA-4693)
 + * Allow preparing timestamp, ttl and limit in CQL3 queries (CASSANDRA-4450)
 + * Support native link w/o JNA in Java7 (CASSANDRA-3734)
 + * Use SASL authentication in binary protocol v2 (CASSANDRA-5545)
 + * Replace Thrift HsHa with LMAX Disruptor based implementation 
(CASSANDRA-5582)
 + * cqlsh: Add row count to SELECT output (CASSANDRA-5636)
 + * Include a timestamp with all read commands to determine column expiration
 +   (CASSANDRA-5149)
 +
  1.2.6
+  * Fix cross-DC mutation forwarding (CASSANDRA-5632)
   * Reduce SSTableLoader memory usage (CASSANDRA-5555)
   * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
   * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5556414c/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
index 744dca9,eedd134..dcdfc2e
--- a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
@@@ -37,15 -37,15 +37,15 @@@ public class RowMutationVerbHandler imp
          try
          {
              RowMutation rm = message.payload;
-             logger.debug("Applying mutation");
  
              // Check if there were any forwarding headers in this message
-             InetAddress replyTo = message.from;
              byte[] from = message.parameters.get(RowMutation.FORWARD_FROM);
+             InetAddress replyTo;
              if (from == null)
              {
+                 replyTo = message.from;
                  byte[] forwardBytes = 
message.parameters.get(RowMutation.FORWARD_TO);
 -                if (forwardBytes != null && message.version >= 
MessagingService.VERSION_11)
 +                if (forwardBytes != null)
                      forwardToLocalNodes(rm, message.verb, forwardBytes, 
message.from);
              }
              else
@@@ -70,18 -70,17 +70,17 @@@
       */
      private void forwardToLocalNodes(RowMutation rm, MessagingService.Verb 
verb, byte[] forwardBytes, InetAddress from) throws IOException
      {
 -        DataInputStream dis = new DataInputStream(new 
FastByteArrayInputStream(forwardBytes));
 -        int size = dis.readInt();
 +        DataInputStream in = new DataInputStream(new 
FastByteArrayInputStream(forwardBytes));
 +        int size = in.readInt();
  
-         // remove fwds from message to avoid infinite loop
+         // tell the recipients who to send their ack to
          MessageOut<RowMutation> message = new MessageOut<RowMutation>(verb, 
rm, RowMutation.serializer).withParameter(RowMutation.FORWARD_FROM, 
from.getAddress());
+         // Send a message to each of the addresses on our Forward List
          for (int i = 0; i < size; i++)
          {
-             // Send a message to each of the addresses on our Forward List
 -            InetAddress address = 
CompactEndpointSerializationHelper.deserialize(dis);
 -            String id = dis.readUTF();
 +            InetAddress address = 
CompactEndpointSerializationHelper.deserialize(in);
 +            int id = in.readInt();
-             logger.debug("Forwarding message to {}@{}", id, address);
-             // Let the response go back to the coordinator
+             Tracing.trace("Enqueuing forwarded write to {}", address);
              MessagingService.instance().sendOneWay(message, id, address);
          }
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5556414c/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5556414c/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 9d2f6c1,ee045eb..eb3d908
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -812,17 -522,14 +805,12 @@@ public class StorageProxy implements St
  
          if (dcGroups != null)
          {
 -            MessageOut<RowMutation> message = rm.createMessage();
              // for each datacenter, send the message to one node to relay the 
write to other replicas
 -            for (Map.Entry<String, Collection<InetAddress>> entry: 
dcGroups.entrySet())
 -            {
 -                boolean isLocalDC = entry.getKey().equals(localDataCenter);
 -                Collection<InetAddress> dcTargets = entry.getValue();
 -                sendMessagesToOneDC(message, dcTargets, isLocalDC, 
responseHandler);
 -            }
 +            if (message == null)
 +                message = rm.createMessage();
 +
 +            for (Collection<InetAddress> dcTargets : dcGroups.values())
-             {
-                 // clean out any forwards from previous loop iterations
-                 message = message.withHeaderRemoved(RowMutation.FORWARD_TO);
- 
 +                sendMessagesToNonlocalDC(message, dcTargets, responseHandler);
-             }
          }
      }
  

Reply via email to