Merge branch 'cassandra-2.1' into cassandra-2.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/882df8a2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/882df8a2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/882df8a2

Branch: refs/heads/cassandra-3.1
Commit: 882df8a21711559d18bf6b38cd6026d78b7e4956
Parents: 29ed6fe 0b26ca6
Author: Marcus Eriksson <marc...@apache.org>
Authored: Wed Dec 2 15:06:43 2015 +0100
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Wed Dec 2 15:06:43 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/ColumnIndex.java    |  16 ++--
 .../org/apache/cassandra/db/RangeTombstone.java |   5 ++
 .../cassandra/db/compaction/Scrubber.java       |  25 +++++-
 .../io/sstable/SSTableIdentityIterator.java     |  76 +++++++++++++++----
 .../io/sstable/format/big/BigTableWriter.java   |   1 +
 .../Keyspace1-Standard3-jb-1-Summary.db         | Bin 71 -> 63 bytes
 .../Keyspace1-StandardInteger1-ka-2-CRC.db      | Bin 0 -> 8 bytes
 .../Keyspace1-StandardInteger1-ka-2-Data.db     | Bin 0 -> 12357 bytes
 .../Keyspace1-StandardInteger1-ka-2-Digest.sha1 |   1 +
 .../Keyspace1-StandardInteger1-ka-2-Filter.db   | Bin 0 -> 176 bytes
 .../Keyspace1-StandardInteger1-ka-2-Index.db    | Bin 0 -> 108 bytes
 ...eyspace1-StandardInteger1-ka-2-Statistics.db | Bin 0 -> 4470 bytes
 .../Keyspace1-StandardInteger1-ka-2-Summary.db  | Bin 0 -> 112 bytes
 .../Keyspace1-StandardInteger1-ka-2-TOC.txt     |   8 ++
 .../apache/cassandra/db/RowIndexEntryTest.java  |   1 +
 .../unit/org/apache/cassandra/db/ScrubTest.java |  60 +++++++++++++++
 .../streaming/StreamingTransferTest.java        |  46 ++++++++++-
 18 files changed, 216 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index eaad3a2,e00abfe..bca5fb0
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,23 -1,5 +1,24 @@@
 -2.1.12
 +2.2.4
 + * Show CQL help in cqlsh in web browser (CASSANDRA-7225)
 + * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775)
 + * Reject index queries while the index is building (CASSANDRA-8505)
 + * CQL.textile syntax incorrectly includes optional keyspace for aggregate 
SFUNC and FINALFUNC (CASSANDRA-10747)
 + * Fix JSON update with prepared statements (CASSANDRA-10631)
 + * Don't do anticompaction after subrange repair (CASSANDRA-10422)
 + * Fix SimpleDateType type compatibility (CASSANDRA-10027)
 + * (Hadoop) fix splits calculation (CASSANDRA-10640)
 + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 + * (cqlsh) show partial trace if incomplete after max_trace_wait 
(CASSANDRA-7645)
 + * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 + * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large 
buffers (CASSANDRA-10592)
 +Merged from 2.1:
+  * Avoid writing range tombstones after END_OF_ROW marker (CASSANDRA-10791)
   * Optimize the way we check if a token is repaired in anticompaction 
(CASSANDRA-10768)
   * Add proper error handling to stream receiver (CASSANDRA-10774)
   * Warn or fail when changing cluster topology live (CASSANDRA-10243)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 670c682,e02f901..9fd8560
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -162,6 -155,22 +162,22 @@@ public class Scrubber implements Closea
                  if (scrubInfo.isStopRequested())
                      throw new 
CompactionInterruptedException(scrubInfo.getCompactionInfo());
  
+                 updateIndexKey();
+ 
+                 if (prevKey != null && indexFile != null)
+                 {
+                     long nextRowStart = currentRowPositionFromIndex == -1 ? 
dataFile.length() : currentRowPositionFromIndex;
+                     if (dataFile.getFilePointer() < nextRowStart)
+                     {
+                         // Encountered CASSANDRA-10791. Place post-END_OF_ROW 
data in the out-of-order table.
+                         saveOutOfOrderRow(prevKey,
 -                                          
SSTableIdentityIterator.createFragmentIterator(sstable, dataFile, prevKey, 
nextRowStart - dataFile.getFilePointer(), validateColumns),
++                                          
SSTableIdentityIterator.createFragmentIterator(sstable, dataFile, prevKey, 
nextRowStart - dataFile.getFilePointer(), checkData),
+                                           String.format("Row fragment 
detected after END_OF_ROW at key %s", prevKey));
+                         if (dataFile.isEOF())
+                             break;
+                     }
+                 }
+ 
                  long rowStart = dataFile.getFilePointer();
                  outputHandler.debug("Reading row at " + rowStart);
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 17f9a8d,45994d0..8c02ee7
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@@ -20,13 -20,13 +20,16 @@@ package org.apache.cassandra.io.sstable
  import java.io.*;
  import java.util.Iterator;
  
+ import com.google.common.collect.AbstractIterator;
+ 
  import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+ import org.apache.cassandra.db.composites.CellNameType;
 -import org.apache.cassandra.io.sstable.Descriptor.Version;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.Version;
 +import org.apache.cassandra.io.util.FileDataInput;
  import org.apache.cassandra.io.util.RandomAccessReader;
  import org.apache.cassandra.serializers.MarshalException;
  
@@@ -60,13 -62,43 +63,42 @@@
       * @param sstable SSTable we are reading ffrom.
       * @param file Reading using this file.
       * @param key Key of this row.
 -     * @param dataSize length of row data
       * @param checkData if true, do its best to deserialize and check the 
coherence of row data
       */
 -    public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader 
file, DecoratedKey key, long dataSize, boolean checkData)
 +    public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader 
file, DecoratedKey key, boolean checkData)
      {
 -        this(sstable.metadata, file, file.getPath(), key, dataSize, 
checkData, sstable, ColumnSerializer.Flag.LOCAL);
 +        this(sstable.metadata, file, file.getPath(), key, checkData, sstable, 
ColumnSerializer.Flag.LOCAL);
      }
  
