Author: jbellis
Date: Tue Aug 3 14:58:14 2010
New Revision: 981906
URL: http://svn.apache.org/viewvc?rev=981906&view=rev
Log:
handle index scans across multiple nodes and consistency levels
Modified:
cassandra/trunk/interface/cassandra.thrift
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
cassandra/trunk/test/system/test_thrift_server.py
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
Modified: cassandra/trunk/interface/cassandra.thrift
URL:
http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.thrift?rev=981906&r1=981905&r2=981906&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.thrift (original)
+++ cassandra/trunk/interface/cassandra.thrift Tue Aug 3 14:58:14 2010
@@ -46,7 +46,7 @@ namespace rb CassandraThrift
# for every edit that doesn't result in a change to major/minor.
#
# See the Semantic Versioning Specification (SemVer) http://semver.org.
-const string VERSION = "9.0.0"
+const string VERSION = "10.0.0"
#
@@ -254,8 +254,8 @@ struct IndexExpression {
struct IndexClause {
1: required list<IndexExpression> expressions
- 2: required i32 count=100,
- 3: optional binary start_key,
+ 2: required binary start_key,
+ 3: required i32 count=100,
}
/**
@@ -274,12 +274,6 @@ struct KeyRange {
5: required i32 count=100
}
-struct RowPredicate {
- 1: optional list<binary> keys,
- 2: optional KeyRange key_range,
- 3: optional IndexClause index_clause
-}
-
/**
A KeySlice is key followed by the data it maps to. A collection of
KeySlice is returned by the get_range_slice operation.
@@ -409,16 +403,6 @@ service Cassandra {
throws (1:InvalidRequestException ire,
2:UnavailableException ue, 3:TimedOutException te),
/**
- Performs a get_slice for column_parent and predicate for the given keys in
parallel.
- @Deprecated; use `scan`
- */
- map<binary,list<ColumnOrSuperColumn>> multiget_slice(1:required list<binary>
keys,
- 2:required ColumnParent
column_parent,
- 3:required
SlicePredicate predicate,
- 4:required
ConsistencyLevel consistency_level=ONE)
- throws (1:InvalidRequestException ire,
2:UnavailableException ue, 3:TimedOutException te),
-
- /**
returns the number of columns matching <code>predicate</code> for a
particular <code>key</code>,
<code>ColumnFamily</code> and optionally <code>SuperColumn</code>.
*/
@@ -429,6 +413,15 @@ service Cassandra {
throws (1:InvalidRequestException ire, 2:UnavailableException ue,
3:TimedOutException te),
/**
+ Performs a get_slice for column_parent and predicate for the given keys in
parallel.
+ */
+ map<binary,list<ColumnOrSuperColumn>> multiget_slice(1:required list<binary>
keys,
+ 2:required ColumnParent
column_parent,
+ 3:required
SlicePredicate predicate,
+ 4:required
ConsistencyLevel consistency_level=ONE)
+ throws (1:InvalidRequestException ire,
2:UnavailableException ue, 3:TimedOutException te),
+
+ /**
Perform a get_count in parallel on the given list<binary> keys. The return
value maps keys to the count found.
*/
map<binary, i32> multiget_count(1:required string keyspace,
@@ -439,8 +432,7 @@ service Cassandra {
throws (1:InvalidRequestException ire, 2:UnavailableException ue,
3:TimedOutException te),
/**
- returns a subset of columns for a range of keys.
- @Deprecated; use `scan`
+ returns a subset of columns for a contiguous range of keys.
*/
list<KeySlice> get_range_slices(1:required ColumnParent column_parent,
2:required SlicePredicate predicate,
@@ -448,20 +440,13 @@ service Cassandra {
4:required ConsistencyLevel
consistency_level=ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException
ue, 3:TimedOutException te),
- /** Returns the subset of columns specified in SlicePredicate for the rows
requested in RowsPredicate */
- list<KeySlice> scan(1:required ColumnParent column_parent,
- 2:required RowPredicate row_predicate,
- 3:required SlicePredicate column_predicate,
- 4:required ConsistencyLevel consistency_level=ONE)
+ /** Returns the subset of columns specified in SlicePredicate for the rows
matching the IndexClause */
+ list<KeySlice> get_indexed_slices(1:required ColumnParent column_parent,
+ 2:required IndexClause index_clause,
+ 3:required SlicePredicate column_predicate,
+ 4:required ConsistencyLevel
consistency_level=ONE)
throws (1:InvalidRequestException ire, 2:UnavailableException
ue, 3:TimedOutException te),
- /** Counts the subset of columns specified in SlicePredicate for the rows
requested in RowsPredicate */
- list<KeyCount> scan_count(1:required ColumnParent column_parent,
- 2:required RowPredicate row_predicate,
- 3:required SlicePredicate column_predicate,
- 4:required ConsistencyLevel consistency_level=ONE)
- throws (1:InvalidRequestException ire, 2:UnavailableException ue,
3:TimedOutException te),
-
# modification methods
/**
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=981906&r1=981905&r2=981906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue
Aug 3 14:58:14 2010
@@ -1055,42 +1055,56 @@ public class ColumnFamilyStore implement
return rows;
}
- public List<Row> scan(IndexClause indexClause, IFilter dataFilter)
+ public List<Row> scan(IndexClause clause, AbstractBounds range, IFilter
dataFilter)
{
// TODO: allow merge join instead of just one index + loop
- IndexExpression first = highestSelectivityPredicate(indexClause);
+ IndexExpression first = highestSelectivityPredicate(clause);
ColumnFamilyStore indexCFS =
getIndexedColumnFamilyStore(first.column_name);
assert indexCFS != null;
DecoratedKey indexKey = indexCFS.partitioner_.decorateKey(first.value);
- QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
- new
QueryPath(indexCFS.getColumnFamilyName()),
-
ArrayUtils.EMPTY_BYTE_ARRAY,
-
ArrayUtils.EMPTY_BYTE_ARRAY,
- null,
- false,
-
indexClause.count);
List<Row> rows = new ArrayList<Row>();
- ColumnFamily indexRow = indexCFS.getColumnFamily(indexFilter);
- if (indexRow == null)
- return rows;
-
- for (byte[] dataKey : indexRow.getColumnNames())
- {
- DecoratedKey dk = partitioner_.decorateKey(dataKey);
- ColumnFamily data = getColumnFamily(new QueryFilter(dk, new
QueryPath(columnFamily_), dataFilter));
- boolean accepted = true;
- for (IndexExpression expression : indexClause.expressions)
+ byte[] startKey = clause.start_key;
+
+ outer:
+ while (true)
+ {
+ /* we don't have a way to get the key back from the DK -- we just
have a token --
+ * so, we need to loop after starting with start_key, until we get
to keys in the given `range`.
+ * But, if the calling StorageProxy is doing a good job estimating
data from each range, the range
+ * should be pretty close to `start_key`. */
+ QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
+ new
QueryPath(indexCFS.getColumnFamilyName()),
+ startKey,
+
ArrayUtils.EMPTY_BYTE_ARRAY,
+ null,
+ false,
+ clause.count);
+ ColumnFamily indexRow = indexCFS.getColumnFamily(indexFilter);
+ if (indexRow == null)
+ break;
+
+ byte[] dataKey = null;
+ int n = 0;
+ Iterator<byte[]> iter = indexRow.getColumnNames().iterator();
+ while (iter.hasNext())
{
- // (we can skip "first" since we already know it's satisfied)
- if (expression != first && !satisfies(data, expression))
- {
- accepted = false;
- break;
- }
+ dataKey = iter.next();
+ n++;
+ DecoratedKey dk = partitioner_.decorateKey(dataKey);
+ if (!range.right.equals(partitioner_.getMinimumToken()) &&
range.right.compareTo(dk.token) < 0)
+ break outer;
+ if (!range.contains(dk.token))
+ continue;
+ ColumnFamily data = getColumnFamily(new QueryFilter(dk, new
QueryPath(columnFamily_), dataFilter));
+ if (satisfies(data, clause, first))
+ rows.add(new Row(dk, data));
+ if (rows.size() == clause.count)
+ break outer;
}
- if (accepted)
- rows.add(new Row(dk, data));
+ startKey = dataKey;
+ if (n < clause.count)
+ break;
}
return rows;
@@ -1115,10 +1129,19 @@ public class ColumnFamilyStore implement
return best;
}
- private static boolean satisfies(ColumnFamily data, IndexExpression
expression)
+ private static boolean satisfies(ColumnFamily data, IndexClause clause,
IndexExpression first)
{
- IColumn column = data.getColumn(expression.column_name);
- return column != null && Arrays.equals(column.value(),
expression.value);
+ for (IndexExpression expression : clause.expressions)
+ {
+ // (we can skip "first" since we already know it's satisfied)
+ if (expression == first)
+ continue;
+ // check column data vs expression
+ IColumn column = data.getColumn(expression.column_name);
+ if (column != null && !Arrays.equals(column.value(),
expression.value))
+ return false;
+ }
+ return true;
}
public AbstractType getComparator()
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java?rev=981906&r1=981905&r2=981906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java Tue
Aug 3 14:58:14 2010
@@ -23,6 +23,7 @@ import java.io.*;
import java.util.Arrays;
import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.ICompactSerializer2;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.Message;
@@ -42,14 +43,16 @@ public class IndexScanCommand
public final String column_family;
public final IndexClause index_clause;
public final SlicePredicate predicate;
+ public final AbstractBounds range;
- public IndexScanCommand(String keyspace, String column_family, IndexClause
index_clause, SlicePredicate predicate)
+ public IndexScanCommand(String keyspace, String column_family, IndexClause
index_clause, SlicePredicate predicate, AbstractBounds range)
{
this.keyspace = keyspace;
this.column_family = column_family;
this.index_clause = index_clause;
this.predicate = predicate;
+ this.range = range;
}
public Message getMessage()
@@ -85,6 +88,7 @@ public class IndexScanCommand
TSerializer ser = new TSerializer(new TBinaryProtocol.Factory());
FBUtilities.serialize(ser, o.index_clause, out);
FBUtilities.serialize(ser, o.predicate, out);
+ AbstractBounds.serializer().serialize(o.range, out);
}
public IndexScanCommand deserialize(DataInput in) throws IOException
@@ -97,8 +101,9 @@ public class IndexScanCommand
FBUtilities.deserialize(dser, indexClause, in);
SlicePredicate predicate = new SlicePredicate();
FBUtilities.deserialize(dser, predicate, in);
+ AbstractBounds range = AbstractBounds.serializer().deserialize(in);
- return new IndexScanCommand(keyspace, columnFamily, indexClause,
predicate);
+ return new IndexScanCommand(keyspace, columnFamily, indexClause,
predicate, range);
}
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java?rev=981906&r1=981905&r2=981906&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
Tue Aug 3 14:58:14 2010
@@ -18,6 +18,8 @@
package org.apache.cassandra.service;
+import java.util.List;
+
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.net.IVerbHandler;
@@ -36,7 +38,8 @@ public class IndexScanVerbHandler implem
{
IndexScanCommand command = IndexScanCommand.read(message);
ColumnFamilyStore cfs =
Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
- RangeSliceReply reply = new
RangeSliceReply(cfs.scan(command.index_clause,
QueryFilter.getFilter(command.predicate, cfs.getComparator())));
+ List<Row> rows = cfs.scan(command.index_clause, command.range,
QueryFilter.getFilter(command.predicate, cfs.getComparator()));
+ RangeSliceReply reply = new RangeSliceReply(rows);
Message response = reply.getReply(message);
if (logger.isDebugEnabled())
logger.debug("Sending " + reply+ " to " +
message.getMessageId() + "@" + message.getFrom());
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=981906&r1=981905&r2=981906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Tue
Aug 3 14:58:14 2010
@@ -40,10 +40,7 @@ import org.apache.cassandra.concurrent.S
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.*;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.TokenMetadata;
@@ -51,9 +48,7 @@ import org.apache.cassandra.net.IAsyncCa
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.LatencyTracker;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -496,7 +491,7 @@ public class StorageProxy implements Sto
List<AbstractBounds> ranges = getRestrictedRanges(command.range);
// now scan until we have enough results
List<Row> rows = new ArrayList<Row>(command.max_keys);
- for (AbstractBounds range : getRangeIterator(ranges,
command.range.left))
+ for (AbstractBounds range : ranges)
{
List<InetAddress> liveEndpoints =
StorageService.instance.getLiveNaturalEndpoints(command.keyspace, range.right);
@@ -531,8 +526,7 @@ public class StorageProxy implements Sto
RangeSliceResponseResolver resolver = new
RangeSliceResponseResolver(command.keyspace, liveEndpoints);
AbstractReplicationStrategy rs =
StorageService.instance.getReplicationStrategy(command.keyspace);
QuorumResponseHandler<List<Row>> handler =
rs.getQuorumResponseHandler(resolver, consistency_level);
- // TODO bail early if live endpoints can't satisfy requested
- // consistency level
+ // TODO bail early if live endpoints can't satisfy requested
consistency level
for (InetAddress endpoint : liveEndpoints)
{
MessagingService.instance.sendRR(message, endpoint,
handler);
@@ -637,43 +631,6 @@ public class StorageProxy implements Sto
}
/**
- * returns an iterator that will return ranges in ring order, starting
with the one that contains the start token
- */
- private static Iterable<AbstractBounds> getRangeIterator(final
List<AbstractBounds> ranges, Token start)
- {
- // find the one to start with
- int i;
- for (i = 0; i < ranges.size(); i++)
- {
- AbstractBounds range = ranges.get(i);
- if (range.contains(start) || range.left.equals(start))
- break;
- }
- AbstractBounds range = ranges.get(i);
- assert range.contains(start) || range.left.equals(start); // make sure
the loop didn't just end b/c ranges were exhausted
-
- // return an iterable that starts w/ the correct range and iterates
the rest in ring order
- final int begin = i;
- return new Iterable<AbstractBounds>()
- {
- public Iterator<AbstractBounds> iterator()
- {
- return new AbstractIterator<AbstractBounds>()
- {
- int n = 0;
-
- protected AbstractBounds computeNext()
- {
- if (n == ranges.size())
- return endOfData();
- return ranges.get((begin + n++) % ranges.size());
- }
- };
- }
- };
- }
-
- /**
* compute all ranges we're going to query, in sorted order, so that we
get the correct results back.
* 1) computing range intersections is necessary because nodes can be
replica destinations for many ranges,
* so if we do not restrict each scan to the specific range we want we
will get duplicate results.
@@ -720,6 +677,15 @@ public class StorageProxy implements Sto
// sort in order that the original query range would see them.
int queryOrder1 = queryRange.left.compareTo(o1.left);
int queryOrder2 = queryRange.left.compareTo(o2.left);
+
+ // check for exact match with query start
+ assert !(queryOrder1 == 0 && queryOrder2 == 0);
+ if (queryOrder1 == 0)
+ return -1;
+ if (queryOrder2 == 0)
+ return 1;
+
+ // order segments in order they should be traversed
if (queryOrder1 < queryOrder2)
return -1; // o1 comes after query start, o2 wraps to after
if (queryOrder1 > queryOrder2)
@@ -785,26 +751,51 @@ public class StorageProxy implements Sto
return writeStats.getRecentLatencyMicros();
}
- public static List<Row> scan(IndexScanCommand command, ConsistencyLevel
consistency_level)
+ public static List<Row> scan(String keyspace, String column_family,
IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel
consistency_level)
throws IOException, TimeoutException
{
IPartitioner p = StorageService.getPartitioner();
- Token startToken = command.index_clause.start_key == null ?
p.getMinimumToken() : p.getToken(command.index_clause.start_key);
- List<InetAddress> endpoints =
StorageService.instance.getLiveNaturalEndpoints(command.keyspace, startToken);
- // TODO iterate through endpoints in token order like getRangeSlice
- Message message = command.getMessage();
- RangeSliceResponseResolver resolver = new
RangeSliceResponseResolver(command.keyspace, endpoints);
- AbstractReplicationStrategy rs =
StorageService.instance.getReplicationStrategy(command.keyspace);
- QuorumResponseHandler<List<Row>> handler =
rs.getQuorumResponseHandler(resolver, consistency_level);
- MessagingService.instance.sendRR(message, endpoints.get(0), handler);
- try
- {
- return handler.get();
- }
- catch (DigestMismatchException e)
+
+ Token leftToken = index_clause.start_key == null ? p.getMinimumToken()
: p.getToken(index_clause.start_key);
+ List<AbstractBounds> ranges = getRestrictedRanges(new
Bounds(leftToken, p.getMinimumToken()));
+ logger.debug("scan ranges are " + StringUtils.join(ranges, ","));
+
+ // now scan until we have enough results
+ List<Row> rows = new ArrayList<Row>(index_clause.count);
+ for (AbstractBounds range : ranges)
{
- throw new RuntimeException(e);
+ List<InetAddress> liveEndpoints =
StorageService.instance.getLiveNaturalEndpoints(keyspace, range.right);
+
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(),
liveEndpoints);
+
+ // collect replies and resolve according to consistency level
+ RangeSliceResponseResolver resolver = new
RangeSliceResponseResolver(keyspace, liveEndpoints);
+ AbstractReplicationStrategy rs =
StorageService.instance.getReplicationStrategy(keyspace);
+ QuorumResponseHandler<List<Row>> handler =
rs.getQuorumResponseHandler(resolver, consistency_level);
+ // TODO bail early if live endpoints can't satisfy requested
consistency level
+ IndexScanCommand command = new IndexScanCommand(keyspace,
column_family, index_clause, column_predicate, range);
+ Message message = command.getMessage();
+ for (InetAddress endpoint : liveEndpoints)
+ {
+ MessagingService.instance.sendRR(message, endpoint, handler);
+ if (logger.isDebugEnabled())
+ logger.debug("reading " + command + " from " +
message.getMessageId() + "@" + endpoint);
+ }
+
+ List<Row> theseRows;
+ try
+ {
+ theseRows = handler.get();
+ }
+ catch (DigestMismatchException e)
+ {
+ throw new RuntimeException(e);
+ }
+ rows.addAll(theseRows);
+ if (rows.size() >= index_clause.count)
+ return rows.subList(0, index_clause.count);
}
+
+ return rows;
}
static class weakReadLocalCallable implements Callable<Object>
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=981906&r1=981905&r2=981906&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
Tue Aug 3 14:58:14 2010
@@ -522,12 +522,6 @@ public class CassandraServer implements
String keyspace = keySpace.get();
checkKeyspaceAndLoginAuthorized(AccessLevel.READONLY);
- return getRangeSlicesInternal(keyspace, column_parent, range,
predicate, consistency_level);
- }
-
- private List<KeySlice> getRangeSlicesInternal(String keyspace,
ColumnParent column_parent, KeyRange range, SlicePredicate predicate,
ConsistencyLevel consistency_level)
- throws InvalidRequestException, UnavailableException, TimedOutException
- {
ThriftValidation.validateColumnParent(keyspace, column_parent);
ThriftValidation.validatePredicate(keyspace, column_parent, predicate);
ThriftValidation.validateKeyRange(range);
@@ -584,48 +578,21 @@ public class CassandraServer implements
return keySlices;
}
- public List<KeySlice> scan(ColumnParent column_parent, RowPredicate
row_predicate, SlicePredicate column_predicate, ConsistencyLevel
consistency_level) throws InvalidRequestException, UnavailableException,
TimedOutException, TException
+ public List<KeySlice> get_indexed_slices(ColumnParent column_parent,
IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel
consistency_level) throws InvalidRequestException, UnavailableException,
TimedOutException, TException
{
if (logger.isDebugEnabled())
logger.debug("scan");
checkKeyspaceAndLoginAuthorized(AccessLevel.READONLY);
-
- if (row_predicate.keys != null)
- {
- Map<byte[], List<ColumnOrSuperColumn>> rowMap =
multigetSliceInternal(keySpace.get(), row_predicate.keys, column_parent,
column_predicate, consistency_level);
- List<KeySlice> rows = new ArrayList<KeySlice>(rowMap.size());
- for (Map.Entry<byte[], List<ColumnOrSuperColumn>> entry :
rowMap.entrySet())
- {
- rows.add(new KeySlice(entry.getKey(), entry.getValue()));
- }
- return rows;
- }
-
- if (row_predicate.key_range != null)
- {
- return getRangeSlicesInternal(keySpace.get(), column_parent,
row_predicate.key_range, column_predicate, consistency_level);
- }
-
- if (row_predicate.index_clause != null)
- {
- return scanIndexInternal(keySpace.get(), column_parent,
row_predicate.index_clause, column_predicate, consistency_level);
- }
-
- throw new InvalidRequestException("row predicate must specify keys,
key_range, or index_clause");
- }
-
- private List<KeySlice> scanIndexInternal(String keyspace, ColumnParent
column_parent, IndexClause index_clause, SlicePredicate predicate,
ConsistencyLevel consistency_level)
- throws InvalidRequestException, TimedOutException
- {
+ String keyspace = keySpace.get();
ThriftValidation.validateColumnParent(keyspace, column_parent);
- ThriftValidation.validatePredicate(keyspace, column_parent, predicate);
+ ThriftValidation.validatePredicate(keyspace, column_parent,
column_predicate);
ThriftValidation.validateIndexClauses(keyspace,
column_parent.column_family, index_clause);
- List<Row> rows = null;
+ List<Row> rows;
try
{
- rows = StorageProxy.scan(new IndexScanCommand(keyspace,
column_parent.column_family, index_clause, predicate), consistency_level);
+ rows = StorageProxy.scan(keyspace, column_parent.column_family,
index_clause, column_predicate, consistency_level);
}
catch (IOException e)
{
@@ -635,18 +602,7 @@ public class CassandraServer implements
{
throw new TimedOutException();
}
- return thriftifyKeySlices(rows, column_parent, predicate);
- }
-
- public List<KeyCount> scan_count(ColumnParent column_parent, RowPredicate
row_predicate, SlicePredicate column_predicate, ConsistencyLevel
consistency_level) throws InvalidRequestException, UnavailableException,
TimedOutException, TException
- {
- List<KeySlice> rows = scan(column_parent, row_predicate,
column_predicate, consistency_level);
- List<KeyCount> rowCounts = new ArrayList<KeyCount>(rows.size());
- for (KeySlice slice : rows)
- {
- rowCounts.add(new KeyCount(slice.key, slice.columns.size()));
- }
- return rowCounts;
+ return thriftifyKeySlices(rows, column_parent, column_predicate);
}
public Set<String> describe_keyspaces() throws TException
Modified: cassandra/trunk/test/system/test_thrift_server.py
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_thrift_server.py?rev=981906&r1=981905&r2=981906&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_thrift_server.py (original)
+++ cassandra/trunk/test/system/test_thrift_server.py Tue Aug 3 14:58:14 2010
@@ -1273,20 +1273,21 @@ class TestMutations(ThriftTester):
# simple query on one index expression
cp = ColumnParent('Indexed1')
sp = SlicePredicate(slice_range=SliceRange('', ''))
- clause = IndexClause([IndexExpression('birthdate', IndexOperator.EQ,
_i64(1))])
- result = client.scan(cp, RowPredicate(index_clause=clause), sp,
ConsistencyLevel.ONE)
+ clause = IndexClause([IndexExpression('birthdate', IndexOperator.EQ,
_i64(1))], '')
+ result = client.get_indexed_slices(cp, clause, sp,
ConsistencyLevel.ONE)
assert len(result) == 1, result
assert result[0].key == 'key1'
assert len(result[0].columns) == 1, result[0].columns
# solo unindexed expression is invalid
- clause = IndexClause([IndexExpression('b', IndexOperator.EQ, _i64(1))])
- _expect_exception(lambda: client.scan(cp,
RowPredicate(index_clause=clause), sp, ConsistencyLevel.ONE),
InvalidRequestException)
+ clause = IndexClause([IndexExpression('b', IndexOperator.EQ,
_i64(1))], '')
+ _expect_exception(lambda: client.get_indexed_slices(cp, clause, sp,
ConsistencyLevel.ONE), InvalidRequestException)
# but unindexed expression added to indexed one is ok
clause = IndexClause([IndexExpression('b', IndexOperator.EQ, _i64(3)),
- IndexExpression('birthdate', IndexOperator.EQ,
_i64(3))])
- result = client.scan(cp, RowPredicate(index_clause=clause), sp,
ConsistencyLevel.ONE)
+ IndexExpression('birthdate', IndexOperator.EQ,
_i64(3))],
+ '')
+ result = client.get_indexed_slices(cp, clause, sp,
ConsistencyLevel.ONE)
assert len(result) == 1, result
assert result[0].key == 'key3'
assert len(result[0].columns) == 2, result[0].columns
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=981906&r1=981905&r2=981906&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Tue Aug 3 14:58:14 2010
@@ -23,6 +23,8 @@ import java.util.*;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+
import static org.junit.Assert.assertNull;
import org.junit.Test;
@@ -31,6 +33,7 @@ import org.apache.cassandra.CleanupHelpe
import org.apache.cassandra.Util;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.IndexClause;
import org.apache.cassandra.thrift.IndexExpression;
@@ -172,12 +175,14 @@ public class ColumnFamilyStoreTest exten
rm.apply();
IndexExpression expr = new
IndexExpression("birthdate".getBytes("UTF8"), IndexOperator.EQ,
FBUtilities.toByteArray(1L));
- IndexClause clause = new IndexClause(Arrays.asList(expr), 100);
+ IndexClause clause = new IndexClause(Arrays.asList(expr),
ArrayUtils.EMPTY_BYTE_ARRAY, 100);
IFilter filter = new IdentityQueryFilter();
- List<Row> rows =
Table.open("Keyspace1").getColumnFamilyStore("Indexed1").scan(clause, filter);
+ IPartitioner p = StorageService.getPartitioner();
+ Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
+ List<Row> rows =
Table.open("Keyspace1").getColumnFamilyStore("Indexed1").scan(clause, range,
filter);
assert rows != null;
- assert rows.size() == 2;
+ assert rows.size() == 2 : StringUtils.join(rows, ",");
assert Arrays.equals("k1".getBytes(), rows.get(0).key.key);
assert Arrays.equals("k3".getBytes(), rows.get(1).key.key);
assert Arrays.equals(FBUtilities.toByteArray(1L),
rows.get(0).cf.getColumn("birthdate".getBytes("UTF8")).value());
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java?rev=981906&r1=981905&r2=981906&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
Tue Aug 3 14:58:14 2010
@@ -20,7 +20,10 @@ import org.apache.cassandra.db.Timestamp
import org.apache.cassandra.db.filter.IFilter;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.IndexClause;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
@@ -65,9 +68,11 @@ public class SSTableWriterTest extends C
cfs.addSSTable(sstr);
IndexExpression expr = new
IndexExpression("birthdate".getBytes("UTF8"), IndexOperator.EQ,
FBUtilities.toByteArray(1L));
- IndexClause clause = new IndexClause(Arrays.asList(expr), 100);
+ IndexClause clause = new IndexClause(Arrays.asList(expr),
"".getBytes(), 100);
IFilter filter = new IdentityQueryFilter();
- List<Row> rows = cfs.scan(clause, filter);
+ IPartitioner p = StorageService.getPartitioner();
+ Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
+ List<Row> rows = cfs.scan(clause, range, filter);
assertEquals("IndexExpression should return two rows on
recoverAndOpen",2, rows.size());
assertTrue("First result should be
'k1'",Arrays.equals("k1".getBytes(), rows.get(0).key.key));