Author: jbellis
Date: Thu Nov 11 00:32:53 2010
New Revision: 1033785
URL: http://svn.apache.org/viewvc?rev=1033785&view=rev
Log:
fix read repair regression from 0.6.7
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-1727
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1033785&r1=1033784&r2=1033785&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Nov 11 00:32:53 2010
@@ -1,6 +1,7 @@
-0.6.7
+dev
* Update windows .bat files to work outside of main Cassandra
directory (CASSANDRA-1713)
+ * fix read repair regression from 0.6.7 (CASSANDRA-1727)
0.6.7
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1033785&r1=1033784&r2=1033785&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Thu Nov 11 00:32:53 2010
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.cache.ICacheExpungeHook;
@@ -110,7 +111,7 @@ class ConsistencyChecker implements Runn
if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
{
- IResponseResolver<Row> readResponseResolver = new
ReadResponseResolver(table_, replicas_.size());
+ ReadResponseResolver readResponseResolver = new
ReadResponseResolver(table_, replicas_.size());
IAsyncCallback responseHandler;
if (replicas_.contains(FBUtilities.getLocalAddress()))
responseHandler = new DataRepairHandler(row_,
replicas_.size(), readResponseResolver);
@@ -141,33 +142,32 @@ class ConsistencyChecker implements Runn
static class DataRepairHandler implements IAsyncCallback,
ICacheExpungeHook<String, String>
{
private final Collection<Message> responses_ = new
LinkedBlockingQueue<Message>();
- private final IResponseResolver<Row> readResponseResolver_;
+ private final ReadResponseResolver readResponseResolver_;
private final int majority_;
- DataRepairHandler(int responseCount, IResponseResolver<Row>
readResponseResolver)
+ DataRepairHandler(int responseCount, ReadResponseResolver
readResponseResolver)
{
readResponseResolver_ = readResponseResolver;
majority_ = (responseCount / 2) + 1;
}
- public DataRepairHandler(Row localRow, int responseCount,
IResponseResolver<Row> readResponseResolver) throws IOException
+ public DataRepairHandler(Row localRow, int responseCount,
ReadResponseResolver readResponseResolver) throws IOException
{
this(responseCount, readResponseResolver);
// wrap localRow in a response Message so it doesn't need to be
special-cased in the resolver
ReadResponse readResponse = new ReadResponse(localRow);
- DataOutputBuffer out = new DataOutputBuffer();
- ReadResponse.serializer().serialize(readResponse, out);
- byte[] bytes = new byte[out.getLength()];
- System.arraycopy(out.getData(), 0, bytes, 0, bytes.length);
- responses_.add(new Message(FBUtilities.getLocalAddress(),
StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE, bytes));
+ Message fakeMessage = new Message(FBUtilities.getLocalAddress(),
StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE,
ArrayUtils.EMPTY_BYTE_ARRAY);
+ responses_.add(fakeMessage);
+ readResponseResolver_.injectPreProcessed(fakeMessage,
readResponse);
}
// synchronized so the " == majority" is safe
public synchronized void response(Message message)
{
if (logger_.isDebugEnabled())
- logger_.debug("Received responses in
DataRepairHandler : " + message.toString());
+ logger_.debug("Received response in DataRepairHandler
: " + message.toString());
responses_.add(message);
+ readResponseResolver_.preprocess(message);
if (responses_.size() == majority_)
{
String messageId = message.getMessageId();
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1033785&r1=1033784&r2=1033785&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Thu Nov 11 00:32:53 2010
@@ -201,6 +201,12 @@ public class ReadResponseResolver implem
}
}
+ /** hack so ConsistencyChecker doesn't have to serialize/deserialize an
extra real Message */
+ public void injectPreProcessed(Message message, ReadResponse result)
+ {
+ results.put(message, result);
+ }
+
public boolean isDataPresent(Collection<Message> responses)
{
int digests = 0;