add ack count to TimedOutException on writes
patch by jbellis; reviewed by slebresne for CASSANDRA-4414


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c9a13c3c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c9a13c3c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c9a13c3c

Branch: refs/heads/trunk
Commit: c9a13c3c452d3e9e49bca158da93f9a9bbae7379
Parents: 2711548
Author: Jonathan Ellis <[email protected]>
Authored: Wed Jul 11 18:13:25 2012 -0500
Committer: Jonathan Ellis <[email protected]>
Committed: Fri Jul 13 11:32:12 2012 -0500

----------------------------------------------------------------------
 interface/cassandra.thrift                         |    8 +-
 .../org/apache/cassandra/thrift/Cassandra.java     |    4 +
 .../org/apache/cassandra/thrift/Constants.java     |    2 +-
 .../apache/cassandra/thrift/TimedOutException.java |  118 ++++++++++++++-
 .../org/apache/cassandra/cql/QueryProcessor.java   |   17 +--
 .../cql3/statements/ModificationStatement.java     |    9 +-
 .../org/apache/cassandra/db/CounterColumn.java     |    2 +-
 .../cassandra/db/CounterMutationVerbHandler.java   |    7 +-
 .../apache/cassandra/db/HintedHandOffManager.java  |    4 +-
 .../service/AbstractWriteResponseHandler.java      |    7 +-
 .../DatacenterSyncWriteResponseHandler.java        |   14 ++-
 .../cassandra/service/IWriteResponseHandler.java   |    3 +-
 .../org/apache/cassandra/service/StorageProxy.java |   18 +-
 .../cassandra/service/WriteResponseHandler.java    |   10 +-
 .../apache/cassandra/thrift/CassandraServer.java   |   17 +--
 15 files changed, 182 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/interface/cassandra.thrift
----------------------------------------------------------------------
diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift
index 6df9628..1041661 100644
--- a/interface/cassandra.thrift
+++ b/interface/cassandra.thrift
@@ -55,7 +55,7 @@ namespace rb CassandraThrift
 # An effort should be made not to break forward-client-compatibility either
 # (e.g. one should avoid removing obsolete fields from the IDL), but no
 # guarantees in this respect are made by the Cassandra project.
-const string VERSION = "19.32.0"
+const string VERSION = "19.33.0"
 
 
 #
