Updated Branches: refs/heads/cassandra-2.0 ee553f32d -> c8915cea4 refs/heads/trunk a1be1a913 -> 6d099b416
Track and persist sstable read activity patch by Tyler Hobbs; reviewed by jbellis for CASSANDRA-5515 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c8915cea Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c8915cea Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c8915cea Branch: refs/heads/cassandra-2.0 Commit: c8915cea4ba58174208c083b145fecaf93c7e69a Parents: ee553f3 Author: Jonathan Ellis <[email protected]> Authored: Sat Sep 28 10:22:17 2013 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Sat Sep 28 10:22:17 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/config/CFMetaData.java | 9 ++++ .../org/apache/cassandra/config/KSMetaData.java | 3 +- .../cassandra/db/CollationController.java | 2 + .../org/apache/cassandra/db/DataTracker.java | 1 + .../org/apache/cassandra/db/SystemKeyspace.java | 55 +++++++++++++++++++- .../io/sstable/SSTableDeletingTask.java | 4 ++ .../cassandra/io/sstable/SSTableReader.java | 45 +++++++++++++--- 8 files changed, 111 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8915cea/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9e962ca..f62dcde 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.2 + * Track and persist sstable read activity (CASSANDRA-5515) * Fixes for speculative retry (CASSANDRA-5932) * Improve memory usage of metadata min/max column names (CASSANDRA-6077) * Fix thrift validation refusing row markers on CQL3 tables (CASSANDRA-6081) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8915cea/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 3a5309f..29df8c3 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -259,6 +259,15 @@ public final class CFMetaData + "PRIMARY KEY (row_key, cf_id)" + ") WITH COMMENT='in-progress paxos proposals'"); + public static final CFMetaData SSTableActivityCF = compile("CREATE TABLE " + SystemKeyspace.SSTABLE_ACTIVITY_CF + " (" + + "keyspace_name text," + + "columnfamily_name text," + + "generation int," + + "rate_15m double," + + "rate_120m double," + + "PRIMARY KEY ((keyspace_name, columnfamily_name, generation))" + + ") WITH COMMENT='historic sstable read rates'"); + public enum Caching { ALL, KEYS_ONLY, ROWS_ONLY, NONE; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8915cea/src/java/org/apache/cassandra/config/KSMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/KSMetaData.java b/src/java/org/apache/cassandra/config/KSMetaData.java index 198df8d..4b74dc6 100644 --- a/src/java/org/apache/cassandra/config/KSMetaData.java +++ b/src/java/org/apache/cassandra/config/KSMetaData.java @@ -90,7 +90,8 @@ public final class KSMetaData CFMetaData.SchemaColumnFamiliesCf, CFMetaData.SchemaColumnsCf, CFMetaData.CompactionLogCf, - CFMetaData.PaxosCf); + CFMetaData.PaxosCf, + CFMetaData.SSTableActivityCF); return new KSMetaData(Keyspace.SYSTEM_KS, LocalStrategy.class, Collections.<String, String>emptyMap(), true, cfDefs); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8915cea/src/java/org/apache/cassandra/db/CollationController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java index 859135d..758d523 100644 --- a/src/java/org/apache/cassandra/db/CollationController.java +++ b/src/java/org/apache/cassandra/db/CollationController.java @@ -246,6 +246,7 @@ public class CollationController continue; } + sstable.incrementReadCount(); OnDiskAtomIterator iter = filter.getSSTableColumnIterator(sstable); iterators.add(iter); if (iter.getColumnFamily() != null) @@ -268,6 +269,7 @@ public class CollationController if (sstable.getMaxTimestamp() <= minTimestamp) continue; + sstable.incrementReadCount(); OnDiskAtomIterator iter = filter.getSSTableColumnIterator(sstable); if (iter.getColumnFamily() == null) continue; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8915cea/src/java/org/apache/cassandra/db/DataTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java index f30ec1e..c2337ea 100644 --- a/src/java/org/apache/cassandra/db/DataTracker.java +++ b/src/java/org/apache/cassandra/db/DataTracker.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.StorageMetrics; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8915cea/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 0342dbb..1d5927a 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -28,6 +28,8 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Iterables; import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; + +import org.apache.cassandra.metrics.RestorableMeter; import org.apache.cassandra.transport.Server; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -77,6 +79,7 @@ public class SystemKeyspace public static final String SCHEMA_TRIGGERS_CF = "schema_triggers"; public static final String COMPACTION_LOG = "compactions_in_progress"; public static final String PAXOS_CF = "paxos"; + public static final String SSTABLE_ACTIVITY_CF = "sstable_activity"; private static final String LOCAL_KEY = "local"; private static final ByteBuffer ALL_LOCAL_NODE_ID_KEY = ByteBufferUtil.bytes("Local"); @@ -865,4 +868,54 @@ public class SystemKeyspace ByteBufferUtil.bytesToHex(commit.key), commit.update.id())); } -} + + /** + * Returns a RestorableMeter tracking the average read rate of a particular SSTable, restoring the last-seen rate + * from values in system.sstable_activity if present. + * @param keyspace the keyspace the sstable belongs to + * @param table the table the sstable belongs to + * @param generation the generation number for the sstable + */ + public static RestorableMeter getSSTableReadMeter(String keyspace, String table, int generation) + { + String cql = "SELECT * FROM %s WHERE keyspace_name='%s' and columnfamily_name='%s' and generation=%d"; + UntypedResultSet results = processInternal(String.format(cql, + SSTABLE_ACTIVITY_CF, + keyspace, + table, + generation)); + + if (results.isEmpty()) + return new RestorableMeter(); + + UntypedResultSet.Row row = results.one(); + double m15rate = row.getDouble("rate_15m"); + double m120rate = row.getDouble("rate_120m"); + return new RestorableMeter(m15rate, m120rate); + } + + /** + * Writes the current read rates for a given SSTable to system.sstable_activity + */ + public static void persistSSTableReadMeter(String keyspace, String table, int generation, RestorableMeter meter) + { + // Store values with a one-day TTL to handle corner cases where cleanup might not occur + String cql = "INSERT INTO %s (keyspace_name, columnfamily_name, generation, rate_15m, rate_120m) VALUES ('%s', '%s', %d, %f, %f) USING TTL 864000"; + processInternal(String.format(cql, + SSTABLE_ACTIVITY_CF, + keyspace, + table, + generation, + meter.fifteenMinuteRate(), + meter.twoHourRate())); + } + + /** + * Clears persisted read rates from system.sstable_activity for SSTables that have been deleted. + */ + public static void clearSSTableReadMeter(String keyspace, String table, int generation) + { + String cql = "DELETE FROM %s WHERE keyspace_name='%s' AND columnfamily_name='%s' and generation=%d"; + processInternal(String.format(cql, SSTABLE_ACTIVITY_CF, keyspace, table, generation)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8915cea/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java index a577999..fb7f036 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.DataTracker; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; @@ -70,6 +71,9 @@ public class SSTableDeletingTask implements Runnable if (tracker != null) tracker.notifyDeleting(referent); + if (referent.readMeter != null) + SystemKeyspace.clearSSTableReadMeter(referent.getKeyspaceName(), referent.getColumnFamilyName(), referent.descriptor.generation); + // If we can't successfully delete the DATA component, set the task to be retried later: see above File datafile = new File(desc.filenameFor(Component.DATA)); if (!datafile.delete()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8915cea/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 4da579c..abd7c9f 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -39,11 +40,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.DataRange; -import org.apache.cassandra.db.DataTracker; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.RowIndexEntry; -import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.compaction.ICompactionScanner; @@ -53,6 +50,7 @@ import org.apache.cassandra.io.compress.CompressedRandomAccessReader; import org.apache.cassandra.io.compress.CompressedThrottledReader; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.util.*; +import org.apache.cassandra.metrics.RestorableMeter; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tracing.Tracing; @@ -68,6 +66,9 @@ public class SSTableReader extends SSTable implements Closeable { private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class); + private static final ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1); + private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0); + /** * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an uppper bound * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created @@ -105,6 +106,8 @@ public class SSTableReader extends SSTable implements Closeable private final AtomicLong keyCacheHit = new AtomicLong(0); private final AtomicLong keyCacheRequest = new AtomicLong(0); + public final RestorableMeter readMeter; + public static long getApproximateKeyCount(Iterable<SSTableReader> sstables, CFMetaData metadata) { long count = 0; @@ -311,7 +314,7 @@ public class SSTableReader extends SSTable implements Closeable } - private SSTableReader(Descriptor desc, + private SSTableReader(final Descriptor desc, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, @@ -322,7 +325,25 @@ public class SSTableReader extends SSTable implements Closeable this.sstableMetadata = sstableMetadata; this.maxDataAge = maxDataAge; - this.deletingTask = new SSTableDeletingTask(this); + deletingTask = new SSTableDeletingTask(this); + + // Don't track read rates for tables in the system keyspace + if (Keyspace.SYSTEM_KS.equals(desc.ksname)) + { + readMeter = null; + return; + } + + readMeter = SystemKeyspace.getSSTableReadMeter(desc.ksname, desc.cfname, desc.generation); + // sync the average read rate to system.sstable_activity every five minutes, starting one minute from now + syncExecutor.scheduleAtFixedRate(new Runnable() + { + public void run() + { + meterSyncThrottle.acquire(); + SystemKeyspace.persistSSTableReadMeter(desc.ksname, desc.cfname, desc.generation, readMeter); + } + }, 1, 5, TimeUnit.MINUTES); } private SSTableReader(Descriptor desc, @@ -1432,6 +1453,16 @@ public class SSTableReader extends SSTable implements Closeable } } + /** + * Increment the total row read count and read rate for this SSTable. This should not be incremented for range + * slice queries, row cache hits, or non-query reads, like compaction. + */ + public void incrementReadCount() + { + if (readMeter != null) + readMeter.mark(); + } + protected class EmptyCompactionScanner implements ICompactionScanner { private final String filename;