+     /**
+      * Used only by scrubber to solve problems with data written after the 
END_OF_ROW marker. Iterates atoms for the given dataSize only and does not 
accept an END_OF_ROW marker.
+      */
+     public static SSTableIdentityIterator 
createFragmentIterator(SSTableReader sstable, final RandomAccessReader file, 
DecoratedKey key, long dataSize, boolean checkData)
+     {
+         final ColumnSerializer.Flag flag = ColumnSerializer.Flag.LOCAL;
+         final CellNameType type = sstable.metadata.comparator;
+         final int expireBefore = (int) (System.currentTimeMillis() / 1000);
+         final Version version = sstable.descriptor.version;
+         final long dataEnd = file.getFilePointer() + dataSize;
 -        return new SSTableIdentityIterator(sstable.metadata, file, 
file.getPath(), key, dataSize, checkData, sstable, flag, DeletionTime.LIVE,
++        return new SSTableIdentityIterator(sstable.metadata, file, 
file.getPath(), key, checkData, sstable, flag, DeletionTime.LIVE,
+                                            new AbstractIterator<OnDiskAtom>()
+                                                    {
+                                                        protected OnDiskAtom 
computeNext()
+                                                        {
+                                                            if 
(file.getFilePointer() >= dataEnd)
+                                                                return 
endOfData();
+                                                            try
+                                                            {
+                                                                return 
type.onDiskAtomSerializer().deserializeFromSSTable(file, flag, expireBefore, 
version);
+                                                            }
+                                                            catch (IOException 
e)
+                                                            {
+                                                                throw new 
IOError(e);
+                                                            }
+                                                        }
+                                                    });
+     }
+ 
      // sstable may be null *if* checkData is false
      // If it is null, we assume the data is in the current file format
      private SSTableIdentityIterator(CFMetaData metadata,
@@@ -77,22 -110,15 +109,16 @@@
                                      SSTableReader sstable,
                                      ColumnSerializer.Flag flag)
      {
-         assert !checkData || (sstable != null);
-         this.in = in;
-         this.filename = filename;
-         this.key = key;
-         this.flag = flag;
-         this.validateColumns = checkData;
-         this.sstable = sstable;
- 
-         Version dataVersion = sstable == null ? 
DatabaseDescriptor.getSSTableFormat().info.getLatestVersion() : 
sstable.descriptor.version;
-         int expireBefore = (int) (System.currentTimeMillis() / 1000);
-         columnFamily = ArrayBackedSortedColumns.factory.create(metadata);
 -        this(metadata, in, filename, key, dataSize, checkData, sstable, flag, 
readDeletionTime(in, sstable, filename),
 -             metadata.getOnDiskIterator(in, flag, (int) 
(System.currentTimeMillis() / 1000), sstable == null ? 
Descriptor.Version.CURRENT : sstable.descriptor.version));
++        this(metadata, in, filename, key, checkData, sstable, flag, 
readDeletionTime(in, sstable, filename),
++             metadata.getOnDiskIterator(in, flag, (int) 
(System.currentTimeMillis() / 1000),
++                                        sstable == null ? 
DatabaseDescriptor.getSSTableFormat().info.getLatestVersion() : 
sstable.descriptor.version));
+     }
  
+     private static DeletionTime readDeletionTime(DataInput in, SSTableReader 
sstable, String filename)
+     {
          try
          {
-             columnFamily.delete(DeletionTime.serializer.deserialize(in));
-             atomIterator = columnFamily.metadata().getOnDiskIterator(in, 
flag, expireBefore, dataVersion);
+             return DeletionTime.serializer.deserialize(in);
          }
          catch (IOException e)
          {
@@@ -102,6 -128,32 +128,30 @@@
          }
      }
  
+     // sstable may be null *if* checkData is false
+     // If it is null, we assume the data is in the current file format
+     private SSTableIdentityIterator(CFMetaData metadata,
+                                     DataInput in,
+                                     String filename,
+                                     DecoratedKey key,
 -                                    long dataSize,
+                                     boolean checkData,
+                                     SSTableReader sstable,
+                                     ColumnSerializer.Flag flag,
+                                     DeletionTime deletion,
+                                     Iterator<OnDiskAtom> atomIterator)
+     {
+         assert !checkData || (sstable != null);
+         this.in = in;
+         this.filename = filename;
+         this.key = key;
 -        this.dataSize = dataSize;
+         this.flag = flag;
+         this.validateColumns = checkData;
+         this.sstable = sstable;
+         columnFamily = ArrayBackedSortedColumns.factory.create(metadata);
+         columnFamily.delete(deletion);
+         this.atomIterator = atomIterator;
+     }
+ 
      public DecoratedKey getKey()
      {
          return key;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index d064e69,0000000..505bac0
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@@ -1,587 -1,0 +1,588 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format.big;
 +
 +import java.io.*;
 +import java.nio.ByteBuffer;
 +import java.util.Collections;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
 +import org.apache.cassandra.io.sstable.format.Version;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.io.FSWriteError;
 +import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 +import org.apache.cassandra.io.sstable.metadata.MetadataType;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.io.util.*;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.FilterFactory;
 +import org.apache.cassandra.utils.IFilter;
 +import org.apache.cassandra.utils.StreamingHistogram;
 +import org.apache.cassandra.utils.concurrent.Transactional;
 +
 +import static org.apache.cassandra.utils.Throwables.merge;
 +import org.apache.cassandra.utils.SyncUtil;
 +
 +public class BigTableWriter extends SSTableWriter
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(BigTableWriter.class);
 +
 +    // not very random, but the only value that can't be mistaken for a legal 
column-name length
 +    public static final int END_OF_ROW = 0x0000;
 +
 +    private final IndexWriter iwriter;
 +    private SegmentedFile.Builder dbuilder;
 +    private final SequentialWriter dataFile;
 +    private DecoratedKey lastWrittenKey;
 +    private FileMark dataMark;
 +
 +    BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, 
CFMetaData metadata, IPartitioner partitioner, MetadataCollector 
metadataCollector)
 +    {
 +        super(descriptor, keyCount, repairedAt, metadata, partitioner, 
metadataCollector);
 +
 +        if (compression)
 +        {
 +            dataFile = SequentialWriter.open(getFilename(),
 +                                             
descriptor.filenameFor(Component.COMPRESSION_INFO),
 +                                             metadata.compressionParameters(),
 +                                             metadataCollector);
 +            dbuilder = 
SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile);
 +        }
 +        else
 +        {
 +            dataFile = SequentialWriter.open(new File(getFilename()), new 
File(descriptor.filenameFor(Component.CRC)));
 +            dbuilder = 
SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), false);
 +        }
 +        iwriter = new IndexWriter(keyCount, dataFile);
 +    }
 +
 +    public void mark()
 +    {
 +        dataMark = dataFile.mark();
 +        iwriter.mark();
 +    }
 +
 +    public void resetAndTruncate()
 +    {
 +        dataFile.resetAndTruncate(dataMark);
 +        iwriter.resetAndTruncate();
 +    }
 +
 +    /**
 +     * Perform sanity checks on @param decoratedKey and @return the position 
in the data file before any data is written
 +     */
 +    private long beforeAppend(DecoratedKey decoratedKey)
 +    {
 +        assert decoratedKey != null : "Keys must not be null"; // empty keys 
ARE allowed b/c of indexed column values
 +        if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) 
