Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
        CHANGES.txt
        src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
        src/java/org/apache/cassandra/db/ColumnFamilyStore.java
        test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06cd494c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06cd494c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06cd494c

Branch: refs/heads/cassandra-2.1
Commit: 06cd494c1496ec96886ed41ff3207847631986c9
Parents: 0c2eaa9 cc5fb19
Author: Aleksey Yeschenko <alek...@apache.org>
Authored: Thu Jan 22 03:17:55 2015 +0300
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Thu Jan 22 03:17:55 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/ArrayBackedSortedColumns.java  | 96 +++++++++++++++++--
 .../apache/cassandra/db/AtomicBTreeColumns.java |  5 +
 .../org/apache/cassandra/db/ColumnFamily.java   |  6 ++
 .../apache/cassandra/db/ColumnFamilyStore.java  |  7 +-
 .../cassandra/utils/BatchRemoveIterator.java    | 32 +++++++
 .../db/ArrayBackedSortedColumnsTest.java        | 99 +++++++++++++++++++-
 7 files changed, 236 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 9cd8189,0d08cce..a94ca04
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,70 -1,9 +1,71 @@@
 -2.0.13:
 - * Round up time deltas lower than 1ms in BulkLoader (CASSANDRA-8645)
 +2.1.3
 + * Duplicate rows returned when in clause has repeated values (CASSANDRA-6707)
 + * Add tooling to detect hot partitions (CASSANDRA-7974)
 + * Fix cassandra-stress user-mode truncation of partition generation 
(CASSANDRA-8608)
 + * Only stream from unrepaired sstables during inc repair (CASSANDRA-8267)
 + * Don't allow starting multiple inc repairs on the same sstables 
(CASSANDRA-8316)
 + * Invalidate prepared BATCH statements when related tables
 +   or keyspaces are dropped (CASSANDRA-8652)
 + * Fix missing results in secondary index queries on collections
 +   with ALLOW FILTERING (CASSANDRA-8421)
 + * Expose EstimatedHistogram metrics for range slices (CASSANDRA-8627)
 + * (cqlsh) Escape clqshrc passwords properly (CASSANDRA-8618)
 + * Fix NPE when passing wrong argument in ALTER TABLE statement 
(CASSANDRA-8355)
 + * Pig: Refactor and deprecate CqlStorage (CASSANDRA-8599)
 + * Don't reuse the same cleanup strategy for all sstables (CASSANDRA-8537)
 + * Fix case-sensitivity of index name on CREATE and DROP INDEX
 +   statements (CASSANDRA-8365)
 + * Better detection/logging for corruption in compressed sstables 
(CASSANDRA-8192)
 + * Use the correct repairedAt value when closing writer (CASSANDRA-8570)
 + * (cqlsh) Handle a schema mismatch being detected on startup (CASSANDRA-8512)
 + * Properly calculate expected write size during compaction (CASSANDRA-8532)
 + * Invalidate affected prepared statements when a table's columns
 +   are altered (CASSANDRA-7910)
 + * Stress - user defined writes should populate sequentally (CASSANDRA-8524)
 + * Fix regression in SSTableRewriter causing some rows to become unreadable 
 +   during compaction (CASSANDRA-8429)
 + * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
 + * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
 +   is disabled (CASSANDRA-8288)
 + * (cqlsh) Fix DESCRIBE output after keyspaces are altered (CASSANDRA-7623)
 + * Make sure we set lastCompactedKey correctly (CASSANDRA-8463)
 + * (cqlsh) Fix output of CONSISTENCY command (CASSANDRA-8507)
 + * (cqlsh) Fixed the handling of LIST statements (CASSANDRA-8370)
 + * Make sstablescrub check leveled manifest again (CASSANDRA-8432)
 + * Check first/last keys in sstable when giving out positions (CASSANDRA-8458)
 + * Disable mmap on Windows (CASSANDRA-6993)
 + * Add missing ConsistencyLevels to cassandra-stress (CASSANDRA-8253)
 + * Add auth support to cassandra-stress (CASSANDRA-7985)
 + * Fix ArrayIndexOutOfBoundsException when generating error message
 +   for some CQL syntax errors (CASSANDRA-8455)
 + * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
 + * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
 + * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
 + * Ensure memtable flush cannot expire commit log entries from its future 
(CASSANDRA-8383)
 + * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
 + * Remove tmplink files for offline compactions (CASSANDRA-8321)
 + * Reduce maxHintsInProgress (CASSANDRA-8415)
 + * BTree updates may call provided update function twice (CASSANDRA-8018)
 + * Release sstable references after anticompaction (CASSANDRA-8386)
 + * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
 + * Fix high size calculations for prepared statements (CASSANDRA-8231)
 + * Centralize shared executors (CASSANDRA-8055)
 + * Fix filtering for CONTAINS (KEY) relations on frozen collection
 +   clustering columns when the query is restricted to a single
 +   partition (CASSANDRA-8203)
 + * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243)
 + * Add more log info if readMeter is null (CASSANDRA-8238)
 + * add check of the system wall clock time at startup (CASSANDRA-8305)
 + * Support for frozen collections (CASSANDRA-7859)
 + * Fix overflow on histogram computation (CASSANDRA-8028)
 + * Have paxos reuse the timestamp generation of normal queries 
