Merge branch 'cassandra-1.2' into cassandra-2.0

Conflicts:
        src/java/org/apache/cassandra/net/MessagingService.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3a73e392
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3a73e392
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3a73e392

Branch: refs/heads/cassandra-2.1
Commit: 3a73e392fa424bff5378d4bb72117cfa28f9b0b7
Parents: 1c2a812 2890cc5
Author: Aleksey Yeschenko <alek...@apache.org>
Authored: Tue Apr 22 19:47:42 2014 +0300
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Tue Apr 22 19:47:42 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/BatchlogManager.java    |  2 +-
 .../cassandra/db/HintedHandOffManager.java      |  4 +--
 .../apache/cassandra/net/MessagingService.java  | 27 ++++++++++++++------
 .../apache/cassandra/net/WriteCallbackInfo.java | 16 +++++++++---
 .../apache/cassandra/service/StorageProxy.java  |  7 ++++-
 6 files changed, 41 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 9b73c89,74ddcfd..dbed949
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -8,61 -9,10 +8,62 @@@ Merged from 1.2
   * Fix batchlog to account for CF truncation records (CASSANDRA-6999)
   * Fix CQLSH parsing of functions and BLOB literals (CASSANDRA-7018)
   * Require nodetool rebuild_index to specify index names (CASSANDRA-7038)
+  * Ensure that batchlog and hint timeouts do not produce hints 
(CASSANDRA-7058)
  
  
 -1.2.16
 +2.0.7
 + * Put nodes in hibernate when join_ring is false (CASSANDRA-6961)
 + * Avoid early loading of non-system keyspaces before compaction-leftovers 
 +   cleanup at startup (CASSANDRA-6913)
 + * Restrict Windows to parallel repairs (CASSANDRA-6907)
 + * (Hadoop) Allow manually specifying start/end tokens in CFIF 
(CASSANDRA-6436)
 + * Fix NPE in MeteredFlusher (CASSANDRA-6820)
 + * Fix race processing range scan responses (CASSANDRA-6820)
 + * Allow deleting snapshots from dropped keyspaces (CASSANDRA-6821)
 + * Add uuid() function (CASSANDRA-6473)
 + * Omit tombstones from schema digests (CASSANDRA-6862)
 + * Include correct consistencyLevel in LWT timeout (CASSANDRA-6884)
 + * Lower chances for losing new SSTables during nodetool refresh and
 +   ColumnFamilyStore.loadNewSSTables (CASSANDRA-6514)
 + * Add support for DELETE ... IF EXISTS to CQL3 (CASSANDRA-5708)
 + * Update hadoop_cql3_word_count example (CASSANDRA-6793)
 + * Fix handling of RejectedExecution in sync Thrift server (CASSANDRA-6788)
 + * Log more information when exceeding tombstone_warn_threshold 
(CASSANDRA-6865)
 + * Fix truncate to not abort due to unreachable fat clients (CASSANDRA-6864)
 + * Fix schema concurrency exceptions (CASSANDRA-6841)
 + * Fix leaking validator FH in StreamWriter (CASSANDRA-6832)
 + * Fix saving triggers to schema (CASSANDRA-6789)
 + * Fix trigger mutations when base mutation list is immutable (CASSANDRA-6790)
 + * Fix accounting in FileCacheService to allow re-using RAR (CASSANDRA-6838)
 + * Fix static counter columns (CASSANDRA-6827)
 + * Restore expiring->deleted (cell) compaction optimization (CASSANDRA-6844)
 + * Fix CompactionManager.needsCleanup (CASSANDRA-6845)
 + * Correctly compare BooleanType values other than 0 and 1 (CASSANDRA-6779)
 + * Read message id as string from earlier versions (CASSANDRA-6840)
 + * Properly use the Paxos consistency for (non-protocol) batch 
(CASSANDRA-6837)
 + * Add paranoid disk failure option (CASSANDRA-6646)
 + * Improve PerRowSecondaryIndex performance (CASSANDRA-6876)
 + * Extend triggers to support CAS updates (CASSANDRA-6882)
 + * Static columns with IF NOT EXISTS don't always work as expected 
