Author: slebresne
Date: Fri Nov 4 08:35:33 2011
New Revision: 1197426
URL: http://svn.apache.org/viewvc?rev=1197426&view=rev
Log:
Never return more columns than requested
patch by byronclark; reviewed by slebresne for CASSANDRA-3303 and 3395
Modified:
cassandra/branches/cassandra-1.0/CHANGES.txt
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1197426&r1=1197425&r2=1197426&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Fri Nov 4 08:35:33 2011
@@ -11,6 +11,7 @@
* add JMX call to clean (failed) repair sessions (CASSANDRA-3316)
* fix sstableloader reference acquisition bug (CASSANDRA-3438)
* fix estimated row size regression (CASSANDRA-3451)
+ * make sure we don't return more columns than asked (CASSANDRA-3303, 3395)
Merged from 0.8:
* acquire compactionlock during truncate (CASSANDRA-3399)
* fix displaying cfdef entries for super columnfamilies (CASSANDRA-3415)
Modified:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1197426&r1=1197425&r2=1197426&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java
(original)
+++
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ReadCommand.java
Fri Nov 4 08:35:33 2011
@@ -31,6 +31,7 @@ import org.apache.cassandra.io.IVersione
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageProducer;
import org.apache.cassandra.service.IReadCommand;
+import org.apache.cassandra.service.RepairCallback;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
@@ -66,7 +67,7 @@ public abstract class ReadCommand implem
this.queryPath = queryPath;
this.commandType = cmdType;
}
-
+
public boolean isDigestQuery()
{
return isDigestQuery;
@@ -81,7 +82,7 @@ public abstract class ReadCommand implem
{
return queryPath.columnFamilyName;
}
-
+
public abstract ReadCommand copy();
public abstract Row getRow(Table table) throws IOException;
@@ -95,6 +96,18 @@ public abstract class ReadCommand implem
{
return table;
}
+
+ // maybeGenerateRetryCommand is used to generate a retry for short reads
+ public ReadCommand maybeGenerateRetryCommand(RepairCallback handler, Row
row)
+ {
+ return null;
+ }
+
+ // maybeTrim removes columns from a response that is too long
+ public void maybeTrim(Row row)
+ {
+ // noop
+ }
}
class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
Modified:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java?rev=1197426&r1=1197425&r2=1197426&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
(original)
+++
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
Fri Nov 4 08:35:33 2011
@@ -19,17 +19,25 @@ package org.apache.cassandra.db;
import java.io.*;
import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.service.RepairCallback;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SliceFromReadCommand extends ReadCommand
{
+ static final Logger logger =
LoggerFactory.getLogger(SliceFromReadCommand.class);
+
public final ByteBuffer start, finish;
public final boolean reversed;
public final int count;
@@ -62,6 +70,64 @@ public class SliceFromReadCommand extend
}
@Override
+ public ReadCommand maybeGenerateRetryCommand(RepairCallback handler, Row
row)
+ {
+ int maxLiveColumns = handler.getMaxLiveColumns();
+ int liveColumnsInRow = row != null ? row.cf.getLiveColumnCount() : 0;
+
+ assert maxLiveColumns <= count;
+ if ((maxLiveColumns == count) && (liveColumnsInRow < count))
+ {
+ int retryCount = count + count - liveColumnsInRow;
+ return new RetriedSliceFromReadCommand(table, key, queryPath,
start, finish, reversed, count, retryCount);
+ }
+
+ return null;
+ }
+
+ @Override
+ public void maybeTrim(Row row)
+ {
+ if ((row == null) || (row.cf == null))
+ return;
+
+ int liveColumnsInRow = row.cf.getLiveColumnCount();
+
+ if (liveColumnsInRow > getRequestedCount())
+ {
+ int columnsToTrim = liveColumnsInRow - getRequestedCount();
+
+ logger.debug("trimming {} live columns to the originally requested
{}", row.cf.getLiveColumnCount(), getRequestedCount());
+
+ Collection<IColumn> columns;
+ if (reversed)
+ columns = row.cf.getSortedColumns();
+ else
+ columns = row.cf.getReverseSortedColumns();
+
+ Collection<ByteBuffer> toRemove = new HashSet<ByteBuffer>();
+
+ Iterator<IColumn> columnIterator = columns.iterator();
+ while (columnIterator.hasNext() && (toRemove.size() <
columnsToTrim))
+ {
+ IColumn column = columnIterator.next();
+ if (column.isLive())
+ toRemove.add(column.name());
+ }
+
+ for (ByteBuffer columnName : toRemove)
+ {
+ row.cf.remove(columnName);
+ }
+ }
+ }
+
+ protected int getRequestedCount()
+ {
+ return count;
+ }
+
+ @Override
public String toString()
{
return "SliceFromReadCommand(" +
Modified:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1197426&r1=1197425&r2=1197426&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
Fri Nov 4 08:35:33 2011
@@ -685,7 +685,10 @@ public class StorageProxy implements Sto
long startTime2 = System.currentTimeMillis();
Row row = handler.get();
if (row != null)
+ {
+ command.maybeTrim(row);
rows.add(row);
+ }
if (logger.isDebugEnabled())
logger.debug("Read: " + (System.currentTimeMillis() -
startTime2) + " ms.");
@@ -739,35 +742,21 @@ public class StorageProxy implements Sto
throw new AssertionError(e); // full data requested
from each node here, no digests should be sent
}
- // retry short reads, otherwise add the row to our
resultset
- if (command instanceof SliceFromReadCommand)
+ ReadCommand retryCommand =
command.maybeGenerateRetryCommand(handler, row);
+ if (retryCommand != null)
{
- // short reads are only possible on
SliceFromReadCommand
- SliceFromReadCommand sliceCommand =
(SliceFromReadCommand) command;
- int maxLiveColumns = handler.getMaxLiveColumns();
- int liveColumnsInRow = row != null ?
row.cf.getLiveColumnCount() : 0;
-
- assert maxLiveColumns <= sliceCommand.count;
- if ((maxLiveColumns == sliceCommand.count) &&
(liveColumnsInRow < sliceCommand.count))
- {
- logger.debug("detected short read: expected {}
columns, but only resolved {} columns",
- sliceCommand.count, liveColumnsInRow);
+ logger.debug("issuing retry for read command");
+ if (commandsToRetry == Collections.EMPTY_LIST)
+ commandsToRetry = new ArrayList<ReadCommand>();
+ commandsToRetry.add(retryCommand);
+ continue;
+ }
- int retryCount = sliceCommand.count +
sliceCommand.count - liveColumnsInRow;
- SliceFromReadCommand retryCommand = new
SliceFromReadCommand(command.table,
-
command.key,
-
command.queryPath,
-
sliceCommand.start,
-
sliceCommand.finish,
-
sliceCommand.reversed,
-
retryCount);
- if (commandsToRetry == Collections.EMPTY_LIST)
- commandsToRetry = new ArrayList<ReadCommand>();
- commandsToRetry.add(retryCommand);
- continue;
- }
+ if (row != null)
+ {
+ command.maybeTrim(row);
+ rows.add(row);
}
- rows.add(row);
}
}
} while (!commandsToRetry.isEmpty());