implement addAllWithSizeDelta for ThreadSafeSortedColumns (used in Memtable for supercolumns); see #4399
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/587ed053 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/587ed053 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/587ed053 Branch: refs/heads/trunk Commit: 587ed053e9159dd3660283df2aa5a89e7f2464d8 Parents: d2b60f2 Author: Jonathan Ellis <jbel...@apache.org> Authored: Tue Jul 3 12:29:24 2012 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Tue Jul 3 17:23:42 2012 -0500 ---------------------------------------------------------------------- .../db/AbstractThreadUnsafeSortedColumns.java | 10 +---- .../cassandra/db/ArrayBackedSortedColumns.java | 3 +- .../apache/cassandra/db/AtomicSortedColumns.java | 19 ++------- .../cassandra/db/ThreadSafeSortedColumns.java | 30 ++++++++++++--- .../cassandra/db/TreeMapBackedSortedColumns.java | 3 +- 5 files changed, 33 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/587ed053/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java index 90fa9b4..0e71cb3 100644 --- a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java +++ b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java @@ -90,21 +90,13 @@ public abstract class AbstractThreadUnsafeSortedColumns implements ISortedColumn } } - // Implementations should implement this rather than addAll to avoid - // having to care about the deletion infos - protected abstract void addAllColumns(ISortedColumns columns, Allocator allocator, Function<IColumn, IColumn> transformation); - public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation) { // sizeDelta is only needed by memtable updates which should not be using thread-unsafe containers throw new UnsupportedOperationException(); } - public void addAll(ISortedColumns columns, Allocator allocator, Function<IColumn, IColumn> transformation) - { - addAllColumns(columns, allocator, transformation); - delete(columns.getDeletionInfo()); - } + public abstract void addAll(ISortedColumns columns, Allocator allocator, Function<IColumn, IColumn> transformation); public boolean isEmpty() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/587ed053/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java index 4dc1e3e..d76b443 100644 --- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java +++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java @@ -207,8 +207,9 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns return -mid - (result < 0 ? 1 : 2); } - protected void addAllColumns(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation) + public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation) { + delete(cm.getDeletionInfo()); if (cm.isEmpty()) return; http://git-wip-us.apache.org/repos/asf/cassandra/blob/587ed053/src/java/org/apache/cassandra/db/AtomicSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java index c8f5f3a..f1629ab 100644 --- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java +++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java @@ -338,41 +338,30 @@ public class AtomicSortedColumns implements ISortedColumns long addColumn(IColumn column, Allocator allocator) { ByteBuffer name = column.name(); - IColumn oldColumn; - long sizeDelta = 0; while (true) { - oldColumn = map.putIfAbsent(name, column); + IColumn oldColumn = map.putIfAbsent(name, column); if (oldColumn == null) - { - sizeDelta += column.dataSize(); - break; - } + return column.dataSize(); if (oldColumn instanceof SuperColumn) { assert column instanceof SuperColumn; long previousSize = oldColumn.dataSize(); ((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator); - sizeDelta += oldColumn.dataSize() - previousSize; - break; // Delegated to SuperColumn + return oldColumn.dataSize() - previousSize; } else { // calculate reconciled col from old (existing) col and new col IColumn reconciledColumn = column.reconcile(oldColumn, allocator); if (map.replace(name, oldColumn, reconciledColumn)) - { - sizeDelta += reconciledColumn.dataSize() - oldColumn.dataSize(); - break; - } + return reconciledColumn.dataSize() - oldColumn.dataSize(); // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying. // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.) } } - - return sizeDelta; } void retainAll(ISortedColumns columns) http://git-wip-us.apache.org/repos/asf/cassandra/blob/587ed053/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java b/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java index beb33ac..0a32b1c 100644 --- a/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java +++ b/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java @@ -88,22 +88,31 @@ public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns i */ public void addColumn(IColumn column, Allocator allocator) { + addColumnInternal(column, allocator); + } + + private long addColumnInternal(IColumn column, Allocator allocator) + { ByteBuffer name = column.name(); - IColumn oldColumn; - while ((oldColumn = map.putIfAbsent(name, column)) != null) + while (true) { + IColumn oldColumn = map.putIfAbsent(name, column); + if (oldColumn == null) + return column.dataSize(); + if (oldColumn instanceof SuperColumn) { assert column instanceof SuperColumn; + long previousSize = oldColumn.dataSize(); ((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator); - break; // Delegated to SuperColumn + return oldColumn.dataSize() - previousSize; } else { // calculate reconciled col from old (existing) col and new col IColumn reconciledColumn = column.reconcile(oldColumn, allocator); if (map.replace(name, oldColumn, reconciledColumn)) - break; + return reconciledColumn.dataSize() - oldColumn.dataSize(); // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying. // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.) @@ -114,10 +123,19 @@ public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns i /** * We need to go through each column in the column container and resolve it before adding */ - protected void addAllColumns(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation) + public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation) + { + addAllWithSizeDelta(cm, allocator, transformation); + } + + @Override + public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation) { + delete(cm.getDeletionInfo()); + long sizeDelta = 0; for (IColumn column : cm.getSortedColumns()) - addColumn(transformation.apply(column), allocator); + sizeDelta += addColumnInternal(transformation.apply(column), allocator); + return sizeDelta; } public boolean replace(IColumn oldColumn, IColumn newColumn) http://git-wip-us.apache.org/repos/asf/cassandra/blob/587ed053/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java index 51779e3..96e1e5d 100644 --- a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java +++ b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java @@ -117,8 +117,9 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn /** * We need to go through each column in the column container and resolve it before adding */ - protected void addAllColumns(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation) + public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation) { + delete(cm.getDeletionInfo()); for (IColumn column : cm.getSortedColumns()) addColumn(transformation.apply(column), allocator); }