>= 0)
 +            throw new RuntimeException("Last written key " + lastWrittenKey + 
" >= current key " + decoratedKey + " writing into " + getFilename());
 +        return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
 +    }
 +
 +    private void afterAppend(DecoratedKey decoratedKey, long dataEnd, 
RowIndexEntry index) throws IOException
 +    {
 +        metadataCollector.addKey(decoratedKey.getKey());
 +        lastWrittenKey = decoratedKey;
 +        last = lastWrittenKey;
 +        if (first == null)
 +            first = lastWrittenKey;
 +
 +        if (logger.isTraceEnabled())
 +            logger.trace("wrote {} at {}", decoratedKey, dataEnd);
 +        iwriter.append(decoratedKey, index, dataEnd);
 +        dbuilder.addPotentialBoundary(dataEnd);
 +    }
 +
 +    /**
 +     * @param row
 +     * @return null if the row was compacted away entirely; otherwise, the PK 
index entry for this row
 +     */
 +    public RowIndexEntry append(AbstractCompactedRow row)
 +    {
 +        long startPosition = beforeAppend(row.key);
 +        RowIndexEntry entry;
 +        try
 +        {
 +            entry = row.write(startPosition, dataFile);
 +            if (entry == null)
 +                return null;
 +            long endPosition = dataFile.getFilePointer();
 +            long rowSize = endPosition - startPosition;
 +            maybeLogLargePartitionWarning(row.key, rowSize);
 +            metadataCollector.update(rowSize, row.columnStats());
 +            afterAppend(row.key, endPosition, entry);
 +            return entry;
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +    }
 +
 +    public void append(DecoratedKey decoratedKey, ColumnFamily cf)
 +    {
 +        if (decoratedKey.getKey().remaining() > 
FBUtilities.MAX_UNSIGNED_SHORT)
 +        {
 +            logger.error("Key size {} exceeds maximum of {}, skipping row",
 +                         decoratedKey.getKey().remaining(),
 +                         FBUtilities.MAX_UNSIGNED_SHORT);
 +            return;
 +        }
 +
 +        long startPosition = beforeAppend(decoratedKey);
 +        long endPosition;
 +        try
 +        {
 +            RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, 
dataFile.stream);
 +            endPosition = dataFile.getFilePointer();
 +            afterAppend(decoratedKey, endPosition, entry);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +        long rowSize = endPosition - startPosition;
 +        maybeLogLargePartitionWarning(decoratedKey, rowSize);
 +        metadataCollector.update(endPosition - startPosition, 
cf.getColumnStats());
 +    }
 +
 +    private void maybeLogLargePartitionWarning(DecoratedKey key, long rowSize)
 +    {
 +        if (rowSize > 
DatabaseDescriptor.getCompactionLargePartitionWarningThreshold())
 +        {
 +            String keyString = 
metadata.getKeyValidator().getString(key.getKey());
 +            logger.warn("Writing large partition {}/{}:{} ({} bytes)", 
metadata.ksName, metadata.cfName, keyString, rowSize);
 +        }
 +    }
 +
 +    private static RowIndexEntry rawAppend(ColumnFamily cf, long 
startPosition, DecoratedKey key, DataOutputPlus out) throws IOException
 +    {
 +        assert cf.hasColumns() || cf.isMarkedForDelete();
 +
 +        ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, 
key.getKey(), out);
 +        ColumnIndex index = builder.build(cf);
 +
 +        out.writeShort(END_OF_ROW);
 +        return RowIndexEntry.create(startPosition, 
cf.deletionInfo().getTopLevelDeletion(), index);
 +    }
 +
 +    /**
 +     * @throws IOException if a read from the DataInput fails
 +     * @throws FSWriteError if a write to the dataFile fails
 +     */
 +    public long appendFromStream(DecoratedKey key, CFMetaData metadata, 
DataInput in, Version version) throws IOException
 +    {
 +        long currentPosition = beforeAppend(key);
 +
 +        ColumnStats.MaxLongTracker maxTimestampTracker = new 
ColumnStats.MaxLongTracker(Long.MAX_VALUE);
 +        ColumnStats.MinLongTracker minTimestampTracker = new 
ColumnStats.MinLongTracker(Long.MIN_VALUE);
 +        ColumnStats.MaxIntTracker maxDeletionTimeTracker = new 
ColumnStats.MaxIntTracker(Integer.MAX_VALUE);
 +        List<ByteBuffer> minColumnNames = Collections.emptyList();
 +        List<ByteBuffer> maxColumnNames = Collections.emptyList();
 +        StreamingHistogram tombstones = new 
StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
 +        boolean hasLegacyCounterShards = false;
 +
 +        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
 +        cf.delete(DeletionTime.serializer.deserialize(in));
 +
 +        ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, 
key.getKey(), dataFile.stream);
 +
 +        if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < 