(CASSANDRA-6873)
 + * Fix paging with SELECT DISTINCT (CASSANDRA-6857)
 + * Fix UnsupportedOperationException on CAS timeout (CASSANDRA-6923)
 + * Improve MeteredFlusher handling of MF-unaffected column families
 +   (CASSANDRA-6867)
 + * Add CqlRecordReader using native pagination (CASSANDRA-6311)
 + * Add QueryHandler interface (CASSANDRA-6659)
 + * Track liveRatio per-memtable, not per-CF (CASSANDRA-6945)
 + * Make sure upgradesstables keeps sstable level (CASSANDRA-6958)
 + * Fix LIMIT with static columns (CASSANDRA-6956)
 + * Fix clash with CQL column name in thrift validation (CASSANDRA-6892)
 + * Fix error with super columns in mixed 1.2-2.0 clusters (CASSANDRA-6966)
 + * Fix bad skip of sstables on slice query with composite start/finish 
(CASSANDRA-6825)
 + * Fix unintended update with conditional statement (CASSANDRA-6893)
 + * Fix map element access in IF (CASSANDRA-6914)
 + * Avoid costly range calculations for range queries on system keyspaces
 +   (CASSANDRA-6906)
 + * Fix SSTable not released if stream session fails (CASSANDRA-6818)
 + * Avoid build failure due to ANTLR timeout (CASSANDRA-6991)
 +Merged from 1.2:
   * Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816)
   * add extra SSL cipher suites (CASSANDRA-6613)
   * fix nodetool getsstables for blob PK (CASSANDRA-6803)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/BatchlogManager.java
index 5770994,02af9d3..5aea736
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@@ -323,7 -328,7 +323,7 @@@ public class BatchlogManager implement
                  }
              };
              WriteResponseHandler handler = new WriteResponseHandler(ep, 
WriteType.UNLOGGED_BATCH, callback);
-             MessagingService.instance().sendRR(mutation.createMessage(), ep, 
handler);
 -            MessagingService.instance().sendUnhintableMutation(mutation, ep, 
handler);
++            MessagingService.instance().sendRR(mutation.createMessage(), ep, 
handler, false);
              handlers.add(handler);
          }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 942707e,a7a3e06..13d1bb0
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@@ -450,8 -399,8 +450,8 @@@ public class HintedHandOffManager imple
                          deleteHint(hostIdBytes, hint.name(), 
hint.maxTimestamp());
                      }
                  };
-                 WriteResponseHandler responseHandler = new 
WriteResponseHandler(endpoint, WriteType.UNLOGGED_BATCH, callback);
-                 MessagingService.instance().sendRR(message, endpoint, 
responseHandler);
+                 WriteResponseHandler responseHandler = new 
WriteResponseHandler(endpoint, WriteType.SIMPLE, callback);
 -                MessagingService.instance().sendUnhintableMutation(rm, 
endpoint, responseHandler);
++                MessagingService.instance().sendRR(message, endpoint, 
responseHandler, false);
                  responseHandlers.add(responseHandler);
              }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index cc5dae5,3f90d7f..cccf698
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -537,21 -527,18 +537,33 @@@ public final class MessagingService imp
          return verbHandlers.get(type);
      }
  
 -    public String addCallback(IMessageCallback cb, MessageOut message, 
InetAddress to, long timeout)
 +    public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress 
to, long timeout)
      {
 -        String messageId = nextId();
 -        CallbackInfo previous;
 -
 -        // If HH is enabled and this is a mutation message => store the 
message to track for potential hints.
 -        if (DatabaseDescriptor.hintedHandoffEnabled() && message.verb == 
Verb.MUTATION)
 -            previous = callbacks.put(messageId, new CallbackInfo(to, cb, 
message, callbackDeserializers.get(message.verb)), timeout);
 -        else
 -            previous = callbacks.put(messageId, new CallbackInfo(to, cb, 
callbackDeserializers.get(message.verb)), timeout);
 +        assert message.verb != Verb.MUTATION; // mutations need to call the 
overload with a ConsistencyLevel
 +        int messageId = nextId();
 +        CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, 
cb, callbackDeserializers.get(message.verb)), timeout);
 +        assert previous == null : String.format("Callback already exists for 
id %d! (%s)", messageId, previous);
 +        return messageId;
 +    }
  
-     public int addCallback(IAsyncCallback cb, MessageOut<? extends IMutation> 
message, InetAddress to, long timeout, ConsistencyLevel consistencyLevel)
 -        assert previous == null;
