Author: jbellis
Date: Mon Jul 26 04:39:42 2010
New Revision: 979156
URL: http://svn.apache.org/viewvc?rev=979156&view=rev
Log:
add ack to Binary write verb. patch by jbellis; reviewed by Toby Jungen for
CASSANDRA-1093
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=979156&r1=979155&r2=979156&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Mon Jul 26 04:39:42 2010
@@ -17,6 +17,8 @@
* cassandra-cli.bat works on windows (CASSANDRA-1236)
* pre-emptively drop requests that cannot be processed within RPCTimeout
(CASSANDRA-685)
+ * add ack to Binary write verb and update CassandraBulkLoader
+ to wait for acks for each row (CASSANDRA-1093)
0.6.3
Modified:
cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java?rev=979156&r1=979155&r2=979156&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java
(original)
+++
cassandra/branches/cassandra-0.6/contrib/bmt_example/CassandraBulkLoader.java
Mon Jul 26 04:39:42 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,7 +17,7 @@
*/
/**
- * Cassandra has a backdoor called the Binary Memtable. The purpose of this
backdoor is to
+ * Cassandra has a back door called the Binary Memtable. The purpose of this
backdoor is to
* mass import large amounts of data, without using the Thrift interface.
*
* Inserting data through the binary memtable, allows you to skip the commit
log overhead, and an ack
@@ -36,6 +36,12 @@
* in the mapper, so that the end result generates the data set into a column
oriented subset. Once you get to the
* reduce aspect, you can generate the ColumnFamilies you want inserted, and
send it to your nodes.
*
+ * For Cassandra 0.6.4, we modified this example to wait for acks from all
Cassandra nodes for each row
+ * before proceeding to the next. This means to keep Cassandra similarly
busy you can either
+ * 1) add more reducer tasks,
+ * 2) remove the "wait for acks" block of code,
+ * 3) parallelize the writing of rows to Cassandra, e.g. with an Executor.
+ *
* THIS CANNOT RUN ON THE SAME IP ADDRESS AS A CASSANDRA INSTANCE.
*/
@@ -60,7 +66,10 @@ import org.apache.cassandra.dht.BigInteg
import org.apache.cassandra.io.util.DataOutputBuffer;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
@@ -173,10 +182,24 @@ public class CassandraBulkLoader {
/* Get serialized message to send to cluster */
message = createMessage(keyspace, key.toString(), cfName,
columnFamilies);
+ List<IAsyncResult> results = new ArrayList<IAsyncResult>();
for (InetAddress endpoint:
StorageService.instance.getNaturalEndpoints(keyspace, key.toString()))
{
/* Send message to end point */
- MessagingService.instance.sendOneWay(message, endpoint);
+ results.add(MessagingService.instance.sendRR(message,
endpoint));
+ }
+ /* wait for acks */
+ for (IAsyncResult result : results)
+ {
+ try
+ {
+ result.get(DatabaseDescriptor.getRpcTimeout(),
TimeUnit.MILLISECONDS);
+ }
+ catch (TimeoutException e)
+ {
+ // you should probably add retry logic here
+ throw new RuntimeException(e);
+ }
}
output.collect(key, new Text(" inserted into Cassandra node(s)"));
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/BinaryVerbHandler.java?rev=979156&r1=979155&r2=979156&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
Mon Jul 26 04:39:42 2010
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
import org.apache.log4j.Logger;
@@ -40,11 +41,16 @@ public class BinaryVerbHandler implement
RowMutationMessage rmMsg =
RowMutationMessage.serializer().deserialize(new DataInputStream(buffer));
RowMutation rm = rmMsg.getRowMutation();
rm.applyBinary();
+
+ WriteResponse response = new WriteResponse(rm.getTable(),
rm.key(), true);
+ Message responseMessage =
WriteResponse.makeWriteResponseMessage(message, response);
+ if (logger_.isDebugEnabled())
+ logger_.debug("binary " + rm + " applied. Sending response to "
+ message.getMessageId() + "@" + message.getFrom());
+ MessagingService.instance.sendOneWay(responseMessage,
message.getFrom());
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
-
}