Author: jbellis
Date: Tue Oct 19 17:32:00 2010
New Revision: 1024332

URL: http://svn.apache.org/viewvc?rev=1024332&view=rev
Log:
merge from 0.6

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 19 17:32:00 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-1021419
+/cassandra/branches/cassandra-0.6:922689-1024328
 /cassandra/trunk:978791
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1024332&r1=1024331&r2=1024332&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Oct 19 17:32:00 2010
@@ -88,6 +88,9 @@ dev
    (CASSANDRA-1429)
  * Add CfDef.default_validation_class (CASSANDRA-891)
  * fix EstimatedHistogram.max (CASSANDRA-1413)
+ * quorum read optimization (CASSANDRA-1622)
+
+
  * handle zero-length (or missing) rows during HH paging (CASSANDRA-1432)
  * include secondary indexes during schema migrations (CASSANDRA-1406)
  * fix commitlog header race during schema change (CASSANDRA-1435)

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 19 17:32:00 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1021419
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1024328
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:978791
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 19 17:32:00 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1021419
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1024328
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:978791
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 19 17:32:00 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1021419
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1024328
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:978791
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 19 17:32:00 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1021419
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1024328
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:978791
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Oct 19 17:32:00 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1021419
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1024328
 
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:978791
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=1024332&r1=1024331&r2=1024332&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java 
Tue Oct 19 17:32:00 2010
@@ -37,4 +37,5 @@ public interface IResponseResolver<T> {
        public T resolve(Collection<Message> responses) throws 
DigestMismatchException, IOException;
        public boolean isDataPresent(Collection<Message> responses);
 
+    public void preprocess(Message message);
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=1024332&r1=1024331&r2=1024332&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
 Tue Oct 19 17:32:00 2010
@@ -92,6 +92,7 @@ public class QuorumResponseHandler<T> im
     public void response(Message message)
     {
         responses.add(message);
+        responseResolver.preprocess(message);
         if (responses.size() < blockfor) {
             return;
         }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1024332&r1=1024331&r2=1024332&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
 Tue Oct 19 17:32:00 2010
@@ -108,6 +108,10 @@ public class RangeSliceResponseResolver 
         return resolvedRows;
     }
 
+    public void preprocess(Message message)
+    {
+    }
+
     public boolean isDataPresent(Collection<Message> responses)
     {
         return responses.size() >= sources.size();

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1024332&r1=1024331&r2=1024332&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java 
Tue Oct 19 17:32:00 2010
@@ -22,16 +22,14 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOError;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
+import java.util.*;
 
 import org.apache.cassandra.db.*;
 import java.net.InetAddress;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +42,7 @@ public class ReadResponseResolver implem
 {
        private static Logger logger_ = 
LoggerFactory.getLogger(ReadResponseResolver.class);
     private final String table;
+    private final Map<Message, ReadResponse> results = new 
NonBlockingHashMap<Message, ReadResponse>();
 
     public ReadResponseResolver(String table)
     {
@@ -59,6 +58,9 @@ public class ReadResponseResolver implem
       */
        public Row resolve(Collection<Message> responses) throws 
DigestMismatchException, IOException
     {
+        if (logger_.isDebugEnabled())
+            logger_.debug("resolving " + responses.size() + " responses");
+
         long startTime = System.currentTimeMillis();
                List<ColumnFamily> versions = new 
ArrayList<ColumnFamily>(responses.size());
                List<InetAddress> endpoints = new 
ArrayList<InetAddress>(responses.size());
@@ -72,11 +74,11 @@ public class ReadResponseResolver implem
          * query exists then we need to compare the digest with 
          * the digest of the data that is received.
         */
-               for (Message response : responses)
-               {                                                   
-            byte[] body = response.getMessageBody();
-            ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
-            ReadResponse result = ReadResponse.serializer().deserialize(new 
DataInputStream(bufIn));
+               for (Message message : responses)
+               {
+            ReadResponse result = results.get(message);
+            if (result == null)
+                continue; // arrived after quorum already achieved
             if (result.isDigestQuery())
             {
                 digest = result.digest();
@@ -85,11 +87,12 @@ public class ReadResponseResolver implem
             else
             {
                 versions.add(result.row().cf);
-                endpoints.add(response.getFrom());
+                endpoints.add(message.getFrom());
                 key = result.row().key;
             }
         }
-               // If there was a digest query compare it with all the data 
digests 
+
+               // If there was a digest query compare it with all the data 
digests
                // If there is a mismatch then throw an exception so that read 
repair can happen.
         if (isDigestQuery)
         {
@@ -102,10 +105,22 @@ public class ReadResponseResolver implem
                     throw new DigestMismatchException(s);
                 }
             }
+            if (logger_.isDebugEnabled())
+                logger_.debug("digests verified");
         }
 
-        ColumnFamily resolved = resolveSuperset(versions);
-        maybeScheduleRepairs(resolved, table, key, versions, endpoints);
+        ColumnFamily resolved;
+        if (versions.size() > 1)
+        {
+            resolved = resolveSuperset(versions);
+            if (logger_.isDebugEnabled())
+                logger_.debug("versions merged");
+            maybeScheduleRepairs(resolved, table, key, versions, endpoints);
+        }
+        else
+        {
+            resolved = versions.get(0);
+        }
 
         if (logger_.isDebugEnabled())
             logger_.debug("resolve: " + (System.currentTimeMillis() - 
startTime) + " ms.");
@@ -162,27 +177,31 @@ public class ReadResponseResolver implem
         return resolved;
     }
 
-       public boolean isDataPresent(Collection<Message> responses)
+    public void preprocess(Message message)
+    {
+        byte[] body = message.getMessageBody();
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+        try
+        {
+            ReadResponse result = ReadResponse.serializer().deserialize(new 
DataInputStream(bufIn));
+            results.put(message, result);
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    public boolean isDataPresent(Collection<Message> responses)
        {
-        boolean isDataPresent = false;
-        for (Message response : responses)
+        for (Message message : responses)
         {
-            byte[] body = response.getMessageBody();
-            ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
-            try
-            {
-                ReadResponse result = 
ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
-                if (!result.isDigestQuery())
-                {
-                    isDataPresent = true;
-                }
-                bufIn.close();
-            }
-            catch (IOException ex)
-            {
-                throw new RuntimeException(ex);
-            }
+            ReadResponse result = results.get(message);
+            if (result == null)
+                continue; // arrived concurrently
+            if (result.isDigestQuery())
+                return true;
         }
-        return isDataPresent;
+        return false;
     }
 }


Reply via email to