add ack count to TimedOutException on writes patch by jbellis; reviewed by slebresne for CASSANDRA-4414
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c9a13c3c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c9a13c3c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c9a13c3c Branch: refs/heads/trunk Commit: c9a13c3c452d3e9e49bca158da93f9a9bbae7379 Parents: 2711548 Author: Jonathan Ellis <[email protected]> Authored: Wed Jul 11 18:13:25 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Fri Jul 13 11:32:12 2012 -0500 ---------------------------------------------------------------------- interface/cassandra.thrift | 8 +- .../org/apache/cassandra/thrift/Cassandra.java | 4 + .../org/apache/cassandra/thrift/Constants.java | 2 +- .../apache/cassandra/thrift/TimedOutException.java | 118 ++++++++++++++- .../org/apache/cassandra/cql/QueryProcessor.java | 17 +-- .../cql3/statements/ModificationStatement.java | 9 +- .../org/apache/cassandra/db/CounterColumn.java | 2 +- .../cassandra/db/CounterMutationVerbHandler.java | 7 +- .../apache/cassandra/db/HintedHandOffManager.java | 4 +- .../service/AbstractWriteResponseHandler.java | 7 +- .../DatacenterSyncWriteResponseHandler.java | 14 ++- .../cassandra/service/IWriteResponseHandler.java | 3 +- .../org/apache/cassandra/service/StorageProxy.java | 18 +- .../cassandra/service/WriteResponseHandler.java | 10 +- .../apache/cassandra/thrift/CassandraServer.java | 17 +-- 15 files changed, 182 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/interface/cassandra.thrift ---------------------------------------------------------------------- diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift index 6df9628..1041661 100644 --- a/interface/cassandra.thrift +++ b/interface/cassandra.thrift @@ -55,7 +55,7 @@ namespace rb CassandraThrift # An effort should be made not to break forward-client-compatibility either # (e.g. one should avoid removing obsolete fields from the IDL), but no # guarantees in this respect are made by the Cassandra project. -const string VERSION = "19.32.0" +const string VERSION = "19.33.0" # @@ -140,6 +140,12 @@ exception UnavailableException { /** RPC timeout was exceeded. either a node failed mid-operation, or load was too high, or the requested op was too large. */ exception TimedOutException { + /** + * if a write operation was acknowledged some replicas but not enough to + * satisfy the required ConsistencyLevel, the number of successful + * replies will be given here + */ + 1: optional i32 acknowledged_by } /** invalid authentication request (invalid keyspace, user does not exist, or credentials invalid) */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java ---------------------------------------------------------------------- diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java index 1c6ec69..ec4982e 100644 --- a/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java +++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java @@ -17817,6 +17817,8 @@ public class Cassandra { private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bit_vector = new BitSet(1); read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); @@ -34876,6 +34878,8 @@ public class Cassandra { private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bit_vector = new BitSet(1); read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java ---------------------------------------------------------------------- diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java index 7e183c7..9d0701f 100644 --- a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java +++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java @@ -44,6 +44,6 @@ import org.slf4j.LoggerFactory; public class Constants { - public static final String VERSION = "19.32.0"; + public static final String VERSION = "19.33.0"; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java ---------------------------------------------------------------------- diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java index ea1d648..bdb63dc 100644 --- a/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java +++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/TimedOutException.java @@ -48,11 +48,23 @@ import org.slf4j.LoggerFactory; public class TimedOutException extends Exception implements org.apache.thrift.TBase<TimedOutException, TimedOutException._Fields>, java.io.Serializable, Cloneable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TimedOutException"); + private static final org.apache.thrift.protocol.TField ACKNOWLEDGED_BY_FIELD_DESC = new org.apache.thrift.protocol.TField("acknowledged_by", org.apache.thrift.protocol.TType.I32, (short)1); + /** + * if a write operation was acknowledged some replicas but not enough to + * satisfy the required ConsistencyLevel, the number of successful + * replies will be given here + */ + public int acknowledged_by; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { -; + /** + * if a write operation was acknowledged some replicas but not enough to + * satisfy the required ConsistencyLevel, the number of successful + * replies will be given here + */ + ACKNOWLEDGED_BY((short)1, "acknowledged_by"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -67,6 +79,8 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB */ public static _Fields findByThriftId(int fieldId) { switch(fieldId) { + case 1: // ACKNOWLEDGED_BY + return ACKNOWLEDGED_BY; default: return null; } @@ -105,9 +119,16 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB return _fieldName; } } + + // isset id assignments + private static final int __ACKNOWLEDGED_BY_ISSET_ID = 0; + private BitSet __isset_bit_vector = new BitSet(1); + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.ACKNOWLEDGED_BY, new org.apache.thrift.meta_data.FieldMetaData("acknowledged_by", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TimedOutException.class, metaDataMap); } @@ -119,6 +140,9 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB * Performs a deep copy on <i>other</i>. */ public TimedOutException(TimedOutException other) { + __isset_bit_vector.clear(); + __isset_bit_vector.or(other.__isset_bit_vector); + this.acknowledged_by = other.acknowledged_by; } public TimedOutException deepCopy() { @@ -127,15 +151,61 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB @Override public void clear() { + setAcknowledged_byIsSet(false); + this.acknowledged_by = 0; + } + + /** + * if a write operation was acknowledged some replicas but not enough to + * satisfy the required ConsistencyLevel, the number of successful + * replies will be given here + */ + public int getAcknowledged_by() { + return this.acknowledged_by; + } + + /** + * if a write operation was acknowledged some replicas but not enough to + * satisfy the required ConsistencyLevel, the number of successful + * replies will be given here + */ + public TimedOutException setAcknowledged_by(int acknowledged_by) { + this.acknowledged_by = acknowledged_by; + setAcknowledged_byIsSet(true); + return this; + } + + public void unsetAcknowledged_by() { + __isset_bit_vector.clear(__ACKNOWLEDGED_BY_ISSET_ID); + } + + /** Returns true if field acknowledged_by is set (has been assigned a value) and false otherwise */ + public boolean isSetAcknowledged_by() { + return __isset_bit_vector.get(__ACKNOWLEDGED_BY_ISSET_ID); + } + + public void setAcknowledged_byIsSet(boolean value) { + __isset_bit_vector.set(__ACKNOWLEDGED_BY_ISSET_ID, value); } public void setFieldValue(_Fields field, Object value) { switch (field) { + case ACKNOWLEDGED_BY: + if (value == null) { + unsetAcknowledged_by(); + } else { + setAcknowledged_by((Integer)value); + } + break; + } } public Object getFieldValue(_Fields field) { switch (field) { + case ACKNOWLEDGED_BY: + return Integer.valueOf(getAcknowledged_by()); + } throw new IllegalStateException(); } @@ -147,6 +217,8 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB } switch (field) { + case ACKNOWLEDGED_BY: + return isSetAcknowledged_by(); } throw new IllegalStateException(); } @@ -164,6 +236,15 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB if (that == null) return false; + boolean this_present_acknowledged_by = true && this.isSetAcknowledged_by(); + boolean that_present_acknowledged_by = true && that.isSetAcknowledged_by(); + if (this_present_acknowledged_by || that_present_acknowledged_by) { + if (!(this_present_acknowledged_by && that_present_acknowledged_by)) + return false; + if (this.acknowledged_by != that.acknowledged_by) + return false; + } + return true; } @@ -171,6 +252,11 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB public int hashCode() { HashCodeBuilder builder = new HashCodeBuilder(); + boolean present_acknowledged_by = true && (isSetAcknowledged_by()); + builder.append(present_acknowledged_by); + if (present_acknowledged_by) + builder.append(acknowledged_by); + return builder.toHashCode(); } @@ -182,6 +268,16 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB int lastComparison = 0; TimedOutException typedOther = (TimedOutException)other; + lastComparison = Boolean.valueOf(isSetAcknowledged_by()).compareTo(typedOther.isSetAcknowledged_by()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetAcknowledged_by()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.acknowledged_by, typedOther.acknowledged_by); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -199,6 +295,14 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB break; } switch (field.id) { + case 1: // ACKNOWLEDGED_BY + if (field.type == org.apache.thrift.protocol.TType.I32) { + this.acknowledged_by = iprot.readI32(); + setAcknowledged_byIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); } @@ -214,6 +318,11 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB validate(); oprot.writeStructBegin(STRUCT_DESC); + if (isSetAcknowledged_by()) { + oprot.writeFieldBegin(ACKNOWLEDGED_BY_FIELD_DESC); + oprot.writeI32(this.acknowledged_by); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -223,6 +332,11 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB StringBuilder sb = new StringBuilder("TimedOutException("); boolean first = true; + if (isSetAcknowledged_by()) { + sb.append("acknowledged_by:"); + sb.append(this.acknowledged_by); + first = false; + } sb.append(")"); return sb.toString(); } @@ -241,6 +355,8 @@ public class TimedOutException extends Exception implements org.apache.thrift.TB private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bit_vector = new BitSet(1); read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/cql/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java b/src/java/org/apache/cassandra/cql/QueryProcessor.java index 9b29ecd..7695055 100644 --- a/src/java/org/apache/cassandra/cql/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java @@ -264,10 +264,6 @@ public class QueryProcessor { throw new UnavailableException(); } - catch (TimeoutException e) - { - throw new TimedOutException(); - } } private static IFilter filterFromSelect(SelectStatement select, CFMetaData metadata, List<ByteBuffer> variables) @@ -617,10 +613,6 @@ public class QueryProcessor { throw new UnavailableException(); } - catch (TimeoutException e) - { - throw new TimedOutException(); - } result.type = CqlResultType.VOID; return result; @@ -664,14 +656,7 @@ public class QueryProcessor validateKey(deletion.key()); } - try - { - StorageProxy.mutate(deletions, delete.getConsistencyLevel()); - } - catch (TimeoutException e) - { - throw new TimedOutException(); - } + StorageProxy.mutate(deletions, delete.getConsistencyLevel()); result.type = CqlResultType.VOID; return result; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index b91c08c..34fffd4 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -75,14 +75,7 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws InvalidRequestException, UnavailableException, TimedOutException { - try - { - StorageProxy.mutate(getMutations(state, variables), getConsistencyLevel()); - } - catch (TimeoutException e) - { - throw new TimedOutException(); - } + StorageProxy.mutate(getMutations(state, variables), getConsistencyLevel()); return null; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/db/CounterColumn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java index 7ea6514..ecfe2f1 100644 --- a/src/java/org/apache/cassandra/db/CounterColumn.java +++ b/src/java/org/apache/cassandra/db/CounterColumn.java @@ -367,7 +367,7 @@ public class CounterColumn extends Column StorageProxy.performWrite(rm, ConsistencyLevel.ANY, localDataCenter, new StorageProxy.WritePerformer() { public void apply(IMutation mutation, Collection<InetAddress> targets, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) - throws IOException, TimeoutException, UnavailableException + throws IOException, UnavailableException { // We should only send to the remote replica, not the local one targets.remove(local); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java index 470de64..aa71231 100644 --- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java @@ -30,6 +30,7 @@ import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.thrift.TimedOutException; import org.apache.cassandra.thrift.UnavailableException; import org.apache.cassandra.utils.FBUtilities; @@ -52,13 +53,15 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation> } catch (UnavailableException e) { - // We check for UnavailableException in the coordinator not. It is + // We check for UnavailableException in the coordinator now. It is // hence reasonable to let the coordinator timeout in the very // unlikely case we arrive here + logger.debug("counter unavailable", e); } - catch (TimeoutException e) + catch (TimedOutException e) { // The coordinator node will have timeout itself so we let that goes + logger.debug("counter timeout", e); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index e880e43..a00dc90 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -125,7 +125,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES); } - private static void sendMutation(InetAddress endpoint, MessageOut<?> message) throws TimeoutException + private static void sendMutation(InetAddress endpoint, MessageOut<?> message) throws TimedOutException { IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint); MessagingService.instance().sendRR(message, endpoint, responseHandler); @@ -364,7 +364,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean } deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp()); } - catch (TimeoutException e) + catch (TimedOutException e) { logger.info(String.format("Timed out replaying hints to %s; aborting further deliveries", endpoint)); break delivery; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index db85a47..4458b06 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.TimedOutException; import org.apache.cassandra.thrift.UnavailableException; import org.apache.cassandra.utils.SimpleCondition; @@ -42,7 +43,7 @@ public abstract class AbstractWriteResponseHandler implements IWriteResponseHand this.writeEndpoints = writeEndpoints; } - public void get() throws TimeoutException + public void get() throws TimedOutException { long timeout = DatabaseDescriptor.getWriteRpcTimeout() - (System.currentTimeMillis() - startTime); @@ -58,10 +59,12 @@ public abstract class AbstractWriteResponseHandler implements IWriteResponseHand if (!success) { - throw new TimeoutException(); + throw new TimedOutException().setAcknowledged_by(ackCount()); } } + protected abstract int ackCount(); + /** null message means "response from local write" */ public abstract void response(MessageIn msg); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java index 8996324..0dc2ac5 100644 --- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java @@ -79,7 +79,7 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan for (AtomicInteger i : responses.values()) { - if (0 < i.get()) + if (i.get() > 0) return; } @@ -87,6 +87,18 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan condition.signal(); } + protected int ackCount() + { + int n = 0; + for (Map.Entry<String, AtomicInteger> entry : responses.entrySet()) + { + String dc = entry.getKey(); + AtomicInteger i = entry.getValue(); + n += (strategy.getReplicationFactor(dc) / 2) + 1 - i.get(); + } + return n; + } + public void assureSufficientLiveNodes() throws UnavailableException { Map<String, AtomicInteger> dcEndpoints = new HashMap<String, AtomicInteger>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/service/IWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/IWriteResponseHandler.java b/src/java/org/apache/cassandra/service/IWriteResponseHandler.java index 330b51e..6c2ba3d 100644 --- a/src/java/org/apache/cassandra/service/IWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/IWriteResponseHandler.java @@ -20,10 +20,11 @@ package org.apache.cassandra.service; import java.util.concurrent.TimeoutException; import org.apache.cassandra.net.IAsyncCallback; +import org.apache.cassandra.thrift.TimedOutException; import org.apache.cassandra.thrift.UnavailableException; public interface IWriteResponseHandler extends IAsyncCallback { - public void get() throws TimeoutException; + public void get() throws TimedOutException; public void assureSufficientLiveNodes() throws UnavailableException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 58cc99d..e033962 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -112,7 +112,7 @@ public class StorageProxy implements StorageProxyMBean IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) - throws IOException, TimeoutException, UnavailableException + throws IOException, UnavailableException { assert mutation instanceof RowMutation; sendToHintedEndpoints((RowMutation) mutation, targets, responseHandler, localDataCenter, consistency_level); @@ -169,7 +169,7 @@ public class StorageProxy implements StorageProxyMBean * @param mutations the mutations to be applied across the replicas * @param consistency_level the consistency level for the operation */ - public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, TimeoutException + public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, TimedOutException { logger.debug("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level); final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); @@ -200,7 +200,7 @@ public class StorageProxy implements StorageProxyMBean } } - catch (TimeoutException ex) + catch (TimedOutException ex) { ClientRequestMetrics.writeTimeouts.inc(); if (logger.isDebugEnabled()) @@ -244,7 +244,7 @@ public class StorageProxy implements StorageProxyMBean ConsistencyLevel consistency_level, String localDataCenter, WritePerformer performer) - throws UnavailableException, TimeoutException, IOException + throws UnavailableException, IOException { String table = mutation.getTable(); AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy(); @@ -287,7 +287,7 @@ public class StorageProxy implements StorageProxyMBean IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) - throws IOException, TimeoutException, UnavailableException + throws IOException, UnavailableException { // Multimap that holds onto all the messages and addresses meant for a specific datacenter Map<String, Multimap<MessageOut, InetAddress>> dcMessages = new HashMap<String, Multimap<MessageOut, InetAddress>>(targets.size()); @@ -473,7 +473,7 @@ public class StorageProxy implements StorageProxyMBean * quicker response and because the WriteResponseHandlers don't make it easy to send back an error. We also always gather * the write latencies at the coordinator node to make gathering point similar to the case of standard writes. */ - public static IWriteResponseHandler mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, TimeoutException, IOException + public static IWriteResponseHandler mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, IOException { InetAddress endpoint = findSuitableEndpoint(cm.getTable(), cm.key(), localDataCenter); @@ -539,14 +539,14 @@ public class StorageProxy implements StorageProxyMBean // Must be called on a replica of the mutation. This replica becomes the // leader of this mutation. - public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter) throws UnavailableException, TimeoutException, IOException + public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter) throws UnavailableException, IOException { return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer); } // Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while // applyCounterMutationOnLeader assumes it is on the MUTATION stage already) - public static IWriteResponseHandler applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter) throws UnavailableException, TimeoutException, IOException + public static IWriteResponseHandler applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter) throws UnavailableException, IOException { return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer); } @@ -1254,7 +1254,7 @@ public class StorageProxy implements StorageProxyMBean public interface WritePerformer { - public void apply(IMutation mutation, Collection<InetAddress> targets, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException, TimeoutException, UnavailableException; + public void apply(IMutation mutation, Collection<InetAddress> targets, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException, UnavailableException; } private static abstract class DroppableRunnable implements Runnable http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/service/WriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java index 41a6ac3..0164c32 100644 --- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java @@ -40,16 +40,19 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler protected static final Logger logger = LoggerFactory.getLogger(WriteResponseHandler.class); protected final AtomicInteger responses; + private final int blockFor; protected WriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table) { super(writeEndpoints, consistencyLevel); - responses = new AtomicInteger(determineBlockFor(table)); + blockFor = determineBlockFor(table); + responses = new AtomicInteger(blockFor); } protected WriteResponseHandler(InetAddress endpoint) { super(Arrays.asList(endpoint), ConsistencyLevel.ALL); + blockFor = 1; responses = new AtomicInteger(1); } @@ -69,6 +72,11 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler condition.signal(); } + protected int ackCount() + { + return blockFor - responses.get(); + } + protected int determineBlockFor(String table) { switch (consistencyLevel) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9a13c3c/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index efe0372..15084c6 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -623,22 +623,15 @@ public class CassandraServer implements Cassandra.Iface ThriftValidation.validateConsistencyLevel(state().getKeyspace(), consistency_level, RequestType.WRITE); if (mutations.isEmpty()) return; + + schedule(DatabaseDescriptor.getWriteRpcTimeout()); try { - schedule(DatabaseDescriptor.getWriteRpcTimeout()); - try - { - StorageProxy.mutate(mutations, consistency_level); - } - finally - { - release(); - } + StorageProxy.mutate(mutations, consistency_level); } - catch (TimeoutException e) + finally { - logger.debug("... timed out"); - throw new TimedOutException(); + release(); } }
