Updated Branches: refs/heads/cassandra-2.0 d7bf566ae -> 65773b1cd
Add more hooks for compaction strategy implementations patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for CASSANDRA-6111 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65773b1c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65773b1c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65773b1c Branch: refs/heads/cassandra-2.0 Commit: 65773b1cdcec02eef0d73c15890fd0071bb78949 Parents: d7bf566 Author: Aleksey Yeschenko <[email protected]> Authored: Mon Sep 30 01:28:09 2013 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Mon Sep 30 01:28:09 2013 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 36 ++++++++------------ .../org/apache/cassandra/db/DataTracker.java | 14 +++++--- .../org/apache/cassandra/db/MeteredFlusher.java | 18 +++++----- .../compaction/AbstractCompactionStrategy.java | 30 ++++++++++++++++ .../MemtableRenewedNotification.java | 30 ++++++++++++++++ 6 files changed, 95 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/65773b1c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f62dcde..061ad12 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ * Create snapshot dir if it does not exist when migrating leveled manifest (CASSANDRA-6093) * make sequential nodetool repair the default (CASSANDRA-5950) + * Add more hooks for compaction strategy implementations (CASSANDRA-6111) Merged from 1.2: * Allow estimated memtable size to exceed slab allocator size (CASSANDRA-6078) * Start MeteredFlusher earlier to prevent OOM during CL replay (CASSANDRA-6087) http://git-wip-us.apache.org/repos/asf/cassandra/blob/65773b1c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 719e90f..0fd55f0 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1071,9 +1071,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean void replaceFlushed(Memtable memtable, SSTableReader sstable) { - data.replaceFlushed(memtable, sstable); - if (sstable != null) - CompactionManager.instance.submitBackground(this); + compactionStrategy.replaceFlushed(memtable, sstable); } public boolean isValid() @@ -1406,7 +1404,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { List<SSTableReader> findSSTables(DataTracker.View view) { - return view.intervalTree.search(key); + return compactionStrategy.filterSSTablesForReads(view.intervalTree.search(key)); } }); } @@ -1421,7 +1419,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { List<SSTableReader> findSSTables(DataTracker.View view) { - return sstablesForRowBounds(rowBounds, view); + return compactionStrategy.filterSSTablesForReads(sstablesForRowBounds(rowBounds, view)); } }); } @@ -1921,26 +1919,22 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // that was part of the flushed we forced; otherwise on a tie, it won't get deleted. Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS); } - else + + // nuke the memtable data w/o writing to disk first + Keyspace.switchLock.writeLock().lock(); + try { - // just nuke the memtable data w/o writing to disk first - Keyspace.switchLock.writeLock().lock(); - try - { - for (ColumnFamilyStore cfs : concatWithIndexes()) - { - Memtable mt = cfs.getMemtableThreadSafe(); - if (!mt.isClean()) - { - mt.cfs.data.renewMemtable(); - } - } - } - finally + for (ColumnFamilyStore cfs : concatWithIndexes()) { - Keyspace.switchLock.writeLock().unlock(); + Memtable mt = cfs.getMemtableThreadSafe(); + if (!mt.isClean()) + mt.cfs.data.renewMemtable(); } } + finally + { + Keyspace.switchLock.writeLock().unlock(); + } Runnable truncateRunnable = new Runnable() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/65773b1c/src/java/org/apache/cassandra/db/DataTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java index c2337ea..1a19fef 100644 --- a/src/java/org/apache/cassandra/db/DataTracker.java +++ b/src/java/org/apache/cassandra/db/DataTracker.java @@ -33,11 +33,7 @@ import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.StorageMetrics; -import org.apache.cassandra.notifications.INotification; -import org.apache.cassandra.notifications.INotificationConsumer; -import org.apache.cassandra.notifications.SSTableAddedNotification; -import org.apache.cassandra.notifications.SSTableDeletingNotification; -import org.apache.cassandra.notifications.SSTableListChangedNotification; +import org.apache.cassandra.notifications.*; import org.apache.cassandra.utils.Interval; import org.apache.cassandra.utils.IntervalTree; @@ -133,6 +129,7 @@ public class DataTracker newView = currentView.renewMemtable(newMemtable); } while (!view.compareAndSet(currentView, newView)); + notifyRenewed(currentView.memtable); } public void replaceFlushed(Memtable memtable, SSTableReader sstable) @@ -433,6 +430,13 @@ public class DataTracker subscriber.handleNotification(notification, this); } + public void notifyRenewed(Memtable renewed) + { + INotification notification = new MemtableRenewedNotification(renewed); + for (INotificationConsumer subscriber : subscribers) + subscriber.handleNotification(notification, this); + } + public void subscribe(INotificationConsumer consumer) { subscribers.add(consumer); http://git-wip-us.apache.org/repos/asf/cassandra/blob/65773b1c/src/java/org/apache/cassandra/db/MeteredFlusher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/MeteredFlusher.java b/src/java/org/apache/cassandra/db/MeteredFlusher.java index f16b8a0..f1a3ac9 100644 --- a/src/java/org/apache/cassandra/db/MeteredFlusher.java +++ b/src/java/org/apache/cassandra/db/MeteredFlusher.java @@ -23,7 +23,6 @@ import java.util.Comparator; import java.util.List; import com.google.common.collect.Iterables; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +58,7 @@ public class MeteredFlusher implements Runnable + DatabaseDescriptor.getFlushWriters() + DatabaseDescriptor.getFlushQueueSize()) / (1 + cfs.indexManager.getIndexesBackedByCfs().size())); - if (totalMemtableBytesUnused > 0 && size > totalMemtableBytesUnused / maxInFlight) + if (cfs.getCompactionStrategy().isAffectedByMeteredFlusher() && totalMemtableBytesUnused > 0 && size > totalMemtableBytesUnused / maxInFlight) { logger.info("flushing high-traffic column family {} (estimated {} bytes)", cfs, size); cfs.forceFlush(); @@ -102,12 +101,15 @@ public class MeteredFlusher implements Runnable break; ColumnFamilyStore cfs = sorted.remove(sorted.size() - 1); - long size = cfs.getTotalMemtableLiveSize(); - if (size == 0) - break; - logger.info("flushing {} to free up {} bytes", cfs, size); - liveBytes -= size; - cfs.forceFlush(); + if (cfs.getCompactionStrategy().isAffectedByMeteredFlusher()) + { + long size = cfs.getTotalMemtableLiveSize(); + if (size == 0) + break; + logger.info("flushing {} to free up {} bytes", cfs, size); + liveBytes -= size; + cfs.forceFlush(); + } } } finally http://git-wip-us.apache.org/repos/asf/cassandra/blob/65773b1c/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 3db2469..b63caab 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Memtable; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; @@ -184,6 +185,35 @@ public abstract class AbstractCompactionStrategy } /** + * @return whether or not MeteredFlusher should be able to trigger memtable flushes for this CF. + */ + public boolean isAffectedByMeteredFlusher() + { + return true; + } + + /** + * Handle a flushed memtable. + * + * @param memtable the flushed memtable + * @param sstable the written sstable. can be null if the memtable was clean. + */ + public void replaceFlushed(Memtable memtable, SSTableReader sstable) + { + cfs.getDataTracker().replaceFlushed(memtable, sstable); + if (sstable != null) + CompactionManager.instance.submitBackground(cfs); + } + + /** + * @return a subset of the suggested sstables that are relevant for read requests. + */ + public List<SSTableReader> filterSSTablesForReads(List<SSTableReader> sstables) + { + return sstables; + } + + /** * Filters SSTables that are to be blacklisted from the given collection * * @param originalCandidates The collection to check for blacklisted SSTables http://git-wip-us.apache.org/repos/asf/cassandra/blob/65773b1c/src/java/org/apache/cassandra/notifications/MemtableRenewedNotification.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/notifications/MemtableRenewedNotification.java b/src/java/org/apache/cassandra/notifications/MemtableRenewedNotification.java new file mode 100644 index 0000000..4c7e6c5 --- /dev/null +++ b/src/java/org/apache/cassandra/notifications/MemtableRenewedNotification.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.notifications; + +import org.apache.cassandra.db.Memtable; + +public class MemtableRenewedNotification implements INotification +{ + public final Memtable renewed; + + public MemtableRenewedNotification(Memtable renewed) + { + this.renewed = renewed; + } +}
