Author: jbellis
Date: Mon Jul 26 04:45:16 2010
New Revision: 979160

URL: http://svn.apache.org/viewvc?rev=979160&view=rev
Log:
merge from 0.6

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 04:45:16 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-964141,965151,965457,965537,965604,965630-966676
+/cassandra/branches/cassandra-0.6:922689-964141,965151,965457,965537,965604,965630-966676,979156
 /cassandra/trunk:978791
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=979160&r1=979159&r2=979160&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Jul 26 04:45:16 2010
@@ -65,6 +65,8 @@ dev
  * cassandra-cli.bat works on windows (CASSANDRA-1236)
  * pre-emptively drop requests that cannot be processed within RPCTimeout
    (CASSANDRA-685)
+ * add ack to Binary write verb and update CassandraBulkLoader
+   to wait for acks for each row (CASSANDRA-1093)
 
 
 0.6.3

Modified: cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java?rev=979160&r1=979159&r2=979160&view=diff
==============================================================================
--- cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java (original)
+++ cassandra/trunk/contrib/bmt_example/CassandraBulkLoader.java Mon Jul 26 
04:45:16 2010
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,7 +17,7 @@
  */
  
  /**
-  * Cassandra has a backdoor called the Binary Memtable. The purpose of this 
backdoor is to
+  * Cassandra has a back door called the Binary Memtable. The purpose of this 
backdoor is to
   * mass import large amounts of data, without using the Thrift interface.
   *
   * Inserting data through the binary memtable, allows you to skip the commit 
log overhead, and an ack
@@ -36,6 +36,12 @@
   * in the mapper, so that the end result generates the data set into a column 
oriented subset. Once you get to the
   * reduce aspect, you can generate the ColumnFamilies you want inserted, and 
send it to your nodes.
   *
+  * For Cassandra 0.6.4, we modified this example to wait for acks from all 
Cassandra nodes for each row
+  * before proceeding to the next.  This means to keep Cassandra similarly 
busy you can either
+  * 1) add more reducer tasks,
+  * 2) remove the "wait for acks" block of code,
+  * 3) parallelize the writing of rows to Cassandra, e.g. with an Executor.
+  *
   * THIS CANNOT RUN ON THE SAME IP ADDRESS AS A CASSANDRA INSTANCE.
   */
   
@@ -60,7 +66,10 @@ import org.apache.cassandra.dht.BigInteg
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
@@ -174,10 +183,24 @@ public class CassandraBulkLoader {
 
             /* Get serialized message to send to cluster */
             message = createMessage(keyspace, key.getBytes(), cfName, 
columnFamilies);
+            List<IAsyncResult> results = new ArrayList<IAsyncResult>();
             for (InetAddress endpoint: 
StorageService.instance.getNaturalEndpoints(keyspace, key.getBytes()))
             {
                 /* Send message to end point */
-                MessagingService.instance.sendOneWay(message, endpoint);
+                results.add(MessagingService.instance.sendRR(message, 
endpoint));
+            }
+            /* wait for acks */
+            for (IAsyncResult result : results)
+            {
+                try
+                {
+                    result.get(DatabaseDescriptor.getRpcTimeout(), 
TimeUnit.MILLISECONDS);
+                }
+                catch (TimeoutException e)
+                {
+                    // you should probably add retry logic here
+                    throw new RuntimeException(e);
+                }
             }
             
             output.collect(key, new Text(" inserted into Cassandra node(s)"));

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 04:45:16 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-964141,965151,965457,965537,965604,965630-966676
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-964141,965151,965457,965537,965604,965630-966676,979156
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:978791
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 04:45:16 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-964141,965151,965457,965537,965604,965630-966676
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-964141,965151,965457,965537,965604,965630-966676,979156
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:978791
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 04:45:16 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-964141,965151,965457,965537,965604,965630-966676
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-964141,965151,965457,965537,965604,965630-966676,979156
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:978791
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 04:45:16 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-964141,965151,965457,965537,965604,965630-966676
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-964141,965151,965457,965537,965604,965630-966676,979156
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:978791
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 04:45:16 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-964141,965151,965457,965537,965604,965630-966676
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-964141,965151,965457,965537,965604,965630-966676,979156
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:978791
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java?rev=979160&r1=979159&r2=979160&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/BinaryVerbHandler.java Mon 
Jul 26 04:45:16 2010
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
 
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,11 +42,16 @@ public class BinaryVerbHandler implement
             RowMutationMessage rmMsg = 
RowMutationMessage.serializer().deserialize(new DataInputStream(buffer));
             RowMutation rm = rmMsg.getRowMutation();
             rm.applyBinary();
+
+            WriteResponse response = new WriteResponse(rm.getTable(), 
rm.key(), true);
+            Message responseMessage = 
WriteResponse.makeWriteResponseMessage(message, response);
+            if (logger_.isDebugEnabled())
+              logger_.debug("binary " + rm + " applied.  Sending response to " 
+ message.getMessageId() + "@" + message.getFrom());
+            MessagingService.instance.sendOneWay(responseMessage, 
message.getFrom());
         }
         catch (Exception e)
         {
             throw new RuntimeException(e);
         }
     }
-
 }


Reply via email to