Updated Branches: refs/heads/cassandra-1.1 eef93e7d6 -> 092dc586f
Make scrub and cleanup operations throttled patch by Vijay; reviewed by yukim for CASSANDRA-4100 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/092dc586 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/092dc586 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/092dc586 Branch: refs/heads/cassandra-1.1 Commit: 092dc586f556b1c2bef048d9ee8672240a31f442 Parents: eef93e7 Author: Vijay Parthasarathy <[email protected]> Authored: Tue Apr 10 17:34:18 2012 -0700 Committer: Vijay Parthasarathy <[email protected]> Committed: Tue Apr 10 17:34:18 2012 -0700 ---------------------------------------------------------------------- .../db/compaction/AbstractCompactionIterable.java | 20 -------------- .../db/compaction/CompactionController.java | 21 +++++++++++++++ .../db/compaction/CompactionIterable.java | 2 +- .../cassandra/db/compaction/CompactionManager.java | 6 ++++ .../db/compaction/ParallelCompactionIterable.java | 2 +- 5 files changed, 29 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/092dc586/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java index e05a64c..95e6590 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java @@ -28,12 +28,9 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableScanner; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.CloseableIterator; -import org.apache.cassandra.utils.Throttle; public abstract class AbstractCompactionIterable extends CompactionInfo.Holder implements Iterable<AbstractCompactedRow> { @@ -45,8 +42,6 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i protected volatile long bytesRead; protected final List<SSTableScanner> scanners; - protected final Throttle throttle; - public AbstractCompactionIterable(CompactionController controller, OperationType type, List<SSTableScanner> scanners) { this.controller = controller; @@ -58,21 +53,6 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i for (SSTableScanner scanner : scanners) bytes += scanner.getFileLength(); this.totalBytes = bytes; - - this.throttle = new Throttle(toString(), new Throttle.ThroughputFunction() - { - /** @return Instantaneous throughput target in bytes per millisecond. */ - public int targetThroughput() - { - if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 || StorageService.instance.isBootstrapMode()) - // throttling disabled - return 0; - // total throughput - int totalBytesPerMS = DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000; - // per stream throughput (target bytes per MS) - return totalBytesPerMS / Math.max(1, CompactionManager.instance.getActiveCompactions()); - } - }); } protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/092dc586/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index 1da6f9c..9eaefe7 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -30,6 +30,8 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.io.sstable.SSTableIdentityIterator; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.service.CacheService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.Throttle; import org.apache.cassandra.utils.IntervalTree.Interval; import org.apache.cassandra.utils.IntervalTree.IntervalTree; @@ -47,6 +49,20 @@ public class CompactionController public final int gcBefore; public boolean keyExistenceIsExpensive; public final int mergeShardBefore; + private final Throttle throttle = new Throttle("Cassandra_Throttle", new Throttle.ThroughputFunction() + { + /** @return Instantaneous throughput target in bytes per millisecond. */ + public int targetThroughput() + { + if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 || StorageService.instance.isBootstrapMode()) + // throttling disabled + return 0; + // total throughput + int totalBytesPerMS = DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000; + // per stream throughput (target bytes per MS) + return totalBytesPerMS / Math.max(1, CompactionManager.instance.getActiveCompactions()); + } + }); public CompactionController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize) { @@ -153,4 +169,9 @@ public class CompactionController { return getCompactedRow(Collections.singletonList(row)); } + + public void mayThrottle(long currentBytes) + { + throttle.throttle(currentBytes); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/092dc586/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java index eb88489..8a4aa0e 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java @@ -119,7 +119,7 @@ public class CompactionIterable extends AbstractCompactionIterable for (SSTableScanner scanner : scanners) n += scanner.getFilePointer(); bytesRead = n; - throttle.throttle(bytesRead); + controller.mayThrottle(bytesRead); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/092dc586/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 37361cf..d14a13a 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -468,6 +468,7 @@ public class CompactionManager implements CompactionManagerMBean // row header (key or data size) is corrupt. (This means our position in the index file will be one row // "ahead" of the data file.) final RandomAccessReader dataFile = sstable.openDataReader(true); + long rowsRead = 0; RandomAccessReader indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true); ScrubInfo scrubInfo = new ScrubInfo(dataFile, sstable); executor.beginCompaction(scrubInfo); @@ -606,6 +607,8 @@ public class CompactionManager implements CompactionManagerMBean badRows++; } } + if ((rowsRead++ % 1000) == 0) + controller.mayThrottle(dataFile.getFilePointer()); } if (writer.getFilePointer() > 0) @@ -689,6 +692,7 @@ public class CompactionManager implements CompactionManagerMBean throw new IOException("disk full"); SSTableScanner scanner = sstable.getDirectScanner(); + long rowsRead = 0; Collection<ByteBuffer> indexedColumns = cfs.indexManager.getIndexedColumns(); List<IColumn> indexedColumnsInRow = null; @@ -748,6 +752,8 @@ public class CompactionManager implements CompactionManagerMBean } } } + if ((rowsRead++ % 1000) == 0) + controller.mayThrottle(scanner.getFilePointer()); } if (writer != null) newSstable = writer.closeAndOpenReader(sstable.maxDataAge); http://git-wip-us.apache.org/repos/asf/cassandra/blob/092dc586/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java index 764a549..03a29cd 100644 --- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java @@ -167,7 +167,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable for (SSTableScanner scanner : scanners) n += scanner.getFilePointer(); bytesRead = n; - throttle.throttle(bytesRead); + controller.mayThrottle(bytesRead); } return compacted; }