(CASSANDRA-7801)
 + * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
 + * Improve JBOD disk utilization (CASSANDRA-7386)
 + * Log failed host when preparing incremental repair (CASSANDRA-8228)
 + * Force config client mode in CQLSSTableWriter (CASSANDRA-8281)
 +Merged from 2.0:
+  * Add batch remove iterator to ABSC (CASSANDRA-8414)
 -
 -
 -2.0.12:
 + * Round up time deltas lower than 1ms in BulkLoader (CASSANDRA-8645)
   * Use more efficient slice size for querying internal secondary
     index tables (CASSANDRA-8550)
   * Fix potentially returning deleted rows with range tombstone 
(CASSANDRA-8558)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
index b5ed8d2,8d553be..64752e3
--- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
@@@ -17,42 -17,31 +17,38 @@@
   */
  package org.apache.cassandra.db;
  
- import java.util.AbstractCollection;
- import java.util.Arrays;
- import java.util.Collection;
- import java.util.Comparator;
- import java.util.Iterator;
 -import java.nio.ByteBuffer;
+ import java.util.*;
  
  import com.google.common.base.Function;
  import com.google.common.collect.AbstractIterator;
  import com.google.common.collect.Iterables;
 -import com.google.common.collect.Lists;
  
- import net.nicoulaj.compilecommand.annotations.Inline;
  import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.db.composites.Composite;
  import org.apache.cassandra.db.filter.ColumnSlice;
 -import org.apache.cassandra.db.marshal.AbstractType;
 -import org.apache.cassandra.utils.Allocator;
+ import org.apache.cassandra.utils.BatchRemoveIterator;
 +import org.apache.cassandra.utils.memory.AbstractAllocator;
  
  /**
 - * A ColumnFamily backed by an ArrayList.
 + * A ColumnFamily backed by an array.
   * This implementation is not synchronized and should only be used when
   * thread-safety is not required. This implementation makes sense when the
 - * main operations performed are iterating over the map and adding columns
 + * main operations performed are iterating over the cells and adding cells
   * (especially if insertion is in sorted order).
   */
 -public class ArrayBackedSortedColumns extends 
