Updated Branches:
refs/heads/trunk 42c55896b -> d5fc1932e
Merge branch 'cassandra-1.1' into trunk
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
src/java/org/apache/cassandra/service/StorageProxy.java
src/java/org/apache/cassandra/service/WriteResponseHandler.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d5fc1932
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d5fc1932
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d5fc1932
Branch: refs/heads/trunk
Commit: d5fc1932e965a32c944efcdf95fd0236624949fc
Parents: 42c5589 7371e10
Author: Sylvain Lebresne <[email protected]>
Authored: Fri Sep 7 10:50:38 2012 +0200
Committer: Sylvain Lebresne <[email protected]>
Committed: Fri Sep 7 10:50:38 2012 +0200
----------------------------------------------------------------------
CHANGES.txt | 6 ++++
.../org/apache/cassandra/db/CounterColumn.java | 2 +-
.../cassandra/db/CounterMutationVerbHandler.java | 23 +++++++++++----
.../locator/AbstractReplicationStrategy.java | 8 ++--
.../service/AbstractWriteResponseHandler.java | 17 +++++++++-
.../DatacenterSyncWriteResponseHandler.java | 10 +++---
.../service/DatacenterWriteResponseHandler.java | 10 +++---
.../org/apache/cassandra/service/StorageProxy.java | 17 ++++++----
.../cassandra/service/WriteResponseHandler.java | 12 ++++----
9 files changed, 69 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ec4992c,f192be2..0905cf9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,58 -1,9 +1,64 @@@
+1.2-beta1
+ * include message initiation time to replicas so they can more
+ accurately drop timed-out requests (CASSANDRA-2858)
+ * fix clientutil.jar dependencies (CASSANDRA-4566)
+ * optimize WriteResponse (CASSANDRA-4548)
+ * new metrics (CASSANDRA-4009)
+ * redesign KEYS indexes to avoid read-before-write (CASSANDRA-2897)
+ * debug tracing (CASSANDRA-1123)
+ * parallelize row cache loading (CASSANDRA-4282)
+ * Make compaction, flush JBOD-aware (CASSANDRA-4292)
+ * run local range scans on the read stage (CASSANDRA-3687)
+ * clean up ioexceptions (CASSANDRA-2116)
+ * add disk_failure_policy (CASSANDRA-2118)
+ * Introduce new json format with row level deletion (CASSANDRA-4054)
+ * remove redundant "name" column from schema_keyspaces (CASSANDRA-4433)
+ * improve "nodetool ring" handling of multi-dc clusters (CASSANDRA-3047)
+ * update NTS calculateNaturalEndpoints to be O(N log N) (CASSANDRA-3881)
+ * add UseCondCardMark XX jvm settings on jdk 1.7 (CASSANDRA-4366)
+ * split up rpc timeout by operation type (CASSANDRA-2819)
+ * rewrite key cache save/load to use only sequential i/o (CASSANDRA-3762)
+ * update MS protocol with a version handshake + broadcast address id
+ (CASSANDRA-4311)
+ * multithreaded hint replay (CASSANDRA-4189)
+ * add inter-node message compression (CASSANDRA-3127)
+ * remove COPP (CASSANDRA-2479)
+ * Track tombstone expiration and compact when tombstone content is
+ higher than a configurable threshold, default 20% (CASSANDRA-3442, 4234)
+ * update MurmurHash to version 3 (CASSANDRA-2975)
+ * (CLI) track elapsed time for `delete' operation (CASSANDRA-4060)
+ * (CLI) jline version is bumped to 1.0 to properly support
+ 'delete' key function (CASSANDRA-4132)
+ * Save IndexSummary into new SSTable 'Summary' component (CASSANDRA-2392,
4289)
+ * Add support for range tombstones (CASSANDRA-3708)
+ * Improve MessagingService efficiency (CASSANDRA-3617)
+ * Avoid ID conflicts from concurrent schema changes (CASSANDRA-3794)
+ * Set thrift HSHA server thread limit to unlimited by default
(CASSANDRA-4277)
+ * Avoids double serialization of CF id in RowMutation messages
+ (CASSANDRA-4293)
+ * stream compressed sstables directly with java nio (CASSANDRA-4297)
+ * Support multiple ranges in SliceQueryFilter (CASSANDRA-3885)
+ * Add column metadata to system column families (CASSANDRA-4018)
+ * (cql3) Always use composite types by default (CASSANDRA-4329)
+ * (cql3) Add support for set, map and list (CASSANDRA-3647)
+ * Validate date type correctly (CASSANDRA-4441)
+ * (cql3) Allow definitions with only a PK (CASSANDRA-4361)
+ * (cql3) Add support for row key composites (CASSANDRA-4179)
+ * improve DynamicEndpointSnitch by using reservoir sampling (CASSANDRA-4038)
+ * (cql3) Add support for 2ndary indexes (CASSANDRA-3680)
+ * (cql3) fix defining more than one PK to be invalid (CASSANDRA-4477)
+ * remove schema agreement checking from all external APIs (Thrift, CQL and
CQL3) (CASSANDRA-4487)
+ * add Murmur3Partitioner and make it default for new installations
(CASSANDRA-3772)
+ * (cql3) update pseudo-map syntax to use map syntax (CASSANDRA-4497)
+ * Finer grained exceptions hierarchy and provides error code with exceptions
(CASSANDRA-3979)
+
+
+ 1.1.6
+ * (cql3) fix potential NPE with both equal and unequal restriction
(CASSANDRA-4532)
+ * (cql3) improves ORDER BY validation (CASSANDRA-4624)
+ * Fix potential deadlock during counter writes (CASSANDRA-4578)
+
+
1.1.5
* add SecondaryIndex.reload API (CASSANDRA-4581)
* use millis + atomicint for commitlog segment creation instead of
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index f94495d,3ecbe8b..f28e2bd
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@@ -23,34 -25,61 +23,45 @@@ import org.slf4j.Logger
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.FBUtilities;
-public class CounterMutationVerbHandler implements IVerbHandler
+public class CounterMutationVerbHandler implements
IVerbHandler<CounterMutation>
{
- private static Logger logger =
LoggerFactory.getLogger(CounterMutationVerbHandler.class);
+ private static final Logger logger =
LoggerFactory.getLogger(CounterMutationVerbHandler.class);
- public void doVerb(MessageIn<CounterMutation> message, String id)
- public void doVerb(final Message message, final String id)
++ public void doVerb(final MessageIn<CounterMutation> message, final String
id)
{
- byte[] bytes = message.getMessageBody();
- FastByteArrayInputStream buffer = new FastByteArrayInputStream(bytes);
-
try
{
- CounterMutation cm = message.payload;
- DataInputStream is = new DataInputStream(buffer);
- final CounterMutation cm =
CounterMutation.serializer().deserialize(is, message.getVersion());
++ final CounterMutation cm = message.payload;
if (logger.isDebugEnabled())
logger.debug("Applying forwarded " + cm);
String localDataCenter =
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
- StorageProxy.applyCounterMutationOnLeader(cm,
localDataCenter).get();
- WriteResponse response = new WriteResponse();
- MessagingService.instance().sendReply(response.createMessage(),
id, message.from);
+ // We should not wait for the result of the write in this thread,
+ // otherwise we could have a distributed deadlock between replicas
+ // running this VerbHandler (see #4578).
+ // Instead, we use a callback to send the response. Note that the
callback
+ // will not be called if the request timeout, but this is ok
+ // because the coordinator of the counter mutation will timeout on
+ // it's own in that case.
+ StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter,
new Runnable(){
+ public void run()
+ {
- try
- {
- WriteResponse response = new
WriteResponse(cm.getTable(), cm.key(), true);
- Message responseMessage =
WriteResponse.makeWriteResponseMessage(message, response);
-
MessagingService.instance().sendReply(responseMessage, id, message.getFrom());
- }
- catch (IOException e)
- {
- logger.error("Error writing response to counter
mutation", e);
- }
++ WriteResponse response = new WriteResponse();
++
MessagingService.instance().sendReply(response.createMessage(), id,
message.from);
+ }
+ });
}
- catch (UnavailableException e)
- {
- // We check for UnavailableException in the coordinator not. It is
- // hence reasonable to let the coordinator timeout in the very
- // unlikely case we arrive here
- }
- catch (TimeoutException e)
+ catch (RequestExecutionException e)
{
- // The coordinator will timeout on itself, so let that go
+ // The coordinator will timeout on it's own so ignore
+ logger.debug("counter error", e);
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --cc
src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 328e1ea,d280a8e..664d9ea
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@@ -40,12 -49,12 +45,13 @@@ public abstract class AbstractWriteResp
startTime = System.currentTimeMillis();
this.consistencyLevel = consistencyLevel;
this.writeEndpoints = writeEndpoints;
+ this.callback = callback;
}
- public void get() throws TimeoutException
+ public void get() throws WriteTimeoutException
{
- long timeout = DatabaseDescriptor.getRpcTimeout() -
(System.currentTimeMillis() - startTime);
+ long timeout = DatabaseDescriptor.getWriteRpcTimeout() -
(System.currentTimeMillis() - startTime);
+
boolean success;
try
{
@@@ -57,15 -66,20 +63,22 @@@
}
if (!success)
- {
- throw new TimeoutException();
- }
+ throw new WriteTimeoutException(consistencyLevel, ackCount(),
blockFor());
}
+ protected abstract int ackCount();
+
+ protected abstract int blockFor();
+
/** null message means "response from local write" */
- public abstract void response(Message msg);
+ public abstract void response(MessageIn msg);
public abstract void assureSufficientLiveNodes() throws
UnavailableException;
+
+ protected void signal()
+ {
+ condition.signal();
+ if (callback != null)
+ callback.run();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --cc
src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index a33ce60,cbecf6b..85afade
---
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@@ -46,17 -53,15 +46,17 @@@ public class DatacenterSyncWriteRespons
localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
}
- private final NetworkTopologyStrategy strategy;
- private HashMap<String, AtomicInteger> responses = new HashMap<String,
AtomicInteger>();
+ private final String table;
+ private final NetworkTopologyStrategy strategy;
+ private final HashMap<String, AtomicInteger> responses = new
HashMap<String, AtomicInteger>();
- protected DatacenterSyncWriteResponseHandler(Collection<InetAddress>
writeEndpoints, ConsistencyLevel consistencyLevel, String table)
+ protected DatacenterSyncWriteResponseHandler(Collection<InetAddress>
writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable
callback)
{
// Response is been managed by the map so make it 1 for the
superclass.
- super(writeEndpoints, consistencyLevel);
+ super(writeEndpoints, consistencyLevel, callback);
assert consistencyLevel == ConsistencyLevel.EACH_QUORUM;
+ this.table = table;
strategy = (NetworkTopologyStrategy)
Table.open(table).getReplicationStrategy();
for (String dc : strategy.getDatacenters())
@@@ -66,12 -71,12 +66,12 @@@
}
}
- public static IWriteResponseHandler create(Collection<InetAddress>
writeEndpoints, ConsistencyLevel consistencyLevel, String table)
+ public static IWriteResponseHandler create(Collection<InetAddress>
writeEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable
callback)
{
- return new DatacenterSyncWriteResponseHandler(writeEndpoints,
consistencyLevel, table);
+ return new DatacenterSyncWriteResponseHandler(writeEndpoints,
consistencyLevel, table, callback);
}
- public void response(Message message)
+ public void response(MessageIn message)
{
String dataCenter = message == null
? localdc
@@@ -86,26 -91,9 +86,26 @@@
}
// all the quorum conditions are met
- condition.signal();
+ signal();
}
+ protected int blockFor()
+ {
+ return consistencyLevel.blockFor(table);
+ }
+
+ protected int ackCount()
+ {
+ int n = 0;
+ for (Map.Entry<String, AtomicInteger> entry : responses.entrySet())
+ {
+ String dc = entry.getKey();
+ AtomicInteger i = entry.getValue();
+ n += (strategy.getReplicationFactor(dc) / 2) + 1 - i.get();
+ }
+ return n;
+ }
+
public void assureSufficientLiveNodes() throws UnavailableException
{
Map<String, AtomicInteger> dcEndpoints = new HashMap<String,
AtomicInteger>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --cc
src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index 6644b1a,881c99d..2821e3b
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@@ -55,12 -62,20 +55,12 @@@ public class DatacenterWriteResponseHan
}
@Override
- protected int determineBlockFor(String table)
- {
- NetworkTopologyStrategy strategy = (NetworkTopologyStrategy)
Table.open(table).getReplicationStrategy();
- return (strategy.getReplicationFactor(localdc) / 2) + 1;
- }
-
-
- @Override
- public void response(Message message)
+ public void response(MessageIn message)
{
- if (message == null ||
localdc.equals(snitch.getDatacenter(message.getFrom())))
+ if (message == null ||
localdc.equals(snitch.getDatacenter(message.from)))
{
if (responses.decrementAndGet() == 0)
- condition.signal();
+ signal();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 6f461c9,23e0de4..24625df
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -247,8 -246,9 +249,9 @@@ public class StorageProxy implements St
public static IWriteResponseHandler performWrite(IMutation mutation,
ConsistencyLevel
consistency_level,
String localDataCenter,
- WritePerformer performer)
+ WritePerformer performer,
+ Runnable callback)
- throws UnavailableException, TimeoutException, IOException
+ throws UnavailableException, OverloadedException, IOException
{
String table = mutation.getTable();
AbstractReplicationStrategy rs =
Table.open(table).getReplicationStrategy();
@@@ -540,20 -537,20 +543,20 @@@
}
}
-
-
// Must be called on a replica of the mutation. This replica becomes the
// leader of this mutation.
- public static IWriteResponseHandler
applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter)
- public static IWriteResponseHandler
applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter,
Runnable callback) throws UnavailableException, TimeoutException, IOException
++ public static IWriteResponseHandler
applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter,
Runnable callback)
+ throws UnavailableException, IOException, OverloadedException
{
- return performWrite(cm, cm.consistency(), localDataCenter,
counterWritePerformer);
+ return performWrite(cm, cm.consistency(), localDataCenter,
counterWritePerformer, callback);
}
// Same as applyCounterMutationOnLeader but must with the difference that
it use the MUTATION stage to execute the write (while
// applyCounterMutationOnLeader assumes it is on the MUTATION stage
already)
- public static IWriteResponseHandler
applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter)
throws UnavailableException, TimeoutException, IOException
+ public static IWriteResponseHandler
applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter)
+ throws UnavailableException, IOException, OverloadedException
{
- return performWrite(cm, cm.consistency(), localDataCenter,
counterWriteOnCoordinatorPerformer);
+ return performWrite(cm, cm.consistency(), localDataCenter,
counterWriteOnCoordinatorPerformer, null);
}
private static Runnable counterWriteTask(final IMutation mutation,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d5fc1932/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 5b0f64c,baf8558..cc803cb
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@@ -40,19 -41,16 +40,19 @@@ public class WriteResponseHandler exten
protected static final Logger logger =
LoggerFactory.getLogger(WriteResponseHandler.class);
protected final AtomicInteger responses;
+ private final int blockFor;
- protected WriteResponseHandler(Collection<InetAddress> writeEndpoints,
ConsistencyLevel consistencyLevel, String table)
+ protected WriteResponseHandler(Collection<InetAddress> writeEndpoints,
ConsistencyLevel consistencyLevel, String table, Runnable callback)
{
- super(writeEndpoints, consistencyLevel);
+ super(writeEndpoints, consistencyLevel, callback);
- responses = new AtomicInteger(determineBlockFor(table));
+ blockFor = consistencyLevel.blockFor(table);
+ responses = new AtomicInteger(blockFor);
}
protected WriteResponseHandler(InetAddress endpoint)
{
- super(Arrays.asList(endpoint), ConsistencyLevel.ALL);
+ super(Arrays.asList(endpoint), ConsistencyLevel.ALL, null);
+ blockFor = 1;
responses = new AtomicInteger(1);
}
@@@ -66,20 -64,31 +66,20 @@@
return new WriteResponseHandler(endpoint);
}
- public void response(Message m)
+ public void response(MessageIn m)
{
if (responses.decrementAndGet() == 0)
- condition.signal();
+ signal();
}
- protected int determineBlockFor(String table)
+ protected int ackCount()
{
- switch (consistencyLevel)
- {
- case ONE:
- return 1;
- case ANY:
- return 1;
- case TWO:
- return 2;
- case THREE:
- return 3;
- case QUORUM:
- return
(Table.open(table).getReplicationStrategy().getReplicationFactor() / 2) + 1;
- case ALL:
- return
Table.open(table).getReplicationStrategy().getReplicationFactor();
- default:
- throw new UnsupportedOperationException("invalid consistency
level: " + consistencyLevel.toString());
- }
+ return blockFor - responses.get();
+ }
+
+ protected int blockFor()
+ {
+ return blockFor;
}
public void assureSufficientLiveNodes() throws UnavailableException