Integer.MAX_VALUE)
 +        {
 +            
tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
 +            
maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
 +            
minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
 +            
maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
 +        }
 +
 +        Iterator<RangeTombstone> rangeTombstoneIterator = 
cf.deletionInfo().rangeIterator();
 +        while (rangeTombstoneIterator.hasNext())
 +        {
 +            RangeTombstone rangeTombstone = rangeTombstoneIterator.next();
 +            tombstones.update(rangeTombstone.getLocalDeletionTime());
 +            minTimestampTracker.update(rangeTombstone.timestamp());
 +            maxTimestampTracker.update(rangeTombstone.timestamp());
 +            
maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime());
 +            minColumnNames = ColumnNameHelper.minComponents(minColumnNames, 
rangeTombstone.min, metadata.comparator);
 +            maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, 
rangeTombstone.max, metadata.comparator);
 +        }
 +
 +        Iterator<OnDiskAtom> iter = AbstractCell.onDiskIterator(in, 
ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version, 
metadata.comparator);
 +        try
 +        {
 +            while (iter.hasNext())
 +            {
 +                OnDiskAtom atom = iter.next();
 +                if (atom == null)
 +                    break;
 +
 +                if (atom instanceof CounterCell)
 +                {
 +                    atom = ((CounterCell) atom).markLocalToBeCleared();
 +                    hasLegacyCounterShards = hasLegacyCounterShards || 
((CounterCell) atom).hasLegacyShards();
 +                }
 +
 +                int deletionTime = atom.getLocalDeletionTime();
 +                if (deletionTime < Integer.MAX_VALUE)
 +                    tombstones.update(deletionTime);
 +                minTimestampTracker.update(atom.timestamp());
 +                maxTimestampTracker.update(atom.timestamp());
 +                minColumnNames = 
ColumnNameHelper.minComponents(minColumnNames, atom.name(), 
metadata.comparator);
 +                maxColumnNames = 
ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), 
metadata.comparator);
 +                maxDeletionTimeTracker.update(atom.getLocalDeletionTime());
 +
 +                columnIndexer.add(atom); // This write the atom on disk too
 +            }
++            columnIndexer.finishAddingAtoms();
 +
 +            columnIndexer.maybeWriteEmptyRowHeader();
 +            dataFile.stream.writeShort(END_OF_ROW);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +
 +        metadataCollector.updateMinTimestamp(minTimestampTracker.get())
 +                         .updateMaxTimestamp(maxTimestampTracker.get())
 +                         
.updateMaxLocalDeletionTime(maxDeletionTimeTracker.get())
 +                         .addRowSize(dataFile.getFilePointer() - 
currentPosition)
 +                         .addColumnCount(columnIndexer.writtenAtomCount())
 +                         .mergeTombstoneHistogram(tombstones)
 +                         .updateMinColumnNames(minColumnNames)
 +                         .updateMaxColumnNames(maxColumnNames)
 +                         
