use data size ratio in liveRatio instead of live size : serialized throughput patch by jbellis; reviewed by slebresne for CASSANDRA-4399
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/67dec69f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/67dec69f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/67dec69f Branch: refs/heads/cassandra-1.1 Commit: 67dec69f53d2bfd3818fea4ede40e5d5a6b2356b Parents: 8674784 Author: Jonathan Ellis <[email protected]> Authored: Mon Jul 2 01:40:38 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Tue Jul 3 11:44:46 2012 -0500 ---------------------------------------------------------------------- .../cassandra/db/AbstractColumnContainer.java | 8 ++- .../db/AbstractThreadUnsafeSortedColumns.java | 6 +++ .../apache/cassandra/db/AtomicSortedColumns.java | 31 +++++++++++++- .../org/apache/cassandra/db/ISortedColumns.java | 7 +++ src/java/org/apache/cassandra/db/Memtable.java | 24 +++++------ 5 files changed, 57 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/67dec69f/src/java/org/apache/cassandra/db/AbstractColumnContainer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AbstractColumnContainer.java b/src/java/org/apache/cassandra/db/AbstractColumnContainer.java index c7922b1..c35c63c 100644 --- a/src/java/org/apache/cassandra/db/AbstractColumnContainer.java +++ b/src/java/org/apache/cassandra/db/AbstractColumnContainer.java @@ -84,9 +84,11 @@ public abstract class AbstractColumnContainer implements IColumnContainer, IIter columns.maybeResetDeletionTimes(gcBefore); } - /** - * We need to go through each column in the column container and resolve it before adding - */ + public long addAllWithSizeDelta(AbstractColumnContainer cc, Allocator allocator, Function<IColumn, IColumn> transformation) + { + return columns.addAllWithSizeDelta(cc.columns, allocator, transformation); + } + public void addAll(AbstractColumnContainer cc, Allocator allocator, Function<IColumn, IColumn> transformation) { columns.addAll(cc.columns, allocator, transformation); http://git-wip-us.apache.org/repos/asf/cassandra/blob/67dec69f/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java index b09b5ee..1360336 100644 --- a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java +++ b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java @@ -93,6 +93,12 @@ public abstract class AbstractThreadUnsafeSortedColumns implements ISortedColumn // having to care about the deletion infos protected abstract void addAllColumns(ISortedColumns columns, Allocator allocator, Function<IColumn, IColumn> transformation); + public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation) + { + // sizeDelta is only needed by memtable updates which should not be using thread-unsafe containers + throw new UnsupportedOperationException(); + } + public void addAll(ISortedColumns columns, Allocator allocator, Function<IColumn, IColumn> transformation) { addAllColumns(columns, allocator, transformation); http://git-wip-us.apache.org/repos/asf/cassandra/blob/67dec69f/src/java/org/apache/cassandra/db/AtomicSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java index 5fdc0f6..9cb44d2 100644 --- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java +++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java @@ -154,6 +154,11 @@ public class AtomicSortedColumns implements ISortedColumns public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation) { + addAllWithSizeDelta(cm, allocator, transformation); + } + + public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation) + { /* * This operation needs to atomicity and isolation. To that end, we * add the new column to a copy of the map (a cheap O(1) snapTree @@ -166,9 +171,12 @@ public class AtomicSortedColumns implements ISortedColumns * we bail early, avoiding unnecessary work if possible. */ Holder current, modified; + long sizeDelta; + main_loop: do { + sizeDelta = 0; current = ref.get(); DeletionInfo newDelInfo = current.deletionInfo; if (newDelInfo.markedForDeleteAt < cm.getDeletionInfo().markedForDeleteAt) @@ -177,13 +185,15 @@ public class AtomicSortedColumns implements ISortedColumns for (IColumn column : cm.getSortedColumns()) { - modified.addColumn(transformation.apply(column), allocator); + sizeDelta += modified.addColumn(transformation.apply(column), allocator); // bail early if we know we've been beaten if (ref.get() != current) continue main_loop; } } while (!ref.compareAndSet(current, modified)); + + return sizeDelta; } public boolean replace(IColumn oldColumn, IColumn newColumn) @@ -329,16 +339,26 @@ public class AtomicSortedColumns implements ISortedColumns return new Holder(new SnapTreeMap<ByteBuffer, IColumn>(map.comparator()), deletionInfo); } - void addColumn(IColumn column, Allocator allocator) + long addColumn(IColumn column, Allocator allocator) { ByteBuffer name = column.name(); IColumn oldColumn; - while ((oldColumn = map.putIfAbsent(name, column)) != null) + long sizeDelta = 0; + while (true) { + oldColumn = map.putIfAbsent(name, column); + if (oldColumn == null) + { + sizeDelta += column.serializedSize(); + break; + } + if (oldColumn instanceof SuperColumn) { assert column instanceof SuperColumn; + long previousSize = oldColumn.serializedSize(); ((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator); + sizeDelta += oldColumn.serializedSize() - previousSize; break; // Delegated to SuperColumn } else @@ -346,12 +366,17 @@ public class AtomicSortedColumns implements ISortedColumns // calculate reconciled col from old (existing) col and new col IColumn reconciledColumn = column.reconcile(oldColumn, allocator); if (map.replace(name, oldColumn, reconciledColumn)) + { + sizeDelta += reconciledColumn.serializedSize() - oldColumn.serializedSize(); break; + } // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying. // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.) } } + + return sizeDelta; } void retainAll(ISortedColumns columns) http://git-wip-us.apache.org/repos/asf/cassandra/blob/67dec69f/src/java/org/apache/cassandra/db/ISortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ISortedColumns.java b/src/java/org/apache/cassandra/db/ISortedColumns.java index 1200544..00c8ea5 100644 --- a/src/java/org/apache/cassandra/db/ISortedColumns.java +++ b/src/java/org/apache/cassandra/db/ISortedColumns.java @@ -68,6 +68,13 @@ public interface ISortedColumns extends IIterableColumns * add(c); * </code> * but is potentially faster. + * + * @return the difference in size seen after merging the given columns + */ + public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation); + + /** + * Adds the columns without necessarily computing the size delta */ public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation); http://git-wip-us.apache.org/repos/asf/cassandra/blob/67dec69f/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index f1dcc56..347b997 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -80,7 +80,7 @@ public class Memtable volatile static Memtable activelyMeasuring; private volatile boolean isFrozen; - private final AtomicLong currentThroughput = new AtomicLong(0); + private final AtomicLong currentSize = new AtomicLong(0); private final AtomicLong currentOperations = new AtomicLong(0); // We index the memtable by RowPosition only for the purpose of being able @@ -122,12 +122,12 @@ public class Memtable { // 25% fudge factor on the base throughput * liveRatio calculation. (Based on observed // pre-slabbing behavior -- not sure what accounts for this. May have changed with introduction of slabbing.) - return (long) (currentThroughput.get() * cfs.liveRatio * 1.25); + return (long) (currentSize.get() * cfs.liveRatio * 1.25); } public long getSerializedSize() { - return currentThroughput.get(); + return currentSize.get(); } public long getOperations() @@ -190,7 +190,7 @@ public class Memtable deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue()); objects += entry.getValue().getColumnCount(); } - double newRatio = (double) deepSize / currentThroughput.get(); + double newRatio = (double) deepSize / currentSize.get(); if (newRatio < MIN_SANE_LIVE_RATIO) { @@ -226,12 +226,6 @@ public class Memtable private void resolve(DecoratedKey key, ColumnFamily cf) { - currentThroughput.addAndGet(cf.size()); - currentOperations.addAndGet((cf.getColumnCount() == 0) - ? cf.isMarkedForDelete() ? 1 : 0 - : cf.getColumnCount()); - - ColumnFamily previous = columnFamilies.get(key); if (previous == null) @@ -244,7 +238,11 @@ public class Memtable previous = empty; } - previous.addAll(cf, allocator, localCopyFunction); + long sizeDelta = previous.addAllWithSizeDelta(cf, allocator, localCopyFunction); + currentSize.addAndGet(sizeDelta); + currentOperations.addAndGet((cf.getColumnCount() == 0) + ? cf.isMarkedForDelete() ? 1 : 0 + : cf.getColumnCount()); } // for debugging @@ -274,7 +272,7 @@ public class Memtable } long estimatedSize = (long) ((keySize // index entries + keySize // keys in data file - + currentThroughput.get()) // data + + currentSize.get()) // data * 1.2); // bloom filter and row index overhead SSTableReader ssTable; // errors when creating the writer that may leave empty temp files. @@ -325,7 +323,7 @@ public class Memtable public String toString() { return String.format("Memtable-%s@%s(%s/%s serialized/live bytes, %s ops)", - cfs.getColumnFamilyName(), hashCode(), currentThroughput, getLiveSize(), currentOperations); + cfs.getColumnFamilyName(), hashCode(), currentSize, getLiveSize(), currentOperations); } /**
