Author: jbellis
Date: Thu Aug 11 19:29:38 2011
New Revision: 1156758
URL: http://svn.apache.org/viewvc?rev=1156758&view=rev
Log:
provide monotonic read consistency
patch by jbellis; reviewed by slebresne for CASSANDRA-2494
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1156758&r1=1156757&r2=1156758&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Aug 11 19:29:38 2011
@@ -30,6 +30,9 @@
* make column family backed column map pluggable and introduce unsynchronized
ArrayList backed one to speedup reads (CASSANDRA-2843)
* refactoring of the secondary index api (CASSANDRA-2982)
+ * make CL > ONE reads wait for digest reconciliation before returning
+ (CASSANDRA-2494)
+
0.8.4
* include files-to-be-streamed in StreamInSession.getSources (CASSANDRA-2972)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1156758&r1=1156757&r2=1156758&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
Thu Aug 11 19:29:38 2011
@@ -22,15 +22,18 @@ import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import com.google.common.collect.AbstractIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RangeSliceReply;
import org.apache.cassandra.db.Row;
+import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.CloseableIterator;
@@ -43,9 +46,19 @@ import org.apache.cassandra.utils.MergeI
public class RangeSliceResponseResolver implements
IResponseResolver<Iterable<Row>>
{
private static final Logger logger_ =
LoggerFactory.getLogger(RangeSliceResponseResolver.class);
+
+ private static final Comparator<Pair<Row,InetAddress>> pairComparator =
new Comparator<Pair<Row, InetAddress>>()
+ {
+ public int compare(Pair<Row, InetAddress> o1, Pair<Row, InetAddress>
o2)
+ {
+ return o1.left.key.compareTo(o2.left.key);
+ }
+ };
+
private final String table;
private final List<InetAddress> sources;
protected final Collection<Message> responses = new
LinkedBlockingQueue<Message>();;
+ public final List<IAsyncResult> repairResults = new
ArrayList<IAsyncResult>();
public RangeSliceResponseResolver(String table, List<InetAddress> sources)
{
@@ -73,50 +86,7 @@ public class RangeSliceResponseResolver
iters.add(new RowIterator(reply.rows.iterator(),
response.getFrom()));
}
// for each row, compute the combination of all different versions
seen, and repair incomplete versions
- MergeIterator<Pair<Row,InetAddress>, Row> iter =
MergeIterator.get(iters, new Comparator<Pair<Row,InetAddress>>()
- {
- public int compare(Pair<Row,InetAddress> o1, Pair<Row,InetAddress>
o2)
- {
- return o1.left.key.compareTo(o2.left.key);
- }
- }, new MergeIterator.Reducer<Pair<Row,InetAddress>, Row>()
- {
- List<ColumnFamily> versions = new
ArrayList<ColumnFamily>(sources.size());
- List<InetAddress> versionSources = new
ArrayList<InetAddress>(sources.size());
- DecoratedKey key;
-
- public void reduce(Pair<Row,InetAddress> current)
- {
- key = current.left.key;
- versions.add(current.left.cf);
- versionSources.add(current.right);
- }
-
- protected Row getReduced()
- {
- ColumnFamily resolved = versions.size() > 1
- ?
RowRepairResolver.resolveSuperset(versions)
- : versions.get(0);
- if (versions.size() < sources.size())
- {
- // add placeholder rows for sources that didn't have any
data, so maybeScheduleRepairs sees them
- for (InetAddress source : sources)
- {
- if (!versionSources.contains(source))
- {
- versions.add(null);
- versionSources.add(source);
- }
- }
- }
- // resolved can be null even if versions doesn't have all
nulls because of the call to removeDeleted in resolveSuperSet
- if (resolved != null)
- RowRepairResolver.maybeScheduleRepairs(resolved, table,
key, versions, versionSources);
- versions.clear();
- versionSources.clear();
- return new Row(key, resolved);
- }
- });
+ MergeIterator<Pair<Row,InetAddress>, Row> iter =
MergeIterator.get(iters, pairComparator, new Reducer());
List<Row> resolvedRows = new ArrayList<Row>(n);
while (iter.hasNext())
@@ -163,4 +133,43 @@ public class RangeSliceResponseResolver
{
throw new UnsupportedOperationException();
}
+
+ private class Reducer extends MergeIterator.Reducer<Pair<Row,InetAddress>,
Row>
+ {
+ List<ColumnFamily> versions = new
ArrayList<ColumnFamily>(sources.size());
+ List<InetAddress> versionSources = new
ArrayList<InetAddress>(sources.size());
+ DecoratedKey key;
+
+ public void reduce(Pair<Row,InetAddress> current)
+ {
+ key = current.left.key;
+ versions.add(current.left.cf);
+ versionSources.add(current.right);
+ }
+
+ protected Row getReduced()
+ {
+ ColumnFamily resolved = versions.size() > 1
+ ? RowRepairResolver.resolveSuperset(versions)
+ : versions.get(0);
+ if (versions.size() < sources.size())
+ {
+ // add placeholder rows for sources that didn't have any data,
so maybeScheduleRepairs sees them
+ for (InetAddress source : sources)
+ {
+ if (!versionSources.contains(source))
+ {
+ versions.add(null);
+ versionSources.add(source);
+ }
+ }
+ }
+ // resolved can be null even if versions doesn't have all nulls
because of the call to removeDeleted in resolveSuperSet
+ if (resolved != null)
+
repairResults.addAll(RowRepairResolver.scheduleRepairs(resolved, table, key,
versions, versionSources));
+ versions.clear();
+ versionSources.clear();
+ return new Row(key, resolved);
+ }
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java?rev=1156758&r1=1156757&r2=1156758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
Thu Aug 11 19:29:38 2011
@@ -29,13 +29,14 @@ import java.util.concurrent.TimeoutExcep
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Row;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.utils.SimpleCondition;
-public class RepairCallback<T> implements IAsyncCallback
+public class RepairCallback implements IAsyncCallback
{
- private final IResponseResolver<T> resolver;
+ public final RowRepairResolver resolver;
private final List<InetAddress> endpoints;
private final SimpleCondition condition = new SimpleCondition();
private final long startTime;
@@ -49,14 +50,14 @@ public class RepairCallback<T> implement
* mismatch, and we're going to do full-data reads from everyone -- that
is, this is the final
* stage in the read process.)
*/
- public RepairCallback(IResponseResolver<T> resolver, List<InetAddress>
endpoints)
+ public RepairCallback(RowRepairResolver resolver, List<InetAddress>
endpoints)
{
this.resolver = resolver;
this.endpoints = endpoints;
this.startTime = System.currentTimeMillis();
}
- public T get() throws TimeoutException, DigestMismatchException,
IOException
+ public Row get() throws TimeoutException, DigestMismatchException,
IOException
{
long timeout = DatabaseDescriptor.getRpcTimeout() -
(System.currentTimeMillis() - startTime);
try
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java?rev=1156758&r1=1156757&r2=1156758&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
Thu Aug 11 19:29:38 2011
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -31,13 +32,16 @@ import org.apache.cassandra.db.columnite
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.FBUtilities;
public class RowRepairResolver extends AbstractRowResolver
{
protected int maxLiveColumns = 0;
+ public List<IAsyncResult> repairResults = Collections.emptyList();
public RowRepairResolver(String table, ByteBuffer key)
{
@@ -89,7 +93,7 @@ public class RowRepairResolver extends A
logger.debug("versions merged");
// resolved can be null even if versions doesn't have all nulls
because of the call to removeDeleted in resolveSuperSet
if (resolved != null)
- maybeScheduleRepairs(resolved, table, key, versions,
endpoints);
+ repairResults = scheduleRepairs(resolved, table, key,
versions, endpoints);
}
else
{
@@ -106,8 +110,10 @@ public class RowRepairResolver extends A
* For each row version, compare with resolved (the superset of all row
versions);
* if it is missing anything, send a mutation to the endpoint it come from.
*/
- public static void maybeScheduleRepairs(ColumnFamily resolved, String
table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress>
endpoints)
+ public static List<IAsyncResult> scheduleRepairs(ColumnFamily resolved,
String table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress>
endpoints)
{
+ List<IAsyncResult> results = new
ArrayList<IAsyncResult>(versions.size());
+
for (int i = 0; i < versions.size(); i++)
{
ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved);
@@ -126,8 +132,10 @@ public class RowRepairResolver extends A
{
throw new IOError(e);
}
- MessagingService.instance().sendOneWay(repairMessage,
endpoints.get(i));
+ results.add(MessagingService.instance().sendRR(repairMessage,
endpoints.get(i)));
}
+
+ return results;
}
static ColumnFamily resolveSuperset(List<ColumnFamily> versions)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1156758&r1=1156757&r2=1156758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu
Aug 11 19:29:38 2011
@@ -30,8 +30,9 @@ import javax.management.ObjectName;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
-import org.apache.cassandra.net.CachingMessageProducer;
-import org.apache.cassandra.net.MessageProducer;
+
+import org.apache.cassandra.net.*;
+
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -49,9 +50,6 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.IndexClause;
@@ -571,7 +569,7 @@ public class StorageProxy implements Sto
repairCommands.clear();
// read results and make a second pass for any digest mismatches
- List<RepairCallback<Row>> repairResponseHandlers = null;
+ List<RepairCallback> repairResponseHandlers = null;
for (int i = 0; i < commandsToSend.size(); i++)
{
ReadCallback<Row> handler = readCallbacks.get(i);
@@ -598,7 +596,7 @@ public class StorageProxy implements Sto
if (logger.isDebugEnabled())
logger.debug("Digest mismatch: {}", ex.toString());
RowRepairResolver resolver = new
RowRepairResolver(command.table, command.key);
- RepairCallback<Row> repairHandler = new
RepairCallback<Row>(resolver, handler.endpoints);
+ RepairCallback repairHandler = new
RepairCallback(resolver, handler.endpoints);
if (repairCommands == Collections.EMPTY_LIST)
repairCommands = new ArrayList<ReadCommand>();
@@ -608,7 +606,7 @@ public class StorageProxy implements Sto
MessagingService.instance().sendRR(command, endpoint,
repairHandler);
if (repairResponseHandlers == null)
- repairResponseHandlers = new
ArrayList<RepairCallback<Row>>();
+ repairResponseHandlers = new
ArrayList<RepairCallback>();
repairResponseHandlers.add(repairHandler);
}
}
@@ -622,48 +620,49 @@ public class StorageProxy implements Sto
for (int i = 0; i < repairCommands.size(); i++)
{
ReadCommand command = repairCommands.get(i);
- RepairCallback<Row> handler =
repairResponseHandlers.get(i);
+ RepairCallback handler = repairResponseHandlers.get(i);
+ FBUtilities.waitOnFutures(handler.resolver.repairResults,
DatabaseDescriptor.getRpcTimeout());
+ Row row;
try
{
- Row row = handler.get();
-
- if (command instanceof SliceFromReadCommand)
- {
- // short reads are only possible on
SliceFromReadCommand
- SliceFromReadCommand sliceCommand =
(SliceFromReadCommand)command;
- int maxLiveColumns = handler.getMaxLiveColumns();
- int liveColumnsInRow = row != null ?
row.cf.getLiveColumnCount() : 0;
-
- assert maxLiveColumns <= sliceCommand.count;
- if ((maxLiveColumns == sliceCommand.count) &&
(liveColumnsInRow < sliceCommand.count))
- {
- if (logger.isDebugEnabled())
- logger.debug("detected short read:
expected {} columns, but only resolved {} columns",
- sliceCommand.count,
liveColumnsInRow);
-
- int retryCount = sliceCommand.count +
sliceCommand.count - liveColumnsInRow;
- SliceFromReadCommand retryCommand = new
SliceFromReadCommand(command.table,
-
command.key,
-
command.queryPath,
-
sliceCommand.start,
-
sliceCommand.finish,
-
sliceCommand.reversed,
-
retryCount);
- if (commandsToRetry == Collections.EMPTY_LIST)
- commandsToRetry = new
ArrayList<ReadCommand>();
- commandsToRetry.add(retryCommand);
- }
- else if (row != null)
- rows.add(row);
- }
- else if (row != null)
- rows.add(row);
+ row = handler.get();
}
catch (DigestMismatchException e)
{
throw new AssertionError(e); // full data requested
from each node here, no digests should be sent
}
+
+ // retry short reads, otherwise add the row to our
resultset
+ if (command instanceof SliceFromReadCommand)
+ {
+ // short reads are only possible on
SliceFromReadCommand
+ SliceFromReadCommand sliceCommand =
(SliceFromReadCommand) command;
+ int maxLiveColumns = handler.getMaxLiveColumns();
+ int liveColumnsInRow = row != null ?
row.cf.getLiveColumnCount() : 0;
+
+ assert maxLiveColumns <= sliceCommand.count;
+ if ((maxLiveColumns == sliceCommand.count) &&
(liveColumnsInRow < sliceCommand.count))
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("detected short read: expected {}
columns, but only resolved {} columns",
+ sliceCommand.count,
liveColumnsInRow);
+
+ int retryCount = sliceCommand.count +
sliceCommand.count - liveColumnsInRow;
+ SliceFromReadCommand retryCommand = new
SliceFromReadCommand(command.table,
+
command.key,
+
command.queryPath,
+
sliceCommand.start,
+
sliceCommand.finish,
+
sliceCommand.reversed,
+
retryCount);
+ if (commandsToRetry == Collections.EMPTY_LIST)
+ commandsToRetry = new ArrayList<ReadCommand>();
+ commandsToRetry.add(retryCommand);
+ continue;
+ }
+ }
+ rows.add(row);
}
}
} while (!commandsToRetry.isEmpty());
@@ -769,6 +768,7 @@ public class StorageProxy implements Sto
rows.add(row);
logger.debug("range slices read {}", row.key);
}
+ FBUtilities.waitOnFutures(resolver.repairResults,
DatabaseDescriptor.getRpcTimeout());
}
catch (TimeoutException ex)
{
@@ -1035,6 +1035,7 @@ public class StorageProxy implements Sto
rows.add(row);
logger.debug("read {}", row);
}
+ FBUtilities.waitOnFutures(resolver.repairResults,
DatabaseDescriptor.getRpcTimeout());
}
catch (TimeoutException ex)
{
@@ -1044,7 +1045,7 @@ public class StorageProxy implements Sto
}
catch (DigestMismatchException e)
{
- throw new RuntimeException(e);
+ throw new AssertionError(e);
}
if (rows.size() >= index_clause.count)
return rows.subList(0, index_clause.count);
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1156758&r1=1156757&r2=1156758&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Thu
Aug 11 19:29:38 2011
@@ -32,6 +32,8 @@ import java.security.NoSuchAlgorithmExce
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -50,6 +52,7 @@ import org.apache.cassandra.dht.IPartiti
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.PropertyFileSnitch;
+import org.apache.cassandra.net.IAsyncResult;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
@@ -575,6 +578,12 @@ public class FBUtilities
}
}
+ public static void waitOnFutures(List<IAsyncResult> results, long ms)
throws TimeoutException
+ {
+ for (IAsyncResult result : results)
+ result.get(ms, TimeUnit.MILLISECONDS);
+ }
+
public static IPartitioner newPartitioner(String partitionerClassName)
throws ConfigurationException
{
if (!partitionerClassName.contains("."))