Author: jbellis
Date: Tue Sep 28 17:24:06 2010
New Revision: 1002275

URL: http://svn.apache.org/viewvc?rev=1002275&view=rev
Log:
make nodetool compact and cleanup blocking.
patch by Nirmal Ranganathan; reviewed by jbellis for CASSANDRA-1449

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
    
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Sep 28 17:24:06 2010
@@ -108,6 +108,7 @@ dev
  * support TTL'd index values (CASSANDRA-1536)
  * make removetoken work like decommission (CASSANDRA-1216)
  * make cli comparator-aware and improve quote rules (CASSANDRA-1523,-1524)
+ * make nodetool compact and cleanup blocking (CASSANDRA-1449)
 
 
 0.7-beta1

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue 
Sep 28 17:24:06 2010
@@ -728,9 +728,9 @@ public class ColumnFamilyStore implement
         return maxFile;
     }
 
-    void forceCleanup()
+    void forceCleanup() throws ExecutionException, InterruptedException
     {
-        CompactionManager.instance.submitCleanup(ColumnFamilyStore.this);
+        CompactionManager.instance.performCleanup(ColumnFamilyStore.this);
     }
 
     void markCompacted(Collection<SSTableReader> sstables)
@@ -1436,9 +1436,9 @@ public class ColumnFamilyStore implement
         ssTables.getRowCache().remove(key);
     }
 
-    public void forceMajorCompaction()
+    public void forceMajorCompaction() throws InterruptedException, 
ExecutionException
     {
-        CompactionManager.instance.submitMajor(this);
+        CompactionManager.instance.performMajor(this);
     }
 
     public void invalidateRowCache()

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java 
Tue Sep 28 17:24:06 2010
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.db;
 
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLongArray;
@@ -133,7 +134,7 @@ public interface ColumnFamilyStoreMBean
     /**
      * force a major compaction of this column family
      */
-    public void forceMajorCompaction();
+    public void forceMajorCompaction() throws ExecutionException, 
InterruptedException;
 
     /**
      * invalidate the row cache; for use after bulk loading via BinaryMemtable

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Tue 
Sep 28 17:24:06 2010
@@ -25,6 +25,7 @@ import java.net.InetAddress;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -135,7 +136,7 @@ public class CompactionManager implement
         }
     }
 
-    public Future<Object> submitCleanup(final ColumnFamilyStore cfStore)
+    public void performCleanup(final ColumnFamilyStore cfStore) throws 
InterruptedException, ExecutionException
     {
         Callable<Object> runnable = new Callable<Object>()
         {
@@ -145,7 +146,7 @@ public class CompactionManager implement
                 return this;
             }
         };
-        return executor.submit(runnable);
+        executor.submit(runnable).get();
     }
 
     public Future<List<SSTableReader>> submitAnticompaction(final 
ColumnFamilyStore cfStore, final Collection<Range> ranges, final InetAddress 
target)
@@ -160,12 +161,12 @@ public class CompactionManager implement
         return executor.submit(callable);
     }
 
-    public Future submitMajor(final ColumnFamilyStore cfStore)
+    public void performMajor(final ColumnFamilyStore cfStore) throws 
InterruptedException, ExecutionException
     {
-        return submitMajor(cfStore, 0, (int) (System.currentTimeMillis() / 
1000) - cfStore.metadata.gcGraceSeconds);
+        submitMajor(cfStore, 0, (int) (System.currentTimeMillis() / 1000) - 
cfStore.metadata.gcGraceSeconds).get();
     }
 
-    public Future submitMajor(final ColumnFamilyStore cfStore, final long 
skip, final int gcBefore)
+    public Future<Object> submitMajor(final ColumnFamilyStore cfStore, final 
long skip, final int gcBefore)
     {
         Callable<Object> callable = new Callable<Object>()
         {
@@ -195,7 +196,7 @@ public class CompactionManager implement
         return executor.submit(callable);
     }
 
-    public Future submitValidation(final ColumnFamilyStore cfStore, final 
AntiEntropyService.Validator validator)
+    public Future<Object> submitValidation(final ColumnFamilyStore cfStore, 
final AntiEntropyService.Validator validator)
     {
         Callable<Object> callable = new Callable<Object>()
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Tue Sep 28 
17:24:06 2010
@@ -138,7 +138,7 @@ public class Table
     /**
      * Do a cleanup of keys that do not belong locally.
      */
