Author: jbellis
Date: Thu Apr 1 01:53:56 2010
New Revision: 929777
URL: http://svn.apache.org/viewvc?rev=929777&view=rev
Log:
merge from 0.6
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr 1 01:53:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-929765
+/cassandra/branches/cassandra-0.6:922689-929775
/incubator/cassandra/branches/cassandra-0.3:774578-796573
/incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5:888872-915439
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=929777&r1=929776&r2=929777&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Apr 1 01:53:56 2010
@@ -8,6 +8,8 @@ dev
0.6.1
* fix NPE in sstable2json when no excluded keys are given (CASSANDRA-934)
+ * keep the replica set constant throughout the read repair process
+ (CASSANDRA-937)
0.6.0-RC1
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr 1 01:53:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-929765
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-929775
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-903502
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr 1 01:53:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-929765
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-929775
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-903502
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr 1 01:53:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-929765
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-929775
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-903502
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr 1 01:53:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-929765
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-929775
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-903502
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr 1 01:53:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-929765
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-929775
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-903502
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=929777&r1=929776&r2=929777&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Thu
Apr 1 01:53:56 2010
@@ -98,9 +98,7 @@ public class ReadVerbHandler implements
if (message.getHeader(ReadCommand.DO_REPAIR) != null)
{
List<InetAddress> endpoints =
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
- /* Remove the local storage endpoint from the list. */
- endpoints.remove(FBUtilities.getLocalAddress());
- if (endpoints.size() > 0)
+ if (endpoints.size() > 1)
StorageService.instance.doConsistencyCheck(row, endpoints,
command);
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=929777&r1=929776&r2=929777&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Thu Apr 1 01:53:56 2010
@@ -30,10 +30,13 @@ import java.util.concurrent.LinkedBlocki
import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.cache.ICacheExpungeHook;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Row;
+import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
@@ -47,19 +50,18 @@ import org.slf4j.LoggerFactory;
class ConsistencyChecker implements Runnable
{
private static Logger logger_ =
LoggerFactory.getLogger(ConsistencyManager.class);
- private static long scheduledTimeMillis_ = 600;
- private static ExpiringMap<String, String> readRepairTable_ = new
ExpiringMap<String, String>(scheduledTimeMillis_);
+ private static ExpiringMap<String, String> readRepairTable_ = new
ExpiringMap<String, String>(DatabaseDescriptor.getRpcTimeout());
private final String table_;
private final Row row_;
protected final List<InetAddress> replicas_;
private final ReadCommand readCommand_;
- public ConsistencyChecker(String table, Row row, List<InetAddress>
replicas, ReadCommand readCommand)
+ public ConsistencyChecker(String table, Row row, List<InetAddress>
endpoints, ReadCommand readCommand)
{
table_ = table;
row_ = row;
- replicas_ = replicas;
+ replicas_ = endpoints;
readCommand_ = readCommand;
}
@@ -71,7 +73,13 @@ class ConsistencyChecker implements Runn
Message message =
readCommandDigestOnly.makeReadMessage();
if (logger_.isDebugEnabled())
logger_.debug("Reading consistency digest for " +
readCommand_.key + " from " + message.getMessageId() + "@[" +
StringUtils.join(replicas_, ", ") + "]");
- MessagingService.instance.sendRR(message, replicas_.toArray(new
InetAddress[replicas_.size()]), new DigestResponseHandler());
+
+ MessagingService.instance.addCallback(new DigestResponseHandler(),
message.getMessageId());
+ for (InetAddress endpoint : replicas_)
+ {
+ if (!endpoint.equals(FBUtilities.getLocalAddress()))
+ MessagingService.instance.sendOneWay(message, endpoint);
+ }
}
catch (IOException ex)
{
@@ -88,48 +96,49 @@ class ConsistencyChecker implements Runn
class DigestResponseHandler implements IAsyncCallback
{
- Collection<Message> responses_ = new
LinkedBlockingQueue<Message>();
+ private boolean repairInvoked;
- // syncronized so "size() == " works
- public synchronized void response(Message msg)
+ public synchronized void response(Message response)
{
- responses_.add(msg);
- if (responses_.size() != ConsistencyChecker.this.replicas_.size())
+ if (repairInvoked)
return;
- for (Message response : responses_)
+ try
{
- try
+ byte[] body = response.getMessageBody();
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+ ReadResponse result =
ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+ byte[] digest = result.digest();
+
+ if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
{
- byte[] body = response.getMessageBody();
- ByteArrayInputStream bufIn = new
ByteArrayInputStream(body);
- ReadResponse result =
ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
- byte[] digest = result.digest();
- if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
+ IResponseResolver<Row> readResponseResolver = new
ReadResponseResolver(table_, replicas_.size());
+ IAsyncCallback responseHandler;
+ if (replicas_.contains(FBUtilities.getLocalAddress()))
+ responseHandler = new DataRepairHandler(row_,
replicas_.size(), readResponseResolver);
+ else
+ responseHandler = new
DataRepairHandler(replicas_.size(), readResponseResolver);
+
+ ReadCommand readCommand = constructReadMessage(false);
+ Message message = readCommand.makeReadMessage();
+ if (logger_.isDebugEnabled())
+ logger_.debug("Performing read repair for " +
readCommand_.key + " to " + message.getMessageId() + "@[" +
StringUtils.join(replicas_, ", ") + "]");
+ MessagingService.instance.addCallback(responseHandler,
message.getMessageId());
+ for (InetAddress endpoint : replicas_)
{
- doReadRepair();
- break;
+ if (!endpoint.equals(FBUtilities.getLocalAddress()))
+ MessagingService.instance.sendOneWay(message,
endpoint);
}
+
+ repairInvoked = true;
}
- catch (Exception e)
- {
- throw new RuntimeException("Error handling responses for "
+ row_, e);
- }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Error handling responses for " +
row_, e);
}
}
-
- private void doReadRepair() throws IOException
- {
- replicas_.add(FBUtilities.getLocalAddress());
- IResponseResolver<Row> readResponseResolver = new
ReadResponseResolver(table_, replicas_.size());
- IAsyncCallback responseHandler = new
DataRepairHandler(replicas_.size(), readResponseResolver);
- ReadCommand readCommand = constructReadMessage(false);
- Message message = readCommand.makeReadMessage();
- if (logger_.isDebugEnabled())
- logger_.debug("Performing read repair for " + readCommand_.key +
" to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") +
"]");
- MessagingService.instance.sendRR(message, replicas_.toArray(new
InetAddress[replicas_.size()]), responseHandler);
- }
- }
+ }
static class DataRepairHandler implements IAsyncCallback,
ICacheExpungeHook<String, String>
{
@@ -143,6 +152,18 @@ class ConsistencyChecker implements Runn
majority_ = (responseCount / 2) + 1;
}
+ public DataRepairHandler(Row localRow, int responseCount,
IResponseResolver<Row> readResponseResolver) throws IOException
+ {
+ this(responseCount, readResponseResolver);
+ // wrap localRow in a response Message so it doesn't need to be
special-cased in the resolver
+ ReadResponse readResponse = new ReadResponse(localRow);
+ DataOutputBuffer out = new DataOutputBuffer();
+ ReadResponse.serializer().serialize(readResponse, out);
+ byte[] bytes = new byte[out.getLength()];
+ System.arraycopy(out.getData(), 0, bytes, 0, bytes.length);
+ responses_.add(new Message(FBUtilities.getLocalAddress(),
StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE, bytes));
+ }
+
// synchronized so the " == majority" is safe
public synchronized void response(Message message)
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=929777&r1=929776&r2=929777&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu
Apr 1 01:53:56 2010
@@ -764,9 +764,7 @@ public class StorageProxy implements Sto
if (DatabaseDescriptor.getConsistencyCheck())
{
List<InetAddress> endpoints =
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
- /* Remove the local storage endpoint from the list. */
- endpoints.remove(FBUtilities.getLocalAddress());
- if (endpoints.size() > 0)
+ if (endpoints.size() > 1)
StorageService.instance.doConsistencyCheck(row, endpoints,
command);
}