Updated Branches: refs/heads/cassandra-2.0 6b3fe5ee7 -> d280e970e
Update memtable size while flushing patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for CASSANDRA-6249 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d280e970 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d280e970 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d280e970 Branch: refs/heads/cassandra-2.0 Commit: d280e970e934447f42144cd8651ea678497a9785 Parents: 6b3fe5e Author: Aleksey Yeschenko <[email protected]> Authored: Sun Oct 27 23:34:10 2013 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Sun Oct 27 23:34:10 2013 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamily.java | 8 ++++++++ .../apache/cassandra/db/ColumnFamilyStore.java | 20 +++++++++++++++++--- .../org/apache/cassandra/db/DataTracker.java | 9 +++++++++ src/java/org/apache/cassandra/db/Memtable.java | 7 ++++++- 5 files changed, 41 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d280e970/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1052901..7e6ba95 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -2,6 +2,7 @@ * Fix modifying column_metadata from thrift (CASSANDRA-6182) * cqlsh: fix LIST USERS output (CASSANDRA-6242) * Add IRequestSink interface (CASSANDRA-6248) + * Update memtable size while flushing (CASSANDRA-6249) 2.0.2 http://git-wip-us.apache.org/repos/asf/cassandra/blob/d280e970/src/java/org/apache/cassandra/db/ColumnFamily.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java index 4031ebc..7b5642a 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamily.java +++ b/src/java/org/apache/cassandra/db/ColumnFamily.java @@ -319,6 +319,14 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry return ObjectSizes.measureDeep(this); } + public long dataSize() + { + long size = 0; + for (Column column : this) + size += column.dataSize(); + return size; + } + public long maxTimestamp() { long maxTimestamp = deletionInfo().maxTimestamp(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d280e970/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 597ca53..4346224 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -884,11 +884,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return removeDeletedCF(cf, gcBefore); } - private static void removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer) + private static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer) { Iterator<Column> iter = cf.iterator(); DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester(); boolean hasDroppedColumns = !cf.metadata.getDroppedColumns().isEmpty(); + long removedBytes = 0; while (iter.hasNext()) { Column c = iter.next(); @@ -900,13 +901,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { iter.remove(); indexer.remove(c); + removedBytes += c.dataSize(); } } + return removedBytes; } - public static void removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore) + public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore) { - removeDeletedColumnsOnly(cf, gcBefore, SecondaryIndexManager.nullUpdater); + return removeDeletedColumnsOnly(cf, gcBefore, SecondaryIndexManager.nullUpdater); } // returns true if @@ -1094,6 +1097,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return getMemtableDataSize() + indexManager.getTotalLiveSize(); } + /** + * @return the live size of all the memtables (the current active one and pending flush). + */ + public long getAllMemtablesLiveSize() + { + long size = 0; + for (Memtable mt : getDataTracker().getAllMemtables()) + size += mt.getLiveSize(); + return size; + } + public int getMemtableSwitchCount() { return (int) metric.memtableSwitchCount.count(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d280e970/src/java/org/apache/cassandra/db/DataTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java index 365d607..1c25f44 100644 --- a/src/java/org/apache/cassandra/db/DataTracker.java +++ b/src/java/org/apache/cassandra/db/DataTracker.java @@ -62,6 +62,15 @@ public class DataTracker return view.get().memtablesPendingFlush; } + /** + * @return the active memtable and all the memtables that are pending flush. + */ + public Iterable<Memtable> getAllMemtables() + { + View snapshot = view.get(); + return Iterables.concat(snapshot.memtablesPendingFlush, Collections.singleton(snapshot.memtable)); + } + public Set<SSTableReader> getSSTables() { return view.get().sstables; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d280e970/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 9a8f810..12d36bf 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -233,6 +233,7 @@ public class Memtable private Iterator<Map.Entry<RowPosition, AtomicSortedColumns>> iter = stopAt.isMinimum(cfs.partitioner) ? rows.tailMap(startWith).entrySet().iterator() : rows.subMap(startWith, true, stopAt, true).entrySet().iterator(); + private Map.Entry<RowPosition, AtomicSortedColumns> currentEntry; public boolean hasNext() { @@ -242,6 +243,8 @@ public class Memtable public Map.Entry<DecoratedKey, AtomicSortedColumns> next() { Map.Entry<RowPosition, AtomicSortedColumns> entry = iter.next(); + // Store the reference to the current entry so that remove() can update the current size. + currentEntry = entry; // Actual stored key should be true DecoratedKey assert entry.getKey() instanceof DecoratedKey; // Object cast is required since otherwise we can't turn RowPosition into DecoratedKey @@ -251,6 +254,8 @@ public class Memtable public void remove() { iter.remove(); + currentSize.addAndGet(-currentEntry.getValue().dataSize()); + currentEntry = null; } }; } @@ -355,7 +360,7 @@ public class Memtable // the table has secondary indexes, or else the stale entries wouldn't be cleaned up during compaction, // and will only be dropped during 2i query read-repair, if at all. if (!cfs.indexManager.hasIndexes()) - ColumnFamilyStore.removeDeletedColumnsOnly(cf, Integer.MIN_VALUE); + currentSize.addAndGet(-ColumnFamilyStore.removeDeletedColumnsOnly(cf, Integer.MIN_VALUE)); } writer.append((DecoratedKey)entry.getKey(), cf); }
