Author: jbellis
Date: Fri Apr 17 01:48:58 2009
New Revision: 765831
URL: http://svn.apache.org/viewvc?rev=765831&view=rev
Log:
waitForFlush -> forceBlockingFlush. ServerTest.cleanup now flushes and cleans
out
all ColumnFamilyStores and commitlog, allowing remove tests to not step on each
others' toes (all tests pass now).
patch by jbellis; reviewed by Sandeep Tata for #85
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/test/org/apache/cassandra/ServerTest.java
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=765831&r1=765830&r2=765831&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
Fri Apr 17 01:48:58 2009
@@ -84,7 +84,7 @@
private AtomicReference<BinaryMemtable> binaryMemtable_;
/* SSTables on disk for this column family */
- Set<String> ssTables_ = new HashSet<String>();
+ private Set<String> ssTables_ = new HashSet<String>();
/* Modification lock used for protecting reads from compactions. */
private ReentrantReadWriteLock lock_ = new ReentrantReadWriteLock(true);
@@ -433,11 +433,23 @@
void forceFlush() throws IOException
{
- //MemtableManager.instance().submit(getColumnFamilyName(),
memtable_.get() , CommitLog.CommitLogContext.NULL);
- //memtable_.get().flush(true, CommitLog.CommitLogContext.NULL);
memtable_.get().forceflush(this);
}
+ void forceBlockingFlush() throws IOException, ExecutionException,
InterruptedException
+ {
+ forceFlush();
+ // block for flush to finish by adding a no-op action to the flush
executorservice
+ // and waiting for that to finish. (this works since flush ES is
single-threaded.)
+ Future f = MemtableManager.instance().flusher_.submit(new Runnable()
+ {
+ public void run()
+ {
+ }
+ });
+ f.get();
+ }
+
void forceFlushBinary()
{
BinaryMemtableManager.instance().submit(getColumnFamilyName(),
binaryMemtable_.get());
@@ -1407,4 +1419,18 @@
{
return memtableSwitchCount;
}
+
+ /**
+ * clears out all data associated with this ColumnFamily.
+ * For use in testing.
+ */
+ public void reset() throws IOException, ExecutionException,
InterruptedException
+ {
+ forceBlockingFlush();
+ for (String fName : ssTables_)
+ {
+ new File(fName).delete();
+ }
+ ssTables_.clear();
+ }
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java?rev=765831&r1=765830&r2=765831&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java Fri
Apr 17 01:48:58 2009
@@ -57,7 +57,7 @@
*
* Author : Avinash Lakshman ( [email protected]) & Prashant Malik (
[email protected] )
*/
-class CommitLog
+public class CommitLog
{
private static final int bufSize_ = 128*1024*1024;
private static Map<String, CommitLog> instances_ = new HashMap<String,
CommitLog>();
@@ -623,6 +623,11 @@
forcedRollOver_ = true;
}
+ public static void reset()
+ {
+ CommitLog.instances_.clear();
+ }
+
public static void main(String[] args) throws Throwable
{
LogUtil.init();
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java?rev=765831&r1=765830&r2=765831&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java Fri Apr 17
01:48:58 2009
@@ -425,7 +425,7 @@
return columnFamilyStores_;
}
- ColumnFamilyStore getColumnFamilyStore(String cfName)
+ public ColumnFamilyStore getColumnFamilyStore(String cfName)
{
return columnFamilyStores_.get(cfName);
}
Modified: incubator/cassandra/trunk/test/org/apache/cassandra/ServerTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/ServerTest.java?rev=765831&r1=765830&r2=765831&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/org/apache/cassandra/ServerTest.java
(original)
+++ incubator/cassandra/trunk/test/org/apache/cassandra/ServerTest.java Fri Apr
17 01:48:58 2009
@@ -3,15 +3,34 @@
import org.testng.annotations.Test;
import org.testng.annotations.BeforeMethod;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.CommitLog;
import java.io.File;
+import java.io.IOException;
@Test(groups={"serial"})
public class ServerTest {
- // TODO clean up static structures too (e.g. memtables)
@BeforeMethod
public void cleanup()
{
+ Table table = Table.open("Table1");
+ for (String cfName : table.getColumnFamilies())
+ {
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+ try
+ {
+ cfs.reset();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ CommitLog.reset();
+
String[] directoryNames = {
DatabaseDescriptor.getBootstrapFileLocation(),
DatabaseDescriptor.getLogFileLocation(),
Modified:
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=765831&r1=765830&r2=765831&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
(original)
+++
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Fri Apr 17 01:48:58 2009
@@ -67,9 +67,8 @@
// validateNameSort(table);
- table.getColumnFamilyStore("Standard1").forceFlush();
- table.getColumnFamilyStore("Super1").forceFlush();
- waitForFlush();
+ table.getColumnFamilyStore("Standard1").forceBlockingFlush();
+ table.getColumnFamilyStore("Super1").forceBlockingFlush();
validateNameSort(table);
}
@@ -93,8 +92,7 @@
validateTimeSort(table);
- table.getColumnFamilyStore("StandardByTime1").forceFlush();
- waitForFlush();
+ table.getColumnFamilyStore("StandardByTime1").forceBlockingFlush();
validateTimeSort(table);
// interleave some new data to test memtable + sstable
@@ -154,18 +152,6 @@
}
}
- private void waitForFlush()
- throws InterruptedException, ExecutionException
- {
- Future f = MemtableManager.instance().flusher_.submit(new Runnable()
- {
- public void run()
- {
- }
- });
- f.get();
- }
-
private void validateNameSort(Table table)
throws ColumnFamilyNotDefinedException, IOException
{
@@ -213,8 +199,7 @@
rm = new RowMutation("Table1", "key1");
rm.add("Standard1:Column1", "asdf".getBytes(), 0);
rm.apply();
- store.forceFlush();
- waitForFlush();
+ store.forceBlockingFlush();
// remove
rm = new RowMutation("Table1", "key1");
@@ -236,8 +221,7 @@
rm = new RowMutation("Table1", "key1");
rm.add("Super1:SC1:Column1", "asdf".getBytes(), 0);
rm.apply();
- store.forceFlush();
- waitForFlush();
+ store.forceBlockingFlush();
// remove
rm = new RowMutation("Table1", "key1");
@@ -259,8 +243,7 @@
rm = new RowMutation("Table1", "key1");
rm.add("Super1:SC1:Column1", "asdf".getBytes(), 0);
rm.apply();
- store.forceFlush();
- waitForFlush();
+ store.forceBlockingFlush();
// remove
rm = new RowMutation("Table1", "key1");
@@ -359,7 +342,6 @@
{
Table table = Table.open("Table1");
ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
- store.ssTables_.clear(); // TODO integrate this better into test
setup/teardown
for (int j = 0; j < 5; j++) {
for (int i = 0; i < 10; i++) {
@@ -369,8 +351,7 @@
rm.add("Standard1:A", new byte[0], epoch);
rm.apply();
}
- store.forceFlush();
- waitForFlush();
+ store.forceBlockingFlush();
}
Future ft = MinorCompactionManager.instance().submit(store);
ft.get();