Author: jbellis
Date: Tue Sep 28 17:24:06 2010
New Revision: 1002275
URL: http://svn.apache.org/viewvc?rev=1002275&view=rev
Log:
make nodetool compact and cleanup blocking.
patch by Nirmal Ranganathan; reviewed by jbellis for CASSANDRA-1449
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Sep 28 17:24:06 2010
@@ -108,6 +108,7 @@ dev
* support TTL'd index values (CASSANDRA-1536)
* make removetoken work like decommission (CASSANDRA-1216)
* make cli comparator-aware and improve quote rules (CASSANDRA-1523,-1524)
+ * make nodetool compact and cleanup blocking (CASSANDRA-1449)
0.7-beta1
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue
Sep 28 17:24:06 2010
@@ -728,9 +728,9 @@ public class ColumnFamilyStore implement
return maxFile;
}
- void forceCleanup()
+ void forceCleanup() throws ExecutionException, InterruptedException
{
- CompactionManager.instance.submitCleanup(ColumnFamilyStore.this);
+ CompactionManager.instance.performCleanup(ColumnFamilyStore.this);
}
void markCompacted(Collection<SSTableReader> sstables)
@@ -1436,9 +1436,9 @@ public class ColumnFamilyStore implement
ssTables.getRowCache().remove(key);
}
- public void forceMajorCompaction()
+ public void forceMajorCompaction() throws InterruptedException,
ExecutionException
{
- CompactionManager.instance.submitMajor(this);
+ CompactionManager.instance.performMajor(this);
}
public void invalidateRowCache()
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
Tue Sep 28 17:24:06 2010
@@ -18,6 +18,7 @@
package org.apache.cassandra.db;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLongArray;
@@ -133,7 +134,7 @@ public interface ColumnFamilyStoreMBean
/**
* force a major compaction of this column family
*/
- public void forceMajorCompaction();
+ public void forceMajorCompaction() throws ExecutionException,
InterruptedException;
/**
* invalidate the row cache; for use after bulk loading via BinaryMemtable
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Tue
Sep 28 17:24:06 2010
@@ -25,6 +25,7 @@ import java.net.InetAddress;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -135,7 +136,7 @@ public class CompactionManager implement
}
}
- public Future<Object> submitCleanup(final ColumnFamilyStore cfStore)
+ public void performCleanup(final ColumnFamilyStore cfStore) throws
InterruptedException, ExecutionException
{
Callable<Object> runnable = new Callable<Object>()
{
@@ -145,7 +146,7 @@ public class CompactionManager implement
return this;
}
};
- return executor.submit(runnable);
+ executor.submit(runnable).get();
}
public Future<List<SSTableReader>> submitAnticompaction(final
ColumnFamilyStore cfStore, final Collection<Range> ranges, final InetAddress
target)
@@ -160,12 +161,12 @@ public class CompactionManager implement
return executor.submit(callable);
}
- public Future submitMajor(final ColumnFamilyStore cfStore)
+ public void performMajor(final ColumnFamilyStore cfStore) throws
InterruptedException, ExecutionException
{
- return submitMajor(cfStore, 0, (int) (System.currentTimeMillis() /
1000) - cfStore.metadata.gcGraceSeconds);
+ submitMajor(cfStore, 0, (int) (System.currentTimeMillis() / 1000) -
cfStore.metadata.gcGraceSeconds).get();
}
- public Future submitMajor(final ColumnFamilyStore cfStore, final long
skip, final int gcBefore)
+ public Future<Object> submitMajor(final ColumnFamilyStore cfStore, final
long skip, final int gcBefore)
{
Callable<Object> callable = new Callable<Object>()
{
@@ -195,7 +196,7 @@ public class CompactionManager implement
return executor.submit(callable);
}
- public Future submitValidation(final ColumnFamilyStore cfStore, final
AntiEntropyService.Validator validator)
+ public Future<Object> submitValidation(final ColumnFamilyStore cfStore,
final AntiEntropyService.Validator validator)
{
Callable<Object> callable = new Callable<Object>()
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Tue Sep 28
17:24:06 2010
@@ -138,7 +138,7 @@ public class Table
/**
* Do a cleanup of keys that do not belong locally.
*/
- public void forceCleanup()
+ public void forceCleanup() throws IOException, ExecutionException,
InterruptedException
{
if (name.equals(SYSTEM_TABLE))
throw new RuntimeException("Cleanup of the system table is neither
necessary nor wise");
@@ -146,8 +146,7 @@ public class Table
for (ColumnFamilyStore cfStore : columnFamilyStores.values())
cfStore.forceCleanup();
}
-
-
+
/**
* Take a snapshot of the entire set of column families with a given
timestamp.
*
@@ -201,10 +200,10 @@ public class Table
* This method is an ADMIN operation to force compaction
* of all SSTables on disk.
*/
- public void forceCompaction()
+ public void forceCompaction() throws IOException, ExecutionException,
InterruptedException
{
for (ColumnFamilyStore cfStore : columnFamilyStores.values())
- CompactionManager.instance.submitMajor(cfStore);
+ CompactionManager.instance.performMajor(cfStore);
}
/**
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Tue Sep 28 17:24:06 2010
@@ -25,6 +25,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.*;
+
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -1168,7 +1169,7 @@ public class StorageService implements I
return
Gossiper.instance.getCurrentGenerationNumber(FBUtilities.getLocalAddress());
}
- public void forceTableCleanup() throws IOException
+ public void forceTableCleanup() throws IOException, ExecutionException,
InterruptedException
{
List<String> tables = DatabaseDescriptor.getNonSystemTables();
for (String tName : tables)
@@ -1178,19 +1179,19 @@ public class StorageService implements I
}
}
- public void forceTableCleanup(String tableName) throws IOException
+ public void forceTableCleanup(String tableName) throws IOException,
ExecutionException, InterruptedException
{
Table table = getValidTable(tableName);
table.forceCleanup();
}
-
- public void forceTableCompaction() throws IOException
+
+ public void forceTableCompaction() throws IOException, ExecutionException,
InterruptedException
{
for (Table table : Table.all())
table.forceCompaction();
}
- public void forceTableCompaction(String tableName) throws IOException
+ public void forceTableCompaction(String tableName) throws IOException,
ExecutionException, InterruptedException
{
Table table = getValidTable(tableName);
table.forceCompaction();
@@ -1270,14 +1271,15 @@ public class StorageService implements I
* @param columnFamilies
* @throws IOException
*/
- public void forceTableFlush(final String tableName, final String...
columnFamilies) throws IOException
+ public void forceTableFlush(final String tableName, final String...
columnFamilies)
+ throws IOException, ExecutionException, InterruptedException
{
for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName,
columnFamilies))
{
logger_.debug("Forcing binary flush on keyspace " + tableName + ",
CF " + cfStore.getColumnFamilyName());
cfStore.forceFlushBinary();
logger_.debug("Forcing flush on keyspace " + tableName + ", CF " +
cfStore.getColumnFamilyName());
- cfStore.forceFlush();
+ cfStore.forceBlockingFlush();
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
Tue Sep 28 17:24:06 2010
@@ -132,22 +132,22 @@ public interface StorageServiceMBean
/**
* Forces major compaction (all sstable files compacted)
*/
- public void forceTableCompaction() throws IOException;
+ public void forceTableCompaction() throws IOException, ExecutionException,
InterruptedException;
/**
* Forces major compaction of a single keyspace
*/
- public void forceTableCompaction(String tableName) throws IOException;
+ public void forceTableCompaction(String tableName) throws IOException,
ExecutionException, InterruptedException;
/**
* Trigger a cleanup of keys on all tables.
*/
- public void forceTableCleanup() throws IOException;
+ public void forceTableCleanup() throws IOException, ExecutionException,
InterruptedException;
/**
* Trigger a cleanup of keys on a single keyspace
*/
- public void forceTableCleanup(String tableName) throws IOException;
+ public void forceTableCleanup(String tableName) throws IOException,
ExecutionException, InterruptedException;
/**
* Takes the snapshot for a given table.
@@ -176,7 +176,7 @@ public interface StorageServiceMBean
* @param columnFamilies
* @throws IOException
*/
- public void forceTableFlush(String tableName, String... columnFamilies)
throws IOException;
+ public void forceTableFlush(String tableName, String... columnFamilies)
throws IOException, ExecutionException, InterruptedException;
/**
* Triggers proactive repair for given column families, or all
columnfamilies for the given table
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Tue Sep 28
17:24:06 2010
@@ -427,17 +427,35 @@ public class NodeCmd {
}
else if (cmdName.equals("cleanup"))
{
- if (arguments.length > 1)
- probe.forceTableCleanup(arguments[1]);
- else
- probe.forceTableCleanup();
+ try
+ {
+ if (arguments.length > 1)
+ probe.forceTableCleanup(arguments[1]);
+ else
+ probe.forceTableCleanup();
+ }
+ catch (ExecutionException ee)
+ {
+ System.err.println("Error occured during Keyspace cleanup");
+ ee.printStackTrace();
+ System.exit(3);
+ }
}
else if (cmdName.equals("compact"))
{
- if (arguments.length > 1)
- probe.forceTableCompaction(arguments[1]);
- else
- probe.forceTableCompaction();
+ try
+ {
+ if (arguments.length > 1)
+ probe.forceTableCompaction(arguments[1]);
+ else
+ probe.forceTableCompaction();
+ }
+ catch (ExecutionException ee)
+ {
+ System.err.println("Error occured during Keyspace compaction");
+ ee.printStackTrace();
+ System.exit(3);
+ }
}
else if (cmdName.equals("cfstats"))
{
@@ -511,7 +529,15 @@ public class NodeCmd {
columnFamilies[i] = cmd.getArgs()[i + 2];
}
if (cmdName.equals("flush"))
- probe.forceTableFlush(cmd.getArgs()[1], columnFamilies);
+ try
+ {
+ probe.forceTableFlush(cmd.getArgs()[1], columnFamilies);
+ } catch (ExecutionException ee)
+ {
+ System.err.println("Error occured during flushing");
+ ee.printStackTrace();
+ System.exit(3);
+ }
else // cmdName.equals("repair")
probe.forceTableRepair(cmd.getArgs()[1], columnFamilies);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Tue Sep
28 17:24:06 2010
@@ -122,27 +122,27 @@ public class NodeProbe
mbeanServerConn, ManagementFactory.RUNTIME_MXBEAN_NAME,
RuntimeMXBean.class);
}
- public void forceTableCleanup() throws IOException
+ public void forceTableCleanup() throws IOException, ExecutionException,
InterruptedException
{
ssProxy.forceTableCleanup();
}
- public void forceTableCleanup(String tableName) throws IOException
+ public void forceTableCleanup(String tableName) throws IOException,
ExecutionException, InterruptedException
{
ssProxy.forceTableCleanup(tableName);
}
- public void forceTableCompaction() throws IOException
+ public void forceTableCompaction() throws IOException, ExecutionException,
InterruptedException
{
ssProxy.forceTableCompaction();
}
- public void forceTableCompaction(String tableName) throws IOException
+ public void forceTableCompaction(String tableName) throws IOException,
ExecutionException, InterruptedException
{
ssProxy.forceTableCompaction(tableName);
}
- public void forceTableFlush(String tableName, String... columnFamilies)
throws IOException
+ public void forceTableFlush(String tableName, String... columnFamilies)
throws IOException, ExecutionException, InterruptedException
{
ssProxy.forceTableFlush(tableName, columnFamilies);
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java Tue
Sep 28 17:24:06 2010
@@ -75,7 +75,7 @@ public class CompactionsTest extends Cle
}
if (store.getSSTables().size() > 1)
{
- CompactionManager.instance.submitMajor(store).get();
+ CompactionManager.instance.performMajor(store);
}
assertEquals(inserted.size(), Util.getRangeSlice(store).size());
}
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
Tue Sep 28 17:24:06 2010
@@ -50,7 +50,7 @@ public class OneCompactionTest extends C
store.forceBlockingFlush();
assertEquals(inserted.size(), Util.getRangeSlice(store).size());
}
- CompactionManager.instance.submitMajor(store).get();
+ CompactionManager.instance.performMajor(store);
assertEquals(1, store.getSSTables().size());
}
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
Tue Sep 28 17:24:06 2010
@@ -60,7 +60,7 @@ public class RemoveSuperColumnTest exten
store.forceBlockingFlush();
validateRemoveTwoSources(dk);
- CompactionManager.instance.submitMajor(store).get();
+ CompactionManager.instance.performMajor(store);
assertEquals(1, store.getSSTables().size());
validateRemoveCompacted(dk);
}
@@ -150,7 +150,7 @@ public class RemoveSuperColumnTest exten
store.forceBlockingFlush();
validateRemoveWithNewData(dk);
- CompactionManager.instance.submitMajor(store).get();
+ CompactionManager.instance.performMajor(store);
assertEquals(1, store.getSSTables().size());
validateRemoveWithNewData(dk);
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java Tue Sep 28
17:24:06 2010
@@ -392,7 +392,7 @@ public class TableTest extends CleanupHe
// compact so we have a big row with more than the minimum index count
if (cfStore.getSSTables().size() > 1)
{
- CompactionManager.instance.submitMajor(cfStore).get();
+ CompactionManager.instance.performMajor(cfStore);
}
SSTableReader sstable = cfStore.getSSTables().iterator().next();
long position = sstable.getPosition(key, SSTableReader.Operator.EQ);
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java?rev=1002275&r1=1002274&r2=1002275&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
Tue Sep 28 17:24:06 2010
@@ -67,7 +67,7 @@ public class SSTableReaderTest extends C
rm.apply();
}
store.forceBlockingFlush();
- CompactionManager.instance.submitMajor(store).get();
+ CompactionManager.instance.performMajor(store);
List<Range> ranges = new ArrayList<Range>();
// 1 key
@@ -108,7 +108,7 @@ public class SSTableReaderTest extends C
rm.apply();
}
store.forceBlockingFlush();
- CompactionManager.instance.submitMajor(store).get();
+ CompactionManager.instance.performMajor(store);
// check that all our keys are found correctly
SSTableReader sstable = store.getSSTables().iterator().next();