Author: jbellis
Date: Wed Dec 23 19:20:16 2009
New Revision: 893605
URL: http://svn.apache.org/viewvc?rev=893605&view=rev
Log:
make estimation of pendingtasks for CompactionManager sane. patch by jbellis;
reviewed by eevans for CASSANDRA-599
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManagerMBean.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=893605&r1=893604&r2=893605&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Wed Dec 23 19:20:16 2009
@@ -559,7 +559,7 @@
public void addSSTable(SSTableReader sstable)
{
ssTables_.add(sstable);
- CompactionManager.instance.submitMinor(this);
+ CompactionManager.instance.submitMinorIfNeeded(this);
}
/*
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=893605&r1=893604&r2=893605&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
Wed Dec 23 19:20:16 2009
@@ -23,13 +23,12 @@
import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.management.*;
import org.apache.log4j.Logger;
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.*;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -37,6 +36,7 @@
import org.apache.cassandra.service.AntiEntropyService;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.FBUtilities;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import java.net.InetAddress;
@@ -68,44 +68,56 @@
}
}
- private ExecutorService compactor_ = new
JMXEnabledThreadPoolExecutor("COMPACTION-POOL");
+ private CompactionExecutor executor = new CompactionExecutor();
+ private Map<ColumnFamilyStore, Integer> estimatedCompactions = new
NonBlockingHashMap<ColumnFamilyStore, Integer>();
/**
* Call this whenever a compaction might be needed on the given
columnfamily.
* It's okay to over-call (within reason) since the compactions are
single-threaded,
* and if a call is unnecessary, it will just be no-oped in the bucketing
phase.
*/
- public Future<Integer> submitMinor(final ColumnFamilyStore cfs)
+ public Future<Integer> submitMinorIfNeeded(final ColumnFamilyStore cfs)
{
Callable<Integer> callable = new Callable<Integer>()
{
public Integer call() throws IOException
{
- int filesCompacted = 0;
- if (minimumCompactionThreshold > 0 &&
maximumCompactionThreshold > 0)
+ if (minimumCompactionThreshold <= 0 ||
maximumCompactionThreshold <= 0)
{
- logger.debug("Checking to see if compaction of " +
cfs.columnFamily_ + " would be useful");
- for (List<SSTableReader> sstables :
getCompactionBuckets(cfs.getSSTables(), 50L * 1024L * 1024L))
+ logger.debug("Compaction is currently disabled.");
+ return 0;
+ }
+ logger.debug("Checking to see if compaction of " +
cfs.columnFamily_ + " would be useful");
+ Set<List<SSTableReader>> buckets =
getCompactionBuckets(cfs.getSSTables(), 50L * 1024L * 1024L);
+ updateEstimateFor(cfs, buckets);
+
+ for (List<SSTableReader> sstables : buckets)
+ {
+ if (sstables.size() >= minimumCompactionThreshold)
{
- if (sstables.size() < minimumCompactionThreshold)
- {
- continue;
- }
// if we have too many to compact all at once, compact
older ones first -- this avoids
// re-compacting files we just created.
Collections.sort(sstables);
- filesCompacted += doCompaction(cfs,
sstables.subList(0, Math.min(sstables.size(), maximumCompactionThreshold)),
getDefaultGCBefore());
+ return doCompaction(cfs, sstables.subList(0,
Math.min(sstables.size(), maximumCompactionThreshold)), getDefaultGCBefore());
}
- logger.debug(filesCompacted + " files compacted");
}
- else
- {
- logger.debug("Compaction is currently disabled.");
- }
- return filesCompacted;
+ return 0;
}
};
- return compactor_.submit(callable);
+ return executor.submit(callable);
+ }
+
+ private void updateEstimateFor(ColumnFamilyStore cfs,
Set<List<SSTableReader>> buckets)
+ {
+ int n = 0;
+ for (List<SSTableReader> sstables : buckets)
+ {
+ if (sstables.size() >= minimumCompactionThreshold)
+ {
+ n += 1 + sstables.size() / (maximumCompactionThreshold -
minimumCompactionThreshold);
+ }
+ }
+ estimatedCompactions.put(cfs, n);
}
public Future<Object> submitCleanup(final ColumnFamilyStore cfStore)
@@ -118,7 +130,7 @@
return this;
}
};
- return compactor_.submit(runnable);
+ return executor.submit(runnable);
}
public Future<List<SSTableReader>> submitAnticompaction(final
ColumnFamilyStore cfStore, final Collection<Range> ranges, final InetAddress
target)
@@ -130,7 +142,7 @@
return doAntiCompaction(cfStore, cfStore.getSSTables(),
ranges, target);
}
};
- return compactor_.submit(callable);
+ return executor.submit(callable);
}
public Future submitMajor(final ColumnFamilyStore cfStore)
@@ -165,7 +177,7 @@
return this;
}
};
- return compactor_.submit(callable);
+ return executor.submit(callable);
}
public Future submitReadonly(final ColumnFamilyStore cfStore, final
InetAddress initiator)
@@ -178,7 +190,7 @@
return this;
}
};
- return compactor_.submit(callable);
+ return executor.submit(callable);
}
/**
@@ -258,6 +270,7 @@
SSTableWriter writer;
CompactionIterator ci = new CompactionIterator(sstables, gcBefore,
major); // retain a handle so we can call close()
Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci,
PredicateUtils.notNullPredicate());
+ executor.beginCompaction(cfs, ci);
try
{
@@ -293,7 +306,7 @@
SSTableReader ssTable =
writer.closeAndOpenReader(DatabaseDescriptor.getKeysCachedFraction(table.name));
cfs.replaceCompactedSSTables(sstables, Arrays.asList(ssTable));
gcAfterRpcTimeout();
- instance.submitMinor(cfs);
+ submitMinorIfNeeded(cfs);
String format = "Compacted to %s. %d/%d bytes for %d keys. Time:
%dms.";
long dTime = System.currentTimeMillis() - startTime;
@@ -341,6 +354,7 @@
SSTableWriter writer = null;
CompactionIterator ci = new AntiCompactionIterator(sstables, ranges,
getDefaultGCBefore(), cfs.isCompleteSSTables(sstables));
Iterator<CompactionIterator.CompactedRow> nni = new FilterIterator(ci,
PredicateUtils.notNullPredicate());
+ executor.beginCompaction(cfs, ci);
try
{
@@ -403,6 +417,7 @@
{
Collection<SSTableReader> sstables = cfs.getSSTables();
CompactionIterator ci = new CompactionIterator(sstables,
getDefaultGCBefore(), true);
+ executor.beginCompaction(cfs, ci);
try
{
Iterator<CompactionIterator.CompactedRow> nni = new
FilterIterator(ci, PredicateUtils.notNullPredicate());
@@ -528,21 +543,99 @@
}
}
- public void checkAllColumnFamilies()
+ public void checkAllColumnFamilies() throws IOException
{
+ // perform estimates
for (String tableName : DatabaseDescriptor.getTables())
{
- try
+ for (final ColumnFamilyStore cfs :
Table.open(tableName).getColumnFamilyStores().values())
{
- for (ColumnFamilyStore cfs :
Table.open(tableName).getColumnFamilyStores().values())
+ Runnable runnable = new Runnable()
{
- submitMinor(cfs);
- }
+ public void run ()
+ {
+ logger.debug("Estimating compactions for " +
cfs.columnFamily_);
+ final Set<List<SSTableReader>> buckets =
getCompactionBuckets(cfs.getSSTables(), 50L * 1024L * 1024L);
+ updateEstimateFor(cfs, buckets);
+ }
+ };
+ executor.submit(runnable);
}
- catch (IOException e)
+ }
+
+ // actually schedule compactions. done in a second pass so all the
estimates occur before we
+ // bog down the executor in actual compactions.
+ for (String tableName : DatabaseDescriptor.getTables())
+ {
+ for (final ColumnFamilyStore cfs :
Table.open(tableName).getColumnFamilyStores().values())
{
- throw new RuntimeException(e);
+ submitMinorIfNeeded(cfs);
}
}
}
+
+ private class CompactionExecutor extends DebuggableThreadPoolExecutor
+ {
+ private volatile ColumnFamilyStore cfs;
+ private volatile CompactionIterator ci;
+
+ public CompactionExecutor()
+ {
+ super("COMPACTION-POOL");
+ }
+
+ @Override
+ public void afterExecute(Runnable r, Throwable t)
+ {
+ super.afterExecute(r, t);
+ cfs = null;
+ ci = null;
+ }
+
+ void beginCompaction(ColumnFamilyStore cfs, CompactionIterator ci)
+ {
+ this.cfs = cfs;
+ this.ci = ci;
+ }
+
+ public String getColumnFamilyName()
+ {
+ return cfs == null ? null : cfs.getColumnFamilyName();
+ }
+
+ public Long getBytesTotal()
+ {
+ return ci == null ? null : ci.getTotalBytes();
+ }
+
+ public Long getBytesCompleted()
+ {
+ return ci == null ? null : ci.getBytesRead();
+ }
+ }
+
+ public String getColumnFamilyInProgress()
+ {
+ return executor.getColumnFamilyName();
+ }
+
+ public Long getBytesTotalInProgress()
+ {
+ return executor.getBytesTotal();
+ }
+
+ public Long getBytesCompacted()
+ {
+ return executor.getBytesCompleted();
+ }
+
+ public int getPendingTasks()
+ {
+ int n = 0;
+ for (Integer i : estimatedCompactions.values())
+ {
+ n += i;
+ }
+ return n;
+ }
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManagerMBean.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManagerMBean.java?rev=893605&r1=893604&r2=893605&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManagerMBean.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManagerMBean.java
Wed Dec 23 19:20:16 2009
@@ -39,4 +39,24 @@
* Sets the maximum number of sstables in queue before compaction kicks off
*/
public void setMaximumCompactionThreshold(int threshold);
+
+ /**
+ * @return the columnfamily currently being compacted; null if none
+ */
+ public String getColumnFamilyInProgress();
+
+ /**
+ * @return the total (data, not including index and filter) bytes being
compacted; null if none
+ */
+ public Long getBytesTotalInProgress();
+
+ /**
+ * @return the progress on the current compaction; null if none
+ */
+ public Long getBytesCompacted();
+
+ /**
+ * @return estimated number of compactions remaining to perform
+ */
+ public int getPendingTasks();
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=893605&r1=893604&r2=893605&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
Wed Dec 23 19:20:16 2009
@@ -45,10 +45,15 @@
private final List<IteratingRow> rows = new ArrayList<IteratingRow>();
private final int gcBefore;
- private boolean major;
+ private final boolean major;
+
+ private long totalBytes;
+ private long bytesRead;
+ private long row;
public CompactionIterator(Iterable<SSTableReader> sstables, int gcBefore,
boolean major) throws IOException
{
+<<<<<<< HEAD
this(getCollatingIterator(sstables), gcBefore, major);
}
@@ -56,6 +61,15 @@
protected CompactionIterator(Iterator iter, int gcBefore, boolean major)
{
super(iter);
+=======
+ super(getCollatingIterator(sstables));
+ row = 0;
+ totalBytes = bytesRead = 0;
+ for (SSTableScanner iter :
(List<SSTableScanner>)((CollatingIterator)source).getIterators())
+ {
+ totalBytes += iter.getFileLength();
+ }
+>>>>>>> make estimation of pendingtasks for CompactionManager sane
this.gcBefore = gcBefore;
this.major = major;
}
@@ -135,6 +149,14 @@
finally
{
rows.clear();
+ if ((row++ % 1000) == 0)
+ {
+ bytesRead = 0;
+ for (SSTableScanner iter :
(List<SSTableScanner>)((CollatingIterator)source).getIterators())
+ {
+ bytesRead += iter.getFilePointer();
+ }
+ }
}
return new CompactedRow(key, buffer);
}
@@ -147,6 +169,16 @@
}
}
+ public long getTotalBytes()
+ {
+ return totalBytes;
+ }
+
+ public long getBytesRead()
+ {
+ return bytesRead;
+ }
+
public static class CompactedRow
{
public final DecoratedKey key;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=893605&r1=893604&r2=893605&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Wed
Dec 23 19:20:16 2009
@@ -198,11 +198,6 @@
}
}
- public long dataBytesOnDisk()
- {
- return new File(path).length();
- }
-
public long bytesOnDisk()
{
long bytes = 0;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java?rev=893605&r1=893604&r2=893605&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java
Wed Dec 23 19:20:16 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.Closeable;
+import java.io.IOError;
import java.util.Iterator;
import java.util.Arrays;
@@ -72,6 +73,23 @@
}
}
+ public long getFileLength()
+ {
+ try
+ {
+ return file.length();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ public long getFilePointer()
+ {
+ return file.getFilePointer();
+ }
+
public boolean hasNext()
{
if (iterator == null)
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=893605&r1=893604&r2=893605&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
Wed Dec 23 19:20:16 2009
@@ -66,7 +66,7 @@
}
while (true)
{
- Future<Integer> ft = CompactionManager.instance.submitMinor(store);
+ Future<Integer> ft =
CompactionManager.instance.submitMinorIfNeeded(store);
if (ft.get() == 0)
break;
}