Author: jbellis
Date: Wed Sep 22 03:41:52 2010
New Revision: 999742
URL: http://svn.apache.org/viewvc?rev=999742&view=rev
Log:
split 2ary index build out from bloom/row index build, and move into stream
session post-processing. bloom/row index construction moved into
SSTableWriter.Builder and is now run on CompactionManager executor
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1415
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed
Sep 22 03:41:52 2010
@@ -33,6 +33,7 @@ import javax.management.ObjectName;
import com.google.common.collect.Iterables;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -205,12 +206,9 @@ public class ColumnFamilyStore implement
logger.info("Creating index {}.{}", table,
indexedCfMetadata.cfName);
Runnable runnable = new WrappedRunnable()
{
- public void runMayThrow() throws IOException,
ExecutionException, InterruptedException
+ public void runMayThrow() throws IOException
{
- logger.debug("Submitting index build to
compactionmanager");
- ReducingKeyIterator iter = new
ReducingKeyIterator(getSSTables());
- Future future =
CompactionManager.instance.submitIndexBuild(ColumnFamilyStore.this,
FBUtilities.getSingleColumnSet(info.name), iter);
- future.get();
+ buildSecondaryIndexes(getSSTables(),
FBUtilities.getSingleColumnSet(info.name));
logger.info("Index {} complete", indexedCfMetadata.cfName);
SystemTable.setIndexBuilt(table, indexedCfMetadata.cfName);
}
@@ -220,6 +218,26 @@ public class ColumnFamilyStore implement
indexedColumns.put(info.name, indexedCfs);
}
+ public void buildSecondaryIndexes(Collection<SSTableReader> sstables,
SortedSet<byte[]> columns)
+ {
+ logger.debug("Submitting index build to compactionmanager");
+ Future future = CompactionManager.instance.submitIndexBuild(this,
columns, new ReducingKeyIterator(sstables));
+ try
+ {
+ future.get();
+ for (byte[] column : columns)
+ getIndexedColumnFamilyStore(column).forceBlockingFlush();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
// called when dropping or renaming a CF. Performs mbean housekeeping.
void unregisterMBean()
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Wed
Sep 22 03:41:52 2010
@@ -18,38 +18,38 @@
package org.apache.cassandra.db;
-import java.io.IOException;
import java.io.File;
+import java.io.IOException;
import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
-import javax.management.*;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import org.apache.commons.collections.PredicateUtils;
+import org.apache.commons.collections.iterators.CollatingIterator;
+import org.apache.commons.collections.iterators.FilterIterator;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.*;
+import org.apache.cassandra.io.AbstractCompactedRow;
+import org.apache.cassandra.io.CompactionIterator;
+import org.apache.cassandra.io.ICompactionInfo;
import org.apache.cassandra.io.sstable.*;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.AntiEntropyService;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.AntiEntropyService;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import java.net.InetAddress;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.collections.iterators.FilterIterator;
-import org.apache.commons.collections.iterators.CollatingIterator;
-import org.apache.commons.collections.PredicateUtils;
-
public class CompactionManager implements CompactionManagerMBean
{
public static final String MBEAN_OBJECT_NAME =
"org.apache.cassandra.db:type=CompactionManager";
@@ -508,6 +508,20 @@ public class CompactionManager implement
return executor.submit(runnable);
}
+ public Future<SSTableReader> submitSSTableBuild(Descriptor desc)
+ {
+ final SSTableWriter.Builder builder =
SSTableWriter.createBuilder(desc);
+ Callable<SSTableReader> callable = new Callable<SSTableReader>()
+ {
+ public SSTableReader call() throws IOException
+ {
+ executor.beginCompaction(builder.cfs, builder);
+ return builder.build();
+ }
+ };
+ return executor.submit(callable);
+ }
+
private static class AntiCompactionIterator extends CompactionIterator
{
private Set<SSTableScanner> scanners;
@@ -550,6 +564,11 @@ public class CompactionManager implement
}
return scanners;
}
+
+ public String getTaskType()
+ {
+ return "Anticompaction";
+ }
}
public void checkAllColumnFamilies() throws IOException
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Sep 22
03:41:52 2010
@@ -456,7 +456,8 @@ public class Table
synchronized (indexLockFor(key.key))
{
ColumnFamily cf = readCurrentIndexedColumns(key, cfs,
columns);
- applyIndexUpdates(key.key, memtablesToFlush, cf, cfs,
cf.getColumnNames(), null);
+ if (cf != null)
+ applyIndexUpdates(key.key, memtablesToFlush, cf, cfs,
cf.getColumnNames(), null);
}
}
finally
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
Wed Sep 22 03:41:52 2010
@@ -161,4 +161,8 @@ implements Closeable, ICompactionInfo
return bytesRead;
}
+ public String getTaskType()
+ {
+ return "Compaction";
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java Wed
Sep 22 03:41:52 2010
@@ -5,4 +5,6 @@ public interface ICompactionInfo
public long getTotalBytes();
public long getBytesRead();
+
+ public String getTaskType();
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
Wed Sep 22 03:41:52 2010
@@ -4,16 +4,16 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOError;
import java.io.IOException;
+import java.util.Iterator;
import com.google.common.collect.AbstractIterator;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.io.ICompactionInfo;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-public class KeyIterator extends AbstractIterator<DecoratedKey> implements
IKeyIterator
+public class KeyIterator extends AbstractIterator<DecoratedKey> implements
Iterator<DecoratedKey>, Closeable
{
private final BufferedRandomAccessFile in;
private final Descriptor desc;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
Wed Sep 22 03:41:52 2010
@@ -66,6 +66,11 @@ public class ReducingKeyIterator impleme
return m;
}
+ public String getTaskType()
+ {
+ return "Secondary index build";
+ }
+
public boolean hasNext()
{
return iter.hasNext();
@@ -73,7 +78,7 @@ public class ReducingKeyIterator impleme
public DecoratedKey next()
{
- return (DecoratedKey) iter.next();
+ return iter.next();
}
public void remove()
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
Wed Sep 22 03:41:52 2010
@@ -32,6 +32,7 @@ import org.apache.cassandra.config.Datab
import org.apache.cassandra.db.*;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.AbstractCompactedRow;
+import org.apache.cassandra.io.ICompactionInfo;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.SegmentedFile;
import org.apache.cassandra.service.StorageService;
@@ -211,62 +212,35 @@ public class SSTableWriter extends SSTab
return dfile.length() / (dataPosition / keys);
}
+ public static Builder createBuilder(Descriptor desc)
+ {
+ if (!desc.isLatestVersion)
+ // TODO: streaming between different versions will fail: need
support for
+ // recovering other versions to provide a stable streaming api
+ throw new RuntimeException(String.format("Cannot recover SSTable
with version %s (current version %s).",
+ desc.version,
Descriptor.CURRENT_VERSION));
+
+ return new Builder(desc);
+ }
+
/**
- * If either of the index or filter files are missing, rebuilds both.
- * TODO: Builds most of the in-memory state of the sstable, but doesn't
actually open it.
+ * Removes the given SSTable from temporary status and opens it,
rebuilding the
+ * bloom filter and row index from the data file.
*/
- private static void maybeRecover(Descriptor desc) throws IOException
+ public static class Builder implements ICompactionInfo
{
- logger.debug("In maybeRecover with Descriptor {}", desc);
- File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
- File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
- if (ifile.exists() && ffile.exists())
- // nothing to do
- return;
-
- ColumnFamilyStore cfs =
Table.open(desc.ksname).getColumnFamilyStore(desc.cfname);
-
- // remove existing files
- ifile.delete();
- ffile.delete();
-
- // open the data file for input, and an IndexWriter for output
- BufferedRandomAccessFile dfile = new
BufferedRandomAccessFile(desc.filenameFor(SSTable.COMPONENT_DATA), "r", 8 *
1024 * 1024);
- IndexWriter iwriter;
- long estimatedRows;
- try
- {
- estimatedRows = estimateRows(desc, dfile);
- iwriter = new IndexWriter(desc, StorageService.getPartitioner(),
estimatedRows);
- }
- catch(IOException e)
- {
- dfile.close();
- throw e;
- }
+ private final Descriptor desc;
+ public final ColumnFamilyStore cfs;
+ private BufferedRandomAccessFile dfile;
- // build the index and filter
- long rows = 0;
- try
- {
- DecoratedKey key;
- long dataPosition = 0;
- while (dataPosition < dfile.length())
- {
- key = SSTableReader.decodeKey(StorageService.getPartitioner(),
desc, FBUtilities.readShortByteArray(dfile));
- long dataSize = SSTableReader.readRowSize(dfile, desc);
- iwriter.afterAppend(key, dataPosition);
- dataPosition = dfile.getFilePointer() + dataSize;
- dfile.seek(dataPosition);
- rows++;
- }
- }
- finally
+ public Builder(Descriptor desc)
{
+
+ this.desc = desc;
+ cfs = Table.open(desc.ksname).getColumnFamilyStore(desc.cfname);
try
{
- dfile.close();
- iwriter.close();
+ dfile = new
BufferedRandomAccessFile(desc.filenameFor(SSTable.COMPONENT_DATA), "r", 8 *
1024 * 1024);
}
catch (IOException e)
{
@@ -274,44 +248,80 @@ public class SSTableWriter extends SSTab
}
}
- if (!cfs.getIndexedColumns().isEmpty())
+ public SSTableReader build() throws IOException
{
- Future future = CompactionManager.instance.submitIndexBuild(cfs,
cfs.getIndexedColumns(), new KeyIterator(desc));
+ File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
+ File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
+ assert !ifile.exists();
+ assert !ffile.exists();
+
+ IndexWriter iwriter;
+ long estimatedRows;
try
{
- future.get();
- for (byte[] column : cfs.getIndexedColumns())
-
cfs.getIndexedColumnFamilyStore(column).forceBlockingFlush();
+ estimatedRows = estimateRows(desc, dfile);
+ iwriter = new IndexWriter(desc,
StorageService.getPartitioner(), estimatedRows);
+ }
+ catch(IOException e)
+ {
+ dfile.close();
+ throw e;
}
- catch (InterruptedException e)
+
+ // build the index and filter
+ long rows = 0;
+ try
{
- throw new AssertionError(e);
+ DecoratedKey key;
+ long dataPosition = 0;
+ while (dataPosition < dfile.length())
+ {
+ key =
SSTableReader.decodeKey(StorageService.getPartitioner(), desc,
FBUtilities.readShortByteArray(dfile));
+ long dataSize = SSTableReader.readRowSize(dfile, desc);
+ iwriter.afterAppend(key, dataPosition);
+ dataPosition = dfile.getFilePointer() + dataSize;
+ dfile.seek(dataPosition);
+ rows++;
+ }
}
- catch (ExecutionException e)
+ finally
{
- throw new RuntimeException(e);
+ try
+ {
+ dfile.close();
+ iwriter.close();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
}
+
+ logger.debug("estimated row count was %s of real count",
((double)estimatedRows) / rows);
+ return SSTableReader.open(rename(desc,
SSTable.componentsFor(desc)));
}
- logger.debug("estimated row count was %s of real count",
((double)estimatedRows) / rows);
- }
+ public long getTotalBytes()
+ {
+ try
+ {
+ return dfile.length();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
- /**
- * Removes the given SSTable from temporary status and opens it,
rebuilding the non-essential portions of the
- * file if necessary.
- */
- public static SSTableReader recoverAndOpen(Descriptor desc) throws
IOException
- {
- if (!desc.isLatestVersion)
- // TODO: streaming between different versions will fail: need
support for
- // recovering other versions to provide a stable streaming api
- throw new RuntimeException(String.format("Cannot recover SSTable
with version %s (current version %s).",
- desc.version,
Descriptor.CURRENT_VERSION));
+ public long getBytesRead()
+ {
+ return dfile.getFilePointer();
+ }
- // FIXME: once maybeRecover is recovering BMIs, it should return the
recovered
- // components
- maybeRecover(desc);
- return SSTableReader.open(rename(desc, SSTable.componentsFor(desc)));
+ public String getTaskType()
+ {
+ return "SSTable rebuild";
+ }
}
/**
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
Wed Sep 22 03:41:52 2010
@@ -100,24 +100,6 @@ public class IncomingStreamReader
fc.close();
}
- addSSTable(localFile);
- session.finished(remoteFile);
- }
-
- public static void addSSTable(PendingFile pendingFile)
- {
- // file was successfully streamed
- Descriptor desc = pendingFile.desc;
- try
- {
- SSTableReader sstable =
SSTableWriter.recoverAndOpen(pendingFile.desc);
-
Table.open(desc.ksname).getColumnFamilyStore(desc.cfname).addSSTable(sstable);
- logger.info("Streaming added " + sstable);
- }
- catch (IOException e)
- {
- logger.error("Failed adding {}", pendingFile, e);
- throw new RuntimeException("Not able to add streamed file " +
pendingFile.getFilename(), e);
- }
+ session.finished(remoteFile, localFile);
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
Wed Sep 22 03:41:52 2010
@@ -22,7 +22,13 @@ import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.CompactionManager;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.net.MessagingService;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.apache.cassandra.utils.Pair;
@@ -41,6 +47,8 @@ public class StreamInSession
private final Pair<InetAddress, Long> context;
private final Runnable callback;
private String table;
+ private final List<Future<SSTableReader>> buildFutures = new
ArrayList<Future<SSTableReader>>();
+ private ColumnFamilyStore cfs;
private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)
{
@@ -84,13 +92,19 @@ public class StreamInSession
if(logger.isDebugEnabled())
logger.debug("Adding file {} to Stream Request queue",
file.getFilename());
this.files.add(file);
+ if (cfs == null)
+ cfs =
Table.open(file.desc.ksname).getColumnFamilyStore(file.desc.cfname);
}
}
- public void finished(PendingFile remoteFile) throws IOException
+ public void finished(PendingFile remoteFile, PendingFile localFile) throws
IOException
{
if (logger.isDebugEnabled())
logger.debug("Finished {}. Sending ack to {}", remoteFile, this);
+
+ Future future =
CompactionManager.instance.submitSSTableBuild(localFile.desc);
+ buildFutures.add(future);
+
files.remove(remoteFile);
StreamReply reply = new StreamReply(remoteFile.getFilename(),
getSessionId(), StreamReply.Status.FILE_FINISHED);
// send a StreamStatus message telling the source node it can delete
this file
@@ -108,6 +122,31 @@ public class StreamInSession
{
if (files.isEmpty())
{
+ // wait for bloom filters and row indexes to finish building
+ List<SSTableReader> sstables = new
ArrayList<SSTableReader>(buildFutures.size());
+ for (Future<SSTableReader> future : buildFutures)
+ {
+ try
+ {
+ SSTableReader sstable = future.get();
+ cfs.addSSTable(sstable);
+ sstables.add(sstable);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // build secondary indexes
+ if (cfs != null && !cfs.getIndexedColumns().isEmpty())
+ cfs.buildSecondaryIndexes(sstables, cfs.getIndexedColumns());
+
+ // send reply to source that we're done
StreamReply reply = new StreamReply("", getSessionId(),
StreamReply.Status.SESSION_FINISHED);
logger.info("Finished streaming session {} from {}",
getSessionId(), getHost());
MessagingService.instance.sendOneWay(reply.createMessage(),
getHost());
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
Wed Sep 22 03:41:52 2010
@@ -29,21 +29,17 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.db.Column;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.db.TimestampClock;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.IFilter;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.IndexClause;
import org.apache.cassandra.thrift.IndexExpression;
@@ -54,7 +50,7 @@ import org.junit.Test;
public class SSTableWriterTest extends CleanupHelper {
@Test
- public void testRecoverAndOpen() throws IOException
+ public void testRecoverAndOpen() throws IOException, ExecutionException,
InterruptedException
{
RowMutation rm;
@@ -80,13 +76,13 @@ public class SSTableWriterTest extends C
SSTableReader orig = SSTableUtils.writeRawSSTable("Keyspace1",
"Indexed1", entries);
// whack the index to trigger the recover
- new File(orig.desc.filenameFor(Component.PRIMARY_INDEX)).delete();
- new File(orig.desc.filenameFor(Component.FILTER)).delete();
-
- SSTableReader sstr = SSTableWriter.recoverAndOpen(orig.desc);
-
+
FileUtils.deleteWithConfirm(orig.desc.filenameFor(Component.PRIMARY_INDEX));
+ FileUtils.deleteWithConfirm(orig.desc.filenameFor(Component.FILTER));
+
+ SSTableReader sstr =
CompactionManager.instance.submitSSTableBuild(orig.desc).get();
ColumnFamilyStore cfs =
Table.open("Keyspace1").getColumnFamilyStore("Indexed1");
cfs.addSSTable(sstr);
+ cfs.buildSecondaryIndexes(cfs.getSSTables(), cfs.getIndexedColumns());
IndexExpression expr = new
IndexExpression("birthdate".getBytes("UTF8"), IndexOperator.EQ,
FBUtilities.toByteArray(1L));
IndexClause clause = new IndexClause(Arrays.asList(expr),
"".getBytes(), 100);
@@ -95,7 +91,7 @@ public class SSTableWriterTest extends C
Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
List<Row> rows = cfs.scan(clause, range, filter);
- assertEquals("IndexExpression should return two rows on
recoverAndOpen",2, rows.size());
+ assertEquals("IndexExpression should return two rows on
recoverAndOpen", 2, rows.size());
assertTrue("First result should be
'k1'",Arrays.equals("k1".getBytes(), rows.get(0).key.key));
}
}