Author: jbellis
Date: Thu Nov 11 00:32:53 2010
New Revision: 1033785

URL: http://svn.apache.org/viewvc?rev=1033785&view=rev
Log:
fix read repair regression from 0.6.7
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-1727

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1033785&r1=1033784&r2=1033785&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Nov 11 00:32:53 2010
@@ -1,6 +1,7 @@
-0.6.7
+dev
  * Update windows .bat files to work outside of main Cassandra
    directory (CASSANDRA-1713)
+ * fix read repair regression from 0.6.7 (CASSANDRA-1727)
 
 
 0.6.7

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1033785&r1=1033784&r2=1033785&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
 Thu Nov 11 00:32:53 2010
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.log4j.Logger;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.cache.ICacheExpungeHook;
@@ -110,7 +111,7 @@ class ConsistencyChecker implements Runn
 
                 if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
                 {
-                    IResponseResolver<Row> readResponseResolver = new 
ReadResponseResolver(table_, replicas_.size());
+                    ReadResponseResolver readResponseResolver = new 
ReadResponseResolver(table_, replicas_.size());
                     IAsyncCallback responseHandler;
                     if (replicas_.contains(FBUtilities.getLocalAddress()))
                         responseHandler = new DataRepairHandler(row_, 
replicas_.size(), readResponseResolver);
@@ -141,33 +142,32 @@ class ConsistencyChecker implements Runn
        static class DataRepairHandler implements IAsyncCallback, 
ICacheExpungeHook<String, String>
        {
                private final Collection<Message> responses_ = new 
LinkedBlockingQueue<Message>();
-               private final IResponseResolver<Row> readResponseResolver_;
+               private final ReadResponseResolver readResponseResolver_;
                private final int majority_;
                
-               DataRepairHandler(int responseCount, IResponseResolver<Row> 
readResponseResolver)
+               DataRepairHandler(int responseCount, ReadResponseResolver 
readResponseResolver)
                {
                        readResponseResolver_ = readResponseResolver;
                        majority_ = (responseCount / 2) + 1;  
                }
 
-        public DataRepairHandler(Row localRow, int responseCount, 
IResponseResolver<Row> readResponseResolver) throws IOException
+        public DataRepairHandler(Row localRow, int responseCount, 
ReadResponseResolver readResponseResolver) throws IOException
         {
             this(responseCount, readResponseResolver);
             // wrap localRow in a response Message so it doesn't need to be 
special-cased in the resolver
             ReadResponse readResponse = new ReadResponse(localRow);
-            DataOutputBuffer out = new DataOutputBuffer();
-            ReadResponse.serializer().serialize(readResponse, out);
-            byte[] bytes = new byte[out.getLength()];
-            System.arraycopy(out.getData(), 0, bytes, 0, bytes.length);
-            responses_.add(new Message(FBUtilities.getLocalAddress(), 
StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE, bytes));
+            Message fakeMessage = new Message(FBUtilities.getLocalAddress(), 
StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE, 
ArrayUtils.EMPTY_BYTE_ARRAY);
+            responses_.add(fakeMessage);
+            readResponseResolver_.injectPreProcessed(fakeMessage, 
readResponse);
         }
 
         // synchronized so the " == majority" is safe
                public synchronized void response(Message message)
                {
                        if (logger_.isDebugEnabled())
-                         logger_.debug("Received responses in 
DataRepairHandler : " + message.toString());
+                         logger_.debug("Received response in DataRepairHandler 
: " + message.toString());
                        responses_.add(message);
+            readResponseResolver_.preprocess(message);
             if (responses_.size() == majority_)
             {
                 String messageId = message.getMessageId();

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1033785&r1=1033784&r2=1033785&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
 Thu Nov 11 00:32:53 2010
@@ -201,6 +201,12 @@ public class ReadResponseResolver implem
         }
     }
 
+    /** hack so ConsistencyChecker doesn't have to serialize/deserialize an 
extra real Message */
+    public void injectPreProcessed(Message message, ReadResponse result)
+    {
+        results.put(message, result);
+    }
+
     public boolean isDataPresent(Collection<Message> responses)
        {
         int digests = 0;


Reply via email to