Author: slebresne
Date: Wed Apr 20 09:22:36 2011
New Revision: 1095335
URL: http://svn.apache.org/viewvc?rev=1095335&view=rev
Log:
merge CASSANDRA-2457 from 0.8
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 09:22:36 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
/cassandra/branches/cassandra-0.7:1026516-1094195,1094604,1094647,1094796,1094809,1094818,1094831
/cassandra/branches/cassandra-0.7.0:1053690-1055654
-/cassandra/branches/cassandra-0.8:1090935-1095106
+/cassandra/branches/cassandra-0.8:1090935-1095106,1095334
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3:774578-796573
/incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 09:22:36 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
/cassandra/branches/cassandra-0.7/contrib:1026516-1094195,1094481,1094604,1094647,1094796,1094809,1094818,1094831
/cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
-/cassandra/branches/cassandra-0.8/contrib:1090935-1095106
+/cassandra/branches/cassandra-0.8/contrib:1090935-1095106,1095334
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573
/incubator/cassandra/branches/cassandra-0.4/contrib:810145-810987,810994-834239,834349-834350
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 09:22:36 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1094195,1094481,1094604,1094647,1094796,1094809,1094818,1094831
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090935-1095106
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090935-1095106,1095334
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 09:22:36 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1094195,1094481,1094604,1094647,1094796,1094809,1094818,1094831
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090935-1095106
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090935-1095106,1095334
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 09:22:36 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1094195,1094481,1094604,1094647,1094796,1094809,1094818,1094831
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090935-1095106
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090935-1095106,1095334
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 09:22:36 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1094195,1094481,1094604,1094647,1094796,1094809,1094818,1094831
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090935-1095106
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090935-1095106,1095334
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 09:22:36 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1094195,1094481,1094604,1094647,1094796,1094809,1094818,1094831
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090935-1095106
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090935-1095106,1095334
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java?rev=1095335&r1=1095334&r2=1095335&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
Wed Apr 20 09:22:36 2011
@@ -25,12 +25,10 @@ import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.service.StorageProxy;
@@ -53,7 +51,8 @@ public class CounterMutationVerbHandler
if (logger.isDebugEnabled())
logger.debug("Applying forwarded " + cm);
- StorageProxy.applyCounterMutationOnLeader(cm);
+ String localDataCenter =
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getLocalAddress());
+ StorageProxy.applyCounterMutationOnLeader(cm,
localDataCenter).get();
WriteResponse response = new WriteResponse(cm.getTable(),
cm.key(), true);
Message responseMessage =
WriteResponse.makeWriteResponseMessage(message, response);
MessagingService.instance().sendReply(responseMessage, id,
message.getFrom());
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1095335&r1=1095334&r2=1095335&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Wed Apr
20 09:22:36 2011
@@ -226,27 +226,6 @@ public class RowMutation implements IMut
return new Message(FBUtilities.getLocalAddress(), verb,
getSerializedBuffer(version), version);
}
- public static RowMutation getRowMutationFromMutations(String keyspace,
ByteBuffer key, Map<String, List<Mutation>> cfmap)
- {
- RowMutation rm = new RowMutation(keyspace, key);
- for (Map.Entry<String, List<Mutation>> entry : cfmap.entrySet())
- {
- String cfName = entry.getKey();
- for (Mutation mutation : entry.getValue())
- {
- if (mutation.deletion != null)
- {
- deleteColumnOrSuperColumnToRowMutation(rm, cfName,
mutation.deletion);
- }
- if (mutation.column_or_supercolumn != null)
- {
- addColumnOrSuperColumnToRowMutation(rm, cfName,
mutation.column_or_supercolumn);
- }
- }
- }
- return rm;
- }
-
public synchronized byte[] getSerializedBuffer(int version) throws
IOException
{
byte[] preserializedBuffer = preserializedBuffers.get(version);
@@ -288,47 +267,47 @@ public class RowMutation implements IMut
return buff.append("])").toString();
}
- private static void addColumnOrSuperColumnToRowMutation(RowMutation rm,
String cfName, ColumnOrSuperColumn cosc)
+ public void addColumnOrSuperColumn(String cfName, ColumnOrSuperColumn cosc)
{
if (cosc.super_column != null)
{
for (org.apache.cassandra.thrift.Column column :
cosc.super_column.columns)
{
- rm.add(new QueryPath(cfName, cosc.super_column.name,
column.name), column.value, column.timestamp, column.ttl);
+ add(new QueryPath(cfName, cosc.super_column.name,
column.name), column.value, column.timestamp, column.ttl);
}
}
else if (cosc.column != null)
{
- rm.add(new QueryPath(cfName, null, cosc.column.name),
cosc.column.value, cosc.column.timestamp, cosc.column.ttl);
+ add(new QueryPath(cfName, null, cosc.column.name),
cosc.column.value, cosc.column.timestamp, cosc.column.ttl);
}
else if (cosc.counter_super_column != null)
{
for (org.apache.cassandra.thrift.CounterColumn column :
cosc.counter_super_column.columns)
{
- rm.addCounter(new QueryPath(cfName,
cosc.counter_super_column.name, column.name), column.value);
+ addCounter(new QueryPath(cfName,
cosc.counter_super_column.name, column.name), column.value);
}
}
else // cosc.counter_column != null
{
- rm.addCounter(new QueryPath(cfName, null,
cosc.counter_column.name), cosc.counter_column.value);
+ addCounter(new QueryPath(cfName, null, cosc.counter_column.name),
cosc.counter_column.value);
}
}
- private static void deleteColumnOrSuperColumnToRowMutation(RowMutation rm,
String cfName, Deletion del)
+ public void deleteColumnOrSuperColumn(String cfName, Deletion del)
{
if (del.predicate != null && del.predicate.column_names != null)
{
for(ByteBuffer c : del.predicate.column_names)
{
- if (del.super_column == null &&
DatabaseDescriptor.getColumnFamilyType(rm.table_, cfName) ==
ColumnFamilyType.Super)
- rm.delete(new QueryPath(cfName, c), del.timestamp);
+ if (del.super_column == null &&
DatabaseDescriptor.getColumnFamilyType(table_, cfName) ==
ColumnFamilyType.Super)
+ delete(new QueryPath(cfName, c), del.timestamp);
else
- rm.delete(new QueryPath(cfName, del.super_column, c),
del.timestamp);
+ delete(new QueryPath(cfName, del.super_column, c),
del.timestamp);
}
}
else
{
- rm.delete(new QueryPath(cfName, del.super_column), del.timestamp);
+ delete(new QueryPath(cfName, del.super_column), del.timestamp);
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1095335&r1=1095334&r2=1095335&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed
Apr 20 09:22:36 2011
@@ -67,9 +67,6 @@ public class StorageProxy implements Sto
private static final LatencyTracker readStats = new LatencyTracker();
private static final LatencyTracker rangeStats = new LatencyTracker();
private static final LatencyTracker writeStats = new LatencyTracker();
- // we keep counter latency appart from normal write because write with
- // consistency > CL.ONE involves a read in the write path
- private static final LatencyTracker counterWriteStats = new
LatencyTracker();
private static boolean hintedHandoffEnabled =
DatabaseDescriptor.hintedHandoffEnabled();
private static int maxHintWindow = DatabaseDescriptor.getMaxHintWindow();
public static final String UNREACHABLE = "UNREACHABLE";
@@ -127,7 +124,7 @@ public class StorageProxy implements Sto
}
/**
- * Use this method to have these RowMutations applied
+ * Use this method to have these Mutations applied
* across all replicas. This method will take care
* of the possibility of a replica being down and hint
* the data across to some other replica.
@@ -135,27 +132,7 @@ public class StorageProxy implements Sto
* @param mutations the mutations to be applied across the replicas
* @param consistency_level the consistency level for the operation
*/
- public static void mutate(List<RowMutation> mutations, ConsistencyLevel
consistency_level) throws UnavailableException, TimeoutException
- {
- write(mutations, consistency_level, standardWritePerformer, true);
- }
-
- /**
- * Perform the write of a batch of mutations given a WritePerformer.
- * For each mutation, gather the list of write endpoints, apply locally
and/or
- * forward the mutation to said write endpoint (deletaged to the actual
- * WritePerformer) and wait for the responses based on consistency level.
- *
- * @param mutations the mutations to be applied
- * @param consistency_level the consistency level for the write operation
- * @param performer the WritePerformer in charge of appliying the mutation
- * given the list of write endpoints (either standardWritePerformer for
- * standard writes or counterWritePerformer for counter writes).
- * @param updateStats whether or not to update the writeStats. This must be
- * true for standard writes but false for counter writes as the latency of
- * the latter is tracked in mutateCounters() by counterWriteStats.
- */
- public static void write(List<? extends IMutation> mutations,
ConsistencyLevel consistency_level, WritePerformer performer, boolean
updateStats) throws UnavailableException, TimeoutException
+ public static void mutate(List<? extends IMutation> mutations,
ConsistencyLevel consistency_level) throws UnavailableException,
TimeoutException
{
final String localDataCenter =
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getLocalAddress());
@@ -168,19 +145,14 @@ public class StorageProxy implements Sto
for (IMutation mutation : mutations)
{
mostRecentMutation = mutation;
- String table = mutation.getTable();
- AbstractReplicationStrategy rs =
Table.open(table).getReplicationStrategy();
-
- Collection<InetAddress> writeEndpoints =
getWriteEndpoints(table, mutation.key());
- Multimap<InetAddress, InetAddress> hintedEndpoints =
rs.getHintedEndpoints(writeEndpoints);
-
- final IWriteResponseHandler responseHandler =
rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level);
-
- // exit early if we can't fulfill the CL at this time
- responseHandler.assureSufficientLiveNodes();
-
- responseHandlers.add(responseHandler);
- performer.apply(mutation, hintedEndpoints, responseHandler,
localDataCenter, consistency_level);
+ if (mutation instanceof CounterMutation)
+ {
+
responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter));
+ }
+ else
+ {
+ responseHandlers.add(performWrite(mutation,
consistency_level, localDataCenter, standardWritePerformer));
+ }
}
// wait for writes. throws timeoutexception if necessary
for (IWriteResponseHandler responseHandler : responseHandlers)
@@ -195,11 +167,39 @@ public class StorageProxy implements Sto
}
finally
{
- if (updateStats)
- writeStats.addNano(System.nanoTime() - startTime);
+ writeStats.addNano(System.nanoTime() - startTime);
}
}
+ /**
+ * Perform the write of a mutation given a WritePerformer.
+ * Gather the list of write endpoints, apply locally and/or forward the
mutation to
+ * said write endpoint (deletaged to the actual WritePerformer) and wait
for the
+ * responses based on consistency level.
+ *
+ * @param mutations the mutations to be applied
+ * @param consistency_level the consistency level for the write operation
+ * @param performer the WritePerformer in charge of appliying the mutation
+ * given the list of write endpoints (either standardWritePerformer for
+ * standard writes or counterWritePerformer for counter writes).
+ */
+ public static IWriteResponseHandler performWrite(IMutation mutation,
ConsistencyLevel consistency_level, String localDataCenter, WritePerformer
performer) throws UnavailableException, TimeoutException, IOException
+ {
+ String table = mutation.getTable();
+ AbstractReplicationStrategy rs =
Table.open(table).getReplicationStrategy();
+
+ Collection<InetAddress> writeEndpoints = getWriteEndpoints(table,
mutation.key());
+ Multimap<InetAddress, InetAddress> hintedEndpoints =
rs.getHintedEndpoints(writeEndpoints);
+
+ IWriteResponseHandler responseHandler =
rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level);
+
+ // exit early if we can't fulfill the CL at this time
+ responseHandler.assureSufficientLiveNodes();
+
+ performer.apply(mutation, hintedEndpoints, responseHandler,
localDataCenter, consistency_level);
+ return responseHandler;
+ }
+
private static Collection<InetAddress> getWriteEndpoints(String table,
ByteBuffer key)
{
StorageService ss = StorageService.instance;
@@ -351,13 +351,12 @@ public class StorageProxy implements Sto
}
/**
- * The equivalent of mutate() for counters.
- * (Note that each CounterMutation ship the consistency level)
+ * Handle counter mutation on the coordinator host.
*
* A counter mutation needs to first be applied to a replica (that we'll
call the leader for the mutation) before being
* replicated to the other endpoint. To achieve so, there is two case:
* 1) the coordinator host is a replica: we proceed to applying the
update locally and replicate throug
- * applyCounterMutationOnLeader
+ * applyCounterMutationOnCoordinator
* 2) the coordinator is not a replica: we forward the (counter)mutation
to a chosen replica (that will proceed through
* applyCounterMutationOnLeader upon receive) and wait for its
acknowledgment.
*
@@ -365,60 +364,31 @@ public class StorageProxy implements Sto
* quicker response and because the WriteResponseHandlers don't make it
easy to send back an error. We also always gather
* the write latencies at the coordinator node to make gathering point
similar to the case of standard writes.
*/
- public static void mutateCounters(List<CounterMutation> mutations) throws
UnavailableException, TimeoutException
+ public static IWriteResponseHandler mutateCounter(CounterMutation cm,
String localDataCenter) throws UnavailableException, TimeoutException,
IOException
{
- long startTime = System.nanoTime();
- ArrayList<IWriteResponseHandler> responseHandlers = new
ArrayList<IWriteResponseHandler>();
-
- CounterMutation mostRecentMutation = null;
- StorageService ss = StorageService.instance;
-
- try
- {
- for (CounterMutation cm : mutations)
- {
- mostRecentMutation = cm;
- InetAddress endpoint = findSuitableEndpoint(cm.getTable(),
cm.key());
-
- if (endpoint.equals(FBUtilities.getLocalAddress()))
- {
- applyCounterMutationOnCoordinator(cm);
- }
- else
- {
- // Exit now if we can't fulfill the CL here instead of
forwarding to the leader replica
- String table = cm.getTable();
- AbstractReplicationStrategy rs =
Table.open(table).getReplicationStrategy();
- Collection<InetAddress> writeEndpoints =
getWriteEndpoints(table, cm.key());
- Multimap<InetAddress, InetAddress> hintedEndpoints =
rs.getHintedEndpoints(writeEndpoints);
- rs.getWriteResponseHandler(writeEndpoints,
hintedEndpoints, cm.consistency()).assureSufficientLiveNodes();
-
- // Forward the actual update to the chosen leader replica
- IWriteResponseHandler responseHandler =
WriteResponseHandler.create(endpoint);
- responseHandlers.add(responseHandler);
+ InetAddress endpoint = findSuitableEndpoint(cm.getTable(), cm.key());
- Message message =
cm.makeMutationMessage(Gossiper.instance.getVersion(endpoint));
- if (logger.isDebugEnabled())
- logger.debug("forwarding counter update of key " +
ByteBufferUtil.bytesToHex(cm.key()) + " to " + endpoint);
- MessagingService.instance().sendRR(message, endpoint,
responseHandler);
- }
- }
- // wait for writes. throws timeoutexception if necessary
- for (IWriteResponseHandler responseHandler : responseHandlers)
- {
- responseHandler.get();
- }
- }
- catch (IOException e)
+ if (endpoint.equals(FBUtilities.getLocalAddress()))
{
- if (mostRecentMutation == null)
- throw new RuntimeException("no mutations were seen but found
an error during write anyway", e);
- else
- throw new RuntimeException("error writing key " +
ByteBufferUtil.bytesToHex(mostRecentMutation.key()), e);
+ return applyCounterMutationOnCoordinator(cm, localDataCenter);
}
- finally
+ else
{
- counterWriteStats.addNano(System.nanoTime() - startTime);
+ // Exit now if we can't fulfill the CL here instead of forwarding
to the leader replica
+ String table = cm.getTable();
+ AbstractReplicationStrategy rs =
Table.open(table).getReplicationStrategy();
+ Collection<InetAddress> writeEndpoints = getWriteEndpoints(table,
cm.key());
+ Multimap<InetAddress, InetAddress> hintedEndpoints =
rs.getHintedEndpoints(writeEndpoints);
+ rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints,
cm.consistency()).assureSufficientLiveNodes();
+
+ // Forward the actual update to the chosen leader replica
+ IWriteResponseHandler responseHandler =
WriteResponseHandler.create(endpoint);
+
+ Message message =
cm.makeMutationMessage(Gossiper.instance.getVersion(endpoint));
+ if (logger.isDebugEnabled())
+ logger.debug("forwarding counter update of key " +
ByteBufferUtil.bytesToHex(cm.key()) + " to " + endpoint);
+ MessagingService.instance().sendRR(message, endpoint,
responseHandler);
+ return responseHandler;
}
}
@@ -433,16 +403,16 @@ public class StorageProxy implements Sto
// Must be called on a replica of the mutation. This replica becomes the
// leader of this mutation.
- public static void applyCounterMutationOnLeader(CounterMutation cm) throws
UnavailableException, TimeoutException, IOException
+ public static IWriteResponseHandler
applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter) throws
UnavailableException, TimeoutException, IOException
{
- write(Collections.singletonList(cm), cm.consistency(),
counterWritePerformer, false);
+ return performWrite(cm, cm.consistency(), localDataCenter,
counterWritePerformer);
}
// Same as applyCounterMutationOnLeader but must with the difference that
it use the MUTATION stage to execute the write (while
// applyCounterMutationOnLeader assumes it is on the MUTATION stage
already)
- public static void applyCounterMutationOnCoordinator(CounterMutation cm)
throws UnavailableException, TimeoutException, IOException
+ public static IWriteResponseHandler
applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter)
throws UnavailableException, TimeoutException, IOException
{
- write(Collections.singletonList(cm), cm.consistency(),
counterWriteOnCoordinatorPerformer, false);
+ return performWrite(cm, cm.consistency(), localDataCenter,
counterWriteOnCoordinatorPerformer);
}
private static void applyCounterMutation(final IMutation mutation, final
Multimap<InetAddress, InetAddress> hintedEndpoints, final IWriteResponseHandler
responseHandler, final String localDataCenter, final ConsistencyLevel
consistency_level, boolean executeOnMutationStage)
@@ -948,31 +918,6 @@ public class StorageProxy implements Sto
return writeStats.getRecentLatencyHistogramMicros();
}
- public long getCounterWriteOperations()
- {
- return counterWriteStats.getOpCount();
- }
-
- public long getTotalCounterWriteLatencyMicros()
- {
- return counterWriteStats.getTotalLatencyMicros();
- }
-
- public double getRecentCounterWriteLatencyMicros()
- {
- return counterWriteStats.getRecentLatencyMicros();
- }
-
- public long[] getTotalCounterWriteLatencyHistogramMicros()
- {
- return counterWriteStats.getTotalLatencyHistogramMicros();
- }
-
- public long[] getRecentCounterWriteLatencyHistogramMicros()
- {
- return counterWriteStats.getRecentLatencyHistogramMicros();
- }
-
public static List<Row> scan(final String keyspace, String column_family,
IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel
consistency_level)
throws IOException, TimeoutException, UnavailableException
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java?rev=1095335&r1=1095334&r2=1095335&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java
Wed Apr 20 09:22:36 2011
@@ -38,12 +38,6 @@ public interface StorageProxyMBean
public long[] getTotalWriteLatencyHistogramMicros();
public long[] getRecentWriteLatencyHistogramMicros();
- public long getCounterWriteOperations();
- public long getTotalCounterWriteLatencyMicros();
- public double getRecentCounterWriteLatencyMicros();
- public long[] getTotalCounterWriteLatencyHistogramMicros();
- public long[] getRecentCounterWriteLatencyHistogramMicros();
-
public boolean getHintedHandoffEnabled();
public void setHintedHandoffEnabled(boolean b);
public int getMaxHintWindow();
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1095335&r1=1095334&r2=1095335&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
Wed Apr 20 09:22:36 2011
@@ -440,13 +440,18 @@ public class CassandraServer implements
throws InvalidRequestException, UnavailableException, TimedOutException
{
List<String> cfamsSeen = new ArrayList<String>();
- List<RowMutation> rowMutations = new ArrayList<RowMutation>();
+ List<IMutation> rowMutations = new ArrayList<IMutation>();
String keyspace = state().getKeyspace();
for (Map.Entry<ByteBuffer, Map<String, List<Mutation>>> mutationEntry:
mutation_map.entrySet())
{
ByteBuffer key = mutationEntry.getKey();
+ // We need to separate row mutation for standard cf and counter cf
(that will be encapsulated in a
+ // CounterMutation) because it doesn't follow the same code path
+ RowMutation rmStandard = null;
+ RowMutation rmCounter = null;
+
Map<String, List<Mutation>> columnFamilyToMutations =
mutationEntry.getValue();
for (Map.Entry<String, List<Mutation>> columnFamilyMutations :
columnFamilyToMutations.entrySet())
{
@@ -462,17 +467,37 @@ public class CassandraServer implements
CFMetaData metadata =
ThriftValidation.validateColumnFamily(keyspace, cfName);
ThriftValidation.validateKey(metadata, key);
+ RowMutation rm;
if (metadata.getDefaultValidator().isCommutative())
+ {
ThriftValidation.validateCommutativeForWrite(metadata,
consistency_level);
+ rmCounter = rmCounter == null ? new RowMutation(keyspace,
key) : rmCounter;
+ rm = rmCounter;
+ }
+ else
+ {
+ rmStandard = rmStandard == null ? new
RowMutation(keyspace, key) : rmStandard;
+ rm = rmStandard;
+ }
for (Mutation mutation : columnFamilyMutations.getValue())
{
ThriftValidation.validateMutation(metadata, mutation);
+
+ if (mutation.deletion != null)
+ {
+ rm.deleteColumnOrSuperColumn(cfName,
mutation.deletion);
+ }
+ if (mutation.column_or_supercolumn != null)
+ {
+ rm.addColumnOrSuperColumn(cfName,
mutation.column_or_supercolumn);
+ }
}
}
- RowMutation rm = RowMutation.getRowMutationFromMutations(keyspace,
key, columnFamilyToMutations);
- if (!rm.isEmpty())
- rowMutations.add(rm);
+ if (rmStandard != null && !rmStandard.isEmpty())
+ rowMutations.add(rmStandard);
+ if (rmCounter != null && !rmCounter.isEmpty())
+ rowMutations.add(new
org.apache.cassandra.db.CounterMutation(rmCounter, consistency_level));
}
doInsert(consistency_level, rowMutations);
@@ -500,7 +525,10 @@ public class CassandraServer implements
RowMutation rm = new RowMutation(state().getKeyspace(), key);
rm.delete(new QueryPath(column_path), timestamp);
- doInsert(consistency_level, Arrays.asList(rm));
+ if (isCommutativeOp)
+ doInsert(consistency_level, Arrays.asList(new CounterMutation(rm,
consistency_level)));
+ else
+ doInsert(consistency_level, Arrays.asList(rm));
}
public void remove(ByteBuffer key, ColumnPath column_path, long timestamp,
ConsistencyLevel consistency_level)
@@ -511,7 +539,7 @@ public class CassandraServer implements
internal_remove(key, column_path, timestamp, consistency_level, false);
}
- private void doInsert(ConsistencyLevel consistency_level,
List<RowMutation> mutations) throws UnavailableException, TimedOutException
+ private void doInsert(ConsistencyLevel consistency_level, List<? extends
IMutation> mutations) throws UnavailableException, TimedOutException
{
try
{
@@ -520,22 +548,7 @@ public class CassandraServer implements
try
{
if (!mutations.isEmpty())
- {
- // FIXME: Mighty ugly but we've made sure above this will
always work
- if
(mutations.iterator().next().getColumnFamilies().iterator().next().metadata().getDefaultValidator().isCommutative())
- {
- List<org.apache.cassandra.db.CounterMutation>
cmutations = new
ArrayList<org.apache.cassandra.db.CounterMutation>(mutations.size());
- for (RowMutation mutation : mutations)
- {
- cmutations.add(new
org.apache.cassandra.db.CounterMutation(mutation, consistency_level));
- }
- StorageProxy.mutateCounters(cmutations);
- }
- else
- {
- StorageProxy.mutate(mutations, consistency_level);
- }
- }
+ StorageProxy.mutate(mutations, consistency_level);
}
catch (TimeoutException e)
{
@@ -1045,7 +1058,7 @@ public class CassandraServer implements
{
throw new InvalidRequestException(e.getMessage());
}
- doInsert(consistency_level, Arrays.asList(rm));
+ doInsert(consistency_level, Arrays.asList(new CounterMutation(rm,
consistency_level)));
}
public void remove_counter(ByteBuffer key, ColumnPath path,
ConsistencyLevel consistency_level)