Author: jbellis
Date: Tue Jan 18 16:07:32 2011
New Revision: 1060432
URL: http://svn.apache.org/viewvc?rev=1060432&view=rev
Log:
support CL.QUORUM, ALL for counters
patch by slebresne; reviewed by Kelvin Kakugawa and jbellis for CASSANDRA-1944
Added:
cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/IMutation.java
Removed:
cassandra/trunk/src/java/org/apache/cassandra/db/ReplicateOnWriteTask.java
cassandra/trunk/src/java/org/apache/cassandra/db/ReplicateOnWriteVerbHandler.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1060432&r1=1060431&r2=1060432&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Jan 18 16:07:32 2011
@@ -1,6 +1,7 @@
0.8-dev
* avoid double RowMutation serialization on write path (CASSANDRA-1800)
- * adds support for columns that act as incr/decr counters (CASSANDRA-1072)
+ * adds support for columns that act as incr/decr counters
+ (CASSANDRA-1072, 1944)
* make NetworkTopologyStrategy the default (CASSANDRA-1960)
Added: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java?rev=1060432&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
(added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java Tue
Jan 18 16:07:32 2011
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.LinkedList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.AbstractCommutativeType;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+
+public class CounterMutation implements IMutation
+{
+ private static final Logger logger =
LoggerFactory.getLogger(CounterMutation.class);
+ private static final CounterMutationSerializer serializer = new
CounterMutationSerializer();
+
+ private final RowMutation rowMutation;
+ private final ConsistencyLevel consistency;
+
+ public CounterMutation(RowMutation rowMutation, ConsistencyLevel
consistency)
+ {
+ this.rowMutation = rowMutation;
+ this.consistency = consistency;
+ }
+
+ public String getTable()
+ {
+ return rowMutation.getTable();
+ }
+
+ public ByteBuffer key()
+ {
+ return rowMutation.key();
+ }
+
+ public RowMutation rowMutation()
+ {
+ return rowMutation;
+ }
+
+ public ConsistencyLevel consistency()
+ {
+ return consistency;
+ }
+
+ public static CounterMutationSerializer serializer()
+ {
+ return serializer;
+ }
+
+ public RowMutation makeReplicationMutation() throws IOException
+ {
+ List<ReadCommand> readCommands = new LinkedList<ReadCommand>();
+ for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
+ {
+ if (!columnFamily.metadata().getReplicateOnWrite())
+ continue;
+
+ // CF type: regular
+ if (!columnFamily.isSuper())
+ {
+ QueryPath queryPath = new
QueryPath(columnFamily.metadata().cfName);
+ ReadCommand readCommand = new
SliceByNamesReadCommand(rowMutation.getTable(), rowMutation.key(), queryPath,
columnFamily.getColumnNames());
+ readCommands.add(readCommand);
+ continue;
+ }
+
+ // CF type: super
+ for (IColumn superColumn : columnFamily.getSortedColumns())
+ {
+ QueryPath queryPath = new
QueryPath(columnFamily.metadata().cfName, superColumn.name());
+
+ // construct set of sub-column names
+ Collection<IColumn> subColumns = superColumn.getSubColumns();
+ Collection<ByteBuffer> subColNames = new
HashSet<ByteBuffer>(subColumns.size());
+ for (IColumn subCol : subColumns)
+ {
+ subColNames.add(subCol.name());
+ }
+
+ ReadCommand readCommand = new
SliceByNamesReadCommand(rowMutation.getTable(), rowMutation.key(), queryPath,
subColNames);
+ readCommands.add(readCommand);
+ }
+ }
+
+ // replicate to non-local replicas
+ List<InetAddress> foreignReplicas =
StorageService.instance.getLiveNaturalEndpoints(rowMutation.getTable(),
rowMutation.key());
+ foreignReplicas.remove(FBUtilities.getLocalAddress()); // remove local
replica
+
+ // create a replication RowMutation
+ RowMutation replicationMutation = new
RowMutation(rowMutation.getTable(), rowMutation.key());
+ for (ReadCommand readCommand : readCommands)
+ {
+ Table table = Table.open(readCommand.table);
+ Row row = readCommand.getRow(table);
+ AbstractType defaultValidator =
row.cf.metadata().getDefaultValidator();
+ if (defaultValidator.isCommutative())
+ {
+ /**
+ * Clean out contexts for all nodes we're sending the repair
to, otherwise,
+ * we could send a context which is local to one of the
foreign replicas,
+ * which would then incorrectly add that to its own count,
because
+ * local resolution aggregates.
+ */
+ // note: the following logic could be optimized
+ for (InetAddress foreignNode : foreignReplicas)
+ {
+
((AbstractCommutativeType)defaultValidator).cleanContext(row.cf, foreignNode);
+ }
+ }
+ replicationMutation.add(row.cf);
+ }
+ return replicationMutation;
+ }
+
+ public Message makeMutationMessage() throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ serializer().serialize(this, dos);
+ return new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.COUNTER_MUTATION, bos.toByteArray());
+ }
+
+ public boolean shouldReplicateOnWrite()
+ {
+ for (ColumnFamily cf : rowMutation.getColumnFamilies())
+ if (cf.metadata().getReplicateOnWrite())
+ return true;
+ return false;
+ }
+
+ public void apply() throws IOException
+ {
+ rowMutation.updateCommutativeTypes(FBUtilities.getLocalAddress());
+
+ rowMutation.deepCopy().apply();
+ }
+
+ @Override
+ public String toString()
+ {
+ return toString(false);
+ }
+
+ public String toString(boolean shallow)
+ {
+ StringBuilder buff = new StringBuilder("CounterMutation(");
+ buff.append(rowMutation.toString(shallow));
+ buff.append(", ").append(consistency.toString());
+ return buff.append(")").toString();
+ }
+}
+
+class CounterMutationSerializer implements ICompactSerializer<CounterMutation>
+{
+ public void serialize(CounterMutation cm, DataOutputStream dos) throws
IOException
+ {
+ RowMutation.serializer().serialize(cm.rowMutation(), dos);
+ dos.writeUTF(cm.consistency().name());
+ }
+
+ public CounterMutation deserialize(DataInputStream dis) throws IOException
+ {
+ RowMutation rm = RowMutation.serializer().deserialize(dis);
+ ConsistencyLevel consistency = Enum.valueOf(ConsistencyLevel.class,
dis.readUTF());
+ return new CounterMutation(rm, consistency);
+ }
+}
Added:
cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java?rev=1060432&view=auto
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
(added)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
Tue Jan 18 16:07:32 2011
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.*;
+import java.util.concurrent.TimeoutException;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
+
+public class CounterMutationVerbHandler implements IVerbHandler
+{
+ private static Logger logger =
LoggerFactory.getLogger(CounterMutationVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ byte[] bytes = message.getMessageBody();
+ ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
+
+ try
+ {
+ DataInputStream is = new DataInputStream(buffer);
+ CounterMutation cm = CounterMutation.serializer().deserialize(is);
+ if (logger.isDebugEnabled())
+ logger.debug("Applying forwarded " + cm);
+
+ StorageProxy.applyCounterMutationOnLeader(cm);
+ WriteResponse response = new WriteResponse(cm.getTable(),
cm.key(), true);
+ Message responseMessage =
WriteResponse.makeWriteResponseMessage(message, response);
+ MessagingService.instance().sendOneWay(responseMessage,
message.getFrom());
+ }
+ catch (UnavailableException e)
+ {
+ // We check for UnavailableException in the coordinator not. It is
+ // hence reasonable to let the coordinator timeout in the very
+ // unlikely case we arrive here
+ }
+ catch (TimeoutException e)
+ {
+ // The coordinator node will have timeout itself so we let that
goes
+ }
+ catch (IOException e)
+ {
+ logger.error("Error in counter mutation", e);
+ }
+ }
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/db/IMutation.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IMutation.java?rev=1060432&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IMutation.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IMutation.java Tue Jan 18
16:07:32 2011
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.io.IOException;
+
+import org.apache.cassandra.net.Message;
+
+public interface IMutation
+{
+ public String getTable();
+ public ByteBuffer key();
+ public String toString(boolean shallow);
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1060432&r1=1060431&r2=1060432&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Tue Jan
18 16:07:32 2011
@@ -40,7 +40,7 @@ import org.apache.cassandra.thrift.Mutat
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
-public class RowMutation
+public class RowMutation implements IMutation
{
private static RowMutationSerializer serializer_;
public static final String HINT = "HINT";
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1060432&r1=1060431&r2=1060432&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
Tue Jan 18 16:07:32 2011
@@ -79,10 +79,6 @@ public class RowMutationVerbHandler impl
if (logger_.isDebugEnabled())
logger_.debug(rm + " applied. Sending response to " +
message.getMessageId() + "@" + message.getFrom());
MessagingService.instance().sendOneWay(responseMessage,
message.getFrom());
-
- // repair-on-write (remote message)
- ReplicateOnWriteTask replicateOnWriteTask = new
ReplicateOnWriteTask(rm);
-
StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(replicateOnWriteTask);
}
catch (IOException e)
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1060432&r1=1060431&r2=1060432&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Tue
Jan 18 16:07:32 2011
@@ -41,7 +41,6 @@ import org.apache.cassandra.config.CFMet
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.IPartitioner;
@@ -52,8 +51,12 @@ import org.apache.cassandra.locator.Toke
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.*;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.IndexClause;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.UnavailableException;
import static com.google.common.base.Charsets.UTF_8;
@@ -68,9 +71,15 @@ public class StorageProxy implements Sto
private static final LatencyTracker readStats = new LatencyTracker();
private static final LatencyTracker rangeStats = new LatencyTracker();
private static final LatencyTracker writeStats = new LatencyTracker();
+ // we keep counter latency appart from normal write because write with
+ // consistency > CL.ONE involves a read in the write path
+ private static final LatencyTracker counterWriteStats = new
LatencyTracker();
private static boolean hintedHandoffEnabled =
DatabaseDescriptor.hintedHandoffEnabled();
private static final String UNREACHABLE = "UNREACHABLE";
+ private static final WritePerformer standardWritePerformer;
+ private static final WritePerformer counterWritePerformer;
+
private StorageProxy() {}
static
{
@@ -83,6 +92,23 @@ public class StorageProxy implements Sto
{
throw new RuntimeException(e);
}
+
+ standardWritePerformer = new WritePerformer()
+ {
+ public void apply(IMutation mutation, Multimap<InetAddress,
InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String
localDataCenter) throws IOException
+ {
+ assert mutation instanceof RowMutation;
+ sendToHintedEndpoints((RowMutation) mutation, hintedEndpoints,
responseHandler, localDataCenter, true);
+ }
+ };
+
+ counterWritePerformer = new WritePerformer()
+ {
+ public void apply(IMutation mutation, Multimap<InetAddress,
InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String
localDataCenter) throws IOException
+ {
+ applyCounterMutation(mutation, hintedEndpoints,
responseHandler, localDataCenter);
+ }
+ };
}
/**
@@ -96,157 +122,147 @@ public class StorageProxy implements Sto
*/
public static void mutate(List<RowMutation> mutations, ConsistencyLevel
consistency_level) throws UnavailableException, TimeoutException
{
+ write(mutations, consistency_level, standardWritePerformer, true);
+ }
+
+ /**
+ * Perform the write of a batch of mutations given a WritePerformer.
+ * For each mutation, gather the list of write endpoints, apply locally
and/or
+ * forward the mutation to said write endpoint (deletaged to the actual
+ * WritePerformer) and wait for the responses based on consistency level.
+ *
+ * @param mutations the mutations to be applied
+ * @param consistency_level the consistency level for the write operation
+ * @param performer the WritePerformer in charge of appliying the mutation
+ * given the list of write endpoints (either standardWritePerformer for
+ * standard writes or counterWritePerformer for counter writes).
+ * @param updateStats whether or not to update the writeStats. This must be
+ * true for standard writes but false for counter writes as the latency of
+ * the latter is tracked in mutateCounters() by counterWriteStats.
+ */
+ public static void write(List<? extends IMutation> mutations,
ConsistencyLevel consistency_level, WritePerformer performer, boolean
updateStats) throws UnavailableException, TimeoutException
+ {
+ final String localDataCenter =
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getLocalAddress());
+
long startTime = System.nanoTime();
List<IWriteResponseHandler> responseHandlers = new
ArrayList<IWriteResponseHandler>();
- RowMutation mostRecentRowMutation = null;
- StorageService ss = StorageService.instance;
- String localDataCenter =
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getLocalAddress());
-
+ IMutation mostRecentMutation = null;
try
{
- for (RowMutation rm : mutations)
+ for (IMutation mutation : mutations)
{
- mostRecentRowMutation = rm;
- String table = rm.getTable();
+ mostRecentMutation = mutation;
+ String table = mutation.getTable();
AbstractReplicationStrategy rs =
Table.open(table).getReplicationStrategy();
- List<InetAddress> naturalEndpoints =
ss.getNaturalEndpoints(table, rm.key());
- Collection<InetAddress> writeEndpoints =
ss.getTokenMetadata().getWriteEndpoints(StorageService.getPartitioner().getToken(rm.key()),
table, naturalEndpoints);
+ Collection<InetAddress> writeEndpoints =
getWriteEndpoints(table, mutation.key());
Multimap<InetAddress, InetAddress> hintedEndpoints =
rs.getHintedEndpoints(writeEndpoints);
-
+
final IWriteResponseHandler responseHandler =
rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level);
-
+
// exit early if we can't fulfill the CL at this time
responseHandler.assureSufficientLiveNodes();
-
- responseHandlers.add(responseHandler);
-
- // Multimap that holds onto all the messages and addresses
meant for a specific datacenter
- Map<String, Multimap<Message, InetAddress>> dcMessages = new
HashMap<String, Multimap<Message, InetAddress>>(hintedEndpoints.size());
- Message unhintedMessage = null;
-
- //XXX: if commutative value, only allow CL.ONE write
- updateDestinationForCommutativeTypes(consistency_level, rm,
hintedEndpoints);
-
- for (Map.Entry<InetAddress, Collection<InetAddress>> entry :
hintedEndpoints.asMap().entrySet())
- {
- InetAddress destination = entry.getKey();
- Collection<InetAddress> targets = entry.getValue();
-
- String dc =
DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
-
- if (targets.size() == 1 &&
targets.iterator().next().equals(destination))
- {
- // only non-hinted writes are supported
- rm.updateCommutativeTypes(destination);
-
- // unhinted writes
- if (destination.equals(FBUtilities.getLocalAddress()))
- {
- insertLocalMessage(rm, responseHandler);
- }
- else
- {
- // belongs on a different server
- if (unhintedMessage == null)
- {
- unhintedMessage = rm.makeRowMutationMessage();
-
MessagingService.instance().addCallback(responseHandler,
unhintedMessage.getMessageId());
- }
- if (logger.isDebugEnabled())
- logger.debug("insert writing key " +
FBUtilities.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() +
"@" + destination);
-
-
- Multimap<Message, InetAddress> messages =
dcMessages.get(dc);
- if (messages == null)
- {
- messages = HashMultimap.create();
- dcMessages.put(dc, messages);
- }
-
- messages.put(unhintedMessage, destination);
- }
- }
- else
- {
- // hinted
- Message hintedMessage = rm.makeRowMutationMessage();
- for (InetAddress target : targets)
- {
- if (!target.equals(destination))
- {
- addHintHeader(hintedMessage, target);
- if (logger.isDebugEnabled())
- logger.debug("insert writing key " +
FBUtilities.bytesToHex(rm.key()) + " to " + hintedMessage.getMessageId() + "@"
+ destination + " for " + target);
- }
- }
- responseHandler.addHintCallback(hintedMessage,
destination);
-
- Multimap<Message, InetAddress> messages =
dcMessages.get(dc);
-
- if (messages == null)
- {
- messages = HashMultimap.create();
- dcMessages.put(dc, messages);
- }
-
- messages.put(hintedMessage, destination);
- }
- }
- sendMessages(localDataCenter, dcMessages);
+ responseHandlers.add(responseHandler);
+ performer.apply(mutation, hintedEndpoints, responseHandler,
localDataCenter);
}
-
// wait for writes. throws timeoutexception if necessary
for (IWriteResponseHandler responseHandler : responseHandlers)
+ {
responseHandler.get();
+ }
}
catch (IOException e)
{
- if (mostRecentRowMutation == null)
- throw new RuntimeException("no mutations were seen but found
an error during write anyway", e);
- else
- throw new RuntimeException("error writing key " +
FBUtilities.bytesToHex(mostRecentRowMutation.key()), e);
+ assert mostRecentMutation != null;
+ throw new RuntimeException("error writing key " +
FBUtilities.bytesToHex(mostRecentMutation.key()), e);
}
finally
{
- writeStats.addNano(System.nanoTime() - startTime);
+ if (updateStats)
+ writeStats.addNano(System.nanoTime() - startTime);
}
}
- /**
- * Update destination endpoints depending on the clock type.
- */
- private static void updateDestinationForCommutativeTypes(ConsistencyLevel
consistency_level, RowMutation rm,
- Multimap<InetAddress, InetAddress> destinationEndpoints)
+ private static Collection<InetAddress> getWriteEndpoints(String table,
ByteBuffer key)
{
- AbstractType defaultValidator =
rm.getColumnFamilies().iterator().next().metadata().getDefaultValidator();
- if (!defaultValidator.isCommutative())
- return;
-
- InetAddress randomDestination =
pickRandomDestination(destinationEndpoints);
- destinationEndpoints.clear();
- destinationEndpoints.put(randomDestination, randomDestination);
+ StorageService ss = StorageService.instance;
+ List<InetAddress> naturalEndpoints = ss.getNaturalEndpoints(table,
key);
+ return
ss.getTokenMetadata().getWriteEndpoints(StorageService.getPartitioner().getToken(key),
table, naturalEndpoints);
}
- /**
- * @param endpoints potential destinations.
- * @return one destination randomly chosen from the endpoints unless
localhost is in the map, then that is returned.
- */
- private static InetAddress pickRandomDestination(Multimap<InetAddress,
InetAddress> endpoints)
+ private static void sendToHintedEndpoints(RowMutation rm,
Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler
responseHandler, String localDataCenter, boolean insertLocalMessages)
+ throws IOException
{
- Set<InetAddress> destinationSet = endpoints.keySet();
+ // Multimap that holds onto all the messages and addresses meant for a
specific datacenter
+ Map<String, Multimap<Message, InetAddress>> dcMessages = new
HashMap<String, Multimap<Message, InetAddress>>(hintedEndpoints.size());
+ Message unhintedMessage = null;
- if (destinationSet.contains(FBUtilities.getLocalAddress()))
+ for (Map.Entry<InetAddress, Collection<InetAddress>> entry :
hintedEndpoints.asMap().entrySet())
{
- return FBUtilities.getLocalAddress();
- }
- else
- {
- InetAddress[] destinations = destinationSet.toArray(new
InetAddress[0]);
- return destinations[random.nextInt(destinations.length)];
+ InetAddress destination = entry.getKey();
+ Collection<InetAddress> targets = entry.getValue();
+
+ String dc =
DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
+
+ if (targets.size() == 1 &&
targets.iterator().next().equals(destination))
+ {
+ // unhinted writes
+ if (destination.equals(FBUtilities.getLocalAddress()))
+ {
+ if (insertLocalMessages)
+ insertLocalMessage(rm, responseHandler);
+ }
+ else
+ {
+ // belongs on a different server
+ if (unhintedMessage == null)
+ {
+ unhintedMessage = rm.makeRowMutationMessage();
+
MessagingService.instance().addCallback(responseHandler,
unhintedMessage.getMessageId());
+ }
+ if (logger.isDebugEnabled())
+ logger.debug("insert writing key " +
FBUtilities.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() +
"@" + destination);
+
+ Multimap<Message, InetAddress> messages =
dcMessages.get(dc);
+ if (messages == null)
+ {
+ messages = HashMultimap.create();
+ dcMessages.put(dc, messages);
+ }
+
+ messages.put(unhintedMessage, destination);
+ }
+ }
+ else
+ {
+ // hinted
+ Message hintedMessage = rm.makeRowMutationMessage();
+ for (InetAddress target : targets)
+ {
+ if (!target.equals(destination))
+ {
+ addHintHeader(hintedMessage, target);
+ if (logger.isDebugEnabled())
+ logger.debug("insert writing key " +
FBUtilities.bytesToHex(rm.key()) + " to " + hintedMessage.getMessageId() + "@"
+ destination + " for " + target);
+ }
+ }
+ responseHandler.addHintCallback(hintedMessage, destination);
+
+ Multimap<Message, InetAddress> messages = dcMessages.get(dc);
+
+ if (messages == null)
+ {
+ messages = HashMultimap.create();
+ dcMessages.put(dc, messages);
+ }
+
+ messages.put(hintedMessage, destination);
+ }
}
+
+ sendMessages(localDataCenter, dcMessages);
}
/**
@@ -327,10 +343,122 @@ public class StorageProxy implements Sto
{
rm.deepCopy().apply();
responseHandler.response(null);
+ }
+ };
+ StageManager.getStage(Stage.MUTATION).execute(runnable);
+ }
+
+ /**
+ * The equivalent of mutate() for counters.
+ * (Note that each CounterMutation ship the consistency level)
+ *
+ * A counter mutation needs to first be applied to a replica (that we'll
call the leader for the mutation) before being
+ * replicated to the other endpoint. To achieve so, there is two case:
+ * 1) the coordinator host is a replica: we proceed to applying the
update locally and replicate throug
+ * applyCounterMutationOnLeader
+ * 2) the coordinator is not a replica: we forward the (counter)mutation
to a chosen replica (that will proceed through
+ * applyCounterMutationOnLeader upon receive) and wait for its
acknowledgment.
+ *
+ * Implementation note: We check if we can fulfill the CL on the
coordinator host even if he is not a replica to allow
+ * quicker response and because the WriteResponseHandlers don't make it
easy to send back an error. We also always gather
+ * the write latencies at the coordinator node to make gathering point
similar to the case of standard writes.
+ */
+ public static void mutateCounters(List<CounterMutation> mutations) throws
UnavailableException, TimeoutException
+ {
+ long startTime = System.nanoTime();
+ ArrayList<IWriteResponseHandler> responseHandlers = new
ArrayList<IWriteResponseHandler>();
+
+ CounterMutation mostRecentMutation = null;
+ StorageService ss = StorageService.instance;
+
+ try
+ {
+ for (CounterMutation cm : mutations)
+ {
+ mostRecentMutation = cm;
+ InetAddress endpoint = ss.findSuitableEndpoint(cm.getTable(),
cm.key());
+
+ if (endpoint.equals(FBUtilities.getLocalAddress()))
+ {
+ applyCounterMutationOnLeader(cm);
+ }
+ else
+ {
+ // Exit now if we can't fulfill the CL here instead of
forwarding to the leader replica
+ String table = cm.getTable();
+ AbstractReplicationStrategy rs =
Table.open(table).getReplicationStrategy();
+ Collection<InetAddress> writeEndpoints =
getWriteEndpoints(table, cm.key());
+ Multimap<InetAddress, InetAddress> hintedEndpoints =
rs.getHintedEndpoints(writeEndpoints);
+ rs.getWriteResponseHandler(writeEndpoints,
hintedEndpoints, cm.consistency()).assureSufficientLiveNodes();
+
+ // Forward the actual update to the chosen leader replica
+ IWriteResponseHandler responseHandler =
WriteResponseHandler.create(endpoint);
+ responseHandlers.add(responseHandler);
+
+ Message msg = cm.makeMutationMessage();
+ MessagingService.instance().addCallback(responseHandler,
msg.getMessageId());
+ if (logger.isDebugEnabled())
+ logger.debug("forwarding counter update of key " +
FBUtilities.bytesToHex(cm.key()) + " to " + msg.getMessageId() + "@" +
endpoint);
+ MessagingService.instance().sendOneWay(msg, endpoint);
+ }
+ }
+ // wait for writes. throws timeoutexception if necessary
+ for (IWriteResponseHandler responseHandler : responseHandlers)
+ {
+ responseHandler.get();
+ }
+ }
+ catch (IOException e)
+ {
+ if (mostRecentMutation == null)
+ throw new RuntimeException("no mutations were seen but found
an error during write anyway", e);
+ else
+ throw new RuntimeException("error writing key " +
FBUtilities.bytesToHex(mostRecentMutation.key()), e);
+ }
+ finally
+ {
+ counterWriteStats.addNano(System.nanoTime() - startTime);
+ }
+ }
+
+ // Must be called on a replica of the mutation. This replica becomes the
+ // leader of this mutation.
+ public static void applyCounterMutationOnLeader(CounterMutation cm) throws
UnavailableException, TimeoutException, IOException
+ {
+ write(Collections.singletonList(cm), cm.consistency(),
counterWritePerformer, false);
+ }
+
+ private static void applyCounterMutation(final IMutation mutation, final
Multimap<InetAddress, InetAddress> hintedEndpoints, final IWriteResponseHandler
responseHandler, final String localDataCenter)
+ {
+ // we apply locally first, then send it to other replica
+ if (logger.isDebugEnabled())
+ logger.debug("insert writing local & replicate " +
mutation.toString(true));
+
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws IOException
+ {
+ assert mutation instanceof CounterMutation;
+ final CounterMutation cm = (CounterMutation) mutation;
+
+ // apply mutation
+ cm.apply();
+
+ responseHandler.response(null);
- // repair-on-write (local message)
- ReplicateOnWriteTask replicateOnWriteTask = new
ReplicateOnWriteTask(rm);
-
StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(replicateOnWriteTask);
+ if (cm.shouldReplicateOnWrite())
+ {
+ // We do the replication on another stage because it
involves a read (see CM.makeReplicationMutation)
+ // and we want to avoid blocking too much the MUTATION
stage
+
StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new WrappedRunnable()
+ {
+ public void runMayThrow() throws IOException
+ {
+ // send mutation to other replica
+ sendToHintedEndpoints(cm.makeReplicationMutation(),
hintedEndpoints, responseHandler, localDataCenter, false);
+ }
+ });
+ }
}
};
StageManager.getStage(Stage.MUTATION).execute(runnable);
@@ -771,6 +899,31 @@ public class StorageProxy implements Sto
return writeStats.getRecentLatencyHistogramMicros();
}
+ public long getCounterWriteOperations()
+ {
+ return counterWriteStats.getOpCount();
+ }
+
+ public long getTotalCounterWriteLatencyMicros()
+ {
+ return counterWriteStats.getTotalLatencyMicros();
+ }
+
+ public double getRecentCounterWriteLatencyMicros()
+ {
+ return counterWriteStats.getRecentLatencyMicros();
+ }
+
+ public long[] getTotalCounterWriteLatencyHistogramMicros()
+ {
+ return counterWriteStats.getTotalLatencyHistogramMicros();
+ }
+
+ public long[] getRecentCounterWriteLatencyHistogramMicros()
+ {
+ return counterWriteStats.getRecentLatencyHistogramMicros();
+ }
+
public static List<Row> scan(String keyspace, String column_family,
IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel
consistency_level)
throws IOException, TimeoutException, UnavailableException
{
@@ -923,4 +1076,9 @@ public class StorageProxy implements Sto
}
}
}
+
+ private interface WritePerformer
+ {
+ public void apply(IMutation mutation, Multimap<InetAddress,
InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String
localDataCenter) throws IOException;
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java?rev=1060432&r1=1060431&r2=1060432&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java
Tue Jan 18 16:07:32 2011
@@ -38,6 +38,12 @@ public interface StorageProxyMBean
public long[] getTotalWriteLatencyHistogramMicros();
public long[] getRecentWriteLatencyHistogramMicros();
+ public long getCounterWriteOperations();
+ public long getTotalCounterWriteLatencyMicros();
+ public double getRecentCounterWriteLatencyMicros();
+ public long[] getTotalCounterWriteLatencyHistogramMicros();
+ public long[] getRecentCounterWriteLatencyHistogramMicros();
+
public boolean getHintedHandoffEnabled();
public void setHintedHandoffEnabled(boolean b);
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1060432&r1=1060431&r2=1060432&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Tue Jan 18 16:07:32 2011
@@ -107,7 +107,7 @@ public class StorageService implements I
INDEX_SCAN,
REPLICATION_FINISHED,
INTERNAL_RESPONSE, // responses to internal calls
- REPLICATE_ON_WRITE,
+ COUNTER_MUTATION,
;
// remember to add new verbs at the end, since we serialize by ordinal
}
@@ -136,7 +136,7 @@ public class StorageService implements I
put(Verb.INDEX_SCAN, Stage.READ);
put(Verb.REPLICATION_FINISHED, Stage.MISC);
put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
- put(Verb.REPLICATE_ON_WRITE, Stage.REPLICATE_ON_WRITE);
+ put(Verb.COUNTER_MUTATION, Stage.MUTATION);
}};
@@ -224,7 +224,7 @@ public class StorageService implements I
MessagingService.instance().registerVerbHandlers(Verb.READ, new
ReadVerbHandler());
MessagingService.instance().registerVerbHandlers(Verb.RANGE_SLICE, new
RangeSliceVerbHandler());
MessagingService.instance().registerVerbHandlers(Verb.INDEX_SCAN, new
IndexScanVerbHandler());
-
MessagingService.instance().registerVerbHandlers(Verb.REPLICATE_ON_WRITE, new
ReplicateOnWriteVerbHandler());
+
MessagingService.instance().registerVerbHandlers(Verb.COUNTER_MUTATION, new
CounterMutationVerbHandler());
// see BootStrapper for a summary of how the bootstrap verbs interact
MessagingService.instance().registerVerbHandlers(Verb.BOOTSTRAP_TOKEN,
new BootStrapper.BootstrapTokenVerbHandler());
MessagingService.instance().registerVerbHandlers(Verb.STREAM_REQUEST,
new StreamRequestVerbHandler());
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1060432&r1=1060431&r2=1060432&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
Tue Jan 18 16:07:32 2011
@@ -18,7 +18,6 @@
package org.apache.cassandra.thrift;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
@@ -397,7 +396,9 @@ public class CassandraServer implements
ThriftValidation.validateMutation(state().getKeyspace(),
cfName, mutation);
}
}
-
rowMutations.add(RowMutation.getRowMutationFromMutations(state().getKeyspace(),
key, columnFamilyToMutations));
+ RowMutation rm =
RowMutation.getRowMutationFromMutations(state().getKeyspace(), key,
columnFamilyToMutations);
+ if (!rm.isEmpty())
+ rowMutations.add(rm);
}
doInsert(consistency_level, rowMutations);
@@ -441,7 +442,23 @@ public class CassandraServer implements
try
{
- StorageProxy.mutate(mutations, consistency_level);
+ if (!mutations.isEmpty())
+ {
+ // FIXME: Mighty ugly but we've made sure above this will
always work
+ if
(mutations.iterator().next().getColumnFamilies().iterator().next().metadata().getDefaultValidator().isCommutative())
+ {
+ List<org.apache.cassandra.db.CounterMutation>
cmutations = new
ArrayList<org.apache.cassandra.db.CounterMutation>(mutations.size());
+ for (RowMutation mutation : mutations)
+ {
+ cmutations.add(new
org.apache.cassandra.db.CounterMutation(mutation, consistency_level));
+ }
+ StorageProxy.mutateCounters(cmutations);
+ }
+ else
+ {
+ StorageProxy.mutate(mutations, consistency_level);
+ }
+ }
}
catch (TimeoutException e)
{
@@ -961,13 +978,8 @@ public class CassandraServer implements
{
logger.debug("add");
- if (ConsistencyLevel.ONE != consistency_level)
- {
- throw new InvalidRequestException("Commutative CFs only support
ConsistencyLevel.ONE");
- }
-
String keyspace = state().getKeyspace();
- ThriftValidation.validateCommutative(keyspace,
column_parent.column_family);
+ ThriftValidation.validateCommutativeForWrite(keyspace,
column_parent.column_family, consistency_level);
internal_insert(key, column_parent, getCounterColumn(column),
consistency_level);
}
@@ -1017,11 +1029,6 @@ public class CassandraServer implements
{
logger.debug("batch_add");
- if (ConsistencyLevel.ONE != consistency_level)
- {
- throw new InvalidRequestException("Commutative CFs only support
ConsistencyLevel.ONE");
- }
-
String keyspace = state().getKeyspace();
Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map = new
HashMap<ByteBuffer,Map<String,List<Mutation>>>();
@@ -1032,7 +1039,7 @@ public class CassandraServer implements
for (Entry<String, List<CounterMutation>> innerEntry :
entry.getValue().entrySet())
{
- ThriftValidation.validateCommutative(keyspace,
innerEntry.getKey());
+ ThriftValidation.validateCommutativeForWrite(keyspace,
innerEntry.getKey(), consistency_level);
List<Mutation> mutations = new
ArrayList<Mutation>(innerEntry.getValue().size());
for (CounterMutation cm : innerEntry.getValue())
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java?rev=1060432&r1=1060431&r2=1060432&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java
Tue Jan 18 16:07:32 2011
@@ -445,7 +445,7 @@ public class ThriftValidation
return metadata;
}
- static void validateCommutative(String tablename, String cfName) throws
InvalidRequestException
+ public static CFMetaData validateCommutative(String tablename, String
cfName) throws InvalidRequestException
{
validateTable(tablename);
CFMetaData metadata = validateCFMetaData(tablename, cfName);
@@ -453,5 +453,15 @@ public class ThriftValidation
{
throw new InvalidRequestException("not commutative columnfamily "
+ cfName);
}
+ return metadata;
+ }
+
+ public static void validateCommutativeForWrite(String tablename, String
cfName, ConsistencyLevel consistency) throws InvalidRequestException
+ {
+ CFMetaData metadata = validateCommutative(tablename, cfName);
+ if (!metadata.getReplicateOnWrite() && consistency !=
ConsistencyLevel.ONE)
+ {
+ throw new InvalidRequestException("cannot achieve CL > CL.ONE
without replicate_on_write on columnfamily " + cfName);
+ }
}
}