Author: jbellis
Date: Tue Aug 2 13:27:10 2011
New Revision: 1153115
URL: http://svn.apache.org/viewvc?rev=1153115&view=rev
Log:
fix "short reads" in [multi]get
patch by Byron Clark; reviewed by jbellis for CASSANDRA-2643
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1153115&r1=1153114&r2=1153115&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Aug 2 13:27:10 2011
@@ -25,6 +25,7 @@
* fix potential use of free'd native memory in SerializingCache
(CASSANDRA-1951)
* add paging to get_count (CASSANDRA-2894)
+ * fix "short reads" in [multi]get (CASSANDRA-2643)
0.8.3
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java?rev=1153115&r1=1153114&r2=1153115&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/AbstractColumnContainer.java
Tue Aug 2 13:27:10 2011
@@ -225,6 +225,19 @@ public abstract class AbstractColumnCont
return getColumnCount();
}
+ public int getLiveColumnCount()
+ {
+ int count = 0;
+
+ for (IColumn column : columns.values())
+ {
+ if (column.isLive())
+ count++;
+ }
+
+ return count;
+ }
+
public Iterator<IColumn> iterator()
{
return columns.values().iterator();
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java?rev=1153115&r1=1153114&r2=1153115&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java
Tue Aug 2 13:27:10 2011
@@ -83,4 +83,9 @@ public abstract class AbstractRowResolve
{
return replies.keySet();
}
+
+ public int getMaxLiveColumns()
+ {
+ throw new UnsupportedOperationException();
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=1153115&r1=1153114&r2=1153115&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
Tue Aug 2 13:27:10 2011
@@ -43,4 +43,6 @@ public interface IResponseResolver<T> {
public void preprocess(Message message);
public Iterable<Message> getMessages();
+
+ public int getMaxLiveColumns();
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1153115&r1=1153114&r2=1153115&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
Tue Aug 2 13:27:10 2011
@@ -158,4 +158,9 @@ public class RangeSliceResponseResolver
{
return responses;
}
+
+ public int getMaxLiveColumns()
+ {
+ throw new UnsupportedOperationException();
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java?rev=1153115&r1=1153114&r2=1153115&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
Tue Aug 2 13:27:10 2011
@@ -82,4 +82,9 @@ public class RepairCallback<T> implement
{
return true;
}
+
+ public int getMaxLiveColumns()
+ {
+ return resolver.getMaxLiveColumns();
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java?rev=1153115&r1=1153114&r2=1153115&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
Tue Aug 2 13:27:10 2011
@@ -37,6 +37,8 @@ import org.apache.cassandra.utils.*;
public class RowRepairResolver extends AbstractRowResolver
{
+ protected int maxLiveColumns = 0;
+
public RowRepairResolver(String table, ByteBuffer key)
{
super(key, table);
@@ -76,6 +78,12 @@ public class RowRepairResolver extends A
ColumnFamily resolved;
if (versions.size() > 1)
{
+ for (ColumnFamily cf : versions)
+ {
+ int liveColumns = cf.getLiveColumnCount();
+ if (liveColumns > maxLiveColumns)
+ maxLiveColumns = liveColumns;
+ }
resolved = resolveSuperset(versions);
if (logger.isDebugEnabled())
logger.debug("versions merged");
@@ -90,8 +98,9 @@ public class RowRepairResolver extends A
if (logger.isDebugEnabled())
logger.debug("resolve: " + (System.currentTimeMillis() -
startTime) + " ms.");
- return new Row(key, resolved);
- }
+
+ return new Row(key, resolved);
+ }
/**
* For each row version, compare with resolved (the superset of all row
versions);
@@ -163,4 +172,9 @@ public class RowRepairResolver extends A
{
throw new UnsupportedOperationException();
}
+
+ public int getMaxLiveColumns()
+ {
+ return maxLiveColumns;
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1153115&r1=1153114&r2=1153115&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Tue
Aug 2 13:27:10 2011
@@ -501,117 +501,172 @@ public class StorageProxy implements Sto
{
List<ReadCallback<Row>> readCallbacks = new
ArrayList<ReadCallback<Row>>();
List<Row> rows = new ArrayList<Row>();
+ List<ReadCommand> commandsToRetry = Collections.emptyList();
+ List<ReadCommand> repairCommands = Collections.emptyList();
- // send out read requests
- for (ReadCommand command: commands)
+ do
{
- assert !command.isDigestQuery();
- logger.debug("Command/ConsistencyLevel is {}/{}", command,
consistency_level);
+ List<ReadCommand> commandsToSend = commandsToRetry.isEmpty() ?
commands : commandsToRetry;
- List<InetAddress> endpoints =
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
-
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(),
endpoints);
+ if (!commandsToRetry.isEmpty())
+ logger.debug("Retrying {} commands", commandsToRetry.size());
- RowDigestResolver resolver = new RowDigestResolver(command.table,
command.key);
- ReadCallback<Row> handler = getReadCallback(resolver, command,
consistency_level, endpoints);
- handler.assureSufficientLiveNodes();
- assert !handler.endpoints.isEmpty();
-
- // The data-request message is sent to dataPoint, the node that
will actually get
- // the data for us. The other replicas are only sent a digest
query.
- ReadCommand digestCommand = null;
- if (handler.endpoints.size() > 1)
+ // send out read requests
+ for (ReadCommand command : commandsToSend)
{
- digestCommand = command.copy();
- digestCommand.setDigestQuery(true);
- }
+ assert !command.isDigestQuery();
+ logger.debug("Command/ConsistencyLevel is {}/{}", command,
consistency_level);
- InetAddress dataPoint = handler.endpoints.get(0);
- if (dataPoint.equals(FBUtilities.getBroadcastAddress()))
- {
- logger.debug("reading data locally");
- StageManager.getStage(Stage.READ).execute(new
LocalReadRunnable(command, handler));
- }
- else
- {
- logger.debug("reading data from {}", dataPoint);
- MessagingService.instance().sendRR(command, dataPoint,
handler);
- }
+ List<InetAddress> endpoints =
StorageService.instance.getLiveNaturalEndpoints(command.table,
+
command.key);
+
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(),
endpoints);
- // We lazy-construct the digest Message object since it may not be
necessary if we
- // are doing a local digest read, or no digest reads at all.
- MessageProducer producer = new
CachingMessageProducer(digestCommand);
- for (InetAddress digestPoint : handler.endpoints.subList(1,
handler.endpoints.size()))
- {
- if (digestPoint.equals(FBUtilities.getBroadcastAddress()))
+ RowDigestResolver resolver = new
RowDigestResolver(command.table, command.key);
+ ReadCallback<Row> handler = getReadCallback(resolver, command,
consistency_level, endpoints);
+ handler.assureSufficientLiveNodes();
+ assert !handler.endpoints.isEmpty();
+
+ // The data-request message is sent to dataPoint, the node
that will actually get
+ // the data for us. The other replicas are only sent a digest
query.
+ ReadCommand digestCommand = null;
+ if (handler.endpoints.size() > 1)
+ {
+ digestCommand = command.copy();
+ digestCommand.setDigestQuery(true);
+ }
+
+ InetAddress dataPoint = handler.endpoints.get(0);
+ if (dataPoint.equals(FBUtilities.getBroadcastAddress()))
{
- logger.debug("reading digest locally");
- StageManager.getStage(Stage.READ).execute(new
LocalReadRunnable(digestCommand, handler));
+ logger.debug("reading data locally");
+ StageManager.getStage(Stage.READ).execute(new
LocalReadRunnable(command, handler));
}
else
{
- logger.debug("reading digest from {}", digestPoint);
- MessagingService.instance().sendRR(producer, digestPoint,
handler);
+ logger.debug("reading data from {}", dataPoint);
+ MessagingService.instance().sendRR(command, dataPoint,
handler);
}
- }
-
- readCallbacks.add(handler);
- }
- // read results and make a second pass for any digest mismatches
- List<RepairCallback<Row>> repairResponseHandlers = null;
- for (int i = 0; i < commands.size(); i++)
- {
- ReadCallback<Row> handler = readCallbacks.get(i);
- Row row;
- ReadCommand command = commands.get(i);
- try
- {
- long startTime2 = System.currentTimeMillis();
- row = handler.get(); // CL.ONE is special cased here to ignore
digests even if some have arrived
- if (row != null)
- rows.add(row);
+ // We lazy-construct the digest Message object since it may
not be necessary if we
+ // are doing a local digest read, or no digest reads at all.
+ MessageProducer producer = new
CachingMessageProducer(digestCommand);
+ for (InetAddress digestPoint : handler.endpoints.subList(1,
handler.endpoints.size()))
+ {
+ if (digestPoint.equals(FBUtilities.getBroadcastAddress()))
+ {
+ logger.debug("reading digest locally");
+ StageManager.getStage(Stage.READ).execute(new
LocalReadRunnable(digestCommand, handler));
+ }
+ else
+ {
+ logger.debug("reading digest from {}", digestPoint);
+ MessagingService.instance().sendRR(producer,
digestPoint, handler);
+ }
+ }
- if (logger.isDebugEnabled())
- logger.debug("Read: " + (System.currentTimeMillis() -
startTime2) + " ms.");
- }
- catch (TimeoutException ex)
- {
- if (logger.isDebugEnabled())
- logger.debug("Read timeout: {}", ex.toString());
- throw ex;
+ readCallbacks.add(handler);
}
- catch (DigestMismatchException ex)
- {
- if (logger.isDebugEnabled())
- logger.debug("Digest mismatch: {}", ex.toString());
- RowRepairResolver resolver = new
RowRepairResolver(command.table, command.key);
- RepairCallback<Row> repairHandler = new
RepairCallback<Row>(resolver, handler.endpoints);
- for (InetAddress endpoint : handler.endpoints)
- MessagingService.instance().sendRR(command, endpoint,
repairHandler);
- if (repairResponseHandlers == null)
- repairResponseHandlers = new
ArrayList<RepairCallback<Row>>();
- repairResponseHandlers.add(repairHandler);
- }
- }
+ if (repairCommands != Collections.EMPTY_LIST)
+ repairCommands.clear();
- // read the results for the digest mismatch retries
- if (repairResponseHandlers != null)
- {
- for (RepairCallback<Row> handler : repairResponseHandlers)
+ // read results and make a second pass for any digest mismatches
+ List<RepairCallback<Row>> repairResponseHandlers = null;
+ for (int i = 0; i < commandsToSend.size(); i++)
{
+ ReadCallback<Row> handler = readCallbacks.get(i);
+ Row row;
+ ReadCommand command = commands.get(i);
try
{
- Row row = handler.get();
+ long startTime2 = System.currentTimeMillis();
+ row = handler.get(); // CL.ONE is special cased here to
ignore digests even if some have arrived
if (row != null)
rows.add(row);
+
+ if (logger.isDebugEnabled())
+ logger.debug("Read: " + (System.currentTimeMillis() -
startTime2) + " ms.");
}
- catch (DigestMismatchException e)
+ catch (TimeoutException ex)
{
- throw new AssertionError(e); // full data requested from
each node here, no digests should be sent
+ if (logger.isDebugEnabled())
+ logger.debug("Read timeout: {}", ex.toString());
+ throw ex;
+ }
+ catch (DigestMismatchException ex)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Digest mismatch: {}", ex.toString());
+ RowRepairResolver resolver = new
RowRepairResolver(command.table, command.key);
+ RepairCallback<Row> repairHandler = new
RepairCallback<Row>(resolver, handler.endpoints);
+
+ if (repairCommands == Collections.EMPTY_LIST)
+ repairCommands = new ArrayList<ReadCommand>();
+ repairCommands.add(command);
+
+ for (InetAddress endpoint : handler.endpoints)
+ MessagingService.instance().sendRR(command, endpoint,
repairHandler);
+
+ if (repairResponseHandlers == null)
+ repairResponseHandlers = new
ArrayList<RepairCallback<Row>>();
+ repairResponseHandlers.add(repairHandler);
}
}
- }
+
+ if (commandsToRetry != Collections.EMPTY_LIST)
+ commandsToRetry.clear();
+
+ // read the results for the digest mismatch retries
+ if (repairResponseHandlers != null)
+ {
+ for (int i = 0; i < repairCommands.size(); i++)
+ {
+ ReadCommand command = repairCommands.get(i);
+ RepairCallback<Row> handler =
repairResponseHandlers.get(i);
+
+ try
+ {
+ Row row = handler.get();
+
+ if (command instanceof SliceFromReadCommand)
+ {
+ // short reads are only possible on
SliceFromReadCommand
+ SliceFromReadCommand sliceCommand =
(SliceFromReadCommand)command;
+ int maxLiveColumns = handler.getMaxLiveColumns();
+ int liveColumnsInRow = row != null ?
row.cf.getLiveColumnCount() : 0;
+
+ assert maxLiveColumns <= sliceCommand.count;
+ if ((maxLiveColumns == sliceCommand.count) &&
(liveColumnsInRow < sliceCommand.count))
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("detected short read:
expected {} columns, but only resolved {} columns",
+ sliceCommand.count,
liveColumnsInRow);
+
+ int retryCount = sliceCommand.count +
sliceCommand.count - liveColumnsInRow;
+ SliceFromReadCommand retryCommand = new
SliceFromReadCommand(command.table,
+
command.key,
+
command.queryPath,
+
sliceCommand.start,
+
sliceCommand.finish,
+
sliceCommand.reversed,
+
retryCount);
+ if (commandsToRetry == Collections.EMPTY_LIST)
+ commandsToRetry = new
ArrayList<ReadCommand>();
+ commandsToRetry.add(retryCommand);
+ }
+ else if (row != null)
+ rows.add(row);
+ }
+ else if (row != null)
+ rows.add(row);
+ }
+ catch (DigestMismatchException e)
+ {
+ throw new AssertionError(e); // full data requested
from each node here, no digests should be sent
+ }
+ }
+ }
+ } while (!commandsToRetry.isEmpty());
return rows;
}