++    public int addCallback(IAsyncCallback cb,
++                           MessageOut<? extends IMutation> message,
++                           InetAddress to,
++                           long timeout,
++                           ConsistencyLevel consistencyLevel,
++                           boolean allowHints)
 +    {
 +        assert message.verb == Verb.MUTATION || message.verb == 
Verb.COUNTER_MUTATION;
 +        int messageId = nextId();
-         CallbackInfo previous = callbacks.put(messageId, new 
WriteCallbackInfo(to, cb, message, callbackDeserializers.get(message.verb), 
consistencyLevel), timeout);
++        CallbackInfo previous = callbacks.put(messageId,
++                                              new WriteCallbackInfo(to,
++                                                                    cb,
++                                                                    message,
++                                                                    
callbackDeserializers.get(message.verb),
++                                                                    
consistencyLevel,
++                                                                    
allowHints),
++                                                                    timeout);
 +        assert previous == null : String.format("Callback already exists for 
id %d! (%s)", messageId, previous);
          return messageId;
      }
  
@@@ -568,24 -559,14 +580,21 @@@
      }
  
      /**
 -     * A special version of sendRR that doesn't trigger a hint for the 
mutation on a timeout.
 -     * Used by BatchlogManager and HintedHandOffManager.
 +     * Send a non-mutation message to a given endpoint. This method specifies 
a callback
 +     * which is invoked with the actual response.
-      * Also holds the message (only mutation messages) to determine if it
-      * needs to trigger a hint (uses StorageProxy for that).
 +     *
 +     * @param message message to be sent.
 +     * @param to      endpoint to which the message needs to be sent
 +     * @param cb      callback interface which is used to pass the responses 
or
 +     *                suggest that a timeout occurred to the invoker of the 
send().
-      *                suggest that a timeout occurred to the invoker of the 
send().
 +     * @param timeout the timeout used for expiration
 +     * @return an reference to message id used to match with the result
       */
 -    public void sendUnhintableMutation(RowMutation mutation, InetAddress to, 
IMessageCallback cb)
 +    public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, 
long timeout)
      {
 -        String id = nextId();
 -        callbacks.put(id, new CallbackInfo(to, cb, WriteResponse.serializer), 
DatabaseDescriptor.getWriteRpcTimeout());
 -        sendOneWay(mutation.createMessage(), id, to);
 +        int id = addCallback(cb, message, to, timeout);
 +        sendOneWay(message, id, to);
 +        return id;
      }
  
      /**
@@@ -596,14 -577,24 +605,16 @@@
       *
       * @param message message to be sent.
       * @param to      endpoint to which the message needs to be sent
 -     * @param cb      callback interface which is used to pass the responses 
or
 +     * @param handler callback interface which is used to pass the responses 
or
       *                suggest that a timeout occurred to the invoker of the 
send().
-      *                suggest that a timeout occurred to the invoker of the 
send().
 -     * @param timeout the timeout used for expiration
       * @return an reference to message id used to match with the result
       */
