Author: jbellis
Date: Mon Jul 26 04:45:16 2010
New Revision: 979160
URL: http://svn.apache.org/viewvc?rev=979160&view=rev
Log:
merge from 0.6
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 04:45:16 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-964141,965151,965457,965537,965604,965630-966676
+/cassandra/branches/cassandra-0.6:922689-964141,965151,965457,965537,965604,965630-966676,979156
/cassandra/trunk:978791
/incubator/cassandra/branches/cassandra-0.3:774578-796573
/incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=979160&r1=979159&r2=979160&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Jul 26 04:45:16 2010
@@ -65,6 +65,8 @@ dev
* cassandra-cli.bat works on windows (CASSANDRA-1236)
* pre-emptively drop requests that cannot be processed within RPCTimeout
(CASSANDRA-685)
+ * add ack to Binary write verb and update CassandraBulkLoader
+ to wait for acks for each row (CASSANDRA-1093)
0.6.3
Modified: cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java?rev=979160&r1=979159&r2=979160&view=diff
==============================================================================
--- cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java (original)
+++ cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java Mon Jul 26
04:45:16 2010
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,7 +17,7 @@
*/
/**
- * Cassandra has a backdoor called the Binary Memtable. The purpose of this
backdoor is to
+ * Cassandra has a back door called the Binary Memtable. The purpose of this
backdoor is to
* mass import large amounts of data, without using the Thrift interface.
*
* Inserting data through the binary memtable, allows you to skip the commit
log overhead, and an ack
@@ -36,6 +36,12 @@
* in the mapper, so that the end result generates the data set into a column
oriented subset. Once you get to the
* reduce aspect, you can generate the ColumnFamilies you want inserted, and
send it to your nodes.
*
+ * For Cassandra 0.6.4, we modified this example to wait for acks from all
Cassandra nodes for each row
+ * before proceeding to the next. This means to keep Cassandra similarly
busy you can either
+ * 1) add more reducer tasks,
+ * 2) remove the "wait for acks" block of code,
+ * 3) parallelize the writing of rows to Cassandra, e.g. with an Executor.
+ *
* THIS CANNOT RUN ON THE SAME IP ADDRESS AS A CASSANDRA INSTANCE.
*/
@@ -60,7 +66,10 @@ import org.apache.cassandra.dht.BigInteg
import org.apache.cassandra.io.util.DataOutputBuffer;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
@@ -174,10 +183,24 @@ public class CassandraBulkLoader {
/* Get serialized message to send to cluster */
message = createMessage(keyspace, key.getBytes(), cfName,
columnFamilies);
+ List<IAsyncResult> results = new ArrayList<IAsyncResult>();
for (InetAddress endpoint:
StorageService.instance.getNaturalEndpoints(keyspace, key.getBytes()))
{
/* Send message to end point */
- MessagingService.instance.sendOneWay(message, endpoint);
+ results.add(MessagingService.instance.sendRR(message,
endpoint));
+ }
+ /* wait for acks */
+ for (IAsyncResult result : results)
+ {
+ try
+ {
+ result.get(DatabaseDescriptor.getRpcTimeout(),
TimeUnit.MILLISECONDS);
+ }
+ catch (TimeoutException e)
+ {
+ // you should probably add retry logic here
+ throw new RuntimeException(e);
+ }
}
output.collect(key, new Text(" inserted into Cassandra node(s)"));
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 04:45:16 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-964141,965151,965457,965537,965604,965630-966676
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-964141,965151,965457,965537,965604,965630-966676,979156
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:978791
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 04:45:16 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-964141,965151,965457,965537,965604,965630-966676
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-964141,965151,965457,965537,965604,965630-966676,979156
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:978791
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 04:45:16 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-964141,965151,965457,965537,965604,965630-966676
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-964141,965151,965457,965537,965604,965630-966676,979156
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:978791
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 04:45:16 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-964141,965151,965457,965537,965604,965630-966676
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-964141,965151,965457,965537,965604,965630-966676,979156
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:978791
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 04:45:16 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-964141,965151,965457,965537,965604,965630-966676
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-964141,965151,965457,965537,965604,965630-966676,979156
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:978791
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java?rev=979160&r1=979159&r2=979160&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java Mon
Jul 26 04:45:16 2010
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,11 +42,16 @@ public class BinaryVerbHandler implement
RowMutationMessage rmMsg =
RowMutationMessage.serializer().deserialize(new DataInputStream(buffer));
RowMutation rm = rmMsg.getRowMutation();
rm.applyBinary();
+
+ WriteResponse response = new WriteResponse(rm.getTable(),
rm.key(), true);
+ Message responseMessage =
WriteResponse.makeWriteResponseMessage(message, response);
+ if (logger_.isDebugEnabled())
+ logger_.debug("binary " + rm + " applied. Sending response to "
+ message.getMessageId() + "@" + message.getFrom());
+ MessagingService.instance.sendOneWay(responseMessage,
message.getFrom());
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
-
}