Updated Branches: refs/heads/cassandra-1.1 64d279289 -> 9f90a7997
Fix bug with counter in super columns patch by slebresne; reviewed by yukim for CASSANDRA-3821 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9f90a799 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9f90a799 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9f90a799 Branch: refs/heads/cassandra-1.1 Commit: 9f90a7997a08848e48f26526f9ffe7ae859d046c Parents: 64d2792 Author: Sylvain Lebresne <[email protected]> Authored: Wed Feb 22 09:41:31 2012 +0100 Committer: Sylvain Lebresne <[email protected]> Committed: Wed Feb 22 09:41:31 2012 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/CollationController.java | 6 +- src/java/org/apache/cassandra/db/Memtable.java | 3 +- .../cassandra/db/ThreadSafeSortedColumns.java | 184 +++++++++++++++ 4 files changed, 191 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f90a799/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e93778b..ade1ba8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -5,6 +5,7 @@ * Show Effective Owership via Nodetool ring <keyspace> (CASSANDRA-3412) * Update ORDER BY syntax for CQL3 (CASSANDRA-3925) * Fix BulkRecordWriter to not throw NPE if reducer gets no map data from Hadoop (CASSANDRA-3944) + * Fix bug with counters in super columns (CASSANDRA-3821) Merged from 1.0: * remove the wait on hint future during write (CASSANDRA-3870) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f90a799/src/java/org/apache/cassandra/db/CollationController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java index dd7464d..c9589bc 100644 --- a/src/java/org/apache/cassandra/db/CollationController.java +++ b/src/java/org/apache/cassandra/db/CollationController.java @@ -75,8 +75,9 @@ public class CollationController { logger.debug("collectTimeOrderedData"); + // AtomicSortedColumns doesn't work for super columns (see #3821) ISortedColumns.Factory factory = mutableColumns - ? AtomicSortedColumns.factory() + ? cfs.metadata.cfType == ColumnFamilyType.Super ? ThreadSafeSortedColumns.factory() : AtomicSortedColumns.factory() : TreeMapBackedSortedColumns.factory(); ColumnFamily container = ColumnFamily.create(cfs.metadata, factory, filter.filter.isReversed()); List<IColumnIterator> iterators = new ArrayList<IColumnIterator>(); @@ -208,8 +209,9 @@ public class CollationController private ColumnFamily collectAllData() { logger.debug("collectAllData"); + // AtomicSortedColumns doesn't work for super columns (see #3821) ISortedColumns.Factory factory = mutableColumns - ? AtomicSortedColumns.factory() + ? cfs.metadata.cfType == ColumnFamilyType.Super ? ThreadSafeSortedColumns.factory() : AtomicSortedColumns.factory() : ArrayBackedSortedColumns.factory(); List<IColumnIterator> iterators = new ArrayList<IColumnIterator>(); ColumnFamily returnCF = ColumnFamily.create(cfs.metadata, factory, filter.filter.isReversed()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f90a799/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index c0c6289..ddf2179 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -223,7 +223,8 @@ public class Memtable if (previous == null) { - ColumnFamily empty = cf.cloneMeShallow(AtomicSortedColumns.factory(), false); + // AtomicSortedColumns doesn't work for super columns (see #3821) + ColumnFamily empty = cf.cloneMeShallow(cf.isSuper() ? ThreadSafeSortedColumns.factory() : AtomicSortedColumns.factory(), false); // We'll add the columns later. This avoids wasting works if we get beaten in the putIfAbsent previous = columnFamilies.putIfAbsent(new DecoratedKey(key.token, allocator.clone(key.key)), empty); if (previous == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f90a799/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java b/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java new file mode 100644 index 0000000..c82c658 --- /dev/null +++ b/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Iterator; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.concurrent.ConcurrentSkipListMap; + +import com.google.common.base.Function; + +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.utils.Allocator; + +public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns implements ISortedColumns +{ + private final ConcurrentSkipListMap<ByteBuffer, IColumn> map; + + public static final ISortedColumns.Factory factory = new Factory() + { + public ISortedColumns create(AbstractType<?> comparator, boolean insertReversed) + { + return new ThreadSafeSortedColumns(comparator); + } + + public ISortedColumns fromSorted(SortedMap<ByteBuffer, IColumn> sortedMap, boolean insertReversed) + { + return new ThreadSafeSortedColumns(sortedMap); + } + }; + + public static ISortedColumns.Factory factory() + { + return factory; + } + + public AbstractType<?> getComparator() + { + return (AbstractType<?>)map.comparator(); + } + + private ThreadSafeSortedColumns(AbstractType<?> comparator) + { + this.map = new ConcurrentSkipListMap<ByteBuffer, IColumn>(comparator); + } + + private ThreadSafeSortedColumns(SortedMap<ByteBuffer, IColumn> columns) + { + this.map = new ConcurrentSkipListMap<ByteBuffer, IColumn>(columns); + } + + public ISortedColumns.Factory getFactory() + { + return factory(); + } + + public ISortedColumns cloneMe() + { + return new ThreadSafeSortedColumns(map); + } + + public boolean isInsertReversed() + { + return false; + } + + /* + * If we find an old column that has the same name + * the ask it to resolve itself else add the new column + */ + public void addColumn(IColumn column, Allocator allocator) + { + ByteBuffer name = column.name(); + IColumn oldColumn; + while ((oldColumn = map.putIfAbsent(name, column)) != null) + { + if (oldColumn instanceof SuperColumn) + { + assert column instanceof SuperColumn; + ((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator); + break; // Delegated to SuperColumn + } + else + { + // calculate reconciled col from old (existing) col and new col + IColumn reconciledColumn = column.reconcile(oldColumn, allocator); + if (map.replace(name, oldColumn, reconciledColumn)) + break; + + // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying. + // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.) + } + } + } + + /** + * We need to go through each column in the column container and resolve it before adding + */ + protected void addAllColumns(ISortedColumns cm, Allocator allocator, Function<IColumn, IColumn> transformation) + { + for (IColumn column : cm.getSortedColumns()) + addColumn(transformation.apply(column), allocator); + } + + public boolean replace(IColumn oldColumn, IColumn newColumn) + { + if (!oldColumn.name().equals(newColumn.name())) + throw new IllegalArgumentException(); + + return map.replace(oldColumn.name(), oldColumn, newColumn); + } + + public IColumn getColumn(ByteBuffer name) + { + return map.get(name); + } + + public void removeColumn(ByteBuffer name) + { + map.remove(name); + } + + public void clear() + { + map.clear(); + } + + public int size() + { + return map.size(); + } + + public Collection<IColumn> getSortedColumns() + { + return map.values(); + } + + public Collection<IColumn> getReverseSortedColumns() + { + return map.descendingMap().values(); + } + + public SortedSet<ByteBuffer> getColumnNames() + { + return map.navigableKeySet(); + } + + public Iterator<IColumn> iterator() + { + return map.values().iterator(); + } + + public Iterator<IColumn> reverseIterator() + { + return getReverseSortedColumns().iterator(); + } + + public Iterator<IColumn> iterator(ByteBuffer start) + { + return map.tailMap(start).values().iterator(); + } + + public Iterator<IColumn> reverseIterator(ByteBuffer start) + { + return map.descendingMap().tailMap(start).values().iterator(); + } +}