-    public void forceCleanup()
+    public void forceCleanup() throws IOException, ExecutionException, 
InterruptedException
     {
         if (name.equals(SYSTEM_TABLE))
             throw new RuntimeException("Cleanup of the system table is neither 
necessary nor wise");
@@ -146,8 +146,7 @@ public class Table
         for (ColumnFamilyStore cfStore : columnFamilyStores.values())
             cfStore.forceCleanup();
     }
-    
-    
+
     /**
      * Take a snapshot of the entire set of column families with a given 
timestamp.
      * 
@@ -201,10 +200,10 @@ public class Table
      * This method is an ADMIN operation to force compaction
      * of all SSTables on disk. 
      */
-    public void forceCompaction()
+    public void forceCompaction() throws IOException, ExecutionException, 
InterruptedException
     {
         for (ColumnFamilyStore cfStore : columnFamilyStores.values())
-            CompactionManager.instance.submitMajor(cfStore);
+            CompactionManager.instance.performMajor(cfStore);
     }
 
     /**

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
Tue Sep 28 17:24:06 2010
@@ -25,6 +25,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.*;
+
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -1168,7 +1169,7 @@ public class StorageService implements I
         return 
Gossiper.instance.getCurrentGenerationNumber(FBUtilities.getLocalAddress());
     }
 
-    public void forceTableCleanup() throws IOException
+    public void forceTableCleanup() throws IOException, ExecutionException, 
InterruptedException
     {
         List<String> tables = DatabaseDescriptor.getNonSystemTables();
         for (String tName : tables)
@@ -1178,19 +1179,19 @@ public class StorageService implements I
         }
     }
 
-    public void forceTableCleanup(String tableName) throws IOException
+    public void forceTableCleanup(String tableName) throws IOException, 
ExecutionException, InterruptedException
     {
         Table table = getValidTable(tableName);
         table.forceCleanup();
     }
-    
-    public void forceTableCompaction() throws IOException
+
+    public void forceTableCompaction() throws IOException, ExecutionException, 
InterruptedException
     {
         for (Table table : Table.all())
             table.forceCompaction();
     }
 
-    public void forceTableCompaction(String tableName) throws IOException
+    public void forceTableCompaction(String tableName) throws IOException, 
ExecutionException, InterruptedException
     {
         Table table = getValidTable(tableName);
         table.forceCompaction();
@@ -1270,14 +1271,15 @@ public class StorageService implements I
      * @param columnFamilies
      * @throws IOException
      */
-    public void forceTableFlush(final String tableName, final String... 
columnFamilies) throws IOException
+    public void forceTableFlush(final String tableName, final String... 
columnFamilies)
+                throws IOException, ExecutionException, InterruptedException
     {
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, 
columnFamilies))
         {
             logger_.debug("Forcing binary flush on keyspace " + tableName + ", 
CF " + cfStore.getColumnFamilyName());
             cfStore.forceFlushBinary();
             logger_.debug("Forcing flush on keyspace " + tableName + ", CF " + 
cfStore.getColumnFamilyName());
-            cfStore.forceFlush();
+            cfStore.forceBlockingFlush();
         }
     }
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
Tue Sep 28 17:24:06 2010
@@ -132,22 +132,22 @@ public interface StorageServiceMBean
     /**
      * Forces major compaction (all sstable files compacted)
      */
-    public void forceTableCompaction() throws IOException;
+    public void forceTableCompaction() throws IOException, ExecutionException, 
InterruptedException;
 
     /**
      * Forces major compaction of a single keyspace
      */
-    public void forceTableCompaction(String tableName) throws IOException;
+    public void forceTableCompaction(String tableName) throws IOException, 
ExecutionException, InterruptedException;
 
     /**
      * Trigger a cleanup of keys on all tables.
      */
-    public void forceTableCleanup() throws IOException;
+    public void forceTableCleanup() throws IOException, ExecutionException, 
InterruptedException;
 
     /**
      * Trigger a cleanup of keys on a single keyspace
      */
-    public void forceTableCleanup(String tableName) throws IOException;
+    public void forceTableCleanup(String tableName) throws IOException, 
ExecutionException, InterruptedException;
 
     /**
      * Takes the snapshot for a given table.
@@ -176,7 +176,7 @@ public interface StorageServiceMBean
      * @param columnFamilies
      * @throws IOException
      */
