Author: jbellis
Date: Fri Apr 17 01:48:58 2009
New Revision: 765831

URL: http://svn.apache.org/viewvc?rev=765831&view=rev
Log:
waitForFlush -> forceBlockingFlush.  ServerTest.cleanup now flushes and cleans 
out
all ColumnFamilyStores and commitlog, allowing remove tests to not step on each
others' toes (all tests pass now).

patch by jbellis; reviewed by Sandeep Tata for #85

Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/test/org/apache/cassandra/ServerTest.java
    
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java

Modified: 
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=765831&r1=765830&r2=765831&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ 
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java 
Fri Apr 17 01:48:58 2009
@@ -84,7 +84,7 @@
     private AtomicReference<BinaryMemtable> binaryMemtable_;
 
     /* SSTables on disk for this column family */
-    Set<String> ssTables_ = new HashSet<String>();
+    private Set<String> ssTables_ = new HashSet<String>();
 
     /* Modification lock used for protecting reads from compactions. */
     private ReentrantReadWriteLock lock_ = new ReentrantReadWriteLock(true);
@@ -433,11 +433,23 @@
 
     void forceFlush() throws IOException
     {
-        //MemtableManager.instance().submit(getColumnFamilyName(), 
memtable_.get() , CommitLog.CommitLogContext.NULL);
-        //memtable_.get().flush(true, CommitLog.CommitLogContext.NULL);
         memtable_.get().forceflush(this);
     }
 
