Updated Branches: refs/heads/cassandra-1.2 6e2401e00 -> dfe493760 refs/heads/cassandra-2.0 cfa097cdd -> 1e0d9513b refs/heads/trunk 3700e0121 -> 494ba4582
add file_cache_size_in_mb setting patch by pyaskevich and jbellis for CASSANDRA-5661 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dfe49376 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dfe49376 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dfe49376 Branch: refs/heads/cassandra-1.2 Commit: dfe49376097cf73c04c0d8506782263a2e820cb0 Parents: 6e2401e Author: Jonathan Ellis <[email protected]> Authored: Tue Sep 17 16:49:11 2013 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Tue Sep 17 16:53:11 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 4 + .../org/apache/cassandra/config/Config.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 8 ++ .../cassandra/io/util/PoolingSegmentedFile.java | 14 +- .../cassandra/io/util/RandomAccessReader.java | 5 + .../cassandra/metrics/FileCacheMetrics.java | 64 +++++++++ .../cassandra/service/FileCacheService.java | 139 +++++++++++++++++++ 8 files changed, 229 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fb4f3f4..e4066ab 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2.10 + * add file_cache_size_in_mb setting (CASSANDRA-5661) * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982) * (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002) * Fix possible divide-by-zero in HHOM (CASSANDRA-5990) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 27ac09b..2916ed9 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -275,6 +275,10 @@ reduce_cache_capacity_to: 0.6 concurrent_reads: 32 concurrent_writes: 32 +# Total memory to use for sstable-reading buffers. Defaults to +# the smaller of 1/4 of heap or 512MB. +# file_cache_size_in_mb: 512 + # Total memory to use for memtables. Cassandra will flush the largest # memtable when this much memory is used. # If omitted, Cassandra will set it to 1/3 of the heap. http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index a924a4c..c5a4aa1 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -169,6 +169,8 @@ public class Config public String row_cache_provider = SerializingCacheProvider.class.getSimpleName(); public boolean populate_io_cache_on_flush = false; + public Integer file_cache_size_in_mb; + public boolean inter_dc_tcp_nodelay = true; public String memtable_allocator = "SlabAllocator"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 8e3cbe2..dbf0905 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -272,6 +272,9 @@ public class DatabaseDescriptor throw new ConfigurationException("concurrent_replicates must be at least 2"); } + if (conf.file_cache_size_in_mb == null) + conf.file_cache_size_in_mb = Math.min(512, (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576))); + if (conf.memtable_total_space_in_mb == null) conf.memtable_total_space_in_mb = (int) (Runtime.getRuntime().maxMemory() / (3 * 1048576)); if (conf.memtable_total_space_in_mb <= 0) @@ -1209,6 +1212,11 @@ public class DatabaseDescriptor return conf.memtable_flush_queue_size; } + public static int getFileCacheSizeInMB() + { + return conf.file_cache_size_in_mb; + } + public static int getTotalMemtableSpaceInMB() { // should only be called if estimatesRealMemtableSize() is true http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java index 4173d5a..892611c 100644 --- a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java @@ -17,13 +17,10 @@ */ package org.apache.cassandra.io.util; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.cassandra.service.FileCacheService; public abstract class PoolingSegmentedFile extends SegmentedFile { - public final Queue<RandomAccessReader> pool = new ConcurrentLinkedQueue<RandomAccessReader>(); - protected PoolingSegmentedFile(String path, long length) { super(path, length); @@ -36,9 +33,11 @@ public abstract class PoolingSegmentedFile extends SegmentedFile public FileDataInput getSegment(long position) { - RandomAccessReader reader = pool.poll(); + RandomAccessReader reader = FileCacheService.instance.get(path); + if (reader == null) reader = createReader(path); + reader.seek(position); return reader; } @@ -47,12 +46,11 @@ public abstract class PoolingSegmentedFile extends SegmentedFile public void recycle(RandomAccessReader reader) { - pool.add(reader); + FileCacheService.instance.put(reader); } public void cleanup() { - for (RandomAccessReader reader : pool) - reader.deallocate(); + FileCacheService.instance.invalidate(path); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/io/util/RandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java index 3210372..64c5cf7 100644 --- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java @@ -186,6 +186,11 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu return filePath; } + public int getBufferSize() + { + return buffer.length; + } + public void reset() { seek(markedPointer); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/metrics/FileCacheMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/FileCacheMetrics.java b/src/java/org/apache/cassandra/metrics/FileCacheMetrics.java new file mode 100644 index 0000000..9b21de6 --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/FileCacheMetrics.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.metrics; + +import java.util.concurrent.TimeUnit; + +import com.yammer.metrics.Metrics; +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.Meter; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.util.RatioGauge; +import org.apache.cassandra.service.FileCacheService; + +public class FileCacheMetrics +{ + /** Total number of hits */ + public final Meter hits; + /** Total number of requests */ + public final Meter requests; + /** hit rate */ + public final Gauge<Double> hitRate; + /** Total size of file cache, in bytes */ + public final Gauge<Long> size; + + public FileCacheMetrics() + { + hits = Metrics.newMeter(new MetricName(FileCacheService.class, "Hits"), "hits", TimeUnit.SECONDS); + requests = Metrics.newMeter(new MetricName(FileCacheService.class, "Requests"), "requests", TimeUnit.SECONDS); + hitRate = Metrics.newGauge(new MetricName(FileCacheService.class, "HitRate"), new RatioGauge() + { + protected double getNumerator() + { + return hits.count(); + } + + protected double getDenominator() + { + return requests.count(); + } + }); + size = Metrics.newGauge(new MetricName(FileCacheService.class, "Size"), new Gauge<Long>() + { + public Long value() + { + return FileCacheService.instance.sizeInBytes(); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/service/FileCacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/FileCacheService.java b/src/java/org/apache/cassandra/service/FileCacheService.java new file mode 100644 index 0000000..9dd1b15 --- /dev/null +++ b/src/java/org/apache/cassandra/service/FileCacheService.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.metrics.FileCacheMetrics; + +public class FileCacheService +{ + private static final Logger logger = LoggerFactory.getLogger(FileCacheService.class); + + private static final long MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getFileCacheSizeInMB() * 1024 * 1024; + private static final int AFTER_ACCESS_EXPIRATION = 512; // in millis + + public static FileCacheService instance = new FileCacheService(); + + private final Cache<String, Queue<RandomAccessReader>> cache; + private final FileCacheMetrics metrics = new FileCacheMetrics(); + public final Callable<Queue<RandomAccessReader>> cacheForPathCreator = new Callable<Queue<RandomAccessReader>>() + { + @Override + public Queue<RandomAccessReader> call() throws Exception + { + return new ConcurrentLinkedQueue<RandomAccessReader>(); + } + }; + + protected FileCacheService() + { + cache = CacheBuilder.<String, Queue<RandomAccessReader>>newBuilder() + .expireAfterAccess(AFTER_ACCESS_EXPIRATION, TimeUnit.MILLISECONDS) + .concurrencyLevel(DatabaseDescriptor.getConcurrentReaders()) + .removalListener(new RemovalListener<String, Queue<RandomAccessReader>>() + { + @Override + public void onRemoval(RemovalNotification<String, Queue<RandomAccessReader>> notification) + { + Queue<RandomAccessReader> cachedInstances = notification.getValue(); + + if (cachedInstances == null) + return; + + for (RandomAccessReader reader : cachedInstances) + reader.deallocate(); + } + }) + .build(); + } + + public RandomAccessReader get(String path) + { + metrics.requests.mark(); + + Queue<RandomAccessReader> instances = getCacheFor(path); + + if (instances == null) + return null; + + RandomAccessReader result = instances.poll(); + + if (result != null) + metrics.hits.mark(); + + return result; + } + + private Queue<RandomAccessReader> getCacheFor(String path) + { + try + { + return cache.get(path, cacheForPathCreator); + } + catch (ExecutionException e) + { + // if something bad happened, let's just carry on and return null + // as dysfunctional queue should not interrupt normal operation + logger.debug("Exception fetching cache", e); + } + + return null; + } + + public void put(RandomAccessReader instance) + { + // This wouldn't be precise sometimes when CRAR is used because + // there is a way for users to dynamically change the size of the buffer, + // but we don't expect that to happen frequently in production. + // Doing accounting this way also allows us to avoid atomic CAS operation on read path. + long memoryUsage = (cache.size() + 1) * instance.getBufferSize(); + + if (memoryUsage >= MEMORY_USAGE_THRESHOLD) + instance.deallocate(); + else + getCacheFor(instance.getPath()).add(instance); + } + + public void invalidate(String path) + { + cache.invalidate(path); + } + + public long sizeInBytes() + { + long n = 0; + for (Queue<RandomAccessReader> queue : cache.asMap().values()) + for (RandomAccessReader reader : queue) + n += reader.getBufferSize(); + return n; + } +}
