Author: jbellis
Date: Wed Jan 5 07:18:16 2011
New Revision: 1055326
URL: http://svn.apache.org/viewvc?rev=1055326&view=rev
Log:
merge from 0.7
Added:
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java
cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
Removed:
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 5 07:18:16 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1055311
-/cassandra/branches/cassandra-0.7:1026516-1055313
+/cassandra/branches/cassandra-0.7:1026516-1055325
/cassandra/branches/cassandra-0.7.0:1053690-1054631
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1055326&r1=1055325&r2=1055326&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Jan 5 07:18:16 2011
@@ -15,6 +15,8 @@
* Make snitches configurable at runtime (CASSANDRA-1374)
* retry hadoop split requests on connection failure (CASSANDRA-1927)
* implement describeOwnership for BOP, COPP (CASSANDRA-1928)
+ * make read repair behave as expected for ConsistencyLevel > ONE
+ (CASSANDRA-982)
0.7.0-rc4
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 5 07:18:16 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1055311
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1055313
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1055325
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1054631
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 5 07:18:16 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1055311
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1055313
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1055325
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1054631
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 5 07:18:16 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1055313
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1055325
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1054631
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 5 07:18:16 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1055313
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1055325
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1054631
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 5 07:18:16 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1055311
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1055313
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1055325
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1054631
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified:
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=1055326&r1=1055325&r2=1055326&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
Wed Jan 5 07:18:16 2011
@@ -223,15 +223,6 @@ public abstract class AbstractReplicatio
return getAddressRanges(temp).get(pendingAddress);
}
- public QuorumResponseHandler getQuorumResponseHandler(IResponseResolver
responseResolver, ConsistencyLevel consistencyLevel)
- {
- if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) ||
consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))
- {
- return new DatacenterQuorumResponseHandler(responseResolver,
consistencyLevel, table);
- }
- return new QuorumResponseHandler(responseResolver, consistencyLevel,
table);
- }
-
public void invalidateCachedTokenEndpointValues()
{
clearEndpointCache();
Added:
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java?rev=1055326&view=auto
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
(added)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
Wed Jan 5 07:18:16 2011
@@ -0,0 +1,88 @@
+package org.apache.cassandra.service;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.NetworkTopologyStrategy;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Datacenter Quorum response handler blocks for a quorum of responses from
the local DC
+ */
+public class DatacenterReadCallback<T> extends ReadCallback<T>
+{
+ private static final IEndpointSnitch snitch =
DatabaseDescriptor.getEndpointSnitch();
+ private static final String localdc =
snitch.getDatacenter(FBUtilities.getLocalAddress());
+ private AtomicInteger localResponses;
+
+ public DatacenterReadCallback(IResponseResolver<T> resolver,
ConsistencyLevel consistencyLevel, String table)
+ {
+ super(resolver, consistencyLevel, table);
+ localResponses = new AtomicInteger(blockfor);
+ }
+
+ @Override
+ public void response(Message message)
+ {
+ resolver.preprocess(message);
+
+ int n;
+ n = localdc.equals(snitch.getDatacenter(message.getFrom()))
+ ? localResponses.decrementAndGet()
+ : localResponses.get();
+
+ if (n == 0 && resolver.isDataPresent())
+ {
+ condition.signal();
+ }
+ }
+
+ @Override
+ public int determineBlockFor(ConsistencyLevel consistency_level, String
table)
+ {
+ NetworkTopologyStrategy stategy = (NetworkTopologyStrategy)
Table.open(table).getReplicationStrategy();
+ return (stategy.getReplicationFactor(localdc) / 2) + 1;
+ }
+
+ @Override
+ public void assureSufficientLiveNodes(Collection<InetAddress> endpoints)
throws UnavailableException
+ {
+ int localEndpoints = 0;
+ for (InetAddress endpoint : endpoints)
+ {
+ if (localdc.equals(snitch.getDatacenter(endpoint)))
+ localEndpoints++;
+ }
+
+ if(localEndpoints < blockfor)
+ throw new UnavailableException();
+ }
+}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=1055326&r1=1055325&r2=1055326&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
Wed Jan 5 07:18:16 2011
@@ -24,7 +24,7 @@ import org.apache.cassandra.net.Message;
public interface IResponseResolver<T> {
- /*
+ /**
* This Method resolves the responses that are passed in . for example
: if
* its write response then all we get is true or false return values
which
* implies if the writes were successful but for reads its more
complicated
@@ -33,8 +33,14 @@ public interface IResponseResolver<T> {
* needs from this interface.
*/
public T resolve() throws DigestMismatchException, IOException;
+
public boolean isDataPresent();
+ /**
+ * returns the data response without comparing with any digests
+ */
+ public T getData() throws IOException;
+
public void preprocess(Message message);
public Iterable<Message> getMessages();
public int getMessageCount();
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1055326&r1=1055325&r2=1055326&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
Wed Jan 5 07:18:16 2011
@@ -54,6 +54,15 @@ public class RangeSliceResponseResolver
this.table = table;
}
+ public List<Row> getData() throws IOException
+ {
+ Message response = responses.iterator().next();
+ RangeSliceReply reply =
RangeSliceReply.read(response.getMessageBody());
+ return reply.rows;
+ }
+
+ // Note: this deserializes the response a 2nd time if getData was called
first
+ // (this is not currently an issue since we don't do read repair for range
queries.)
public List<Row> resolve() throws IOException
{
CollatingIterator collator = new CollatingIterator(new
Comparator<Pair<Row,InetAddress>>()
Added: cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1055326&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java
(added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java Wed
Jan 5 07:18:16 2011
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.SimpleCondition;
+
+public class ReadCallback<T> implements IAsyncCallback
+{
+ protected static final Logger logger = LoggerFactory.getLogger(
ReadCallback.class );
+
+ public final IResponseResolver<T> resolver;
+ protected final SimpleCondition condition = new SimpleCondition();
+ private final long startTime;
+ protected final int blockfor;
+
+ /**
+ * Constructor when response count has to be calculated and blocked for.
+ */
+ public ReadCallback(IResponseResolver<T> resolver, ConsistencyLevel
consistencyLevel, String table)
+ {
+ this.blockfor = determineBlockFor(consistencyLevel, table);
+ this.resolver = resolver;
+ this.startTime = System.currentTimeMillis();
+
+ logger.debug("ReadCallback blocking for {} responses", blockfor);
+ }
+
+ public T get() throws TimeoutException, DigestMismatchException,
IOException
+ {
+ long timeout = DatabaseDescriptor.getRpcTimeout() -
(System.currentTimeMillis() - startTime);
+ boolean success;
+ try
+ {
+ success = condition.await(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException ex)
+ {
+ throw new AssertionError(ex);
+ }
+
+ if (!success)
+ {
+ StringBuilder sb = new StringBuilder("");
+ for (Message message : resolver.getMessages())
+ sb.append(message.getFrom()).append(", ");
+ throw new TimeoutException("Operation timed out - received only "
+ resolver.getMessageCount() + " responses from " + sb.toString() + " .");
+ }
+
+ return blockfor == 1 ? resolver.getData() : resolver.resolve();
+ }
+
+ public void close()
+ {
+ for (Message response : resolver.getMessages())
+ {
+ MessagingService.removeRegisteredCallback(response.getMessageId());
+ }
+ }
+
+ public void response(Message message)
+ {
+ resolver.preprocess(message);
+ if (resolver.getMessageCount() < blockfor)
+ return;
+ if (resolver.isDataPresent())
+ condition.signal();
+ }
+
+ public int determineBlockFor(ConsistencyLevel consistencyLevel, String
table)
+ {
+ switch (consistencyLevel)
+ {
+ case ONE:
+ case ANY:
+ return 1;
+ case QUORUM:
+ return
(Table.open(table).getReplicationStrategy().getReplicationFactor() / 2) + 1;
+ case ALL:
+ return
Table.open(table).getReplicationStrategy().getReplicationFactor();
+ default:
+ throw new UnsupportedOperationException("invalid consistency
level: " + consistencyLevel);
+ }
+ }
+
+ public void assureSufficientLiveNodes(Collection<InetAddress> endpoints)
throws UnavailableException
+ {
+ if (endpoints.size() < blockfor)
+ throw new UnavailableException();
+ }
+}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1055326&r1=1055325&r2=1055326&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Wed Jan 5 07:18:16 2011
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,8 +48,9 @@ public class ReadResponseResolver implem
{
private static Logger logger_ =
LoggerFactory.getLogger(ReadResponseResolver.class);
private final String table;
- private final Map<Message, ReadResponse> results = new
NonBlockingHashMap<Message, ReadResponse>();
+ private final ConcurrentMap<Message, ReadResponse> results = new
NonBlockingHashMap<Message, ReadResponse>();
private DecoratedKey key;
+ private ByteBuffer digest;
public ReadResponseResolver(String table, ByteBuffer key)
{
@@ -56,14 +58,29 @@ public class ReadResponseResolver implem
this.key = StorageService.getPartitioner().decorateKey(key);
}
+ public Row getData() throws IOException
+ {
+ for (Map.Entry<Message, ReadResponse> entry : results.entrySet())
+ {
+ ReadResponse result = entry.getValue();
+ if (!result.isDigestQuery())
+ return result.row();
+ }
+
+ throw new AssertionError("getData should not be invoked when no data
is present");
+ }
+
/*
- * This method handles two different scenarios:
+ * This method handles three different scenarios:
*
- * 1) we're handling the initial read, of data from the closest replica +
digests
+ * 1a)we're handling the initial read, of data from the closest replica +
digests
* from the rest. In this case we check the digests against each other,
* throw an exception if there is a mismatch, otherwise return the data
row.
*
- * 2) there was a mismatch on the initial read, so we redid the digest
requests
+ * 1b)we're checking additional digests that arrived after the minimum to
handle
+ * the requested ConsistencyLevel, i.e. asynchronouse read repair check
+ *
+ * 2) there was a mismatch on the initial read (1a or 1b), so we redid the
digest requests
* as full data reads. In this case we need to compute the most recent
version
* of each column, and send diffs to out-of-date replicas.
*/
@@ -75,10 +92,13 @@ public class ReadResponseResolver implem
long startTime = System.currentTimeMillis();
List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
List<InetAddress> endpoints = new ArrayList<InetAddress>();
- ByteBuffer digest = null;
// validate digests against each other; throw immediately on mismatch.
// also, collects data results into versions/endpoints lists.
+ //
+ // results are cleared as we process them, to avoid unnecessary
duplication of work
+ // when resolve() is called a second time for read repair on responses
that were not
+ // necessary to satisfy ConsistencyLevel.
for (Map.Entry<Message, ReadResponse> entry : results.entrySet())
{
ReadResponse result = entry.getValue();
@@ -106,6 +126,8 @@ public class ReadResponseResolver implem
versions.add(cf);
endpoints.add(from);
}
+
+ results.remove(message);
}
if (logger_.isDebugEnabled())
Added: cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java?rev=1055326&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
(added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
Wed Jan 5 07:18:16 2011
@@ -0,0 +1,55 @@
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.SimpleCondition;
+
+public class RepairCallback<T> implements IAsyncCallback
+{
+ public final IResponseResolver<T> resolver;
+ private final List<InetAddress> endpoints;
+ protected final SimpleCondition condition = new SimpleCondition();
+ private final long startTime;
+
+ public RepairCallback(IResponseResolver<T> resolver, List<InetAddress>
endpoints)
+ {
+ this.resolver = resolver;
+ this.endpoints = endpoints;
+ this.startTime = System.currentTimeMillis();
+ }
+
+ /**
+ * The main difference between this and ReadCallback is, ReadCallback has
a ConsistencyLevel
+ * it needs to achieve. Repair on the other hand is happy to repair
whoever replies within the timeout.
+ */
+ public T get() throws TimeoutException, DigestMismatchException,
IOException
+ {
+ long timeout = DatabaseDescriptor.getRpcTimeout() -
(System.currentTimeMillis() - startTime);
+ try
+ {
+ condition.await(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException ex)
+ {
+ throw new AssertionError(ex);
+ }
+
+ return resolver.resolve();
+ }
+
+
+ public void response(Message message)
+ {
+ resolver.preprocess(message);
+ if (resolver.getMessageCount() == endpoints.size())
+ condition.signal();
+ }
+
+}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1055326&r1=1055325&r2=1055326&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed
Jan 5 07:18:16 2011
@@ -50,7 +50,6 @@ import org.apache.cassandra.gms.Gossiper
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.thrift.*;
@@ -65,6 +64,8 @@ public class StorageProxy implements Sto
{
private static final Logger logger =
LoggerFactory.getLogger(StorageProxy.class);
+ private static ScheduledExecutorService repairExecutor = new
ScheduledThreadPoolExecutor(1); // TODO JMX-enable this
+
private static final Random random = new Random();
// mbean stuff
private static final LatencyTracker readStats = new LatencyTracker();
@@ -351,15 +352,7 @@ public class StorageProxy implements Sto
List<Row> rows;
try
{
- if (consistency_level == ConsistencyLevel.ONE)
- {
- rows = weakRead(commands);
- }
- else
- {
- assert consistency_level.getValue() >=
ConsistencyLevel.QUORUM.getValue();
- rows = strongRead(commands, consistency_level);
- }
+ rows = fetchRows(commands, consistency_level);
}
finally
{
@@ -368,91 +361,23 @@ public class StorageProxy implements Sto
return rows;
}
- private static List<Row> weakRead(List<ReadCommand> commands) throws
IOException, UnavailableException, TimeoutException
- {
- List<Row> rows = new ArrayList<Row>();
-
- // send off all the commands asynchronously
- List<Future<Object>> localFutures = null;
- HashMap<ReadCommand, IAsyncResult> remoteResults = null;
- for (ReadCommand command: commands)
- {
- InetAddress endPoint =
StorageService.instance.findSuitableEndpoint(command.table, command.key);
- if (endPoint.equals(FBUtilities.getLocalAddress()))
- {
- if (logger.isDebugEnabled())
- logger.debug("weakread reading " + command + " locally");
-
- if (localFutures == null)
- localFutures = new ArrayList<Future<Object>>();
- Callable<Object> callable = new weakReadLocalCallable(command);
-
localFutures.add(StageManager.getStage(Stage.READ).submit(callable));
- }
- else
- {
- if (remoteResults == null)
- remoteResults = new HashMap<ReadCommand, IAsyncResult>();
- Message message = command.makeReadMessage();
- if (logger.isDebugEnabled())
- logger.debug("weakread reading " + command + " from " +
message.getMessageId() + "@" + endPoint);
- remoteResults.put(command,
MessagingService.instance().sendRR(message, endPoint));
- }
- }
-
- // wait for results
- if (localFutures != null)
- {
- for (Future<Object> future : localFutures)
- {
- Row row;
- try
- {
- row = (Row) future.get();
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- rows.add(row);
- }
- }
- if (remoteResults != null)
- {
- for (Map.Entry<ReadCommand, IAsyncResult> entry :
remoteResults.entrySet())
- {
- ReadCommand command = entry.getKey();
- IAsyncResult iar = entry.getValue();
- byte[] body;
- body = iar.get(DatabaseDescriptor.getRpcTimeout(),
TimeUnit.MILLISECONDS);
- ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
- ReadResponse response =
ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
- assert response.row() != null;
- rows.add(response.row());
- if (randomlyReadRepair(command))
- StorageService.instance.doConsistencyCheck(response.row(),
command, iar.getFrom());
- }
- }
-
- return rows;
- }
-
- /*
- * This function executes the read protocol.
- // 1. Get the N nodes from storage service where the data needs to be
- // replicated
- // 2. Construct a message for read\write
- * 3. Set one of the messages to get the data and the rest to get the
digest
- // 4. SendRR ( to all the nodes above )
- // 5. Wait for a response from at least X nodes where X <= N and the
data node
- * 6. If the digest matches return the data.
- * 7. else carry out read repair by getting data from all the nodes.
- // 5. return success
+ /**
+ * This function executes local and remote reads, and blocks for the
results:
+ *
+ * 1. Get the replica locations, sorted by response time according to the
snitch
+ * 2. Send a data request to the closest replica, and digest requests to
either
+ * a) all the replicas, if read repair is enabled
+ * b) the closest R-1 replicas, where R is the number required to
satisfy the ConsistencyLevel
+ * 3. Wait for a response from R replicas
+ * 4. If the digests (if any) match the data return the data
+ * 5. else carry out read repair by getting data from all the nodes.
*/
- private static List<Row> strongRead(List<ReadCommand> commands,
ConsistencyLevel consistency_level) throws IOException, UnavailableException,
TimeoutException
+ private static List<Row> fetchRows(List<ReadCommand> commands,
ConsistencyLevel consistency_level) throws IOException, UnavailableException,
TimeoutException
{
- List<QuorumResponseHandler<Row>> quorumResponseHandlers = new
ArrayList<QuorumResponseHandler<Row>>();
+ List<ReadCallback<Row>> readCallbacks = new
ArrayList<ReadCallback<Row>>();
List<List<InetAddress>> commandEndpoints = new
ArrayList<List<InetAddress>>();
List<Row> rows = new ArrayList<Row>();
+ Set<ReadCommand> repairs = new HashSet<ReadCommand>();
// send out read requests
for (ReadCommand command: commands)
@@ -468,53 +393,65 @@ public class StorageProxy implements Sto
AbstractReplicationStrategy rs =
Table.open(command.table).getReplicationStrategy();
ReadResponseResolver resolver = new
ReadResponseResolver(command.table, command.key);
- QuorumResponseHandler<Row> handler =
rs.getQuorumResponseHandler(resolver, consistency_level);
+ ReadCallback<Row> handler = getReadCallback(resolver,
command.table, consistency_level);
handler.assureSufficientLiveNodes(endpoints);
- Message messages[] = new Message[endpoints.size()];
+ int targets;
+ if (randomlyReadRepair(command))
+ {
+ targets = endpoints.size();
+ if (targets > handler.blockfor)
+ repairs.add(command);
+ }
+ else
+ {
+ targets = handler.blockfor;
+ }
+ Message[] messages = new Message[targets];
+
// data-request message is sent to dataPoint, the node that will
actually get
// the data for us. The other replicas are only sent a digest
query.
- int n = 0;
- for (InetAddress endpoint : endpoints)
+ for (int i = 0; i < messages.length; i++)
{
+ InetAddress endpoint = endpoints.get(i);
Message m = endpoint.equals(dataPoint) ? message :
messageDigestOnly;
- messages[n++] = m;
+ messages[i] = m;
if (logger.isDebugEnabled())
- logger.debug("strongread reading " + (m == message ?
"data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" +
endpoint);
+ logger.debug("reading " + (m == message ? "data" :
"digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
}
MessagingService.instance().sendRR(messages, endpoints, handler);
- quorumResponseHandlers.add(handler);
+ readCallbacks.add(handler);
commandEndpoints.add(endpoints);
}
// read results and make a second pass for any digest mismatches
- List<QuorumResponseHandler<Row>> repairResponseHandlers = null;
+ List<RepairCallback<Row>> repairResponseHandlers = null;
for (int i = 0; i < commands.size(); i++)
{
- QuorumResponseHandler<Row> quorumResponseHandler =
quorumResponseHandlers.get(i);
+ ReadCallback<Row> readCallback = readCallbacks.get(i);
Row row;
ReadCommand command = commands.get(i);
+ List<InetAddress> endpoints = commandEndpoints.get(i);
try
{
long startTime2 = System.currentTimeMillis();
- row = quorumResponseHandler.get();
+ row = readCallback.get();
if (row != null)
rows.add(row);
if (logger.isDebugEnabled())
- logger.debug("quorumResponseHandler: " +
(System.currentTimeMillis() - startTime2) + " ms.");
+ logger.debug("Read: " + (System.currentTimeMillis() -
startTime2) + " ms.");
+
+ if (repairs.contains(command))
+ repairExecutor.schedule(new
RepairRunner(readCallback.resolver, command, endpoints),
DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
}
catch (DigestMismatchException ex)
{
- AbstractReplicationStrategy rs =
Table.open(command.table).getReplicationStrategy();
- ReadResponseResolver resolver = new
ReadResponseResolver(command.table, command.key);
- QuorumResponseHandler<Row> handler =
rs.getQuorumResponseHandler(resolver, consistency_level);
if (logger.isDebugEnabled())
logger.debug("Digest mismatch:", ex);
- Message messageRepair = command.makeReadMessage();
- MessagingService.instance().sendRR(messageRepair,
commandEndpoints.get(i), handler);
+ RepairCallback<Row> handler = repair(command, endpoints);
if (repairResponseHandlers == null)
- repairResponseHandlers = new
ArrayList<QuorumResponseHandler<Row>>();
+ repairResponseHandlers = new
ArrayList<RepairCallback<Row>>();
repairResponseHandlers.add(handler);
}
}
@@ -522,7 +459,7 @@ public class StorageProxy implements Sto
// read the results for the digest mismatch retries
if (repairResponseHandlers != null)
{
- for (QuorumResponseHandler<Row> handler : repairResponseHandlers)
+ for (RepairCallback<Row> handler : repairResponseHandlers)
{
try
{
@@ -540,6 +477,26 @@ public class StorageProxy implements Sto
return rows;
}
+ static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver,
String table, ConsistencyLevel consistencyLevel)
+ {
+ if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) ||
consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))
+ {
+ return new DatacenterReadCallback(resolver, consistencyLevel,
table);
+ }
+ return new ReadCallback(resolver, consistencyLevel, table);
+ }
+
+ // TODO repair resolver shouldn't take consistencylevel (it should repair
exactly as many as it receives replies for)
+ private static RepairCallback<Row> repair(ReadCommand command,
List<InetAddress> endpoints)
+ throws IOException
+ {
+ ReadResponseResolver resolver = new
ReadResponseResolver(command.table, command.key);
+ RepairCallback<Row> handler = new RepairCallback<Row>(resolver,
endpoints);
+ Message messageRepair = command.makeReadMessage();
+ MessagingService.instance().sendRR(messageRepair, endpoints, handler);
+ return handler;
+ }
+
/*
* This function executes the read protocol locally. Consistency checks
are performed in the background.
*/
@@ -590,7 +547,7 @@ public class StorageProxy implements Sto
// collect replies and resolve according to consistency
level
RangeSliceResponseResolver resolver = new
RangeSliceResponseResolver(command.keyspace, liveEndpoints);
AbstractReplicationStrategy rs =
Table.open(command.keyspace).getReplicationStrategy();
- QuorumResponseHandler<List<Row>> handler =
rs.getQuorumResponseHandler(resolver, consistency_level);
+ ReadCallback<List<Row>> handler =
getReadCallback(resolver, command.keyspace, consistency_level);
// TODO bail early if live endpoints can't satisfy
requested consistency level
for (InetAddress endpoint : liveEndpoints)
{
@@ -837,7 +794,7 @@ public class StorageProxy implements Sto
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new
RangeSliceResponseResolver(keyspace, liveEndpoints);
AbstractReplicationStrategy rs =
Table.open(keyspace).getReplicationStrategy();
- QuorumResponseHandler<List<Row>> handler =
rs.getQuorumResponseHandler(resolver, consistency_level);
+ ReadCallback<List<Row>> handler = getReadCallback(resolver,
keyspace, consistency_level);
// bail early if live endpoints can't satisfy requested
consistency level
if(handler.blockfor > liveEndpoints.size())
@@ -889,31 +846,6 @@ public class StorageProxy implements Sto
return hintedHandoffEnabled;
}
- static class weakReadLocalCallable implements Callable<Object>
- {
- private ReadCommand command;
-
- weakReadLocalCallable(ReadCommand command)
- {
- this.command = command;
- }
-
- public Object call() throws IOException
- {
- if (logger.isDebugEnabled())
- logger.debug("weakreadlocal reading " + command);
-
- Table table = Table.open(command.table);
- Row row = command.getRow(table);
-
- // Do the consistency checks in the background
- if (randomlyReadRepair(command))
- StorageService.instance.doConsistencyCheck(row, command,
FBUtilities.getLocalAddress());
-
- return row;
- }
- }
-
/**
* Performs the truncate operatoin, which effectively deletes all data from
* the column family cfname
@@ -959,4 +891,32 @@ public class StorageProxy implements Sto
{
return !Gossiper.instance.getUnreachableMembers().isEmpty();
}
+
+ private static class RepairRunner extends WrappedRunnable
+ {
+ private final IResponseResolver<Row> resolver;
+ private final ReadCommand command;
+ private final List<InetAddress> endpoints;
+
+ public RepairRunner(IResponseResolver<Row> resolver, ReadCommand
command, List<InetAddress> endpoints)
+ {
+ this.resolver = resolver;
+ this.command = command;
+ this.endpoints = endpoints;
+ }
+
+ protected void runMayThrow() throws IOException
+ {
+ try
+ {
+ resolver.resolve();
+ }
+ catch (DigestMismatchException e)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Digest mismatch:", e);
+ repair(command, endpoints);
+ }
+ }
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1055326&r1=1055325&r2=1055326&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Wed Jan 5 07:18:16 2011
@@ -454,18 +454,6 @@ public class StorageService implements I
}
/**
- * This method performs the requisite operations to make
- * sure that the N replicas are in sync. We do this in the
- * background when we do not care much about consistency.
- */
- public void doConsistencyCheck(Row row, ReadCommand command, InetAddress
dataSource)
- {
- List<InetAddress> endpoints =
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
- if (endpoints.size() > 1)
- consistencyManager_.submit(new ConsistencyChecker(command, row,
endpoints, dataSource));
- }
-
- /**
* for a keyspace, return the ranges and corresponding hosts for a given
keyspace.
* @param keyspace
* @return
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java?rev=1055326&r1=1055325&r2=1055326&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
Wed Jan 5 07:18:16 2011
@@ -96,7 +96,7 @@ public class ConsistencyLevelTest extend
IWriteResponseHandler writeHandler =
strategy.getWriteResponseHandler(hosts, hintedNodes, c);
- QuorumResponseHandler<Row> readHandler =
strategy.getQuorumResponseHandler(new ReadResponseResolver(table,
ByteBufferUtil.bytes("foo")), c);
+ ReadCallback<Row> readHandler =
StorageProxy.getReadCallback(new ReadResponseResolver(table,
ByteBufferUtil.bytes("foo")), table, c);
boolean isWriteUnavailable = false;
boolean isReadUnavailable = false;