+    void forceBlockingFlush() throws IOException, ExecutionException, 
InterruptedException
+    {
+        forceFlush();
+        // block for flush to finish by adding a no-op action to the flush 
executorservice
+        // and waiting for that to finish.  (this works since flush ES is 
single-threaded.)
+        Future f = MemtableManager.instance().flusher_.submit(new Runnable()
+        {
+            public void run()
+            {
+            }
+        });
+        f.get();
+    }
+
     void forceFlushBinary()
     {
         BinaryMemtableManager.instance().submit(getColumnFamilyName(), 
binaryMemtable_.get());
@@ -1407,4 +1419,18 @@
     {
         return memtableSwitchCount;
     }
+
+    /**
+     * clears out all data associated with this ColumnFamily.
+     * For use in testing.
+     */
+    public void reset() throws IOException, ExecutionException, 
InterruptedException
+    {
+        forceBlockingFlush();
+        for (String fName : ssTables_)
+        {
+            new File(fName).delete();
+        }
+        ssTables_.clear();
+    }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java?rev=765831&r1=765830&r2=765831&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java 
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java Fri 
Apr 17 01:48:58 2009
@@ -57,7 +57,7 @@
  *
  * Author : Avinash Lakshman ( [email protected]) & Prashant Malik ( 
[email protected] )
  */
-class CommitLog
+public class CommitLog
 {
     private static final int bufSize_ = 128*1024*1024;
     private static Map<String, CommitLog> instances_ = new HashMap<String, 
CommitLog>();
@@ -623,6 +623,11 @@
        forcedRollOver_ = true;
     }
 
+    public static void reset()
+    {
+        CommitLog.instances_.clear();
+    }
+
     public static void main(String[] args) throws Throwable
     {
         LogUtil.init();

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java?rev=765831&r1=765830&r2=765831&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java Fri Apr 17 
01:48:58 2009
@@ -425,7 +425,7 @@
         return columnFamilyStores_;
     }
 
-    ColumnFamilyStore getColumnFamilyStore(String cfName)
+    public ColumnFamilyStore getColumnFamilyStore(String cfName)
     {
         return columnFamilyStores_.get(cfName);
     }

Modified: incubator/cassandra/trunk/test/org/apache/cassandra/ServerTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/ServerTest.java?rev=765831&r1=765830&r2=765831&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/org/apache/cassandra/ServerTest.java 
(original)
+++ incubator/cassandra/trunk/test/org/apache/cassandra/ServerTest.java Fri Apr 
17 01:48:58 2009
@@ -3,15 +3,34 @@
 import org.testng.annotations.Test;
 import org.testng.annotations.BeforeMethod;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.CommitLog;
 
 import java.io.File;
+import java.io.IOException;
 
 @Test(groups={"serial"})
 public class ServerTest {
-    // TODO clean up static structures too (e.g. memtables)
     @BeforeMethod
     public void cleanup()
     {
+        Table table = Table.open("Table1");
+        for (String cfName : table.getColumnFamilies())
+        {
+            ColumnFamilyStore cfs = table.getColumnFamilyStore(cfName);
+            try
+            {
+                cfs.reset();
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        CommitLog.reset();
+
         String[] directoryNames = {
                 DatabaseDescriptor.getBootstrapFileLocation(),
                 DatabaseDescriptor.getLogFileLocation(),

Modified: 
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=765831&r1=765830&r2=765831&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
 (original)
+++ 
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
 Fri Apr 17 01:48:58 2009
@@ -67,9 +67,8 @@
 
         // validateNameSort(table);
 
-        table.getColumnFamilyStore("Standard1").forceFlush();
-        table.getColumnFamilyStore("Super1").forceFlush();
-        waitForFlush();
+        table.getColumnFamilyStore("Standard1").forceBlockingFlush();
+        table.getColumnFamilyStore("Super1").forceBlockingFlush();
         validateNameSort(table);
     }
 
@@ -93,8 +92,7 @@
 
         validateTimeSort(table);
 
-        table.getColumnFamilyStore("StandardByTime1").forceFlush();
-        waitForFlush();
+        table.getColumnFamilyStore("StandardByTime1").forceBlockingFlush();
         validateTimeSort(table);
 
         // interleave some new data to test memtable + sstable
@@ -154,18 +152,6 @@
         }
     }
 
-    private void waitForFlush()
-            throws InterruptedException, ExecutionException
-    {
-        Future f = MemtableManager.instance().flusher_.submit(new Runnable()
-        {
-            public void run()
-            {
-            }
-        });
-        f.get();
-    }
-
     private void validateNameSort(Table table)
             throws ColumnFamilyNotDefinedException, IOException
     {
@@ -213,8 +199,7 @@
         rm = new RowMutation("Table1", "key1");
         rm.add("Standard1:Column1", "asdf".getBytes(), 0);
         rm.apply();
-        store.forceFlush();
-        waitForFlush();
+        store.forceBlockingFlush();
 
         // remove
         rm = new RowMutation("Table1", "key1");
@@ -236,8 +221,7 @@
         rm = new RowMutation("Table1", "key1");
         rm.add("Super1:SC1:Column1", "asdf".getBytes(), 0);
         rm.apply();
-        store.forceFlush();
-        waitForFlush();
+        store.forceBlockingFlush();
 
         // remove
         rm = new RowMutation("Table1", "key1");
@@ -259,8 +243,7 @@
         rm = new RowMutation("Table1", "key1");
         rm.add("Super1:SC1:Column1", "asdf".getBytes(), 0);
         rm.apply();
-        store.forceFlush();
-        waitForFlush();
+        store.forceBlockingFlush();
 
         // remove
         rm = new RowMutation("Table1", "key1");
@@ -359,7 +342,6 @@
     {
         Table table = Table.open("Table1");
         ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
-        store.ssTables_.clear(); // TODO integrate this better into test 
setup/teardown
 
         for (int j = 0; j < 5; j++) {
             for (int i = 0; i < 10; i++) {
@@ -369,8 +351,7 @@
                 rm.add("Standard1:A", new byte[0], epoch);
                 rm.apply();
             }
-            store.forceFlush();
-            waitForFlush();
+            store.forceBlockingFlush();
         }
         Future ft = MinorCompactionManager.instance().submit(store);
         ft.get();


Reply via email to