Updated Branches:
  refs/heads/trunk 42c55896b -> d5fc1932e

Merge branch 'cassandra-1.1' into trunk

Conflicts:
        CHANGES.txt
        src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
        src/java/org/apache/cassandra/service/StorageProxy.java
        src/java/org/apache/cassandra/service/WriteResponseHandler.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d5fc1932
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d5fc1932
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d5fc1932

Branch: refs/heads/trunk
Commit: d5fc1932e965a32c944efcdf95fd0236624949fc
Parents: 42c5589 7371e10
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Fri Sep 7 10:50:38 2012 +0200
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Fri Sep 7 10:50:38 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    6 ++++
 .../org/apache/cassandra/db/CounterColumn.java     |    2 +-
 .../cassandra/db/CounterMutationVerbHandler.java   |   23 +++++++++++----
 .../locator/AbstractReplicationStrategy.java       |    8 ++--
 .../service/AbstractWriteResponseHandler.java      |   17 +++++++++-
 .../DatacenterSyncWriteResponseHandler.java        |   10 +++---
 .../service/DatacenterWriteResponseHandler.java    |   10 +++---
 .../org/apache/cassandra/service/StorageProxy.java |   17 ++++++----
 .../cassandra/service/WriteResponseHandler.java    |   12 ++++----
 9 files changed, 69 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ec4992c,f192be2..0905cf9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,58 -1,9 +1,64 @@@
 +1.2-beta1
 + * include message initiation time to replicas so they can more
 +   accurately drop timed-out requests (CASSANDRA-2858)
 + * fix clientutil.jar dependencies (CASSANDRA-4566)
 + * optimize WriteResponse (CASSANDRA-4548)
 + * new metrics (CASSANDRA-4009)
 + * redesign KEYS indexes to avoid read-before-write (CASSANDRA-2897)
 + * debug tracing (CASSANDRA-1123)
 + * parallelize row cache loading (CASSANDRA-4282)
 + * Make compaction, flush JBOD-aware (CASSANDRA-4292)
 + * run local range scans on the read stage (CASSANDRA-3687)
 + * clean up ioexceptions (CASSANDRA-2116)
 + * add disk_failure_policy (CASSANDRA-2118)
 + * Introduce new json format with row level deletion (CASSANDRA-4054)
 + * remove redundant "name" column from schema_keyspaces (CASSANDRA-4433)
 + * improve "nodetool ring" handling of multi-dc clusters (CASSANDRA-3047)
 + * update NTS calculateNaturalEndpoints to be O(N log N) (CASSANDRA-3881)
 + * add UseCondCardMark XX jvm settings on jdk 1.7 (CASSANDRA-4366)
 + * split up rpc timeout by operation type (CASSANDRA-2819)
 + * rewrite key cache save/load to use only sequential i/o (CASSANDRA-3762)
 + * update MS protocol with a version handshake + broadcast address id
 +   (CASSANDRA-4311)
 + * multithreaded hint replay (CASSANDRA-4189)
 + * add inter-node message compression (CASSANDRA-3127)
 + * remove COPP (CASSANDRA-2479)
 + * Track tombstone expiration and compact when tombstone content is
 +   higher than a configurable threshold, default 20% (CASSANDRA-3442, 4234)
 + * update MurmurHash to version 3 (CASSANDRA-2975)
 + * (CLI) track elapsed time for `delete' operation (CASSANDRA-4060)
 + * (CLI) jline version is bumped to 1.0 to properly  support
 +   'delete' key function (CASSANDRA-4132)
 + * Save IndexSummary into new SSTable 'Summary' component (CASSANDRA-2392, 
4289)
 + * Add support for range tombstones (CASSANDRA-3708)
 + * Improve MessagingService efficiency (CASSANDRA-3617)
 + * Avoid ID conflicts from concurrent schema changes (CASSANDRA-3794)
 + * Set thrift HSHA server thread limit to unlimited by default 
(CASSANDRA-4277)
 + * Avoids double serialization of CF id in RowMutation messages
 +   (CASSANDRA-4293)
 + * stream compressed sstables directly with java nio (CASSANDRA-4297)
 + * Support multiple ranges in SliceQueryFilter (CASSANDRA-3885)
 + * Add column metadata to system column families (CASSANDRA-4018)
 + * (cql3) Always use composite types by default (CASSANDRA-4329)
 + * (cql3) Add support for set, map and list (CASSANDRA-3647)
 + * Validate date type correctly (CASSANDRA-4441)
 + * (cql3) Allow definitions with only a PK (CASSANDRA-4361)
 + * (cql3) Add support for row key composites (CASSANDRA-4179)
 + * improve DynamicEndpointSnitch by using reservoir sampling (CASSANDRA-4038)
 + * (cql3) Add support for 2ndary indexes (CASSANDRA-3680)
 + * (cql3) fix defining more than one PK to be invalid (CASSANDRA-4477)
 + * remove schema agreement checking from all external APIs (Thrift, CQL and 
CQL3) (CASSANDRA-4487)
 + * add Murmur3Partitioner and make it default for new installations 
(CASSANDRA-3772)
 + * (cql3) update pseudo-map syntax to use map syntax (CASSANDRA-4497)
 + * Finer grained exceptions hierarchy and provides error code with exceptions 
(CASSANDRA-3979)
 +
 +
+ 1.1.6
+   * (cql3) fix potential NPE with both equal and unequal restriction 
(CASSANDRA-4532)
+   * (cql3) improves ORDER BY validation (CASSANDRA-4624)
+   * Fix potential deadlock during counter writes (CASSANDRA-4578)
+ 
+ 
  1.1.5
   * add SecondaryIndex.reload API (CASSANDRA-4581)
   * use millis + atomicint for commitlog segment creation instead of

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index f94495d,3ecbe8b..f28e2bd
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@@ -23,34 -25,61 +23,45 @@@ import org.slf4j.Logger
  import org.slf4j.LoggerFactory;
  
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.io.util.FastByteArrayInputStream;
 -import org.apache.cassandra.net.*;
 -import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.exceptions.RequestExecutionException;
 +import org.apache.cassandra.net.IVerbHandler;
 +import org.apache.cassandra.net.MessageIn;
 +import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.service.StorageProxy;
 -import org.apache.cassandra.thrift.UnavailableException;
 +import org.apache.cassandra.utils.FBUtilities;
  
 -public class CounterMutationVerbHandler implements IVerbHandler
 +public class CounterMutationVerbHandler implements 
IVerbHandler<CounterMutation>
  {
 -    private static Logger logger = 
LoggerFactory.getLogger(CounterMutationVerbHandler.class);
 +    private static final Logger logger = 
LoggerFactory.getLogger(CounterMutationVerbHandler.class);
  
-     public void doVerb(MessageIn<CounterMutation> message, String id)
 -    public void doVerb(final Message message, final String id)
++    public void doVerb(final MessageIn<CounterMutation> message, final String 
id)
      {
 -        byte[] bytes = message.getMessageBody();
 -        FastByteArrayInputStream buffer = new FastByteArrayInputStream(bytes);
 -
          try
          {
-             CounterMutation cm = message.payload;
 -            DataInputStream is = new DataInputStream(buffer);
 -            final CounterMutation cm = 
CounterMutation.serializer().deserialize(is, message.getVersion());
++            final CounterMutation cm = message.payload;
              if (logger.isDebugEnabled())
                logger.debug("Applying forwarded " + cm);
  
              String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
-             StorageProxy.applyCounterMutationOnLeader(cm, 
localDataCenter).get();
-             WriteResponse response = new WriteResponse();
-             MessagingService.instance().sendReply(response.createMessage(), 
id, message.from);
+             // We should not wait for the result of the write in this thread,
+             // otherwise we could have a distributed deadlock between replicas
+             // running this VerbHandler (see #4578).
+             // Instead, we use a callback to send the response. Note that the 
callback
+             // will not be called if the request timeout, but this is ok
+             // because the coordinator of the counter mutation will timeout on
+             // it's own in that case.
+             StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter, 
new Runnable(){
+                 public void run()
+                 {
 -                    try
 -                    {
 -                        WriteResponse response = new 
WriteResponse(cm.getTable(), cm.key(), true);
 -                        Message responseMessage = 
WriteResponse.makeWriteResponseMessage(message, response);
 -                        
MessagingService.instance().sendReply(responseMessage, id, message.getFrom());
 -                    }
 -                    catch (IOException e)
 -                    {
 -                        logger.error("Error writing response to counter 
mutation", e);
 -                    }
++                    WriteResponse response = new WriteResponse();
++                    
MessagingService.instance().sendReply(response.createMessage(), id, 
message.from);
+                 }
+             });
          }
 -        catch (UnavailableException e)
 -        {
 -            // We check for UnavailableException in the coordinator not. It is
 -            // hence reasonable to let the coordinator timeout in the very
 -            // unlikely case we arrive here
 -        }
 -        catch (TimeoutException e)
 +        catch (RequestExecutionException e)
          {
-             // The coordinator will timeout on itself, so let that go
+             // The coordinator will timeout on it's own so ignore
 +            logger.debug("counter error", e);
          }
          catch (IOException e)
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 328e1ea,d280a8e..664d9ea
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@@ -40,12 -49,12 +45,13 @@@ public abstract class AbstractWriteResp
          startTime = System.currentTimeMillis();
          this.consistencyLevel = consistencyLevel;
          this.writeEndpoints = writeEndpoints;
+         this.callback = callback;
      }
  
 -    public void get() throws TimeoutException
 +    public void get() throws WriteTimeoutException
      {
 -        long timeout = DatabaseDescriptor.getRpcTimeout() - 
(System.currentTimeMillis() - startTime);
 +        long timeout = DatabaseDescriptor.getWriteRpcTimeout() - 
(System.currentTimeMillis() - startTime);
 +
          boolean success;
          try
          {
@@@ -57,15 -66,20 +63,22 @@@
          }
  
          if (!success)
 -        {
 -            throw new TimeoutException();
 -        }
 +            throw new WriteTimeoutException(consistencyLevel, ackCount(), 
blockFor());
      }
  
 +    protected abstract int ackCount();
 +
 +    protected abstract int blockFor();
 +
      /** null message means "response from local write" */
 -    public abstract void response(Message msg);
 +    public abstract void response(MessageIn msg);
  
      public abstract void assureSufficientLiveNodes() throws 
UnavailableException;
+ 
+     protected void signal()
+     {
+         condition.signal();
+         if (callback != null)
+             callback.run();
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index a33ce60,cbecf6b..85afade
--- 
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ 
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@@ -46,17 -53,15 +46,17 @@@ public class DatacenterSyncWriteRespons
          localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
      }
  
 -      private final NetworkTopologyStrategy strategy;
 -    private HashMap<String, AtomicInteger> responses = new HashMap<String, 
AtomicInteger>();
 +    private final String table;
 +    private final NetworkTopologyStrategy strategy;
 +    private final HashMap<String, AtomicInteger> responses = new 
HashMap<String, AtomicInteger>();
  
-     protected DatacenterSyncWriteResponseHandler(Collection<InetAddress> 
writeEndpoints, ConsistencyLevel consistencyLevel, String table)
+     protected DatacenterSyncWriteResponseHandler(Collection<InetAddress> 
writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable 
callback)
      {
          // Response is been managed by the map so make it 1 for the 
superclass.
-         super(writeEndpoints, consistencyLevel);
+         super(writeEndpoints, consistencyLevel, callback);
          assert consistencyLevel == ConsistencyLevel.EACH_QUORUM;
  
 +        this.table = table;
          strategy = (NetworkTopologyStrategy) 
Table.open(table).getReplicationStrategy();
  
          for (String dc : strategy.getDatacenters())
@@@ -66,12 -71,12 +66,12 @@@
          }
      }
  
-     public static IWriteResponseHandler create(Collection<InetAddress> 
writeEndpoints, ConsistencyLevel consistencyLevel, String table)
+     public static IWriteResponseHandler create(Collection<InetAddress> 
writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable 
callback)
      {
-         return new DatacenterSyncWriteResponseHandler(writeEndpoints, 
consistencyLevel, table);
+         return new DatacenterSyncWriteResponseHandler(writeEndpoints, 
consistencyLevel, table, callback);
      }
  
 -    public void response(Message message)
 +    public void response(MessageIn message)
      {
          String dataCenter = message == null
                              ? localdc
@@@ -86,26 -91,9 +86,26 @@@
          }
  
          // all the quorum conditions are met
-         condition.signal();
+         signal();
      }
  
 +    protected int blockFor()
 +    {
 +        return consistencyLevel.blockFor(table);
 +    }
 +
 +    protected int ackCount()
 +    {
 +        int n = 0;
 +        for (Map.Entry<String, AtomicInteger> entry : responses.entrySet())
 +        {
 +            String dc = entry.getKey();
 +            AtomicInteger i = entry.getValue();
 +            n += (strategy.getReplicationFactor(dc) / 2) + 1 - i.get();
 +        }
 +        return n;
 +    }
 +
      public void assureSufficientLiveNodes() throws UnavailableException
      {
          Map<String, AtomicInteger> dcEndpoints = new HashMap<String, 
AtomicInteger>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index 6644b1a,881c99d..2821e3b
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@@ -55,12 -62,20 +55,12 @@@ public class DatacenterWriteResponseHan
      }
  
      @Override
 -    protected int determineBlockFor(String table)
 -    {
 -        NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) 
Table.open(table).getReplicationStrategy();
 -        return (strategy.getReplicationFactor(localdc) / 2) + 1;
 -    }
 -
 -
 -    @Override
 -    public void response(Message message)
 +    public void response(MessageIn message)
      {
 -        if (message == null || 
localdc.equals(snitch.getDatacenter(message.getFrom())))
 +        if (message == null || 
localdc.equals(snitch.getDatacenter(message.from)))
          {
              if (responses.decrementAndGet() == 0)
-                 condition.signal();
+                 signal();
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 6f461c9,23e0de4..24625df
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -247,8 -246,9 +249,9 @@@ public class StorageProxy implements St
      public static IWriteResponseHandler performWrite(IMutation mutation,
                                                       ConsistencyLevel 
consistency_level,
                                                       String localDataCenter,
-                                                      WritePerformer performer)
+                                                      WritePerformer performer,
+                                                      Runnable callback)
 -    throws UnavailableException, TimeoutException, IOException
 +    throws UnavailableException, OverloadedException, IOException
      {
          String table = mutation.getTable();
          AbstractReplicationStrategy rs = 
Table.open(table).getReplicationStrategy();
@@@ -540,20 -537,20 +543,20 @@@
          }
      }
  
 -
 -
      // Must be called on a replica of the mutation. This replica becomes the
      // leader of this mutation.
-     public static IWriteResponseHandler 
applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter)
 -    public static IWriteResponseHandler 
applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, 
Runnable callback) throws UnavailableException, TimeoutException, IOException
++    public static IWriteResponseHandler 
applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, 
Runnable callback)
 +    throws UnavailableException, IOException, OverloadedException
      {
-         return performWrite(cm, cm.consistency(), localDataCenter, 
counterWritePerformer);
+         return performWrite(cm, cm.consistency(), localDataCenter, 
counterWritePerformer, callback);
      }
  
      // Same as applyCounterMutationOnLeader but must with the difference that 
it use the MUTATION stage to execute the write (while
      // applyCounterMutationOnLeader assumes it is on the MUTATION stage 
already)
 -    public static IWriteResponseHandler 
applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter) 
throws UnavailableException, TimeoutException, IOException
 +    public static IWriteResponseHandler 
applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter)
 +    throws UnavailableException, IOException, OverloadedException
      {
-         return performWrite(cm, cm.consistency(), localDataCenter, 
counterWriteOnCoordinatorPerformer);
+         return performWrite(cm, cm.consistency(), localDataCenter, 
counterWriteOnCoordinatorPerformer, null);
      }
  
      private static Runnable counterWriteTask(final IMutation mutation,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 5b0f64c,baf8558..cc803cb
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@@ -40,19 -41,16 +40,19 @@@ public class WriteResponseHandler exten
      protected static final Logger logger = 
LoggerFactory.getLogger(WriteResponseHandler.class);
  
      protected final AtomicInteger responses;
 +    private final int blockFor;
  
-     protected WriteResponseHandler(Collection<InetAddress> writeEndpoints, 
ConsistencyLevel consistencyLevel, String table)
+     protected WriteResponseHandler(Collection<InetAddress> writeEndpoints, 
ConsistencyLevel consistencyLevel, String table, Runnable callback)
      {
-         super(writeEndpoints, consistencyLevel);
+         super(writeEndpoints, consistencyLevel, callback);
 -        responses = new AtomicInteger(determineBlockFor(table));
 +        blockFor = consistencyLevel.blockFor(table);
 +        responses = new AtomicInteger(blockFor);
      }
  
      protected WriteResponseHandler(InetAddress endpoint)
      {
-         super(Arrays.asList(endpoint), ConsistencyLevel.ALL);
+         super(Arrays.asList(endpoint), ConsistencyLevel.ALL, null);
 +        blockFor = 1;
          responses = new AtomicInteger(1);
      }
  
@@@ -66,20 -64,31 +66,20 @@@
          return new WriteResponseHandler(endpoint);
      }
  
 -    public void response(Message m)
 +    public void response(MessageIn m)
      {
          if (responses.decrementAndGet() == 0)
-             condition.signal();
+             signal();
      }
  
 -    protected int determineBlockFor(String table)
 +    protected int ackCount()
      {
 -        switch (consistencyLevel)
 -        {
 -            case ONE:
 -                return 1;
 -            case ANY:
 -                return 1;
 -            case TWO:
 -                return 2;
 -            case THREE:
 -                return 3;
 -            case QUORUM:
 -                return 
(Table.open(table).getReplicationStrategy().getReplicationFactor() / 2) + 1;
 -            case ALL:
 -                return 
Table.open(table).getReplicationStrategy().getReplicationFactor();
 -            default:
 -                throw new UnsupportedOperationException("invalid consistency 
level: " + consistencyLevel.toString());
 -        }
 +        return blockFor - responses.get();
 +    }
 +
 +    protected int blockFor()
 +    {
 +        return blockFor;
      }
  
      public void assureSufficientLiveNodes() throws UnavailableException

Reply via email to