Author: jbellis
Date: Tue Dec 21 21:40:46 2010
New Revision: 1051662
URL: http://svn.apache.org/viewvc?rev=1051662&view=rev
Log:
fix NPE regression caused by CASSANDRA-1830
patch by jbellis; reviewed by mdennis and tjake
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1051662&r1=1051661&r2=1051662&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Tue Dec 21 21:40:46 2010
@@ -153,7 +153,7 @@ class ConsistencyChecker implements Runn
public DataRepairHandler() throws IOException
{
- readResponseResolver_ = new
ReadResponseResolver(readCommand_.table, replicas_.size());
+ readResponseResolver_ = new
ReadResponseResolver(readCommand_.table, readCommand_.key, replicas_.size());
majority_ = (replicas_.size() / 2) + 1;
// wrap original data Row in a response Message so it doesn't need
to be special-cased in the resolver
ReadResponse readResponse = new ReadResponse(row_);
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1051662&r1=1051661&r2=1051662&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Tue Dec 21 21:40:46 2010
@@ -24,11 +24,8 @@ import java.io.IOError;
import java.io.IOException;
import java.util.*;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ReadResponse;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.RowMutationMessage;
+import org.apache.cassandra.db.*;
+
import java.net.InetAddress;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
@@ -48,14 +45,16 @@ public class ReadResponseResolver implem
private final String table;
private final int responseCount;
private final Map<Message, ReadResponse> results = new
NonBlockingHashMap<Message, ReadResponse>();
+ private String key;
- public ReadResponseResolver(String table, int responseCount)
+ public ReadResponseResolver(String table, String key, int responseCount)
{
assert 1 <= responseCount && responseCount <=
DatabaseDescriptor.getReplicationFactor(table)
: "invalid response count " + responseCount;
this.responseCount = responseCount;
this.table = table;
+ this.key = key;
}
/*
@@ -73,7 +72,6 @@ public class ReadResponseResolver implem
long startTime = System.currentTimeMillis();
List<ColumnFamily> versions = new
ArrayList<ColumnFamily>(responses.size());
List<InetAddress> endPoints = new
ArrayList<InetAddress>(responses.size());
- String key = null;
byte[] digest = null;
/*
@@ -104,7 +102,6 @@ public class ReadResponseResolver implem
{
versions.add(result.row().cf);
endPoints.add(message.getFrom());
- key = result.row().key;
}
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1051662&r1=1051661&r2=1051662&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
Tue Dec 21 21:40:46 2010
@@ -470,7 +470,8 @@ public class StorageProxy implements Sto
if (logger.isDebugEnabled())
logger.debug("strongread reading " + (m == message ?
"data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" +
endpoint);
}
- QuorumResponseHandler<Row> quorumResponseHandler = new
QuorumResponseHandler<Row>(responseCount, new
ReadResponseResolver(command.table, responseCount));
+ ReadResponseResolver resolver = new
ReadResponseResolver(command.table, command.key, responseCount);
+ QuorumResponseHandler<Row> quorumResponseHandler = new
QuorumResponseHandler<Row>(responseCount, resolver);
MessagingService.instance.sendRR(messages, endPoints,
quorumResponseHandler);
quorumResponseHandlers.add(quorumResponseHandler);
commandEndPoints.add(endPoints);
@@ -500,7 +501,8 @@ public class StorageProxy implements Sto
if (logger.isDebugEnabled())
logger.debug("Digest mismatch:", ex);
int responseCount =
determineBlockFor(DatabaseDescriptor.getReplicationFactor(command.table),
consistency_level);
- QuorumResponseHandler<Row> qrhRepair = new
QuorumResponseHandler<Row>(responseCount, new
ReadResponseResolver(command.table, responseCount));
+ ReadResponseResolver resolver = new
ReadResponseResolver(command.table, command.key, responseCount);
+ QuorumResponseHandler<Row> qrhRepair = new
QuorumResponseHandler<Row>(responseCount, resolver);
Message messageRepair = command.makeReadMessage();
MessagingService.instance.sendRR(messageRepair,
commandEndPoints.get(i), qrhRepair);
if (repairResponseHandlers == null)