-    public void forceTableFlush(String tableName, String... columnFamilies) 
throws IOException;
+    public void forceTableFlush(String tableName, String... columnFamilies) 
throws IOException, ExecutionException, InterruptedException;
 
     /**
      * Triggers proactive repair for given column families, or all 
columnfamilies for the given table

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Tue Sep 28 
17:24:06 2010
@@ -427,17 +427,35 @@ public class NodeCmd {
         }
         else if (cmdName.equals("cleanup"))
         {
-            if (arguments.length > 1)
-                probe.forceTableCleanup(arguments[1]);
-            else
-                probe.forceTableCleanup();
+            try
+            {
+                if (arguments.length > 1)
+                    probe.forceTableCleanup(arguments[1]);
+                else
+                    probe.forceTableCleanup();
+            }
+            catch (ExecutionException ee)
+            {
+                System.err.println("Error occured during Keyspace cleanup");
+                ee.printStackTrace();
+                System.exit(3);
+            }
         }
         else if (cmdName.equals("compact"))
         {
-            if (arguments.length > 1)
-                probe.forceTableCompaction(arguments[1]);
-            else
-                probe.forceTableCompaction();
+            try
+            {
+                if (arguments.length > 1)
+                    probe.forceTableCompaction(arguments[1]);
+                else
+                    probe.forceTableCompaction();
+            }
+            catch (ExecutionException ee)
+            {
+                System.err.println("Error occured during Keyspace compaction");
+                ee.printStackTrace();
+                System.exit(3);
+            }
         }
         else if (cmdName.equals("cfstats"))
         {
@@ -511,7 +529,15 @@ public class NodeCmd {
                 columnFamilies[i] = cmd.getArgs()[i + 2];
             }
             if (cmdName.equals("flush"))
-                probe.forceTableFlush(cmd.getArgs()[1], columnFamilies);
+                try
+                {
+                    probe.forceTableFlush(cmd.getArgs()[1], columnFamilies);
+                } catch (ExecutionException ee)
+                {
+                    System.err.println("Error occured during flushing");
+                    ee.printStackTrace();
+                    System.exit(3);
+                }
             else // cmdName.equals("repair")
                 probe.forceTableRepair(cmd.getArgs()[1], columnFamilies);
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Tue Sep 
28 17:24:06 2010
@@ -122,27 +122,27 @@ public class NodeProbe
                 mbeanServerConn, ManagementFactory.RUNTIME_MXBEAN_NAME, 
RuntimeMXBean.class);
     }
 
-    public void forceTableCleanup() throws IOException
+    public void forceTableCleanup() throws IOException, ExecutionException, 
InterruptedException
     {
         ssProxy.forceTableCleanup();
     }
 
-    public void forceTableCleanup(String tableName) throws IOException
+    public void forceTableCleanup(String tableName) throws IOException, 
ExecutionException, InterruptedException
     {
         ssProxy.forceTableCleanup(tableName);
     }
 
-    public void forceTableCompaction() throws IOException
+    public void forceTableCompaction() throws IOException, ExecutionException, 
InterruptedException
     {
         ssProxy.forceTableCompaction();
     }
 
-    public void forceTableCompaction(String tableName) throws IOException
+    public void forceTableCompaction(String tableName) throws IOException, 
ExecutionException, InterruptedException
     {
         ssProxy.forceTableCompaction(tableName);
     }
 
-    public void forceTableFlush(String tableName, String... columnFamilies) 
throws IOException
+    public void forceTableFlush(String tableName, String... columnFamilies) 
throws IOException, ExecutionException, InterruptedException
     {
         ssProxy.forceTableFlush(tableName, columnFamilies);
     }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java 
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java Tue 
Sep 28 17:24:06 2010
@@ -75,7 +75,7 @@ public class CompactionsTest extends Cle
         }
         if (store.getSSTables().size() > 1)
         {
-            CompactionManager.instance.submitMajor(store).get();
+            CompactionManager.instance.performMajor(store);
         }
         assertEquals(inserted.size(), Util.getRangeSlice(store).size());
     }

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java 
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java 
Tue Sep 28 17:24:06 2010
@@ -50,7 +50,7 @@ public class OneCompactionTest extends C
             store.forceBlockingFlush();
             assertEquals(inserted.size(), Util.getRangeSlice(store).size());
         }
-        CompactionManager.instance.submitMajor(store).get();
+        CompactionManager.instance.performMajor(store);
         assertEquals(1, store.getSSTables().size());
     }
 

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java 
(original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java 
Tue Sep 28 17:24:06 2010
@@ -60,7 +60,7 @@ public class RemoveSuperColumnTest exten
         store.forceBlockingFlush();
         validateRemoveTwoSources(dk);
 
-        CompactionManager.instance.submitMajor(store).get();
+        CompactionManager.instance.performMajor(store);
         assertEquals(1, store.getSSTables().size());
         validateRemoveCompacted(dk);
     }
@@ -150,7 +150,7 @@ public class RemoveSuperColumnTest exten
         store.forceBlockingFlush();
         validateRemoveWithNewData(dk);
 
-        CompactionManager.instance.submitMajor(store).get();
+        CompactionManager.instance.performMajor(store);
         assertEquals(1, store.getSSTables().size());
         validateRemoveWithNewData(dk);
     }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java Tue Sep 28 
17:24:06 2010
@@ -392,7 +392,7 @@ public class TableTest extends CleanupHe
         // compact so we have a big row with more than the minimum index count
         if (cfStore.getSSTables().size() > 1)
         {
-            CompactionManager.instance.submitMajor(cfStore).get();
+            CompactionManager.instance.performMajor(cfStore);
         }
         SSTableReader sstable = cfStore.getSSTables().iterator().next();
         long position = sstable.getPosition(key, SSTableReader.Operator.EQ);

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
 Tue Sep 28 17:24:06 2010
@@ -67,7 +67,7 @@ public class SSTableReaderTest extends C
             rm.apply();
         }
         store.forceBlockingFlush();
-        CompactionManager.instance.submitMajor(store).get();
+        CompactionManager.instance.performMajor(store);
 
         List<Range> ranges = new ArrayList<Range>();
         // 1 key
@@ -108,7 +108,7 @@ public class SSTableReaderTest extends C
             rm.apply();
         }
         store.forceBlockingFlush();
-        CompactionManager.instance.submitMajor(store).get();
+        CompactionManager.instance.performMajor(store);
 
         // check that all our keys are found correctly
         SSTableReader sstable = store.getSSTables().iterator().next();


Reply via email to