Author: jbellis
Date: Wed Sep 22 03:42:16 2010
New Revision: 999745
URL: http://svn.apache.org/viewvc?rev=999745&view=rev
Log:
add test of secondary index recreation to StreamingTransferTest
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1415
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=999745&r1=999744&r2=999745&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
Wed Sep 22 03:42:16 2010
@@ -20,13 +20,18 @@ package org.apache.cassandra.streaming;
*/
import static junit.framework.Assert.assertEquals;
+import static org.apache.cassandra.Util.column;
import java.net.InetAddress;
import java.util.*;
+import org.apache.commons.lang.ArrayUtils;
+
import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.Util;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.filter.IFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.dht.IPartitioner;
@@ -34,6 +39,9 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.sstable.SSTableUtils;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.IndexClause;
+import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.thrift.IndexOperator;
import org.apache.cassandra.utils.FBUtilities;
import org.junit.BeforeClass;
@@ -52,37 +60,55 @@ public class StreamingTransferTest exten
@Test
public void testTransferTable() throws Exception
{
- // write a temporary SSTable, but don't register it
- Set<String> content = new HashSet<String>();
- content.add("key");
- content.add("key2");
- content.add("key3");
- SSTableReader sstable = SSTableUtils.writeSSTable(content);
- String tablename = sstable.getTableName();
- String cfname = sstable.getColumnFamilyName();
+ Table table = Table.open("Keyspace1");
+ ColumnFamilyStore cfs = table.getColumnFamilyStore("Indexed1");
+
+ // write a temporary SSTable, and unregister it
+ for (int i = 1; i <= 3; i++)
+ {
+ String key = "key" + i;
+ RowMutation rm = new RowMutation("Keyspace1", key.getBytes());
+ ColumnFamily cf = ColumnFamily.create(table.name,
cfs.columnFamily);
+ cf.addColumn(column(key, "v", new TimestampClock(0)));
+ cf.addColumn(new Column("birthdate".getBytes("UTF8"),
FBUtilities.toByteArray((long) i), new TimestampClock(0)));
+ rm.add(cf);
+ rm.apply();
+ }
+ cfs.forceBlockingFlush();
+ assert cfs.getSSTables().size() == 1;
+ SSTableReader sstable = cfs.getSSTables().iterator().next();
+ cfs.removeAllSSTables();
// transfer the first and last key
IPartitioner p = StorageService.getPartitioner();
List<Range> ranges = new ArrayList<Range>();
- ranges.add(new Range(p.getMinimumToken(),
p.getToken("key".getBytes())));
+ ranges.add(new Range(p.getMinimumToken(),
p.getToken("key1".getBytes())));
ranges.add(new Range(p.getToken("key2".getBytes()),
p.getMinimumToken()));
- StreamOutSession session = StreamOutSession.create(tablename, LOCAL,
null);
+ StreamOutSession session = StreamOutSession.create(table.name, LOCAL,
null);
StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges);
session.await();
// confirm that the SSTable was transferred and registered
- ColumnFamilyStore cfstore =
Table.open(tablename).getColumnFamilyStore(cfname);
- List<Row> rows = Util.getRangeSlice(cfstore);
+ List<Row> rows = Util.getRangeSlice(cfs);
assertEquals(2, rows.size());
- assert Arrays.equals(rows.get(0).key.key, "key".getBytes());
+ assert Arrays.equals(rows.get(0).key.key, "key1".getBytes());
assert Arrays.equals(rows.get(1).key.key, "key3".getBytes());
- assert rows.get(0).cf.getColumnsMap().size() == 1;
- assert rows.get(1).cf.getColumnsMap().size() == 1;
+ assertEquals(2, rows.get(0).cf.getColumnsMap().size());
+ assertEquals(2, rows.get(1).cf.getColumnsMap().size());
assert rows.get(1).cf.getColumn("key3".getBytes()) != null;
// and that the index and filter were properly recovered
- assert null !=
cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key"), new
QueryPath("Standard1")));
- assert null !=
cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key3"), new
QueryPath("Standard1")));
+ assert null !=
cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key1"), new
QueryPath(cfs.columnFamily)));
+ assert null !=
cfs.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key3"), new
QueryPath(cfs.columnFamily)));
+
+ // and that the secondary index works
+ IndexExpression expr = new
IndexExpression("birthdate".getBytes("UTF8"), IndexOperator.EQ,
FBUtilities.toByteArray(3L));
+ IndexClause clause = new IndexClause(Arrays.asList(expr),
ArrayUtils.EMPTY_BYTE_ARRAY, 100);
+ IFilter filter = new IdentityQueryFilter();
+ Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
+ rows = cfs.scan(clause, range, filter);
+ assertEquals(1, rows.size());
+ assert Arrays.equals(rows.get(0).key.key, "key3".getBytes());
}
@Test
@@ -115,21 +141,17 @@ public class StreamingTransferTest exten
// confirm that the SSTable was transferred and registered
ColumnFamilyStore cfstore =
Table.open(tablename).getColumnFamilyStore(cfname);
List<Row> rows = Util.getRangeSlice(cfstore);
- assertEquals(8, rows.size());
- assert Arrays.equals(rows.get(0).key.key, "key".getBytes());
- assert Arrays.equals(rows.get(2).key.key, "test".getBytes());
- assert Arrays.equals(rows.get(5).key.key, "transfer1".getBytes());
+ assertEquals(6, rows.size());
+ assert Arrays.equals(rows.get(0).key.key, "test".getBytes());
+ assert Arrays.equals(rows.get(3).key.key, "transfer1".getBytes());
assert rows.get(0).cf.getColumnsMap().size() == 1;
- assert rows.get(2).cf.getColumnsMap().size() == 1;
- assert rows.get(5).cf.getColumnsMap().size() == 1;
- assert rows.get(0).cf.getColumn("key".getBytes()) != null;
-
+ assert rows.get(3).cf.getColumnsMap().size() == 1;
+
// these keys fall outside of the ranges and should not be transferred.
assert null !=
cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer2"), new
QueryPath("Standard1")));
assert null !=
cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer3"), new
QueryPath("Standard1")));
// and that the index and filter were properly recovered
- assert null !=
cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key"), new
QueryPath("Standard1")));
assert null !=
cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("test"), new
QueryPath("Standard1")));
assert null !=
cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("transfer1"), new
QueryPath("Standard1")));
}