Updated Branches:
refs/heads/trunk aa83c7f2f -> f774b7fc3
Make scrub and cleanup operations throttled
patch by Vijay; reviewed by yukim for CASSANDRA-4100
resolved Conflicts in:
src/java/org/apache/cassandra/db/compaction/CompactionController.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f774b7fc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f774b7fc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f774b7fc
Branch: refs/heads/trunk
Commit: f774b7fc396f4fec611247159023c58863de5f85
Parents: aa83c7f
Author: Vijay Parthasarathy <[email protected]>
Authored: Tue Apr 10 17:34:18 2012 -0700
Committer: Vijay Parthasarathy <[email protected]>
Committed: Tue Apr 10 17:41:03 2012 -0700
----------------------------------------------------------------------
.../db/compaction/AbstractCompactionIterable.java | 20 --------------
.../db/compaction/CompactionController.java | 21 +++++++++++++++
.../db/compaction/CompactionIterable.java | 2 +-
.../cassandra/db/compaction/CompactionManager.java | 6 ++++
.../db/compaction/ParallelCompactionIterable.java | 2 +-
5 files changed, 29 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f774b7fc/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
----------------------------------------------------------------------
diff --git
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
index d5bd3d4..2a590bf 100644
---
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
+++
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
@@ -24,12 +24,9 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableScanner;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.CloseableIterator;
-import org.apache.cassandra.utils.Throttle;
public abstract class AbstractCompactionIterable extends CompactionInfo.Holder
implements Iterable<AbstractCompactedRow>
{
@@ -41,8 +38,6 @@ public abstract class AbstractCompactionIterable extends
CompactionInfo.Holder i
protected volatile long bytesRead;
protected final List<SSTableScanner> scanners;
- protected final Throttle throttle;
-
public AbstractCompactionIterable(CompactionController controller,
OperationType type, List<SSTableScanner> scanners)
{
this.controller = controller;
@@ -54,21 +49,6 @@ public abstract class AbstractCompactionIterable extends
CompactionInfo.Holder i
for (SSTableScanner scanner : scanners)
bytes += scanner.getFileLength();
this.totalBytes = bytes;
-
- this.throttle = new Throttle(toString(), new
Throttle.ThroughputFunction()
- {
- /** @return Instantaneous throughput target in bytes per
millisecond. */
- public int targetThroughput()
- {
- if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1
|| StorageService.instance.isBootstrapMode())
- // throttling disabled
- return 0;
- // total throughput
- int totalBytesPerMS =
DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000;
- // per stream throughput (target bytes per MS)
- return totalBytesPerMS / Math.max(1,
CompactionManager.instance.getActiveCompactions());
- }
- });
}
protected static List<SSTableScanner> getScanners(Iterable<SSTableReader>
sstables) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f774b7fc/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git
a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index cb4e87a..f7d5354 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -31,6 +31,8 @@ import org.apache.cassandra.db.DataTracker;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Throttle;
import org.apache.cassandra.utils.IntervalTree.Interval;
import org.apache.cassandra.utils.IntervalTree.IntervalTree;
@@ -46,6 +48,20 @@ public class CompactionController
public final int gcBefore;
public final int mergeShardBefore;
+ private final Throttle throttle = new Throttle("Cassandra_Throttle", new
Throttle.ThroughputFunction()
+ {
+ /** @return Instantaneous throughput target in bytes per millisecond.
*/
+ public int targetThroughput()
+ {
+ if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 ||
StorageService.instance.isBootstrapMode())
+ // throttling disabled
+ return 0;
+ // total throughput
+ int totalBytesPerMS =
DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000;
+ // per stream throughput (target bytes per MS)
+ return totalBytesPerMS / Math.max(1,
CompactionManager.instance.getActiveCompactions());
+ }
+ });
public CompactionController(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
{
@@ -119,4 +135,9 @@ public class CompactionController
{
return getCompactedRow(Collections.singletonList(row));
}
+
+ public void mayThrottle(long currentBytes)
+ {
+ throttle.throttle(currentBytes);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f774b7fc/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git
a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index ba4e3c2..b50066c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -115,7 +115,7 @@ public class CompactionIterable extends
AbstractCompactionIterable
for (SSTableScanner scanner : scanners)
n += scanner.getFilePointer();
bytesRead = n;
- throttle.throttle(bytesRead);
+ controller.mayThrottle(bytesRead);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f774b7fc/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 36f05c1..5e19ce2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -467,6 +467,7 @@ public class CompactionManager implements
CompactionManagerMBean
// row header (key or data size) is corrupt. (This means our position
in the index file will be one row
// "ahead" of the data file.)
final RandomAccessReader dataFile = sstable.openDataReader(true);
+ long rowsRead = 0;
RandomAccessReader indexFile = RandomAccessReader.open(new
File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
ScrubInfo scrubInfo = new ScrubInfo(dataFile, sstable);
executor.beginCompaction(scrubInfo);
@@ -607,6 +608,8 @@ public class CompactionManager implements
CompactionManagerMBean
badRows++;
}
}
+ if ((rowsRead++ % 1000) == 0)
+ controller.mayThrottle(dataFile.getFilePointer());
}
if (writer.getFilePointer() > 0)
@@ -690,6 +693,7 @@ public class CompactionManager implements
CompactionManagerMBean
throw new IOException("disk full");
SSTableScanner scanner = sstable.getDirectScanner();
+ long rowsRead = 0;
Collection<ByteBuffer> indexedColumns =
cfs.indexManager.getIndexedColumns();
List<IColumn> indexedColumnsInRow = null;
@@ -749,6 +753,8 @@ public class CompactionManager implements
CompactionManagerMBean
}
}
}
+ if ((rowsRead++ % 1000) == 0)
+ controller.mayThrottle(scanner.getFilePointer());
}
if (writer != null)
newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f774b7fc/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git
a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index a04bb91..c9fab64 100644
---
a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++
b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -164,7 +164,7 @@ public class ParallelCompactionIterable extends
AbstractCompactionIterable
for (SSTableScanner scanner : scanners)
n += scanner.getFilePointer();
bytesRead = n;
- throttle.throttle(bytesRead);
+ controller.mayThrottle(bytesRead);
}
return compacted;
}