@@ -140,6 +140,12 @@ exception UnavailableException {
 
 /** RPC timeout was exceeded.  either a node failed mid-operation, or load was 
too high, or the requested op was too large. */
 exception TimedOutException {
+    /** 
+     * if a write operation was acknowledged some replicas but not enough to 
+     * satisfy the required ConsistencyLevel, the number of successful 
+     * replies will be given here
+     */
+    1: optional i32 acknowledged_by
 }
 
 /** invalid authentication request (invalid keyspace, user does not exist, or 
credentials invalid) */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
----------------------------------------------------------------------
diff --git 
a/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java 
b/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
index 1c6ec69..ec4982e 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
@@ -17817,6 +17817,8 @@ public class Cassandra {
 
     private void readObject(java.io.ObjectInputStream in) throws 
java.io.IOException, ClassNotFoundException {
       try {
+        // it doesn't seem like you should have to do this, but java 
serialization is wacky, and doesn't call the default constructor.
+        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);
@@ -34876,6 +34878,8 @@ public class Cassandra {
 
     private void readObject(java.io.ObjectInputStream in) throws 
java.io.IOException, ClassNotFoundException {
       try {
+        // it doesn't seem like you should have to do this, but java 
serialization is wacky, and doesn't call the default constructor.
+        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
----------------------------------------------------------------------
diff --git 
a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java 
b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
index 7e183c7..9d0701f 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java
@@ -44,6 +44,6 @@ import org.slf4j.LoggerFactory;
 
 public class Constants {
 
-  public static final String VERSION = "19.32.0";
+  public static final String VERSION = "19.33.0";
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
----------------------------------------------------------------------
diff --git 
a/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java 
b/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
index ea1d648..bdb63dc 100644
--- 
a/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
+++ 
b/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java
@@ -48,11 +48,23 @@ import org.slf4j.LoggerFactory;
 public class TimedOutException extends Exception implements 
org.apache.thrift.TBase<TimedOutException, TimedOutException._Fields>, 
java.io.Serializable, Cloneable {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new 
org.apache.thrift.protocol.TStruct("TimedOutException");
 
+  private static final org.apache.thrift.protocol.TField 
ACKNOWLEDGED_BY_FIELD_DESC = new 
org.apache.thrift.protocol.TField("acknowledged_by", 
org.apache.thrift.protocol.TType.I32, (short)1);
 
+  /**
+   * if a write operation was acknowledged some replicas but not enough to
+   * satisfy the required ConsistencyLevel, the number of successful
+   * replies will be given here
+   */
+  public int acknowledged_by; // required
 
   /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
-;
+    /**
+     * if a write operation was acknowledged some replicas but not enough to
+     * satisfy the required ConsistencyLevel, the number of successful
+     * replies will be given here
+     */
+    ACKNOWLEDGED_BY((short)1, "acknowledged_by");
 
     private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
 
@@ -67,6 +79,8 @@ public class TimedOutException extends Exception implements 
org.apache.thrift.TB
      */
     public static _Fields findByThriftId(int fieldId) {
       switch(fieldId) {
+        case 1: // ACKNOWLEDGED_BY
+          return ACKNOWLEDGED_BY;
         default:
           return null;
       }
@@ -105,9 +119,16 @@ public class TimedOutException extends Exception 
implements org.apache.thrift.TB
       return _fieldName;
     }
   }
+
+  // isset id assignments
+  private static final int __ACKNOWLEDGED_BY_ISSET_ID = 0;
+  private BitSet __isset_bit_vector = new BitSet(1);
+
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> 
metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new 
EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.ACKNOWLEDGED_BY, new 
org.apache.thrift.meta_data.FieldMetaData("acknowledged_by", 
org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TimedOutException.class,
 metaDataMap);
   }
@@ -119,6 +140,9 @@ public class TimedOutException extends Exception implements 
org.apache.thrift.TB
    * Performs a deep copy on <i>other</i>.
    */
   public TimedOutException(TimedOutException other) {
+    __isset_bit_vector.clear();
+    __isset_bit_vector.or(other.__isset_bit_vector);
+    this.acknowledged_by = other.acknowledged_by;
   }
 
   public TimedOutException deepCopy() {
@@ -127,15 +151,61 @@ public class TimedOutException extends Exception 
implements org.apache.thrift.TB
 
   @Override
   public void clear() {
+    setAcknowledged_byIsSet(false);
+    this.acknowledged_by = 0;
+  }
+
+  /**
+   * if a write operation was acknowledged some replicas but not enough to
+   * satisfy the required ConsistencyLevel, the number of successful
+   * replies will be given here
+   */
+  public int getAcknowledged_by() {
+    return this.acknowledged_by;
+  }
+
+  /**
+   * if a write operation was acknowledged some replicas but not enough to
+   * satisfy the required ConsistencyLevel, the number of successful
+   * replies will be given here
+   */
+  public TimedOutException setAcknowledged_by(int acknowledged_by) {
+    this.acknowledged_by = acknowledged_by;
+    setAcknowledged_byIsSet(true);
+    return this;
+  }
+
+  public void unsetAcknowledged_by() {
+    __isset_bit_vector.clear(__ACKNOWLEDGED_BY_ISSET_ID);
+  }
+
+  /** Returns true if field acknowledged_by is set (has been assigned a value) 
and false otherwise */
+  public boolean isSetAcknowledged_by() {
+    return __isset_bit_vector.get(__ACKNOWLEDGED_BY_ISSET_ID);
+  }
+
+  public void setAcknowledged_byIsSet(boolean value) {
+    __isset_bit_vector.set(__ACKNOWLEDGED_BY_ISSET_ID, value);
   }
 
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
+    case ACKNOWLEDGED_BY:
+      if (value == null) {
+        unsetAcknowledged_by();
+      } else {
+        setAcknowledged_by((Integer)value);
+      }
+      break;
+
     }
   }
 
   public Object getFieldValue(_Fields field) {
     switch (field) {
+    case ACKNOWLEDGED_BY:
+      return Integer.valueOf(getAcknowledged_by());
+
     }
     throw new IllegalStateException();
   }
@@ -147,6 +217,8 @@ public class TimedOutException extends Exception implements 
org.apache.thrift.TB
     }
 
     switch (field) {
+    case ACKNOWLEDGED_BY:
+      return isSetAcknowledged_by();
     }
     throw new IllegalStateException();
   }
@@ -164,6 +236,15 @@ public class TimedOutException extends Exception 
implements org.apache.thrift.TB
     if (that == null)
       return false;
 
+    boolean this_present_acknowledged_by = true && this.isSetAcknowledged_by();
+    boolean that_present_acknowledged_by = true && that.isSetAcknowledged_by();
+    if (this_present_acknowledged_by || that_present_acknowledged_by) {
+      if (!(this_present_acknowledged_by && that_present_acknowledged_by))
+        return false;
+      if (this.acknowledged_by != that.acknowledged_by)
+        return false;
+    }
+
     return true;
   }
 
@@ -171,6 +252,11 @@ public class TimedOutException extends Exception 
implements org.apache.thrift.TB
   public int hashCode() {
     HashCodeBuilder builder = new HashCodeBuilder();
 
+    boolean present_acknowledged_by = true && (isSetAcknowledged_by());
+    builder.append(present_acknowledged_by);
+    if (present_acknowledged_by)
+      builder.append(acknowledged_by);
+
     return builder.toHashCode();
   }
 
@@ -182,6 +268,16 @@ public class TimedOutException extends Exception 
implements org.apache.thrift.TB
     int lastComparison = 0;
     TimedOutException typedOther = (TimedOutException)other;
 
+    lastComparison = 
Boolean.valueOf(isSetAcknowledged_by()).compareTo(typedOther.isSetAcknowledged_by());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetAcknowledged_by()) {
+      lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(this.acknowledged_by, 
typedOther.acknowledged_by);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -199,6 +295,14 @@ public class TimedOutException extends Exception 
implements org.apache.thrift.TB
         break;
       }
       switch (field.id) {
+        case 1: // ACKNOWLEDGED_BY
+          if (field.type == org.apache.thrift.protocol.TType.I32) {
+            this.acknowledged_by = iprot.readI32();
+            setAcknowledged_byIsSet(true);
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
         default:
           org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
       }
@@ -214,6 +318,11 @@ public class TimedOutException extends Exception 
implements org.apache.thrift.TB
     validate();
 
     oprot.writeStructBegin(STRUCT_DESC);
+    if (isSetAcknowledged_by()) {
+      oprot.writeFieldBegin(ACKNOWLEDGED_BY_FIELD_DESC);
+      oprot.writeI32(this.acknowledged_by);
+      oprot.writeFieldEnd();
+    }
     oprot.writeFieldStop();
     oprot.writeStructEnd();
   }
@@ -223,6 +332,11 @@ public class TimedOutException extends Exception 
implements org.apache.thrift.TB
     StringBuilder sb = new StringBuilder("TimedOutException(");
     boolean first = true;
 
+    if (isSetAcknowledged_by()) {
+      sb.append("acknowledged_by:");
+      sb.append(this.acknowledged_by);
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -241,6 +355,8 @@ public class TimedOutException extends Exception implements 
org.apache.thrift.TB
 
   private void readObject(java.io.ObjectInputStream in) throws 
java.io.IOException, ClassNotFoundException {
     try {
+      // it doesn't seem like you should have to do this, but java 
serialization is wacky, and doesn't call the default constructor.
+      __isset_bit_vector = new BitSet(1);
       read(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(in)));
     } catch (org.apache.thrift.TException te) {
       throw new java.io.IOException(te);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/cql/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java 
b/src/java/org/apache/cassandra/cql/QueryProcessor.java
index 9b29ecd..7695055 100644
--- a/src/java/org/apache/cassandra/cql/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java
@@ -264,10 +264,6 @@ public class QueryProcessor
         {
             throw new UnavailableException();
         }
-        catch (TimeoutException e)
-        {
-            throw new TimedOutException();
-        }
     }
 
     private static IFilter filterFromSelect(SelectStatement select, CFMetaData 
metadata, List<ByteBuffer> variables)
@@ -617,10 +613,6 @@ public class QueryProcessor
                 {
                     throw new UnavailableException();
                 }
-                catch (TimeoutException e)
-                {
-                    throw new TimedOutException();
-                }
 
                 result.type = CqlResultType.VOID;
                 return result;
@@ -664,14 +656,7 @@ public class QueryProcessor
                     validateKey(deletion.key());
                 }
 
-                try
-                {
-                    StorageProxy.mutate(deletions, 
delete.getConsistencyLevel());
-                }
-                catch (TimeoutException e)
-                {
-                    throw new TimedOutException();
-                }
+                StorageProxy.mutate(deletions, delete.getConsistencyLevel());
 
                 result.type = CqlResultType.VOID;
                 return result;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index b91c08c..34fffd4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -75,14 +75,7 @@ public abstract class ModificationStatement extends 
CFStatement implements CQLSt
 
     public ResultMessage execute(ClientState state, List<ByteBuffer> 
variables) throws InvalidRequestException, UnavailableException, 
TimedOutException
     {
-        try
-        {
-            StorageProxy.mutate(getMutations(state, variables), 
getConsistencyLevel());
-        }
-        catch (TimeoutException e)
-        {
-            throw new TimedOutException();
-        }
+        StorageProxy.mutate(getMutations(state, variables), 
getConsistencyLevel());
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java 
b/src/java/org/apache/cassandra/db/CounterColumn.java
index 7ea6514..ecfe2f1 100644
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterColumn.java
@@ -367,7 +367,7 @@ public class CounterColumn extends Column
         StorageProxy.performWrite(rm, ConsistencyLevel.ANY, localDataCenter, 
new StorageProxy.WritePerformer()
         {
             public void apply(IMutation mutation, Collection<InetAddress> 
targets, IWriteResponseHandler responseHandler, String localDataCenter, 
ConsistencyLevel consistency_level)
-            throws IOException, TimeoutException, UnavailableException
+            throws IOException, UnavailableException
             {
                 // We should only send to the remote replica, not the local one
                 targets.remove(local);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java 
b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index 470de64..aa71231 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.thrift.TimedOutException;
 import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -52,13 +53,15 @@ public class CounterMutationVerbHandler implements 
IVerbHandler<CounterMutation>
         }
         catch (UnavailableException e)
         {
-            // We check for UnavailableException in the coordinator not. It is
+            // We check for UnavailableException in the coordinator now. It is
             // hence reasonable to let the coordinator timeout in the very
             // unlikely case we arrive here
+            logger.debug("counter unavailable", e);
         }
-        catch (TimeoutException e)
+        catch (TimedOutException e)
         {
             // The coordinator node will have timeout itself so we let that 
goes
+            logger.debug("counter timeout", e);
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java 
b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index e880e43..a00dc90 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -125,7 +125,7 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
         StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, 
TimeUnit.MINUTES);
     }
 
-    private static void sendMutation(InetAddress endpoint, MessageOut<?> 
message) throws TimeoutException
+    private static void sendMutation(InetAddress endpoint, MessageOut<?> 
message) throws TimedOutException
     {
         IWriteResponseHandler responseHandler = 
WriteResponseHandler.create(endpoint);
         MessagingService.instance().sendRR(message, endpoint, responseHandler);
@@ -364,7 +364,7 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
                     }
                     deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
                 }
-                catch (TimeoutException e)
+                catch (TimedOutException e)
                 {
                     logger.info(String.format("Timed out replaying hints to 
%s; aborting further deliveries", endpoint));
                     break delivery;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index db85a47..4458b06 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.TimedOutException;
 import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.SimpleCondition;
 
@@ -42,7 +43,7 @@ public abstract class AbstractWriteResponseHandler implements 
IWriteResponseHand
         this.writeEndpoints = writeEndpoints;
     }
 
-    public void get() throws TimeoutException
+    public void get() throws TimedOutException
     {
         long timeout = DatabaseDescriptor.getWriteRpcTimeout() - 
(System.currentTimeMillis() - startTime);
 
@@ -58,10 +59,12 @@ public abstract class AbstractWriteResponseHandler 
implements IWriteResponseHand
 
         if (!success)
         {
-            throw new TimeoutException();
+            throw new TimedOutException().setAcknowledged_by(ackCount());
         }
     }
 
+    protected abstract int ackCount();
+
     /** null message means "response from local write" */
     public abstract void response(MessageIn msg);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index 8996324..0dc2ac5 100644
--- 
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ 
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -79,7 +79,7 @@ public class DatacenterSyncWriteResponseHandler extends 
AbstractWriteResponseHan
 
         for (AtomicInteger i : responses.values())
         {
-            if (0 < i.get())
+            if (i.get() > 0)
                 return;
         }
 
@@ -87,6 +87,18 @@ public class DatacenterSyncWriteResponseHandler extends 
AbstractWriteResponseHan
         condition.signal();
     }
 
+    protected int ackCount()
+    {
+        int n = 0;
+        for (Map.Entry<String, AtomicInteger> entry : responses.entrySet())
+        {
+            String dc = entry.getKey();
+            AtomicInteger i = entry.getValue();
+            n += (strategy.getReplicationFactor(dc) / 2) + 1 - i.get();
+        }
+        return n;
+    }
+
     public void assureSufficientLiveNodes() throws UnavailableException
     {
         Map<String, AtomicInteger> dcEndpoints = new HashMap<String, 
AtomicInteger>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
index 330b51e..6c2ba3d 100644
--- a/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
@@ -20,10 +20,11 @@ package org.apache.cassandra.service;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.thrift.TimedOutException;
 import org.apache.cassandra.thrift.UnavailableException;
 
 public interface IWriteResponseHandler extends IAsyncCallback
 {
-    public void get() throws TimeoutException;
+    public void get() throws TimedOutException;
     public void assureSufficientLiveNodes() throws UnavailableException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 58cc99d..e033962 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -112,7 +112,7 @@ public class StorageProxy implements StorageProxyMBean
                               IWriteResponseHandler responseHandler,
                               String localDataCenter,
                               ConsistencyLevel consistency_level)
-            throws IOException, TimeoutException, UnavailableException
+            throws IOException, UnavailableException
             {
                 assert mutation instanceof RowMutation;
                 sendToHintedEndpoints((RowMutation) mutation, targets, 
responseHandler, localDataCenter, consistency_level);
@@ -169,7 +169,7 @@ public class StorageProxy implements StorageProxyMBean
      * @param mutations the mutations to be applied across the replicas
      * @param consistency_level the consistency level for the operation
      */
-    public static void mutate(List<? extends IMutation> mutations, 
ConsistencyLevel consistency_level) throws UnavailableException, 
TimeoutException
+    public static void mutate(List<? extends IMutation> mutations, 
ConsistencyLevel consistency_level) throws UnavailableException, 
TimedOutException
     {
         logger.debug("Mutations/ConsistencyLevel are {}/{}", mutations, 
consistency_level);
         final String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
@@ -200,7 +200,7 @@ public class StorageProxy implements StorageProxyMBean
             }
 
         }
-        catch (TimeoutException ex)
+        catch (TimedOutException ex)
         {
             ClientRequestMetrics.writeTimeouts.inc();
             if (logger.isDebugEnabled())
@@ -244,7 +244,7 @@ public class StorageProxy implements StorageProxyMBean
                                                      ConsistencyLevel 
consistency_level,
                                                      String localDataCenter,
                                                      WritePerformer performer)
-    throws UnavailableException, TimeoutException, IOException
+    throws UnavailableException, IOException
     {
         String table = mutation.getTable();
         AbstractReplicationStrategy rs = 
Table.open(table).getReplicationStrategy();
@@ -287,7 +287,7 @@ public class StorageProxy implements StorageProxyMBean
                                              IWriteResponseHandler 
responseHandler,
                                              String localDataCenter,
                                              ConsistencyLevel 
consistency_level)
-    throws IOException, TimeoutException, UnavailableException
+    throws IOException, UnavailableException
     {
         // Multimap that holds onto all the messages and addresses meant for a 
specific datacenter
         Map<String, Multimap<MessageOut, InetAddress>> dcMessages = new 
HashMap<String, Multimap<MessageOut, InetAddress>>(targets.size());
@@ -473,7 +473,7 @@ public class StorageProxy implements StorageProxyMBean
      * quicker response and because the WriteResponseHandlers don't make it 
easy to send back an error. We also always gather
      * the write latencies at the coordinator node to make gathering point 
similar to the case of standard writes.
      */
-    public static IWriteResponseHandler mutateCounter(CounterMutation cm, 
String localDataCenter) throws UnavailableException, TimeoutException, 
IOException
+    public static IWriteResponseHandler mutateCounter(CounterMutation cm, 
String localDataCenter) throws UnavailableException, IOException
     {
         InetAddress endpoint = findSuitableEndpoint(cm.getTable(), cm.key(), 
localDataCenter);
 
@@ -539,14 +539,14 @@ public class StorageProxy implements StorageProxyMBean
 
     // Must be called on a replica of the mutation. This replica becomes the
     // leader of this mutation.
-    public static IWriteResponseHandler 
applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter) throws 
UnavailableException, TimeoutException, IOException
+    public static IWriteResponseHandler 
applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter) throws 
UnavailableException, IOException
     {
         return performWrite(cm, cm.consistency(), localDataCenter, 
counterWritePerformer);
     }
 
     // Same as applyCounterMutationOnLeader but must with the difference that 
it use the MUTATION stage to execute the write (while
     // applyCounterMutationOnLeader assumes it is on the MUTATION stage 
already)
-    public static IWriteResponseHandler 
applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter) 
throws UnavailableException, TimeoutException, IOException
+    public static IWriteResponseHandler 
applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter) 
throws UnavailableException, IOException
     {
         return performWrite(cm, cm.consistency(), localDataCenter, 
counterWriteOnCoordinatorPerformer);
     }
@@ -1254,7 +1254,7 @@ public class StorageProxy implements StorageProxyMBean
 
     public interface WritePerformer
     {
-        public void apply(IMutation mutation, Collection<InetAddress> targets, 
IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel 
consistency_level) throws IOException, TimeoutException, UnavailableException;
+        public void apply(IMutation mutation, Collection<InetAddress> targets, 
IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel 
consistency_level) throws IOException, UnavailableException;
     }
 
     private static abstract class DroppableRunnable implements Runnable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 41a6ac3..0164c32 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -40,16 +40,19 @@ public class WriteResponseHandler extends 
AbstractWriteResponseHandler
     protected static final Logger logger = 
LoggerFactory.getLogger(WriteResponseHandler.class);
 
     protected final AtomicInteger responses;
+    private final int blockFor;
 
     protected WriteResponseHandler(Collection<InetAddress> writeEndpoints, 
ConsistencyLevel consistencyLevel, String table)
     {
         super(writeEndpoints, consistencyLevel);
-        responses = new AtomicInteger(determineBlockFor(table));
+        blockFor = determineBlockFor(table);
+        responses = new AtomicInteger(blockFor);
     }
 
     protected WriteResponseHandler(InetAddress endpoint)
     {
         super(Arrays.asList(endpoint), ConsistencyLevel.ALL);
+        blockFor = 1;
         responses = new AtomicInteger(1);
     }
 
@@ -69,6 +72,11 @@ public class WriteResponseHandler extends 
AbstractWriteResponseHandler
             condition.signal();
     }
 
+    protected int ackCount()
+    {
+        return blockFor - responses.get();
+    }
+
     protected int determineBlockFor(String table)
     {
         switch (consistencyLevel)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java 
b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index efe0372..15084c6 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -623,22 +623,15 @@ public class CassandraServer implements Cassandra.Iface
         ThriftValidation.validateConsistencyLevel(state().getKeyspace(), 
consistency_level, RequestType.WRITE);
         if (mutations.isEmpty())
             return;
+
+        schedule(DatabaseDescriptor.getWriteRpcTimeout());
         try
         {
-            schedule(DatabaseDescriptor.getWriteRpcTimeout());
-            try
-            {
-                StorageProxy.mutate(mutations, consistency_level);
-            }
-            finally
-            {
-                release();
-            }
+            StorageProxy.mutate(mutations, consistency_level);
         }
-        catch (TimeoutException e)
+        finally
         {
-            logger.debug("... timed out");
-            throw new TimedOutException();
+            release();
         }
     }
 

Reply via email to