-     public int sendRR(MessageOut<? extends IMutation> message, InetAddress 
to, AbstractWriteResponseHandler handler)
 -    public String sendRR(MessageOut message, InetAddress to, IMessageCallback 
cb, long timeout)
++    public int sendRR(MessageOut<? extends IMutation> message,
++                      InetAddress to,
++                      AbstractWriteResponseHandler handler,
++                      boolean allowHints)
      {
-         int id = addCallback(handler, message, to, message.getTimeout(), 
handler.consistencyLevel);
 -        String id = addCallback(cb, message, to, timeout);
 -
 -        if (cb instanceof AbstractWriteResponseHandler)
 -        {
 -            PBSPredictor.instance().startWriteOperation(id);
 -        }
 -        else if (cb instanceof ReadCallback)
 -        {
 -            PBSPredictor.instance().startReadOperation(id);
 -        }
 -
++        int id = addCallback(handler, message, to, message.getTimeout(), 
handler.consistencyLevel, allowHints);
          sendOneWay(message, id, to);
          return id;
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/WriteCallbackInfo.java
index be7b668,0000000..987ec15
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
@@@ -1,46 -1,0 +1,54 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.cassandra.net;
 +
 +import java.net.InetAddress;
 +
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.io.IVersionedSerializer;
 +import org.apache.cassandra.service.StorageProxy;
 +
 +public class WriteCallbackInfo extends CallbackInfo
 +{
 +    public final MessageOut sentMessage;
 +    private final ConsistencyLevel consistencyLevel;
++    private final boolean allowHints;
 +
-     public WriteCallbackInfo(InetAddress target, IAsyncCallback callback, 
MessageOut message, IVersionedSerializer<?> serializer, ConsistencyLevel 
consistencyLevel)
++    public WriteCallbackInfo(InetAddress target,
++                             IAsyncCallback callback,
++                             MessageOut message,
++                             IVersionedSerializer<?> serializer,
++                             ConsistencyLevel consistencyLevel,
++                             boolean allowHints)
 +    {
 +        super(target, callback, serializer);
 +        assert message != null;
 +        this.sentMessage = message;
 +        this.consistencyLevel = consistencyLevel;
++        this.allowHints = allowHints;
 +    }
 +
 +    public boolean shouldHint()
 +    {
-         return sentMessage.verb != MessagingService.Verb.COUNTER_MUTATION
-                && consistencyLevel != ConsistencyLevel.ANY
-                && StorageProxy.shouldHint(target);
++        return allowHints
++            && sentMessage.verb != MessagingService.Verb.COUNTER_MUTATION
++            && consistencyLevel != ConsistencyLevel.ANY
++            && StorageProxy.shouldHint(target);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3a73e392/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 033ce8e,7ef3d72..fc6ee3a
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -917,29 -616,35 +917,34 @@@ public class StorageProxy implements St
          Iterator<InetAddress> iter = targets.iterator();
          InetAddress target = iter.next();
  
 -        // direct writes to local DC or old Cassandra versions
 -        if (localDC || MessagingService.instance().getVersion(target) < 
MessagingService.VERSION_12)
 +        // Add the other destinations of the same message as a FORWARD_HEADER 
entry
 +        DataOutputBuffer out = new DataOutputBuffer();
 +        try
          {
 -            // yes, the loop and non-loop code here are the same; this is 
clunky but we want to avoid
 -            // creating a second iterator since we already have a perfectly 
good one
 -            MessagingService.instance().sendRR(message, target, handler);
 +            out.writeInt(targets.size() - 1);
              while (iter.hasNext())
              {
 -                target = iter.next();
 -                MessagingService.instance().sendRR(message, target, handler);
 +                InetAddress destination = iter.next();
 +                CompactEndpointSerializationHelper.serialize(destination, 
out);
-                 int id = MessagingService.instance().addCallback(handler, 
message, destination, message.getTimeout(), handler.consistencyLevel);
++                int id = MessagingService.instance().addCallback(handler,
++                                                                 message,
++                                                                 destination,
++                                                                 
message.getTimeout(),
++                                                                 
handler.consistencyLevel,
++                                                                 true);
 +                out.writeInt(id);
 +                logger.trace("Adding FWD message to {}@{}", id, destination);
              }
 -            return;
 +            message = message.withParameter(RowMutation.FORWARD_TO, 
out.getData());
 +            // send the combined message + forward headers
 +            int id = MessagingService.instance().sendRR(message, target, 
handler);
 +            logger.trace("Sending message to {}@{}", id, target);
          }
 -
 -        // Add all the other destinations of the same message as a 
FORWARD_HEADER entry
 -        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
 -        DataOutputStream dos = new DataOutputStream(bos);
 -        dos.writeInt(targets.size() - 1);
 -        while (iter.hasNext())
 +        catch (IOException e)
          {
 -            InetAddress destination = iter.next();
 -            CompactEndpointSerializationHelper.serialize(destination, dos);
 -            String id = MessagingService.instance().addCallback(handler, 
message, destination, message.getTimeout());
 -            dos.writeUTF(id);
 +            // DataOutputBuffer is in-memory, doesn't throw IOException
 +            throw new AssertionError(e);
          }
 -        message = message.withParameter(RowMutation.FORWARD_TO, 
bos.toByteArray());
 -        // send the combined message + forward headers
 -        Tracing.trace("Enqueuing message to {}", target);
 -        MessagingService.instance().sendRR(message, target, handler);
      }
  
      private static void insertLocal(final RowMutation rm, final 
AbstractWriteResponseHandler responseHandler)

Reply via email to