Author: jbellis
Date: Wed Feb 3 20:00:46 2010
New Revision: 906209
URL: http://svn.apache.org/viewvc?rev=906209&view=rev
Log:
allow wrapped range queries. patch by jbellis; reviewed by stuhood for
CASSANDRA-758
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreUtils.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/Util.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=906209&r1=906208&r2=906209&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Wed Feb 3 20:00:46 2010
@@ -415,7 +415,7 @@
return switchMemtable(memtable_, true);
}
- void forceBlockingFlush() throws IOException, ExecutionException,
InterruptedException
+ public void forceBlockingFlush() throws IOException, ExecutionException,
InterruptedException
{
Future<?> future = forceFlush();
if (future != null)
@@ -937,9 +937,11 @@
range_slice. still opens one randomaccessfile per key, which sucks.
something like compactioniterator
would be better.
*/
- private RangeReply getKeyRange(final DecoratedKey startWith, final
DecoratedKey stopAt, int maxResults)
+ private boolean getKeyRange(List<String> keys, final DecoratedKey
startWith, final DecoratedKey stopAt, int maxResults)
throws IOException, ExecutionException, InterruptedException
{
+ // getKeyRange requires start <= stop. getRangeSlice handles range
wrapping if necessary.
+ assert stopAt.isEmpty() || startWith.compareTo(stopAt) <= 0;
// create a CollatedIterator that will return unique keys from
different sources
// (current memtable, historical memtables, and SSTables) in the
correct order.
List<Iterator<DecoratedKey>> iterators = new
ArrayList<Iterator<DecoratedKey>>();
@@ -1009,23 +1011,20 @@
try
{
// pull keys out of the CollatedIterator
- List<String> keys = new ArrayList<String>();
boolean rangeCompletedLocally = false;
for (DecoratedKey current : reduced)
{
if (!stopAt.isEmpty() && stopAt.compareTo(current) < 0)
{
- rangeCompletedLocally = true;
- break;
+ return true;
}
keys.add(current.key);
if (keys.size() >= maxResults)
{
- rangeCompletedLocally = true;
- break;
+ return true;
}
}
- return new RangeReply(keys, rangeCompletedLocally);
+ return false;
}
finally
{
@@ -1054,19 +1053,34 @@
public RangeSliceReply getRangeSlice(byte[] super_column, final
DecoratedKey startKey, final DecoratedKey finishKey, int keyMax, SliceRange
sliceRange, List<byte[]> columnNames)
throws IOException, ExecutionException, InterruptedException
{
- RangeReply rr = getKeyRange(startKey, finishKey, keyMax);
- List<Row> rows = new ArrayList<Row>(rr.keys.size());
+ List<String> keys = new ArrayList<String>();
+ boolean completed;
+ if (finishKey.isEmpty() || startKey.compareTo(finishKey) <= 0)
+ {
+ completed = getKeyRange(keys, startKey, finishKey, keyMax);
+ }
+ else
+ {
+ // wrapped range
+ DecoratedKey emptyKey = new
DecoratedKey(StorageService.getPartitioner().getMinimumToken(), null);
+ completed = getKeyRange(keys, startKey, emptyKey, keyMax);
+ if (!completed)
+ {
+ completed = getKeyRange(keys, emptyKey, finishKey, keyMax);
+ }
+ }
+ List<Row> rows = new ArrayList<Row>(keys.size());
final QueryPath queryPath = new QueryPath(columnFamily_,
super_column, null);
final SortedSet<byte[]> columnNameSet = new
TreeSet<byte[]>(getComparator());
if (columnNames != null)
columnNameSet.addAll(columnNames);
- for (String key : rr.keys)
+ for (String key : keys)
{
QueryFilter filter = sliceRange == null ? new
NamesQueryFilter(key, queryPath, columnNameSet) : new SliceQueryFilter(key,
queryPath, sliceRange.start, sliceRange.finish, sliceRange.reversed,
sliceRange.count);
rows.add(new Row(key, getColumnFamily(filter)));
}
- return new RangeSliceReply(rows, rr.rangeCompletedLocally);
+ return new RangeSliceReply(rows, completed);
}
public AbstractType getComparator()
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/Util.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/Util.java?rev=906209&r1=906208&r2=906209&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/Util.java
(original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/Util.java Wed Feb
3 20:00:46 2010
@@ -71,4 +71,26 @@
new SliceRange(ArrayUtils.EMPTY_BYTE_ARRAY,
ArrayUtils.EMPTY_BYTE_ARRAY, false, 10000),
null);
}
+
+ /**
+ * Writes out a bunch of rows for a single column family.
+ *
+ * @param rows A group of RowMutations for the same table and column
family.
+ * @return The ColumnFamilyStore that was used.
+ */
+ public static ColumnFamilyStore writeColumnFamily(List<RowMutation> rms)
throws IOException, ExecutionException, InterruptedException
+ {
+ RowMutation first = rms.get(0);
+ String tablename = first.getTable();
+ String cfname = first.columnFamilyNames().iterator().next();
+
+ Table table = Table.open(tablename);
+ ColumnFamilyStore store = table.getColumnFamilyStore(cfname);
+
+ for (RowMutation rm : rms)
+ rm.apply();
+
+ store.forceBlockingFlush();
+ return store;
+ }
}
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=906209&r1=906208&r2=906209&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Wed Feb 3 20:00:46 2010
@@ -28,6 +28,9 @@
import static junit.framework.Assert.assertEquals;
import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.utils.WrappedRunnable;
import java.net.InetAddress;
@@ -62,7 +65,7 @@
rm.add(new QueryPath("Standard1", null, "Column1".getBytes()),
"asdf".getBytes(), 0);
rm.add(new QueryPath("Standard1", null, "Column2".getBytes()),
"asdf".getBytes(), 0);
rms.add(rm);
- ColumnFamilyStore store =
ColumnFamilyStoreUtils.writeColumnFamily(rms);
+ ColumnFamilyStore store = Util.writeColumnFamily(rms);
Table table = Table.open("Keyspace1");
List<SSTableReader> ssTables = table.getAllSSTablesOnDisk();
@@ -111,7 +114,7 @@
rm.add(new QueryPath(columnFamilyName, null, "0".getBytes()), new
byte[0], j);
rms.add(rm);
}
- ColumnFamilyStore store =
ColumnFamilyStoreUtils.writeColumnFamily(rms);
+ ColumnFamilyStore store = Util.writeColumnFamily(rms);
List<Range> ranges = new ArrayList<Range>();
IPartitioner partitioner = new CollatingOrderPreservingPartitioner();
@@ -126,5 +129,25 @@
public void testAntiCompaction1() throws IOException, ExecutionException,
InterruptedException
{
testAntiCompaction("Standard1", 100);
- }
+ }
+
+ @Test
+ public void testWrappedRangeQuery() throws IOException,
ExecutionException, InterruptedException
+ {
+ List<RowMutation> rms = new LinkedList<RowMutation>();
+ RowMutation rm;
+ rm = new RowMutation("Keyspace2", "key1");
+ rm.add(new QueryPath("Standard1", null, "Column1".getBytes()),
"asdf".getBytes(), 0);
+ rms.add(rm);
+ Util.writeColumnFamily(rms);
+
+ rm = new RowMutation("Keyspace2", "key2");
+ rm.add(new QueryPath("Standard1", null, "Column1".getBytes()),
"asdf".getBytes(), 0);
+ rms.add(rm);
+ ColumnFamilyStore cfs = Util.writeColumnFamily(rms);
+
+ IPartitioner p = StorageService.getPartitioner();
+ RangeSliceReply result =
cfs.getRangeSlice(ArrayUtils.EMPTY_BYTE_ARRAY, p.decorateKey("key2"),
p.decorateKey("key1"), 10, null, Arrays.asList("asdf".getBytes()));
+ assertEquals(2, result.rows.size());
+ }
}
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=906209&r1=906208&r2=906209&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Wed Feb 3 20:00:46 2010
@@ -38,6 +38,7 @@
import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.config.DatabaseDescriptorTest;
+import org.apache.cassandra.Util;
import org.junit.Before;
import org.junit.Test;
@@ -111,7 +112,7 @@
rm = new RowMutation(tablename, "key1");
rm.add(new QueryPath(cfname, null, "Column1".getBytes()),
"asdf".getBytes(), 0);
rms.add(rm);
- ColumnFamilyStoreUtils.writeColumnFamily(rms);
+ Util.writeColumnFamily(rms);
// sample
validator = new Validator(new CFPair(tablename, cfname));
@@ -170,8 +171,8 @@
rm.add(new QueryPath(cfname, null, "Column1".getBytes()),
"asdf".getBytes(), 0);
rms.add(rm);
// with two SSTables
- ColumnFamilyStoreUtils.writeColumnFamily(rms);
- ColumnFamilyStore store =
ColumnFamilyStoreUtils.writeColumnFamily(rms);
+ Util.writeColumnFamily(rms);
+ ColumnFamilyStore store = Util.writeColumnFamily(rms);
TreePair old = aes.getRendezvousPair(tablename, cfname, REMOTE);
// force a readonly compaction, and wait for it to finish