AbstractThreadUnsafeSortedColumns
 +public class ArrayBackedSortedColumns extends ColumnFamily
  {
 +    private static final Cell[] EMPTY_ARRAY = new Cell[0];
 +    private static final int MINIMAL_CAPACITY = 10;
 +
      private final boolean reversed;
 -    private final ArrayList<Column> columns;
 +
 +    private DeletionInfo deletionInfo;
 +    private Cell[] cells;
 +    private int size;
 +    private int sortedSize;
 +    private volatile boolean isSorted;
  
      public static final ColumnFamily.Factory<ArrayBackedSortedColumns> 
factory = new Factory<ArrayBackedSortedColumns>()
      {
@@@ -114,15 -80,15 +110,103 @@@
          return reversed;
      }
  
 -    private Comparator<ByteBuffer> internalComparator()
++    public BatchRemoveIterator<Cell> batchRemoveIterator()
++    {
++        maybeSortCells();
++
++        return new BatchRemoveIterator<Cell>()
++        {
++            private final Iterator<Cell> iter = iterator();
++            private BitSet removedIndexes = new BitSet(size);
++            private int idx = -1;
++            private boolean shouldCallNext = false;
++            private boolean isCommitted = false;
++            private boolean removedAnything = false;
++
++            public void commit()
++            {
++                if (isCommitted)
++                    throw new IllegalStateException();
++                isCommitted = true;
++
++                if (!removedAnything)
++                    return;
++
++                // the lowest index both not visited and known to be not 
removed
++                int keepIdx = removedIndexes.nextClearBit(0);
++                // the running total of kept items
++                int resultLength = 0;
++                // start from the first not-removed cell, and shift left.
++                int removeIdx = removedIndexes.nextSetBit(keepIdx + 1);
++                while (removeIdx >= 0)
++                {
++                    int length = removeIdx - keepIdx;
++                    if (length > 0)
++                    {
++                        copy(keepIdx, resultLength, length);
++                        resultLength += length;
++                    }
++                    keepIdx = removedIndexes.nextClearBit(removeIdx + 1);
++                    if (keepIdx < 0)
++                        keepIdx = size;
++                    removeIdx = removedIndexes.nextSetBit(keepIdx + 1);
++                }
++                // Copy everything after the last deleted column
++                int length = size - keepIdx;
++                if (length > 0)
++                {
++                    copy(keepIdx, resultLength, length);
++                    resultLength += length;
++                }
++
++                for (int i = resultLength; i < size; i++)
++                    cells[i] = null;
++
++                size = sortedSize = resultLength;
++            }
++
++            private void copy(int src, int dst, int len)
++            {
++                // [src, src+len) and [dst, dst+len) might overlap but it's 
okay because we're going from left to right
++                assert dst <= src : "dst must not be greater than src";
++
++                if (dst < src)
++                    System.arraycopy(cells, src, cells, dst, len);
++            }
++
++            public boolean hasNext()
++            {
++                return iter.hasNext();
++            }
++
++            public Cell next()
++            {
++                idx++;
++                shouldCallNext = false;
++                return iter.next();
++            }
++
++            public void remove()
++            {
++                if (shouldCallNext)
++                    throw new IllegalStateException();
++
++                removedIndexes.set(reversed ? size - idx - 1 : idx);
++                removedAnything = true;
++                shouldCallNext = true;
++            }
++        };
++    }
++
 +    private Comparator<Composite> internalComparator()
      {
 -        return reversed ? getComparator().reverseComparator : getComparator();
 +        return reversed ? getComparator().reverseComparator() : 
getComparator();
      }
  
 -    public Column getColumn(ByteBuffer name)
 +    private void maybeSortCells()
      {
 -        int pos = binarySearch(name);
 -        return pos >= 0 ? columns.get(pos) : null;
 +        if (!isSorted)
 +            sortCells();
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index dc2b5ee,0000000..47f0b85
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@@ -1,559 -1,0 +1,564 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db;
 +
 +import java.util.AbstractCollection;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Comparator;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 +
 +import com.google.common.base.Function;
 +import com.google.common.base.Functions;
 +import com.google.common.collect.AbstractIterator;
 +import com.google.common.collect.Iterators;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.db.composites.Composite;
 +import org.apache.cassandra.db.filter.ColumnSlice;
 +import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.utils.btree.BTree;
 +import org.apache.cassandra.utils.btree.UpdateFunction;
 +import org.apache.cassandra.utils.concurrent.Locks;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.memory.HeapAllocator;
 +import org.apache.cassandra.utils.memory.MemtableAllocator;
 +import org.apache.cassandra.utils.memory.NativePool;
 +
 +import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
 +
 +/**
 + * A thread-safe and atomic ISortedColumns implementation.
 + * Operations (in particular addAll) on this implemenation are atomic and
 + * isolated (in the sense of ACID). Typically a addAll is guaranteed that no
 + * other thread can see the state where only parts but not all columns have
 + * been added.
 + * <p/>
 + * WARNING: removing element through getSortedColumns().iterator() is *not* 
supported
 + */
 +public class AtomicBTreeColumns extends ColumnFamily
 +{
 +    static final long EMPTY_SIZE = ObjectSizes.measure(new 
AtomicBTreeColumns(CFMetaData.IndexCf, null))
 +            + ObjectSizes.measure(new Holder(null, null));
 +
 +    // Reserved values for wasteTracker field. These values must not be 
consecutive (see avoidReservedValues)
 +    private static final int TRACKER_NEVER_WASTED = 0;
 +    private static final int TRACKER_PESSIMISTIC_LOCKING = Integer.MAX_VALUE;
 +
 +    // The granularity with which we track wasted allocation/work; we round up
 +    private static final int ALLOCATION_GRANULARITY_BYTES = 1024;
 +    // The number of bytes we have to waste in excess of our acceptable 
realtime rate of waste (defined below)
 +    private static final long EXCESS_WASTE_BYTES = 10 * 1024 * 1024L;
 +    private static final int EXCESS_WASTE_OFFSET = (int) (EXCESS_WASTE_BYTES 
/ ALLOCATION_GRANULARITY_BYTES);
 +    // Note this is a shift, because dividing a long time and then picking 
the low 32 bits doesn't give correct rollover behavior
 +    private static final int CLOCK_SHIFT = 17;
 +    // CLOCK_GRANULARITY = 1^9ns >> CLOCK_SHIFT == 132us == (1/7.63)ms
 +
 +    /**
 +     * (clock + allocation) granularity are combined to give us an acceptable 
(waste) allocation rate that is defined by
 +     * the passage of real time of 
ALLOCATION_GRANULARITY_BYTES/CLOCK_GRANULARITY, or in this case 7.63Kb/ms, or 
7.45Mb/s
 +     *
 +     * in wasteTracker we maintain within EXCESS_WASTE_OFFSET before the 
current time; whenever we waste bytes
 +     * we increment the current value if it is within this window, and set it 
to the min of the window plus our waste
 +     * otherwise.
 +     */
 +    private volatile int wasteTracker = TRACKER_NEVER_WASTED;
 +
 +    private static final AtomicIntegerFieldUpdater<AtomicBTreeColumns> 
wasteTrackerUpdater = 
AtomicIntegerFieldUpdater.newUpdater(AtomicBTreeColumns.class, "wasteTracker");
 +
 +    private static final Function<Cell, CellName> NAME = new Function<Cell, 
CellName>()
 +    {
 +        public CellName apply(Cell column)
 +        {
 +            return column.name();
 +        }
 +    };
 +
 +    public static final Factory<AtomicBTreeColumns> factory = new 
Factory<AtomicBTreeColumns>()
 +    {
 +        public AtomicBTreeColumns create(CFMetaData metadata, boolean 
insertReversed, int initialCapacity)
 +        {
 +            if (insertReversed)
 +                throw new IllegalArgumentException();
 +            return new AtomicBTreeColumns(metadata);
 +        }
 +    };
 +
 +    private static final DeletionInfo LIVE = DeletionInfo.live();
 +    // This is a small optimization: DeletionInfo is mutable, but we know 
that we will always copy it in that class,
 +    // so we can safely alias one DeletionInfo.live() reference and avoid 
some allocations.
 +    private static final Holder EMPTY = new Holder(BTree.empty(), LIVE);
 +
 +    private volatile Holder ref;
 +
 +    private static final AtomicReferenceFieldUpdater<AtomicBTreeColumns, 
Holder> refUpdater = 
AtomicReferenceFieldUpdater.newUpdater(AtomicBTreeColumns.class, Holder.class, 
"ref");
 +
 +    private AtomicBTreeColumns(CFMetaData metadata)
 +    {
 +        this(metadata, EMPTY);
 +    }
 +
 +    private AtomicBTreeColumns(CFMetaData metadata, Holder holder)
 +    {
 +        super(metadata);
 +        this.ref = holder;
 +    }
 +
 +    public Factory getFactory()
 +    {
 +        return factory;
 +    }
 +
 +    public ColumnFamily cloneMe()
 +    {
 +        return new AtomicBTreeColumns(metadata, ref);
 +    }
 +
 +    public DeletionInfo deletionInfo()
 +    {
 +        return ref.deletionInfo;
 +    }
 +
 +    public void delete(DeletionTime delTime)
 +    {
 +        delete(new DeletionInfo(delTime));
 +    }
 +
 +    protected void delete(RangeTombstone tombstone)
 +    {
 +        delete(new DeletionInfo(tombstone, getComparator()));
 +    }
 +
 +    public void delete(DeletionInfo info)
 +    {
 +        if (info.isLive())
 +            return;
 +
 +        // Keeping deletion info for max markedForDeleteAt value
 +        while (true)
 +        {
 +            Holder current = ref;
 +            DeletionInfo curDelInfo = current.deletionInfo;
 +            DeletionInfo newDelInfo = info.mayModify(curDelInfo) ? 
curDelInfo.copy().add(info) : curDelInfo;
 +            if (refUpdater.compareAndSet(this, current, 
current.with(newDelInfo)))
 +                break;
 +        }
 +    }
 +
 +    public void setDeletionInfo(DeletionInfo newInfo)
 +    {
 +        ref = ref.with(newInfo);
 +    }
 +
 +    public void purgeTombstones(int gcBefore)
 +    {
 +        while (true)
 +        {
 +            Holder current = ref;
 +            if (!current.deletionInfo.hasPurgeableTombstones(gcBefore))
 +                break;
 +
 +            DeletionInfo purgedInfo = current.deletionInfo.copy();
 +            purgedInfo.purge(gcBefore);
 +            if (refUpdater.compareAndSet(this, current, 
current.with(purgedInfo)))
 +                break;
 +        }
 +    }
 +
 +    /**
 +     * This is only called by Memtable.resolve, so only AtomicBTreeColumns 
needs to implement it.
 +     *
 +     * @return the difference in size seen after merging the given columns
 +     */
 +    public Pair<Long, Long> addAllWithSizeDelta(final ColumnFamily cm, 
MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
 +    {
 +        ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, 
allocator, writeOp, indexer);
 +        DeletionInfo inputDeletionInfoCopy = null;
 +
 +        boolean monitorOwned = false;
 +        try
 +        {
 +            if (usePessimisticLocking())
 +            {
 +                Locks.monitorEnterUnsafe(this);
 +                monitorOwned = true;
 +            }
 +            while (true)
 +            {
 +                Holder current = ref;
 +                updater.ref = current;
 +                updater.reset();
 +
 +                DeletionInfo deletionInfo;
 +                if (cm.deletionInfo().mayModify(current.deletionInfo))
 +                {
 +                    if (inputDeletionInfoCopy == null)
 +                        inputDeletionInfoCopy = 
cm.deletionInfo().copy(HeapAllocator.instance);
 +
 +                    deletionInfo = 
current.deletionInfo.copy().add(inputDeletionInfoCopy);
 +                    updater.allocated(deletionInfo.unsharedHeapSize() - 
current.deletionInfo.unsharedHeapSize());
 +                }
 +                else
 +                {
 +                    deletionInfo = current.deletionInfo;
 +                }
 +
 +                Object[] tree = BTree.update(current.tree, 
metadata.comparator.columnComparator(Memtable.MEMORY_POOL instanceof 
NativePool), cm, cm.getColumnCount(), true, updater);
 +
 +                if (tree != null && refUpdater.compareAndSet(this, current, 
new Holder(tree, deletionInfo)))
 +                {
 +                    indexer.updateRowLevelIndexes();
 +                    updater.finish();
 +                    return Pair.create(updater.dataSize, 
updater.colUpdateTimeDelta);
 +                }
 +                else if (!monitorOwned)
 +                {
 +                    boolean shouldLock = usePessimisticLocking();
 +                    if (!shouldLock)
 +                    {
 +                        shouldLock = 
updateWastedAllocationTracker(updater.heapSize);
 +                    }
 +                    if (shouldLock)
 +                    {
 +                        Locks.monitorEnterUnsafe(this);
 +                        monitorOwned = true;
 +                    }
 +                }
 +            }
 +        }
 +        finally
 +        {
 +            if (monitorOwned)
 +                Locks.monitorExitUnsafe(this);
 +        }
 +    }
 +
 +    boolean usePessimisticLocking()
 +    {
 +        return wasteTracker == TRACKER_PESSIMISTIC_LOCKING;
 +    }
 +
 +    /**
 +     * Update the wasted allocation tracker state based on newly wasted 
allocation information
 +     *
 +     * @param wastedBytes the number of bytes wasted by this thread
 +     * @return true if the caller should now proceed with pessimistic locking 
because the waste limit has been reached
 +     */
 +    private boolean updateWastedAllocationTracker(long wastedBytes) {
 +        // Early check for huge allocation that exceeds the limit
 +        if (wastedBytes < EXCESS_WASTE_BYTES)
 +        {
 +            // We round up to ensure work < granularity are still accounted 
for
 +            int wastedAllocation = ((int) (wastedBytes + 
ALLOCATION_GRANULARITY_BYTES - 1)) / ALLOCATION_GRANULARITY_BYTES;
 +
 +            int oldTrackerValue;
 +            while (TRACKER_PESSIMISTIC_LOCKING != (oldTrackerValue = 
wasteTracker))
 +            {
 +                // Note this time value has an arbitrary offset, but is a 
constant rate 32 bit counter (that may wrap)
 +                int time = (int) (System.nanoTime() >>> CLOCK_SHIFT);
 +                int delta = oldTrackerValue - time;
 +                if (oldTrackerValue == TRACKER_NEVER_WASTED || delta >= 0 || 
delta < -EXCESS_WASTE_OFFSET)
 +                    delta = -EXCESS_WASTE_OFFSET;
 +                delta += wastedAllocation;
 +                if (delta >= 0)
 +                    break;
 +                if (wasteTrackerUpdater.compareAndSet(this, oldTrackerValue, 
avoidReservedValues(time + delta)))
 +                    return false;
 +            }
 +        }
 +        // We have definitely reached our waste limit so set the state if it 
isn't already
 +        wasteTrackerUpdater.set(this, TRACKER_PESSIMISTIC_LOCKING);
 +        // And tell the caller to proceed with pessimistic locking
 +        return true;
 +    }
 +
 +    private static int avoidReservedValues(int wasteTracker)
 +    {
 +        if (wasteTracker == TRACKER_NEVER_WASTED || wasteTracker == 
TRACKER_PESSIMISTIC_LOCKING)
 +            return wasteTracker + 1;
 +        return wasteTracker;
 +    }
 +
 +    // no particular reason not to implement these next methods, we just 
haven't needed them yet
 +
 +    public void addColumn(Cell column)
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    public void maybeAppendColumn(Cell cell, DeletionInfo.InOrderTester 
tester, int gcBefore)
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    public void addAll(ColumnFamily cf)
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    public void clear()
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    public Cell getColumn(CellName name)
 +    {
 +        return (Cell) BTree.find(ref.tree, asymmetricComparator(), name);
 +    }
 +
 +    private Comparator<Object> asymmetricComparator()
 +    {
 +        return 
metadata.comparator.asymmetricColumnComparator(Memtable.MEMORY_POOL instanceof 
NativePool);
 +    }
 +
 +    public Iterable<CellName> getColumnNames()
 +    {
 +        return collection(false, NAME);
 +    }
 +
 +    public Collection<Cell> getSortedColumns()
 +    {
 +        return collection(true, Functions.<Cell>identity());
 +    }
 +
 +    public Collection<Cell> getReverseSortedColumns()
 +    {
 +        return collection(false, Functions.<Cell>identity());
 +    }
 +
 +    private <V> Collection<V> collection(final boolean forwards, final 
Function<Cell, V> f)
 +    {
 +        final Holder ref = this.ref;
 +        return new AbstractCollection<V>()
 +        {
 +            public Iterator<V> iterator()
 +            {
 +                return Iterators.transform(BTree.<Cell>slice(ref.tree, 
forwards), f);
 +            }
 +
 +            public int size()
 +            {
 +                return BTree.slice(ref.tree, true).count();
 +            }
 +        };
 +    }
 +
 +    public int getColumnCount()
 +    {
 +        return BTree.slice(ref.tree, true).count();
 +    }
 +
 +    public boolean hasColumns()
 +    {
 +        return !BTree.isEmpty(ref.tree);
 +    }
 +
 +    public Iterator<Cell> iterator(ColumnSlice[] slices)
 +    {
 +        return slices.length == 1
 +             ? slice(ref.tree, asymmetricComparator(), slices[0].start, 
slices[0].finish, true)
 +             : new SliceIterator(ref.tree, asymmetricComparator(), true, 
slices);
 +    }
 +
 +    public Iterator<Cell> reverseIterator(ColumnSlice[] slices)
 +    {
 +        return slices.length == 1
 +             ? slice(ref.tree, asymmetricComparator(), slices[0].finish, 
slices[0].start, false)
 +             : new SliceIterator(ref.tree, asymmetricComparator(), false, 
slices);
 +    }
 +
 +    public boolean isInsertReversed()
 +    {
 +        return false;
 +    }
 +
++    public BatchRemoveIterator<Cell> batchRemoveIterator()
++    {
++        throw new UnsupportedOperationException();
++    }
++
 +    private static final class Holder
 +    {
 +        final DeletionInfo deletionInfo;
 +        // the btree of columns
 +        final Object[] tree;
 +
 +        Holder(Object[] tree, DeletionInfo deletionInfo)
 +        {
 +            this.tree = tree;
 +            this.deletionInfo = deletionInfo;
 +        }
 +
 +        Holder with(DeletionInfo info)
 +        {
 +            return new Holder(this.tree, info);
 +        }
 +    }
 +
 +    // the function we provide to the btree utilities to perform any column 
replacements
 +    private static final class ColumnUpdater implements UpdateFunction<Cell>
 +    {
 +        final AtomicBTreeColumns updating;
 +        final CFMetaData metadata;
 +        final MemtableAllocator allocator;
 +        final OpOrder.Group writeOp;
 +        final Updater indexer;
 +        Holder ref;
 +        long dataSize;
 +        long heapSize;
 +        long colUpdateTimeDelta = Long.MAX_VALUE;
 +        final MemtableAllocator.DataReclaimer reclaimer;
 +        List<Cell> inserted; // TODO: replace with walk of aborted BTree
 +
 +        private ColumnUpdater(AtomicBTreeColumns updating, CFMetaData 
metadata, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
 +        {
 +            this.updating = updating;
 +            this.allocator = allocator;
 +            this.writeOp = writeOp;
 +            this.indexer = indexer;
 +            this.metadata = metadata;
 +            this.reclaimer = allocator.reclaimer();
 +        }
 +
 +        public Cell apply(Cell insert)
 +        {
 +            indexer.insert(insert);
 +            insert = insert.localCopy(metadata, allocator, writeOp);
 +            this.dataSize += insert.cellDataSize();
 +            this.heapSize += insert.unsharedHeapSizeExcludingData();
 +            if (inserted == null)
 +                inserted = new ArrayList<>();
 +            inserted.add(insert);
 +            return insert;
 +        }
 +
 +        public Cell apply(Cell existing, Cell update)
 +        {
 +            Cell reconciled = existing.reconcile(update);
 +            indexer.update(existing, reconciled);
 +            if (existing != reconciled)
 +            {
 +                reconciled = reconciled.localCopy(metadata, allocator, 
writeOp);
 +                dataSize += reconciled.cellDataSize() - 
existing.cellDataSize();
 +                heapSize += reconciled.unsharedHeapSizeExcludingData() - 
existing.unsharedHeapSizeExcludingData();
 +                if (inserted == null)
 +                    inserted = new ArrayList<>();
 +                inserted.add(reconciled);
 +                discard(existing);
 +                //Getting the minimum delta for an update containing multiple 
columns
 +                colUpdateTimeDelta =  Math.min(Math.abs(existing.timestamp()  
- update.timestamp()), colUpdateTimeDelta);
 +            }
 +            return reconciled;
 +        }
 +
 +        protected void reset()
 +        {
 +            this.dataSize = 0;
 +            this.heapSize = 0;
 +            if (inserted != null)
 +            {
 +                for (Cell cell : inserted)
 +                    abort(cell);
 +                inserted.clear();
 +            }
 +            reclaimer.cancel();
 +        }
 +
 +        protected void abort(Cell abort)
 +        {
 +            reclaimer.reclaimImmediately(abort);
 +        }
 +
 +        protected void discard(Cell discard)
 +        {
 +            reclaimer.reclaim(discard);
 +        }
 +
 +        public boolean abortEarly()
 +        {
 +            return updating.ref != ref;
 +        }
 +
 +        public void allocated(long heapSize)
 +        {
 +            this.heapSize += heapSize;
 +        }
 +
 +        protected void finish()
 +        {
 +            allocator.onHeap().allocate(heapSize, writeOp);
 +            reclaimer.commit();
 +        }
 +    }
 +
 +    private static class SliceIterator extends AbstractIterator<Cell>
 +    {
 +        private final Object[] btree;
 +        private final boolean forwards;
 +        private final Comparator<Object> comparator;
 +        private final ColumnSlice[] slices;
 +
 +        private int idx = 0;
 +        private Iterator<Cell> currentSlice;
 +
 +        SliceIterator(Object[] btree, Comparator<Object> comparator, boolean 
forwards, ColumnSlice[] slices)
 +        {
 +            this.btree = btree;
 +            this.comparator = comparator;
 +            this.slices = slices;
 +            this.forwards = forwards;
 +        }
 +
 +        protected Cell computeNext()
 +        {
 +            while (currentSlice != null || idx < slices.length)
 +            {
 +                if (currentSlice == null)
 +                {
 +                    ColumnSlice slice = slices[idx++];
 +                    if (forwards)
 +                        currentSlice = slice(btree, comparator, slice.start, 
slice.finish, true);
 +                    else
 +                        currentSlice = slice(btree, comparator, slice.finish, 
slice.start, false);
 +                }
 +
 +                if (currentSlice.hasNext())
 +                    return currentSlice.next();
 +
 +                currentSlice = null;
 +            }
 +
 +            return endOfData();
 +        }
 +    }
 +
 +    private static Iterator<Cell> slice(Object[] btree, Comparator<Object> 
comparator, Composite start, Composite finish, boolean forwards)
 +    {
 +        return BTree.slice(btree,
 +                           comparator,
 +                           start.isEmpty() ? null : start,
 +                           true,
 +                           finish.isEmpty() ? null : finish,
 +                           true,
 +                           forwards);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamily.java
index 483ecb0,19f8c16..f21d161
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@@ -514,6 -532,38 +514,12 @@@ public abstract class ColumnFamily impl
          return ByteBuffer.wrap(out.getData(), 0, out.getLength());
      }
  
+ 
+     /**
+      * @return an iterator where the removes are carried out once everything 
has been iterated
+      */
 -    public BatchRemoveIterator<Column> batchRemoveIterator()
 -    {
 -        // Default implementation is the ordinary iterator
 -        return new BatchRemoveIterator<Column>()
 -        {
 -            private final Iterator<Column> iter = iterator();
 -
 -            public void commit()
 -            {
 -            }
 -
 -            public boolean hasNext()
 -            {
 -                return iter.hasNext();
 -            }
 -
 -            public Column next()
 -            {
 -                return iter.next();
 -            }
 -
 -            public void remove()
 -            {
 -                iter.remove();
 -            }
 -        };
 -    }
++    public abstract BatchRemoveIterator<Cell> batchRemoveIterator();
+ 
      public abstract static class Factory <T extends ColumnFamily>
      {
          /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 0c95b0e,34d3f1d..3822648
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -1237,14 -951,15 +1237,14 @@@ public class ColumnFamilyStore implemen
       * columns that have been dropped from the schema (for CQL3 tables only).
       * @return the updated ColumnFamily
       */
 -    public static long removeDeletedColumnsOnly(ColumnFamily cf, int 
gcBefore, SecondaryIndexManager.Updater indexer)
 +    public static ColumnFamily removeDeletedColumnsOnly(ColumnFamily cf, int 
gcBefore, SecondaryIndexManager.Updater indexer)
      {
-         Iterator<Cell> iter = cf.iterator();
 -        BatchRemoveIterator<Column> iter = cf.batchRemoveIterator();
++        BatchRemoveIterator<Cell> iter = cf.batchRemoveIterator();
          DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester();
          boolean hasDroppedColumns = 
!cf.metadata.getDroppedColumns().isEmpty();
 -        long removedBytes = 0;
          while (iter.hasNext())
          {
 -            Column c = iter.next();
 +            Cell c = iter.next();
              // remove columns if
              // (a) the column itself is gcable or
              // (b) the column is shadowed by a CF tombstone
@@@ -1253,10 -968,16 +1253,10 @@@
              {
                  iter.remove();
                  indexer.remove(c);
 -                removedBytes += c.dataSize();
              }
          }
- 
+         iter.commit();
 -        return removedBytes;
 -    }
 -
 -    public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore)
 -    {
 -        return removeDeletedColumnsOnly(cf, gcBefore, 
SecondaryIndexManager.nullUpdater);
 +        return cf;
      }
  
      // returns true if
@@@ -1273,7 -994,7 +1273,7 @@@
          if (cf == null || cf.metadata.getDroppedColumns().isEmpty())
              return;
  
-         Iterator<Cell> iter = cf.iterator();
 -        BatchRemoveIterator<Column> iter = cf.batchRemoveIterator();
++        BatchRemoveIterator<Cell> iter = cf.batchRemoveIterator();
          while (iter.hasNext())
              if (isDroppedColumn(iter.next(), metadata))
                  iter.remove();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
index 83a58e4,90cd70f..18851d4
--- a/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
@@@ -25,13 -27,16 +25,16 @@@ import org.junit.Test
  
  import static org.junit.Assert.*;
  
 -import com.google.common.base.Functions;
+ import com.google.common.collect.Sets;
+ 
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.config.Schema;
- import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.db.composites.*;
 +import org.apache.cassandra.db.filter.ColumnSlice;
 +import org.apache.cassandra.db.marshal.Int32Type;
+ import org.apache.cassandra.utils.BatchRemoveIterator;
+ import org.apache.cassandra.utils.ByteBufferUtil;
 -import org.apache.cassandra.db.filter.ColumnSlice;
 -import org.apache.cassandra.utils.HeapAllocator;
  
  public class ArrayBackedSortedColumnsTest extends SchemaLoader
  {
@@@ -265,4 -195,95 +268,98 @@@
          iter.remove();
          assertTrue(!iter.hasNext());
      }
+ 
+     @Test(expected = IllegalStateException.class)
+     public void testBatchRemoveTwice()
+     {
++        CellNameType type = new SimpleDenseCellNameType(Int32Type.instance);
+         ColumnFamily map = 
ArrayBackedSortedColumns.factory.create(metadata(), false);
 -        map.addColumn(new Column(ByteBufferUtil.bytes(1)), 
HeapAllocator.instance);
 -        map.addColumn(new Column(ByteBufferUtil.bytes(2)), 
HeapAllocator.instance);
++        map.addColumn(new BufferCell(type.makeCellName(1)));
++        map.addColumn(new BufferCell(type.makeCellName(2)));
+ 
 -        BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
++        BatchRemoveIterator<Cell> batchIter = map.batchRemoveIterator();
+         batchIter.next();
+         batchIter.remove();
+         batchIter.remove();
+     }
+ 
+     @Test(expected = IllegalStateException.class)
+     public void testBatchCommitTwice()
+     {
++        CellNameType type = new SimpleDenseCellNameType(Int32Type.instance);
+         ColumnFamily map = 
ArrayBackedSortedColumns.factory.create(metadata(), false);
 -        map.addColumn(new Column(ByteBufferUtil.bytes(1)), 
HeapAllocator.instance);
 -        map.addColumn(new Column(ByteBufferUtil.bytes(2)), 
HeapAllocator.instance);
++        map.addColumn(new BufferCell(type.makeCellName(1)));
++        map.addColumn(new BufferCell(type.makeCellName(2)));
+ 
 -        BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
++        BatchRemoveIterator<Cell> batchIter = map.batchRemoveIterator();
+         batchIter.next();
+         batchIter.remove();
+         batchIter.commit();
+         batchIter.commit();
+     }
+ 
+     @Test
+     public void testBatchRemove()
+     {
+         testBatchRemoveInternal(false);
+         testBatchRemoveInternal(true);
+     }
+ 
+     public void testBatchRemoveInternal(boolean reversed)
+     {
++        CellNameType type = new SimpleDenseCellNameType(Int32Type.instance);
+         ColumnFamily map = 
ArrayBackedSortedColumns.factory.create(metadata(), reversed);
+         int[] values = new int[]{ 1, 2, 3, 5 };
+ 
+         for (int i = 0; i < values.length; ++i)
 -            map.addColumn(new Column(ByteBufferUtil.bytes(values[reversed ? 
values.length - 1 - i : i])), HeapAllocator.instance);
++            map.addColumn(new BufferCell(type.makeCellName(values[reversed ? 
values.length - 1 - i : i])));
+ 
 -        BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
++        BatchRemoveIterator<Cell> batchIter = map.batchRemoveIterator();
+         batchIter.next();
+         batchIter.remove();
+         batchIter.next();
+         batchIter.remove();
+ 
 -        assertEquals("1st column before commit", 1, 
map.iterator().next().name().getInt(0));
++        assertEquals("1st column before commit", 1, 
map.iterator().next().name().toByteBuffer().getInt(0));
+ 
+         batchIter.commit();
+ 
 -        assertEquals("1st column after commit", 3, 
map.iterator().next().name().getInt(0));
++        assertEquals("1st column after commit", 3, 
map.iterator().next().name().toByteBuffer().getInt(0));
+     }
+ 
+     @Test
+     public void testBatchRemoveCopy()
+     {
+         // Test delete some random columns and check the result
++        CellNameType type = new SimpleDenseCellNameType(Int32Type.instance);
+         ColumnFamily map = 
ArrayBackedSortedColumns.factory.create(metadata(), false);
+         int n = 127;
+         int[] values = new int[n];
 -        for (int i = 0; i < n; i++) values[i] = i;
++        for (int i = 0; i < n; i++)
++            values[i] = i;
+         Set<Integer> toRemove = Sets.newHashSet(3, 12, 13, 15, 58, 103, 112);
+ 
+         for (int value : values)
 -            map.addColumn(new Column(ByteBufferUtil.bytes(value)), 
HeapAllocator.instance);
++            map.addColumn(new BufferCell(type.makeCellName(value)));
+ 
 -        BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator();
++        BatchRemoveIterator<Cell> batchIter = map.batchRemoveIterator();
+         while (batchIter.hasNext())
 -            if (toRemove.contains(batchIter.next().name().getInt(0)))
++            if 
(toRemove.contains(batchIter.next().name().toByteBuffer().getInt(0)))
+                 batchIter.remove();
+ 
+         batchIter.commit();
+ 
+         int expected = 0;
 -
+         while (toRemove.contains(expected))
+             expected++;
+ 
 -        for (Column column : map)
++        for (Cell column : map)
+         {
 -            assertEquals(expected, column.name().getInt(0));
++            assertEquals(expected, column.name().toByteBuffer().getInt(0));
+             expected++;
+             while (toRemove.contains(expected))
+                 expected++;
+         }
 -
+         assertEquals(expected, n);
+     }
  }

Reply via email to