.updateHasLegacyCounterShards(hasLegacyCounterShards);
 +
 +        afterAppend(key, currentPosition, 
RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), 
columnIndexer.build()));
 +        return currentPosition;
 +    }
 +
 +    private Descriptor makeTmpLinks()
 +    {
 +        // create temp links if they don't already exist
 +        Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
 +        if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
 +        {
 +            FileUtils.createHardLink(new 
File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new 
File(link.filenameFor(Component.PRIMARY_INDEX)));
 +            FileUtils.createHardLink(new 
File(descriptor.filenameFor(Component.DATA)), new 
File(link.filenameFor(Component.DATA)));
 +        }
 +        return link;
 +    }
 +
 +    @SuppressWarnings("resource")
 +    public SSTableReader openEarly()
 +    {
 +        // find the max (exclusive) readable key
 +        IndexSummaryBuilder.ReadableBoundary boundary = 
iwriter.getMaxReadable();
 +        if (boundary == null)
 +            return null;
 +
 +        StatsMetadata stats = statsMetadata();
 +        assert boundary.indexLength > 0 && boundary.dataLength > 0;
 +        Descriptor link = makeTmpLinks();
 +        // open the reader early, giving it a FINAL descriptor type so that 
it is indistinguishable for other consumers
 +        SegmentedFile ifile = 
iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), 
boundary.indexLength);
 +        SegmentedFile dfile = 
dbuilder.complete(link.filenameFor(Component.DATA), boundary.dataLength);
 +        SSTableReader sstable = 
SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
 +                                                           components, 
metadata,
 +                                                           partitioner, ifile,
 +                                                           dfile, 
iwriter.summary.build(partitioner, boundary),
 +                                                           
iwriter.bf.sharedCopy(), maxDataAge, stats, SSTableReader.OpenReason.EARLY);
 +
 +        // now it's open, find the ACTUAL last readable key (i.e. for which 
the data file has also been flushed)
 +        sstable.first = getMinimalKey(first);
 +        sstable.last = getMinimalKey(boundary.lastKey);
 +        return sstable;
 +    }
 +
 +    public SSTableReader openFinalEarly()
 +    {
 +        // we must ensure the data is completely flushed to disk
 +        dataFile.sync();
 +        iwriter.indexFile.sync();
 +        return openFinal(makeTmpLinks(), SSTableReader.OpenReason.EARLY);
 +    }
 +
 +    @SuppressWarnings("resource")
 +    private SSTableReader openFinal(Descriptor desc, SSTableReader.OpenReason 
openReason)
 +    {
 +        if (maxDataAge < 0)
 +            maxDataAge = System.currentTimeMillis();
 +
 +        StatsMetadata stats = statsMetadata();
 +        // finalize in-memory state for the reader
 +        SegmentedFile ifile = 
iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX));
 +        SegmentedFile dfile = 
dbuilder.complete(desc.filenameFor(Component.DATA));
 +        SSTableReader sstable = 
SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL),
 +                                                           components,
 +                                                           this.metadata,
 +                                                           partitioner,
 +                                                           ifile,
 +                                                           dfile,
 +                                                           
iwriter.summary.build(partitioner),
 +                                                           
