Updated Branches: refs/heads/cassandra-1.1 be969899c -> e0f4c7ccc
implement addAllWithSizeDelta for ThreadSafeSortedColumns (used in Memtable for supercolumns); see #4399 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e0f4c7cc Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e0f4c7cc Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e0f4c7cc Branch: refs/heads/cassandra-1.1 Commit: e0f4c7cccff023140b33ae2223fbb2f36e20265f Parents: be96989 Author: Jonathan Ellis <[email protected]> Authored: Tue Jul 3 12:29:24 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Tue Jul 3 12:29:24 2012 -0500 ---------------------------------------------------------------------- .../db/AbstractThreadUnsafeSortedColumns.java | 10 +---- .../cassandra/db/ArrayBackedSortedColumns.java | 3 +- .../apache/cassandra/db/AtomicSortedColumns.java | 19 ++------- .../cassandra/db/ThreadSafeSortedColumns.java | 30 ++++++++++++--- .../cassandra/db/TreeMapBackedSortedColumns.java | 3 +- 5 files changed, 33 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0f4c7cc/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java index 1360336..b50dda5 100644 --- a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java +++ b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java @@ -89,21 +89,13 @@ public abstract class AbstractThreadUnsafeSortedColumns implements ISortedColumn } } - // Implementations should implement this rather than addAll to avoid - // having to care about the deletion infos - protected abstract void addAllColumns(ISortedColumns columns, Allocator allocator, Function<IColumn, IColumn> transformation); - public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation) { // sizeDelta is only needed by memtable updates which should not be using thread-unsafe containers throw new UnsupportedOperationException(); } - public void addAll(ISortedColumns columns, Allocator allocator, Function<IColumn, IColumn> transformation) - { - addAllColumns(columns, allocator, transformation); - delete(columns.getDeletionInfo()); - } + public abstract void addAll(ISortedColumns columns, Allocator allocator, Function<IColumn, IColumn> transformation); public boolean isEmpty() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0f4c7cc/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java index 246b133..1ce3aac 100644 --- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java +++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java @@ -201,8 +201,9 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns return -mid - (result < 0 ? 1 : 2); } - protected void addAllColumns(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation) + public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation) { + delete(cm.getDeletionInfo()); if (cm.isEmpty()) return; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0f4c7cc/src/java/org/apache/cassandra/db/AtomicSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java index 9cb44d2..d421d56 100644 --- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java +++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java @@ -342,41 +342,30 @@ public class AtomicSortedColumns implements ISortedColumns long addColumn(IColumn column, Allocator allocator) { ByteBuffer name = column.name(); - IColumn oldColumn; - long sizeDelta = 0; while (true) { - oldColumn = map.putIfAbsent(name, column); + IColumn oldColumn = map.putIfAbsent(name, column); if (oldColumn == null) - { - sizeDelta += column.serializedSize(); - break; - } + return column.serializedSize(); if (oldColumn instanceof SuperColumn) { assert column instanceof SuperColumn; long previousSize = oldColumn.serializedSize(); ((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator); - sizeDelta += oldColumn.serializedSize() - previousSize; - break; // Delegated to SuperColumn + return oldColumn.serializedSize() - previousSize; } else { // calculate reconciled col from old (existing) col and new col IColumn reconciledColumn = column.reconcile(oldColumn, allocator); if (map.replace(name, oldColumn, reconciledColumn)) - { - sizeDelta += reconciledColumn.serializedSize() - oldColumn.serializedSize(); - break; - } + return reconciledColumn.serializedSize() - oldColumn.serializedSize(); // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying. // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.) } } - - return sizeDelta; } void retainAll(ISortedColumns columns) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0f4c7cc/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java b/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java index c82c658..4d5dd50 100644 --- a/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java +++ b/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java @@ -87,22 +87,31 @@ public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns i */ public void addColumn(IColumn column, Allocator allocator) { + addColumnInternal(column, allocator); + } + + private long addColumnInternal(IColumn column, Allocator allocator) + { ByteBuffer name = column.name(); - IColumn oldColumn; - while ((oldColumn = map.putIfAbsent(name, column)) != null) + while (true) { + IColumn oldColumn = map.putIfAbsent(name, column); + if (oldColumn == null) + return column.serializedSize(); + if (oldColumn instanceof SuperColumn) { assert column instanceof SuperColumn; + long previousSize = oldColumn.serializedSize(); ((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator); - break; // Delegated to SuperColumn + return oldColumn.serializedSize() - previousSize; } else { // calculate reconciled col from old (existing) col and new col IColumn reconciledColumn = column.reconcile(oldColumn, allocator); if (map.replace(name, oldColumn, reconciledColumn)) - break; + return reconciledColumn.serializedSize() - oldColumn.serializedSize(); // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying. // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.) @@ -113,10 +122,19 @@ public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns i /** * We need to go through each column in the column container and resolve it before adding */ - protected void addAllColumns(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation) + public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation) + { + addAllWithSizeDelta(cm, allocator, transformation); + } + + @Override + public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation) { + delete(cm.getDeletionInfo()); + long sizeDelta = 0; for (IColumn column : cm.getSortedColumns()) - addColumn(transformation.apply(column), allocator); + sizeDelta += addColumnInternal(transformation.apply(column), allocator); + return sizeDelta; } public boolean replace(IColumn oldColumn, IColumn newColumn) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0f4c7cc/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java index 8c4e76d..cd7cec3 100644 --- a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java +++ b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java @@ -116,8 +116,9 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn /** * We need to go through each column in the column container and resolve it before adding */ - protected void addAllColumns(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation) + public void addAll(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation) { + delete(cm.getDeletionInfo()); for (IColumn column : cm.getSortedColumns()) addColumn(transformation.apply(column), allocator); }