iwriter.bf.sharedCopy(),
 +                                                           maxDataAge,
 +                                                           stats,
 +                                                           openReason);
 +        sstable.first = getMinimalKey(first);
 +        sstable.last = getMinimalKey(last);
 +        return sstable;
 +    }
 +
 +    protected SSTableWriter.TransactionalProxy txnProxy()
 +    {
 +        return new TransactionalProxy();
 +    }
 +
 +    class TransactionalProxy extends SSTableWriter.TransactionalProxy
 +    {
 +        // finalise our state on disk, including renaming
 +        protected void doPrepare()
 +        {
 +            iwriter.prepareToCommit();
 +
 +            // write sstable statistics
 +            dataFile.setDescriptor(descriptor).prepareToCommit();
 +            writeMetadata(descriptor, finalizeMetadata());
 +
 +            // save the table of components
 +            SSTable.appendTOC(descriptor, components);
 +
 +            // rename to final
 +            rename(descriptor, components);
 +
 +            if (openResult)
 +                finalReader = 
openFinal(descriptor.asType(Descriptor.Type.FINAL), 
SSTableReader.OpenReason.NORMAL);
 +        }
 +
 +        protected Throwable doCommit(Throwable accumulate)
 +        {
 +            accumulate = dataFile.commit(accumulate);
 +            accumulate = iwriter.commit(accumulate);
 +            return accumulate;
 +        }
 +
 +        @Override
 +        protected Throwable doPreCleanup(Throwable accumulate)
 +        {
 +            accumulate = dbuilder.close(accumulate);
 +            return accumulate;
 +        }
 +
 +        protected Throwable doAbort(Throwable accumulate)
 +        {
 +            accumulate = iwriter.abort(accumulate);
 +            accumulate = dataFile.abort(accumulate);
 +
 +            accumulate = delete(descriptor, accumulate);
 +            if (!openResult)
 +                accumulate = delete(descriptor.asType(Descriptor.Type.FINAL), 
accumulate);
 +            return accumulate;
 +        }
 +
 +        private Throwable delete(Descriptor desc, Throwable accumulate)
 +        {
 +            try
 +            {
 +                Set<Component> components = 
SSTable.discoverComponentsFor(desc);
 +                if (!components.isEmpty())
 +                    SSTable.delete(desc, components);
 +            }
 +            catch (Throwable t)
 +            {
 +                logger.error(String.format("Failed deleting temp components 
for %s", descriptor), t);
 +                accumulate = merge(accumulate, t);
 +            }
 +            return accumulate;
 +        }
 +    }
 +
 +    private static void writeMetadata(Descriptor desc, Map<MetadataType, 
MetadataComponent> components)
 +    {
 +        File file = new File(desc.filenameFor(Component.STATS));
 +        try (SequentialWriter out = SequentialWriter.open(file);)
 +        {
 +            desc.getMetadataSerializer().serialize(components, out.stream);
 +            out.setDescriptor(desc).finish();
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, file.getPath());
 +        }
 +    }
 +
 +    public long getFilePointer()
 +    {
 +        return dataFile.getFilePointer();
 +    }
 +
 +    public long getOnDiskFilePointer()
 +    {
 +        return dataFile.getOnDiskFilePointer();
 +    }
 +
 +    /**
 +     * Encapsulates writing the index and filter for an SSTable. The state of 
this object is not valid until it has been closed.
 +     */
 +    class IndexWriter extends AbstractTransactional implements Transactional
 +    {
 +        private final SequentialWriter indexFile;
 +        public final SegmentedFile.Builder builder;
 +        public final IndexSummaryBuilder summary;
 +        public final IFilter bf;
 +        private FileMark mark;
 +
 +        IndexWriter(long keyCount, final SequentialWriter dataFile)
 +        {
 +            indexFile = SequentialWriter.open(new 
File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +            builder = 
SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false);
 +            summary = new IndexSummaryBuilder(keyCount, 
metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
 +            bf = FilterFactory.getFilter(keyCount, 
metadata.getBloomFilterFpChance(), true);
 +            // register listeners to be alerted when the data files are 
flushed
 +            indexFile.setPostFlushListener(new Runnable()
 +            {
 +                public void run()
 +                {
 +                    summary.markIndexSynced(indexFile.getLastFlushOffset());
 +                }
 +            });
 +            dataFile.setPostFlushListener(new Runnable()
 +            {
 +                public void run()
 +                {
 +                    summary.markDataSynced(dataFile.getLastFlushOffset());
 +                }
 +            });
 +        }
 +
 +        // finds the last (-offset) decorated key that can be guaranteed to 
occur fully in the flushed portion of the index file
 +        IndexSummaryBuilder.ReadableBoundary getMaxReadable()
 +        {
 +            return summary.getLastReadableBoundary();
 +        }
 +
 +        public void append(DecoratedKey key, RowIndexEntry indexEntry, long 
dataEnd) throws IOException
 +        {
 +            bf.add(key);
 +            long indexStart = indexFile.getFilePointer();
 +            try
 +            {
 +                ByteBufferUtil.writeWithShortLength(key.getKey(), 
indexFile.stream);
 +                rowIndexEntrySerializer.serialize(indexEntry, 
indexFile.stream);
 +            }
 +            catch (IOException e)
 +            {
 +                throw new FSWriteError(e, indexFile.getPath());
 +            }
 +            long indexEnd = indexFile.getFilePointer();
 +
 +            if (logger.isTraceEnabled())
 +                logger.trace("wrote index entry: {} at {}", indexEntry, 
indexStart);
 +
 +            summary.maybeAddEntry(key, indexStart, indexEnd, dataEnd);
 +            builder.addPotentialBoundary(indexStart);
 +        }
 +
 +        /**
 +         * Closes the index and bloomfilter, making the public state of this 
writer valid for consumption.
 +         */
 +        void flushBf()
 +        {
 +            if (components.contains(Component.FILTER))
 +            {
 +                String path = descriptor.filenameFor(Component.FILTER);
 +                try (FileOutputStream fos = new FileOutputStream(path);
 +                     DataOutputStreamPlus stream = new 
BufferedDataOutputStreamPlus(fos))
 +                {
 +                    // bloom filter
 +                    FilterFactory.serialize(bf, stream);
 +                    stream.flush();
 +                    SyncUtil.sync(fos);
 +                }
 +                catch (IOException e)
 +                {
 +                    throw new FSWriteError(e, path);
 +                }
 +            }
 +        }
 +
 +        public void mark()
 +        {
 +            mark = indexFile.mark();
 +        }
 +
 +        public void resetAndTruncate()
 +        {
 +            // we can't un-set the bloom filter addition, but extra keys in 
there are harmless.
 +            // we can't reset dbuilder either, but that is the last thing 
called in afterappend so
 +            // we assume that if that worked then we won't be trying to reset.
 +            indexFile.resetAndTruncate(mark);
 +        }
 +
 +        protected void doPrepare()
 +        {
 +            flushBf();
 +
 +            // truncate index file
 +            long position = iwriter.indexFile.getFilePointer();
 +            iwriter.indexFile.setDescriptor(descriptor).prepareToCommit();
 +            FileUtils.truncate(iwriter.indexFile.getPath(), position);
 +
 +            // save summary
 +            summary.prepareToCommit();
 +            try (IndexSummary summary = iwriter.summary.build(partitioner))
 +            {
 +                SSTableReader.saveSummary(descriptor, first, last, 
iwriter.builder, dbuilder, summary);
 +            }
 +        }
 +
 +        protected Throwable doCommit(Throwable accumulate)
 +        {
 +            return indexFile.commit(accumulate);
 +        }
 +
 +        protected Throwable doAbort(Throwable accumulate)
 +        {
 +            return indexFile.abort(accumulate);
 +        }
 +
 +        @Override
 +        protected Throwable doPreCleanup(Throwable accumulate)
 +        {
 +            accumulate = summary.close(accumulate);
 +            accumulate = bf.close(accumulate);
 +            accumulate = builder.close(accumulate);
 +            return accumulate;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db
----------------------------------------------------------------------
diff --cc test/data/corrupt-sstables/Keyspace1-StandardInteger1-ka-2-Summary.db
index 0000000,22cfa6a..190922a
mode 000000,100644..100644
Binary files differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ScrubTest.java
index 0d90354,167671b..b69a1f8
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@@ -20,98 -20,68 +20,103 @@@ package org.apache.cassandra.db
   *
   */
  
 -import java.io.*;
 +import java.lang.reflect.Field;
 +import java.lang.reflect.Modifier;
  import java.nio.ByteBuffer;
 -import java.util.Collections;
 -import java.util.HashSet;
 -import java.util.Iterator;
 -import java.util.List;
 -import java.util.Set;
 +import java.util.*;
  import java.util.concurrent.ExecutionException;
  
 +import java.io.File;
 +import java.io.IOError;
 +import java.io.IOException;
 +import java.io.RandomAccessFile;
 +
 +import org.apache.cassandra.config.KSMetaData;
  import org.apache.cassandra.cql3.QueryProcessor;
  import org.apache.cassandra.db.composites.CellNameType;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.db.marshal.CounterColumnType;
 +import org.apache.cassandra.db.marshal.UUIDType;
  import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.db.marshal.CompositeType;
 -import org.apache.cassandra.db.marshal.LongType;
 -import org.apache.cassandra.db.marshal.UTF8Type;
 +import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
  import org.apache.cassandra.exceptions.RequestExecutionException;
  import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.io.sstable.format.SSTableFormat;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.locator.SimpleStrategy;
  import org.apache.cassandra.utils.UUIDGen;
+ 
  import org.apache.commons.lang3.StringUtils;
+ 
  import org.junit.BeforeClass;
  import org.junit.Test;
  import org.junit.runner.RunWith;
  
 -import org.apache.cassandra.OrderedJUnit4ClassRunner;
 -import org.apache.cassandra.cql3.UntypedResultSet;
 -import org.apache.cassandra.SchemaLoader;
 -import org.apache.cassandra.Util;
  import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.cql3.Operator;
 +import org.apache.cassandra.cql3.UntypedResultSet;
  import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+ import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
  import org.apache.cassandra.db.compaction.CompactionManager;
  import org.apache.cassandra.db.compaction.Scrubber;
 +import org.apache.cassandra.db.index.SecondaryIndex;
 +import org.apache.cassandra.db.marshal.*;
  import org.apache.cassandra.exceptions.WriteTimeoutException;
  import org.apache.cassandra.io.sstable.Component;
  import org.apache.cassandra.io.sstable.Descriptor;
 -import org.apache.cassandra.io.sstable.SSTableReader;
 +import org.apache.cassandra.io.sstable.SSTableRewriter;
 +import org.apache.cassandra.OrderedJUnit4ClassRunner;
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.Util;
  import org.apache.cassandra.utils.ByteBufferUtil;
  
 +import static org.junit.Assert.*;
 +import static org.junit.Assume.assumeTrue;
 +
  import static org.apache.cassandra.Util.cellname;
  import static org.apache.cassandra.Util.column;
 -import static junit.framework.Assert.assertNotNull;
  import static org.junit.Assert.assertEquals;
 -import static org.junit.Assert.assertTrue;
  import static org.junit.Assert.fail;
 -import static org.junit.Assume.assumeTrue;
  
  @RunWith(OrderedJUnit4ClassRunner.class)
 -public class ScrubTest extends SchemaLoader
 +public class ScrubTest
  {
 -    public String KEYSPACE = "Keyspace1";
 -    public String CF = "Standard1";
 -    public String CF3 = "Standard2";
 -    public String COUNTER_CF = "Counter1";
 -    private static Integer COMPRESSION_CHUNK_LENGTH = 4096;
 +    public static final String KEYSPACE = "Keyspace1";
 +    public static final String CF = "Standard1";
 +    public static final String CF2 = "Standard2";
 +    public static final String CF3 = "Standard3";
++    public static final String CFI1 = "StandardInteger1";
 +    public static final String COUNTER_CF = "Counter1";
 +    public static final String CF_UUID = "UUIDKeys";
 +    public static final String CF_INDEX1 = "Indexed1";
 +    public static final String CF_INDEX2 = "Indexed2";
 +
 +    public static final String COL_KEYS_INDEX = "birthdate";
 +    public static final String COL_COMPOSITES_INDEX = "col1";
 +    public static final String COL_NON_INDEX = "notanindexcol";
 +
 +    public static final Integer COMPRESSION_CHUNK_LENGTH = 4096;
  
      @BeforeClass
 -    public static void loadSchema() throws ConfigurationException
 +    public static void defineSchema() throws ConfigurationException
      {
 -        loadSchema(COMPRESSION_CHUNK_LENGTH);
 +        SchemaLoader.loadSchema();
 +        SchemaLoader.createKeyspace(KEYSPACE,
 +                                    SimpleStrategy.class,
 +                                    KSMetaData.optsWithRF(1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE, CF),
 +                                    SchemaLoader.standardCFMD(KEYSPACE, CF2),
 +                                    SchemaLoader.standardCFMD(KEYSPACE, CF3),
++                                    SchemaLoader.standardCFMD(KEYSPACE, CFI1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE, 
COUNTER_CF)
 +                                                
.defaultValidator(CounterColumnType.instance)
 +                                                
.compressionParameters(SchemaLoader.getCompressionParameters(COMPRESSION_CHUNK_LENGTH)),
 +                                    SchemaLoader.standardCFMD(KEYSPACE, 
CF_UUID).keyValidator(UUIDType.instance),
 +                                    SchemaLoader.indexCFMD(KEYSPACE, 
CF_INDEX1, true),
 +                                    SchemaLoader.compositeIndexCFMD(KEYSPACE, 
CF_INDEX2, true));
      }
  
      @Test
@@@ -422,6 -364,59 +427,61 @@@
          assert rows.size() == 6 : "Got " + rows.size();
      }
  
+     @Test
+     public void testScrub10791() throws Exception
+     {
+         // Table is created by 
StreamingTransferTest.testTransferRangeTombstones with CASSANDRA-10791 fix 
disabled.
+         CompactionManager.instance.disableAutoCompaction();
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
 -        String columnFamily = "StandardInteger1";
++        String columnFamily = CFI1;
+         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(columnFamily);
+         cfs.clearUnsafe();
+ 
+         String root = System.getProperty("corrupt-sstable-root");
+         assert root != null;
+         File rootDir = new File(root);
+         assert rootDir.isDirectory();
 -        Descriptor desc = new Descriptor(new Descriptor.Version("ka"), 
rootDir, KEYSPACE, columnFamily, 2, Descriptor.Type.FINAL);
 -        CFMetaData metadata = Schema.instance.getCFMetaData(desc.ksname, 
desc.cfname);
++        Descriptor desc = new Descriptor("ka", rootDir, KEYSPACE, 
columnFamily, 2, Descriptor.Type.FINAL, SSTableFormat.Type.LEGACY);
+ 
+         // open without validation for scrubbing
+         Set<Component> components = new HashSet<>();
+         components.add(Component.DATA);
+         components.add(Component.PRIMARY_INDEX);
+         components.add(Component.FILTER);
+         components.add(Component.STATS);
+         components.add(Component.SUMMARY);
+         components.add(Component.TOC);
 -        SSTableReader sstable = SSTableReader.openNoValidation(desc, 
components, metadata);
++        SSTableReader sstable = SSTableReader.openNoValidation(desc, 
components, cfs);
+ 
 -        Scrubber scrubber = new Scrubber(cfs, sstable, false, true, true);
 -        scrubber.scrub();
++        try (LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.SCRUB, sstable);
++             Scrubber scrubber = new Scrubber(cfs, txn, false, true, true);)
++        {
++            scrubber.scrub();
++        }
+ 
+         cfs.loadNewSSTables();
+         assertEquals(7, countCells(cfs));
+     }
+ 
+     private int countCells(ColumnFamilyStore cfs)
+     {
+         int cellCount = 0;
+         for (SSTableReader sstable : cfs.getSSTables())
+         {
+             Iterator<OnDiskAtomIterator> it = sstable.getScanner();
+             while (it.hasNext())
+             {
+                 Iterator<OnDiskAtom> itr = it.next();
+                 while (itr.hasNext())
+                 {
+                     ++cellCount;
+                     itr.next();
+                 }
+             }
+         }
+         return cellCount;
+     }
+ 
      private void overrideWithGarbage(SSTableReader sstable, ByteBuffer key1, 
ByteBuffer key2) throws IOException
      {
          boolean compression = 
Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false"));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/882df8a2/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 3c799e2,31dc492..e751968
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@@ -26,7 -26,7 +26,8 @@@ import java.util.concurrent.TimeUnit
  import com.google.common.collect.Iterables;
  import com.google.common.util.concurrent.FutureCallback;
  import com.google.common.util.concurrent.Futures;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
+ 
  import org.junit.BeforeClass;
  import org.junit.Test;
  import org.junit.runner.RunWith;
@@@ -306,9 -280,21 +312,21 @@@ public class StreamingTransferTes
          // add RangeTombstones
          cf.delete(new DeletionInfo(cellname(2), cellname(3), 
cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
          cf.delete(new DeletionInfo(cellname(5), cellname(7), 
cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
+         cf.delete(new DeletionInfo(cellname(8), cellname(10), 
cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
 -        rm.apply();
++        rm.applyUnsafe();
+ 
+         key = "key1";
+         rm = new Mutation(ks, ByteBufferUtil.bytes(key));
+         // add columns of size slightly less than column_index_size to force 
insert column index
+         rm.add(cfname, cellname(1), ByteBuffer.wrap(new 
byte[DatabaseDescriptor.getColumnIndexSize() - 64]), 2);
+         cf = rm.addOrGet(cfname);
+         // add RangeTombstones
+         cf.delete(new DeletionInfo(cellname(2), cellname(3), 
cf.getComparator(), 1, (int) (System.currentTimeMillis() / 1000)));
 -        rm.apply();
 +        rm.applyUnsafe();
+ 
          cfs.forceBlockingFlush();
  
+         int cellCount = countCells(cfs);
          SSTableReader sstable = cfs.getSSTables().iterator().next();
          cfs.clearUnsafe();
          transferSSTables(sstable);

Reply via email to