This is an automated email from the ASF dual-hosted git repository.

blambov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3078aea1cf Introduce SSTableSimpleScanner for compaction
3078aea1cf is described below

commit 3078aea1cfc70092a185bab8ac5dc8a35627330f
Author: Branimir Lambov <[email protected]>
AuthorDate: Tue Nov 19 12:41:41 2024 +0200

    Introduce SSTableSimpleScanner for compaction
    
    This removes the usage of index files during compaction and simplifies
    and improves the performance of compaction.
    
    patch by Branimir Lambov; reviewed by Sylvain Lebresne for CASSANDRA-20092
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/cache/KeyCacheKey.java    |  12 +-
 .../db/compaction/CompactionIterator.java          |   2 +-
 .../cassandra/io/compress/CompressionMetadata.java |   8 +
 .../io/sstable/SSTableIdentityIterator.java        |  62 +++++++
 .../cassandra/io/sstable/format/SSTableReader.java | 142 ++++++++++++---
 .../io/sstable/format/SSTableScanner.java          |  16 --
 .../io/sstable/format/SSTableSimpleScanner.java    | 195 ++++++++++++++++++++
 .../io/sstable/format/big/BigTableReader.java      |  37 ----
 .../io/sstable/format/big/BigTableScanner.java     |  21 ---
 .../io/sstable/format/bti/BtiTableReader.java      |  23 ---
 .../io/sstable/format/bti/BtiTableScanner.java     |  21 ---
 .../apache/cassandra/tools/SSTablePartitions.java  |   4 +-
 .../io/sstable/SSTableCorruptionDetectionTest.java |  33 ++++
 .../cassandra/io/sstable/SSTableReaderTest.java    | 198 +++++++++++++++++++++
 .../cassandra/io/sstable/SSTableScannerTest.java   | 125 +++++++++++--
 16 files changed, 740 insertions(+), 160 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 49eb15619f..af48dfb1d3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Introduce SSTableSimpleScanner for compaction (CASSANDRA-20092)
  * Include column drop timestamp in alter table transformation 
(CASSANDRA-18961)
  * Make JMX SSL configurable in cassandra.yaml (CASSANDRA-18508)
  * Fix cqlsh CAPTURE command to save query results without trace details when 
TRACING is ON (CASSANDRA-19105)
diff --git a/src/java/org/apache/cassandra/cache/KeyCacheKey.java 
b/src/java/org/apache/cassandra/cache/KeyCacheKey.java
index ac6f1f9693..1a722b05e6 100644
--- a/src/java/org/apache/cassandra/cache/KeyCacheKey.java
+++ b/src/java/org/apache/cassandra/cache/KeyCacheKey.java
@@ -21,7 +21,6 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Objects;
 
-import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -31,9 +30,7 @@ public class KeyCacheKey extends CacheKey
 {
     public final Descriptor desc;
 
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new 
KeyCacheKey(TableMetadata.builder("ks", "tab")
-                                                                               
             .addPartitionKeyColumn("pk", UTF8Type.instance)
-                                                                               
             .build(), null, ByteBufferUtil.EMPTY_BYTE_BUFFER));
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new 
KeyCacheKey());
 
     // keeping an array instead of a ByteBuffer lowers the overhead of the key 
cache working set,
     // without extra copies on lookup since client-provided key ByteBuffers 
will be array-backed already
@@ -47,6 +44,13 @@ public class KeyCacheKey extends CacheKey
         assert this.key != null;
     }
 
+    private KeyCacheKey() // Only for EMPTY_SIZE
+    {
+        super(null, null);
+        this.desc = null;
+        this.key = null;
+    }
+
     public String toString()
     {
         return String.format("KeyCacheKey(%s, %s)", desc, 
ByteBufferUtil.bytesToHex(ByteBuffer.wrap(key)));
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index b27da6f0ae..eb1e761493 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -290,7 +290,7 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
     {
         long n = 0;
         for (ISSTableScanner scanner : scanners)
-            n += scanner.getCurrentPosition();
+            n += scanner.getBytesScanned();
         bytesRead = n;
     }
 
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java 
b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 96b4ce8418..d5f5f05655 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -539,6 +539,14 @@ public class CompressionMetadata extends 
WrappedSharedCloseable
         {
             return String.format("Chunk<offset: %d, length: %d>", offset, 
length);
         }
+
+        /**
+         * @return the end of the chunk in the file, including the checksum
+         */
+        public long chunkEnd()
+        {
+            return offset + length + 4;
+        }
     }
 
     static class ChunkSerializer implements IVersionedSerializer<Chunk>
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index cc201b4125..072a364af3 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -46,6 +46,7 @@ public class SSTableIdentityIterator implements 
Comparable<SSTableIdentityIterat
 
     protected final SSTableSimpleIterator iterator;
     private final Row staticRow;
+    boolean isClosed = false;
 
     public SSTableIdentityIterator(SSTableReader sstable, DecoratedKey key, 
DeletionTime partitionLevelDeletion,
             String filename, SSTableSimpleIterator iterator) throws IOException
@@ -100,6 +101,28 @@ public class SSTableIdentityIterator implements 
Comparable<SSTableIdentityIterat
         }
     }
 
+    public static SSTableIdentityIterator create(SSTableReader sstable, 
FileDataInput dfile, boolean tombstoneOnly)
+    {
+        try
+        {
+            DecoratedKey key = 
sstable.decorateKey(ByteBufferUtil.readWithShortLength(dfile));
+            DeletionTime partitionLevelDeletion = 
DeletionTime.getSerializer(sstable.descriptor.version).deserialize(dfile);
+            if (!partitionLevelDeletion.validate())
+                UnfilteredValidation.handleInvalid(sstable.metadata(), key, 
sstable, "partitionLevelDeletion="+partitionLevelDeletion.toString());
+
+            DeserializationHelper helper = new 
DeserializationHelper(sstable.metadata(), 
sstable.descriptor.version.correspondingMessagingVersion(), 
DeserializationHelper.Flag.LOCAL);
+            SSTableSimpleIterator iterator = tombstoneOnly
+                                             ? 
SSTableSimpleIterator.createTombstoneOnly(sstable.metadata(), dfile, 
sstable.header, helper, partitionLevelDeletion)
+                                             : 
SSTableSimpleIterator.create(sstable.metadata(), dfile, sstable.header, helper, 
partitionLevelDeletion);
+            return new SSTableIdentityIterator(sstable, key, 
partitionLevelDeletion, dfile.getPath(), iterator);
+        }
+        catch (IOException e)
+        {
+            sstable.markSuspect();
+            throw new CorruptSSTableException(e, dfile.getPath());
+        }
+    }
+
     public TableMetadata metadata()
     {
         return iterator.metadata;
@@ -159,6 +182,9 @@ public class SSTableIdentityIterator implements 
Comparable<SSTableIdentityIterat
     {
         try
         {
+            if (isClosed)
+                throw new IllegalStateException("Iterator used after 
closing.");
+
             return doCompute();
         }
         catch (IndexOutOfBoundsException | VIntOutOfRangeException | 
AssertionError e)
@@ -190,6 +216,42 @@ public class SSTableIdentityIterator implements 
Comparable<SSTableIdentityIterat
     public void close()
     {
         // creator is responsible for closing file when finished
+        isClosed = true;
+    }
+
+    public boolean isClosed()
+    {
+        return isClosed;
+    }
+
+    /**
+     * Called to advance to the next partition and make sure that we process 
all outstanding rows if user did not
+     * do so. Unlike next() and hasNext(), this can and will be called after 
the iterator is closed.
+     */
+    public void exhaust()
+    {
+        try
+        {
+            while (iterator.hasNext())
+                iterator.next();
+        }
+        catch (IndexOutOfBoundsException | VIntOutOfRangeException | 
AssertionError e)
+        {
+            sstable.markSuspect();
+            throw new CorruptSSTableException(e, filename);
+        }
+        catch (IOError e)
+        {
+            if (e.getCause() instanceof IOException)
+            {
+                sstable.markSuspect();
+                throw new CorruptSSTableException((Exception)e.getCause(), 
filename);
+            }
+            else
+            {
+                throw e;
+            }
+        }
     }
 
     public String getPath()
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 716d9a8157..fe99b8430d 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -719,7 +719,7 @@ public abstract class SSTableReader extends SSTable 
implements UnfilteredSource,
     /**
      * Determine the minimal set of sections that can be extracted from this 
SSTable to cover the given ranges.
      *
-     * @return A sorted list of (offset,end) pairs that cover the given ranges 
in the datafile for this SSTable.
+     * @return A sorted list of [offset,end) pairs that cover the given ranges 
in the datafile for this SSTable.
      */
     public List<PartitionPositionBounds> 
getPositionsForRanges(Collection<Range<Token>> ranges)
     {
@@ -728,27 +728,110 @@ public abstract class SSTableReader extends SSTable 
implements UnfilteredSource,
         for (Range<Token> range : Range.normalize(ranges))
         {
             assert !range.isWrapAround() || range.right.isMinimum();
-            // truncate the range so it at most covers the sstable
             AbstractBounds<PartitionPosition> bounds = 
Range.makeRowRange(range);
-            PartitionPosition leftBound = bounds.left.compareTo(first) > 0 ? 
bounds.left : first.getToken().minKeyBound();
-            PartitionPosition rightBound = bounds.right.isMinimum() ? 
last.getToken().maxKeyBound() : bounds.right;
+            PartitionPositionBounds pb = getPositionsForBounds(bounds);
+            if (pb != null)
+                positions.add(pb);
+        }
+        return positions;
+    }
 
-            if (leftBound.compareTo(last) > 0 || rightBound.compareTo(first) < 
0)
-                continue;
+    /**
+     * Get a list of data positions in this SSTable that correspond to the 
given list of bounds. This method will remove
+     * non-covered intervals, but will not correct order or overlap in the 
supplied list, e.g. if bounds overlap, the
+     * result will be sections of the data file that repeat the same positions.
+     *
+     * @return A sorted list of [offset,end) pairs corresponding to the given 
boundsList in the datafile for this
+     *         SSTable.
+     */
+    public List<PartitionPositionBounds> 
getPositionsForBoundsIterator(Iterator<AbstractBounds<PartitionPosition>> 
boundsList)
+    {
+        // use the index to determine a minimal section for each range
+        List<PartitionPositionBounds> positions = new ArrayList<>();
+        while (boundsList.hasNext())
+        {
+            AbstractBounds<PartitionPosition> bounds = boundsList.next();
+            PartitionPositionBounds pb = getPositionsForBounds(bounds);
+            if (pb != null)
+                positions.add(pb);
+        }
+        return positions;
+    }
 
-            long left = getPosition(leftBound, Operator.GT);
-            long right = (rightBound.compareTo(last) > 0)
-                         ? uncompressedLength()
-                         : getPosition(rightBound, Operator.GT);
+    /**
+     * Determine the data positions in this SSTable that cover the given 
bounds.
+     *
+     * @return An [offset,end) pair that cover the given bounds in the 
datafile for this SSTable, or null if the range
+     *         is not covered by the sstable or is empty.
+     */
+    public PartitionPositionBounds 
getPositionsForBounds(AbstractBounds<PartitionPosition> bounds)
+    {
+        long left = getPosition(bounds.left, bounds.inclusiveLeft() ? 
Operator.GE : Operator.GT);
+        // Note: getPosition will apply a moved start if the sstable is in 
MOVED_START state.
+        if (left < 0) // empty range
+            return null;
 
-            if (left == right)
-                // empty range
-                continue;
+        long right = bounds.right.isMinimum() ? -1
+                                              : getPosition(bounds.right, 
bounds.inclusiveRight() ? Operator.GT
+                                                                               
                   : Operator.GE);
+        if (right < 0) // right is beyond end
+            right = uncompressedLength();   // this should also be correct for 
EARLY readers
+
+        if (left >= right) // empty range
+            return null;
+
+        return new PartitionPositionBounds(left, right);
+    }
+
+    /**
+     * Return an [offset,end) pair that covers the whole file. This could be 
null if the sstable's moved start has
+     * made the sstable effectively empty.
+     */
+    public PartitionPositionBounds getPositionsForFullRange()
+    {
+        if (openReason != OpenReason.MOVED_START)
+            return new PartitionPositionBounds(0, uncompressedLength());
+        else
+        {
+            // query a full range, so that the required adjustments can be 
applied
+            PartitionPosition minToken = 
getPartitioner().getMinimumToken().minKeyBound();
+            return getPositionsForBounds(new Range<>(minToken, minToken));
+        }
+    }
 
-            assert left < right : String.format("Range=%s openReason=%s 
first=%s last=%s left=%d right=%d", range, openReason, first, last, left, 
right);
-            positions.add(new PartitionPositionBounds(left, right));
+    /**
+     * Calculate a total on-disk (compressed) size for the given partition 
positions. For uncompressed files this is
+     * equal to the sum of the size of the covered ranges. For compressed 
files this is the sum of the size of the
+     * chunks that contain the requested ranges and may be significantly 
bigger than the size of the requested ranges.
+     *
+     * @param positionBounds a list of [offset,end) pairs that specify the 
relevant sections of the data file; this must
+     *                       be non-overlapping and in ascending order.
+     */
+    public long 
onDiskSizeForPartitionPositions(Collection<PartitionPositionBounds> 
positionBounds)
+    {
+        long total = 0;
+        if (!compression)
+        {
+            for (PartitionPositionBounds position : positionBounds)
+                total += position.upperPosition - position.lowerPosition;
         }
-        return positions;
+        else
+        {
+            final CompressionMetadata compressionMetadata = 
getCompressionMetadata();
+            long lastEnd = 0;
+            for (PartitionPositionBounds position : positionBounds)
+            {
+                // The end of the chunk that contains the last required byte 
from the range.
+                long upperChunkEnd = 
compressionMetadata.chunkFor(position.upperPosition - 1).chunkEnd();
+                // The start of the chunk that contains the first required 
byte from the range.
+                long lowerChunkStart = 
compressionMetadata.chunkFor(position.lowerPosition).offset;
+                if (lowerChunkStart < lastEnd)  // if regions include the same 
chunk, count it only once
+                    lowerChunkStart = lastEnd;
+                total += upperChunkEnd - lowerChunkStart;
+                lastEnd = upperChunkEnd;
+            }
+        }
+        return total;
     }
 
     /**
@@ -940,11 +1023,18 @@ public abstract class SSTableReader extends SSTable 
implements UnfilteredSource,
     }
 
     /**
-     * Direct I/O SSTableScanner over the entirety of the sstable..
+     * Direct I/O SSTableScanner over the entirety of the sstable.
      *
      * @return A Scanner over the full content of the SSTable.
      */
-    public abstract ISSTableScanner getScanner();
+    public ISSTableScanner getScanner()
+    {
+        PartitionPositionBounds fullRange = getPositionsForFullRange();
+        if (fullRange != null)
+            return new SSTableSimpleScanner(this, 
Collections.singletonList(fullRange));
+        else
+            return new SSTableSimpleScanner(this, Collections.emptyList());
+    }
 
     /**
      * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
@@ -952,15 +1042,25 @@ public abstract class SSTableReader extends SSTable 
implements UnfilteredSource,
      * @param ranges the range of keys to cover
      * @return A Scanner for seeking over the rows of the SSTable.
      */
-    public abstract ISSTableScanner getScanner(Collection<Range<Token>> 
ranges);
+    public ISSTableScanner getScanner(Collection<Range<Token>> ranges)
+    {
+        if (ranges != null)
+            return new SSTableSimpleScanner(this, 
getPositionsForRanges(ranges));
+        else
+            return getScanner();
+    }
 
     /**
      * Direct I/O SSTableScanner over an iterator of bounds.
      *
-     * @param rangeIterator the keys to cover
+     * @param boundsIterator the keys to cover
      * @return A Scanner for seeking over the rows of the SSTable.
      */
-    public abstract ISSTableScanner 
getScanner(Iterator<AbstractBounds<PartitionPosition>> rangeIterator);
+    public ISSTableScanner 
getScanner(Iterator<AbstractBounds<PartitionPosition>> boundsIterator)
+    {
+        return new SSTableSimpleScanner(this, 
getPositionsForBoundsIterator(boundsIterator));
+    }
+
 
     /**
      * Create a {@link FileDataInput} for the data file of the sstable 
represented by this reader. This method returns
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/SSTableScanner.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableScanner.java
index 217c177206..86c0f7ec2d 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableScanner.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.io.sstable.format;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -35,9 +34,7 @@ import 
org.apache.cassandra.db.rows.LazilyInitializedUnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.AbstractBounds.Boundary;
-import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
@@ -86,14 +83,6 @@ implements ISSTableScanner
         this.listener = listener;
     }
 
-    protected static List<AbstractBounds<PartitionPosition>> 
makeBounds(SSTableReader sstable, Collection<Range<Token>> tokenRanges)
-    {
-        List<AbstractBounds<PartitionPosition>> boundsList = new 
ArrayList<>(tokenRanges.size());
-        for (Range<Token> range : Range.normalize(tokenRanges))
-            addRange(sstable, Range.makeRowRange(range), boundsList);
-        return boundsList;
-    }
-
     protected static List<AbstractBounds<PartitionPosition>> 
makeBounds(SSTableReader sstable, DataRange dataRange)
     {
         List<AbstractBounds<PartitionPosition>> boundsList = new 
ArrayList<>(2);
@@ -101,11 +90,6 @@ implements ISSTableScanner
         return boundsList;
     }
 
-    protected static AbstractBounds<PartitionPosition> fullRange(SSTableReader 
sstable)
-    {
-        return new Bounds<>(sstable.getFirst(), sstable.getLast());
-    }
-
     private static void addRange(SSTableReader sstable, 
AbstractBounds<PartitionPosition> requested, 
List<AbstractBounds<PartitionPosition>> boundsList)
     {
         if (requested instanceof Range && ((Range<?>) 
requested).isWrapAround())
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/SSTableSimpleScanner.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableSimpleScanner.java
new file mode 100644
index 0000000000..6015265ba0
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableSimpleScanner.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.TableMetadata;
+
+import static 
org.apache.cassandra.io.sstable.format.SSTableReader.PartitionPositionBounds;
+
+/// Simple SSTable scanner that reads sequentially through an SSTable without 
using the index.
+///
+/// This is a significant improvement for the performance of compaction over 
using the full-blown DataRange-capable
+/// [SSTableScanner] and enables correct calculation of data sizes to process.
+public class SSTableSimpleScanner
+implements ISSTableScanner
+{
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
+    private final RandomAccessReader dfile;
+    private final SSTableReader sstable;
+
+    private final Iterator<PartitionPositionBounds> rangeIterator;
+
+    private long bytesScannedInPreviousRanges;
+
+    private final long sizeInBytes;
+    private final long compressedSizeInBytes;
+
+    private long currentEndPosition;
+    private long currentStartPosition;
+
+    private SSTableIdentityIterator currentIterator;
+    private DecoratedKey lastKey;
+
+    /// Create a new simple scanner over the given sstables and the given 
ranges of uncompressed positions.
+    /// Each range must start and end on a partition boundary, and, to satisfy 
the contract of [ISSTableScanner], the
+    /// ranges must be non-overlapping and in ascending order. This scanner 
will throw an [IllegalArgumentException] if
+    /// the latter is not true.
+    ///
+    /// The ranges can be constructed by [SSTableReader#getPositionsForRanges] 
and similar methods as done by the
+    /// various [SSTableReader#getScanner] variations.
+    public SSTableSimpleScanner(SSTableReader sstable,
+                                Collection<PartitionPositionBounds> boundsList)
+    {
+        assert sstable != null;
+
+        this.dfile = sstable.openDataReader();
+        this.sstable = sstable;
+        this.sizeInBytes = boundsList.stream().mapToLong(ppb -> 
ppb.upperPosition - ppb.lowerPosition).sum();
+        this.compressedSizeInBytes = sstable.compression ? 
sstable.onDiskSizeForPartitionPositions(boundsList) : sizeInBytes;
+        this.rangeIterator = boundsList.iterator();
+        this.currentEndPosition = 0;
+        this.currentStartPosition = 0;
+        this.bytesScannedInPreviousRanges = 0;
+        this.currentIterator = null;
+        this.lastKey = null;
+    }
+
+    public void close()
+    {
+        if (isClosed.compareAndSet(false, true))
+        {
+            // ensure we report what we have actually processed
+            bytesScannedInPreviousRanges += dfile.getFilePointer() - 
currentStartPosition;
+            dfile.close();
+            // close() may change the file pointer, update so that the 
difference is 0 when reported by getBytesScanned()
+            currentStartPosition = dfile.getFilePointer();
+        }
+    }
+
+    @Override
+    public long getLengthInBytes()
+    {
+        return sizeInBytes;
+    }
+
+
+    public long getCompressedLengthInBytes()
+    {
+        return compressedSizeInBytes;
+    }
+
+    @Override
+    public long getCurrentPosition()
+    {
+        return dfile.getFilePointer();
+    }
+
+    public long getBytesScanned()
+    {
+        return bytesScannedInPreviousRanges + dfile.getFilePointer() - 
currentStartPosition;
+    }
+
+    @Override
+    public Set<SSTableReader> getBackingSSTables()
+    {
+        return ImmutableSet.of(sstable);
+    }
+
+    public TableMetadata metadata()
+    {
+        return sstable.metadata();
+    }
+
+    public boolean hasNext()
+    {
+        if (currentIterator != null)
+        {
+            currentIterator.close(); // Ensure that the iterator cannot be 
used further. No op if already closed.
+
+            // Row iterator must be exhausted to advance to next partition
+            currentIterator.exhaust();
+            currentIterator = null;
+        }
+
+        if (dfile.getFilePointer() < currentEndPosition)
+            return true;
+
+        return advanceRange();
+    }
+
+    boolean advanceRange()
+    {
+        if (!rangeIterator.hasNext())
+            return false;
+
+        bytesScannedInPreviousRanges += currentEndPosition - 
currentStartPosition;
+
+        PartitionPositionBounds nextRange = rangeIterator.next();
+        if (currentEndPosition > nextRange.lowerPosition)
+            throw new IllegalArgumentException("Ranges supplied to 
SSTableSimpleScanner must be non-overlapping and in ascending order.");
+
+        currentEndPosition = nextRange.upperPosition;
+        currentStartPosition = nextRange.lowerPosition;
+        dfile.seek(currentStartPosition);
+        return true;
+    }
+
+    public UnfilteredRowIterator next()
+    {
+        if (!hasNext())
+            throw new NoSuchElementException();
+
+        currentIterator = SSTableIdentityIterator.create(sstable, dfile, 
false);
+        DecoratedKey currentKey = currentIterator.partitionKey();
+        if (lastKey != null && lastKey.compareTo(currentKey) >= 0)
+        {
+            sstable.markSuspect();
+            throw new CorruptSSTableException(new 
IllegalStateException(String.format("Invalid key order: current %s <= previous 
%s",
+                                                                               
       currentKey,
+                                                                               
       lastKey)),
+                                              sstable.getFilename());
+        }
+        lastKey = currentKey;
+        return currentIterator;
+    }
+
+    public void remove()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("%s(sstable=%s)", getClass().getSimpleName(), 
sstable);
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index b58dbc532e..692cadf34d 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -46,7 +45,6 @@ import org.apache.cassandra.db.rows.Rows;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIteratorWithLowerBound;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
-import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
@@ -157,41 +155,6 @@ public class BigTableReader extends 
SSTableReaderWithFilter implements IndexSumm
         return BigTableKeyReader.create(ifile, rowIndexEntrySerializer);
     }
 
-    /**
-     * Direct I/O SSTableScanner over an iterator of bounds.
-     *
-     * @param boundsIterator the keys to cover
-     * @return A Scanner for seeking over the rows of the SSTable.
-     */
-    public ISSTableScanner 
getScanner(Iterator<AbstractBounds<PartitionPosition>> boundsIterator)
-    {
-        return BigTableScanner.getScanner(this, boundsIterator);
-    }
-
-    /**
-     * Direct I/O SSTableScanner over the full sstable.
-     *
-     * @return A Scanner for reading the full SSTable.
-     */
-    public ISSTableScanner getScanner()
-    {
-        return BigTableScanner.getScanner(this);
-    }
-
-    /**
-     * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
-     *
-     * @param ranges the range of keys to cover
-     * @return A Scanner for seeking over the rows of the SSTable.
-     */
-    public ISSTableScanner getScanner(Collection<Range<Token>> ranges)
-    {
-        if (ranges != null)
-            return BigTableScanner.getScanner(this, ranges);
-        else
-            return getScanner();
-    }
-
     /**
      * Finds and returns the first key beyond a given token in this SSTable or 
null if no such key exists.
      */
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index 887d997846..83243529c4 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -18,11 +18,8 @@
 package org.apache.cassandra.io.sstable.format.big;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Iterator;
 
-import com.google.common.collect.Iterators;
-
 import org.apache.cassandra.db.DataRange;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.PartitionPosition;
@@ -30,8 +27,6 @@ import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.SSTable;
@@ -50,12 +45,6 @@ public class BigTableScanner extends 
SSTableScanner<BigTableReader, RowIndexEntr
 
     private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
 
-    // Full scan of the sstables
-    public static ISSTableScanner getScanner(BigTableReader sstable)
-    {
-        return getScanner(sstable, 
Iterators.singletonIterator(fullRange(sstable)));
-    }
-
     public static ISSTableScanner getScanner(BigTableReader sstable,
                                              ColumnFilter columns,
                                              DataRange dataRange,
@@ -64,16 +53,6 @@ public class BigTableScanner extends 
SSTableScanner<BigTableReader, RowIndexEntr
         return new BigTableScanner(sstable, columns, dataRange, 
makeBounds(sstable, dataRange).iterator(), listener);
     }
 
-    public static ISSTableScanner getScanner(BigTableReader sstable, 
Collection<Range<Token>> tokenRanges)
-    {
-        return getScanner(sstable, makeBounds(sstable, 
tokenRanges).iterator());
-    }
-
-    public static ISSTableScanner getScanner(BigTableReader sstable, 
Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
-    {
-        return new BigTableScanner(sstable, 
ColumnFilter.all(sstable.metadata()), null, rangeIterator, 
SSTableReadsListener.NOOP_LISTENER);
-    }
-
     private BigTableScanner(BigTableReader sstable,
                             ColumnFilter columns,
                             DataRange dataRange,
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java
index c5571e7fbb..9a65be1137 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -45,7 +44,6 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.IVerifier;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReadsListener;
@@ -382,27 +380,6 @@ public class BtiTableReader extends SSTableReaderWithFilter
             return new SSTableIterator(this, dataFileInput, key, indexEntry, 
slices, selectedColumns, rowIndexFile);
     }
 
-    @Override
-    public ISSTableScanner getScanner()
-    {
-        return BtiTableScanner.getScanner(this);
-    }
-
-    @Override
-    public ISSTableScanner getScanner(Collection<Range<Token>> ranges)
-    {
-        if (ranges != null)
-            return BtiTableScanner.getScanner(this, ranges);
-        else
-            return getScanner();
-    }
-
-    @Override
-    public ISSTableScanner 
getScanner(Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
-    {
-        return BtiTableScanner.getScanner(this, rangeIterator);
-    }
-
     @VisibleForTesting
     @Override
     public BtiTableReader cloneAndReplace(IFilter filter)
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableScanner.java 
b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableScanner.java
index a9f862c68b..4507ccf7f5 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableScanner.java
@@ -19,11 +19,8 @@ package org.apache.cassandra.io.sstable.format.bti;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Iterator;
 
-import com.google.common.collect.Iterators;
-
 import org.apache.cassandra.db.DataRange;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.PartitionPosition;
@@ -31,20 +28,12 @@ import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableReadsListener;
 import org.apache.cassandra.io.sstable.format.SSTableScanner;
 import org.apache.cassandra.io.util.FileUtils;
 
 public class BtiTableScanner extends SSTableScanner<BtiTableReader, 
TrieIndexEntry, BtiTableScanner.BtiScanningIterator>
 {
-    // Full scan of the sstables
-    public static BtiTableScanner getScanner(BtiTableReader sstable)
-    {
-        return getScanner(sstable, 
Iterators.singletonIterator(fullRange(sstable)));
-    }
-
     public static BtiTableScanner getScanner(BtiTableReader sstable,
                                              ColumnFilter columns,
                                              DataRange dataRange,
@@ -53,16 +42,6 @@ public class BtiTableScanner extends 
SSTableScanner<BtiTableReader, TrieIndexEnt
         return new BtiTableScanner(sstable, columns, dataRange, 
makeBounds(sstable, dataRange).iterator(), listener);
     }
 
-    public static BtiTableScanner getScanner(BtiTableReader sstable, 
Collection<Range<Token>> tokenRanges)
-    {
-        return getScanner(sstable, makeBounds(sstable, 
tokenRanges).iterator());
-    }
-
-    public static BtiTableScanner getScanner(BtiTableReader sstable, 
Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
-    {
-        return new BtiTableScanner(sstable, 
ColumnFilter.all(sstable.metadata()), null, rangeIterator, 
SSTableReadsListener.NOOP_LISTENER);
-    }
-
     private BtiTableScanner(BtiTableReader sstable,
                             ColumnFilter columns,
                             DataRange dataRange,
diff --git a/src/java/org/apache/cassandra/tools/SSTablePartitions.java 
b/src/java/org/apache/cassandra/tools/SSTablePartitions.java
index 2181346271..b435994bba 100644
--- a/src/java/org/apache/cassandra/tools/SSTablePartitions.java
+++ b/src/java/org/apache/cassandra/tools/SSTablePartitions.java
@@ -369,13 +369,15 @@ public class SSTablePartitions
         {
             while (scanner.hasNext())
             {
+                // hasNext() positions us on the next partition, next() has to 
advance to read its header.
+                long startOfPartition = scanner.getCurrentPosition();
                 try (UnfilteredRowIterator partition = scanner.next())
                 {
                     ByteBuffer key = partition.partitionKey().getKey();
                     boolean isExcluded = 
excludedKeys.contains(metadata.partitionKeyType.getString(key));
 
                     PartitionStats partitionStats = new PartitionStats(key,
-                                                                       
scanner.getCurrentPosition(),
+                                                                       
startOfPartition,
                                                                        
partition.partitionLevelDeletion().isLive());
 
                     // Consume the partition to populate the stats.
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
index 21ac51ee86..c631d5d099 100644
--- 
a/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
+++ 
b/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.cache.ChunkCache;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataRange;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Slices;
@@ -156,6 +157,12 @@ public class SSTableCorruptionDetectionTest extends 
SSTableWriterTestBase
         bruteForceCorruptionTest(ssTableReader, sstableScanner());
     }
 
+    @Test
+    public void testSSTableSimpleScanner() throws Throwable
+    {
+        bruteForceCorruptionTest(ssTableReader, sstableSimpleScanner());
+    }
+
     private void bruteForceCorruptionTest(SSTableReader ssTableReader, 
Consumer<SSTableReader> walker) throws Throwable
     {
         FileChannel fc = new 
File(ssTableReader.getFilename()).newReadWriteChannel();
@@ -193,6 +200,32 @@ public class SSTableCorruptionDetectionTest extends 
SSTableWriterTestBase
     }
 
     private Consumer<SSTableReader> sstableScanner()
+    {
+        return (SSTableReader sstable) -> {
+            try (var scanner = sstable.partitionIterator(ColumnFilter.NONE, 
DataRange.allData(sstable.getPartitioner()), 
SSTableReadsListener.NOOP_LISTENER))
+            {
+                while (scanner.hasNext())
+                {
+                    try (UnfilteredRowIterator rowIter = scanner.next())
+                    {
+                        if (rowIter.hasNext())
+                        {
+                            Unfiltered unfiltered = rowIter.next();
+                            if (unfiltered.isRow())
+                            {
+                                Row row = (Row) unfiltered;
+                                assertEquals(2, row.clustering().size());
+                                // no-op read
+                            }
+                        }
+                    }
+
+                }
+            }
+        };
+    }
+
+    private Consumer<SSTableReader> sstableSimpleScanner()
     {
         return (SSTableReader sstable) -> {
             try (ISSTableScanner scanner = sstable.getScanner())
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 9cb7ca3d14..a87ec024b5 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -30,8 +30,11 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 import org.junit.Assume;
 import org.junit.BeforeClass;
@@ -89,6 +92,7 @@ import org.mockito.Mockito;
 
 import static java.lang.String.format;
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -178,6 +182,200 @@ public class SSTableReaderTest
         }
     }
 
+    @Test
+    public void testOnDiskSizeForRanges()
+    {
+        ColumnFamilyStore store = discardSSTables(KEYSPACE1, CF_STANDARD2);
+        partitioner = store.getPartitioner();
+        int count = 1000;
+
+        // insert data and compact to a single sstable
+        for (int j = 0; j < count; j++)
+        {
+            new RowUpdateBuilder(store.metadata(), 15000, k0(j))
+            .clustering("0")
+            .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+            .build()
+            .applyUnsafe();
+        }
+        store.forceBlockingFlush(UNIT_TESTS);
+        store.forceMajorCompaction();
+
+        SSTableReader sstable = store.getLiveSSTables().iterator().next();
+
+        // Non-compression-dependent checks
+        // Check several ways of going through the whole file
+        assertEquals(sstable.onDiskLength(),
+                     onDiskSizeForRanges(sstable, Collections.singleton(new 
Range<>(t(cut(k0(0), 1)), t0(count - 1)))));
+        assertEquals(sstable.onDiskLength(),
+                     onDiskSizeForRanges(sstable, Collections.singleton(new 
Range<>(sstable.getPartitioner().getMinimumToken(),
+                                                                               
     sstable.getPartitioner().getMinimumToken()))));
+        assertEquals(sstable.onDiskLength(),
+                     onDiskSizeForRanges(sstable, Collections.singleton(new 
Range<>(sstable.getPartitioner().getMinimumToken(),
+                                                                               
     sstable.getLast().getToken()))));
+
+        // Split at exact match
+        assertEquals(sstable.onDiskLength(),
+                     onDiskSizeForRanges(sstable, ImmutableList.of(new 
Range<>(t(cut(k0(0), 1)), t0(347)),
+                                                                   new 
Range<>(t0(347), t0(count - 1)))));
+
+        // Split at different prefixes pointing to the same position
+        assertEquals(sstable.onDiskLength(),
+                     onDiskSizeForRanges(sstable, ImmutableList.of(new 
Range<>(t(cut(k0(0), 1)), t(cut(k0(600), 2))),
+                                                                   new 
Range<>(t(cut(k0(600), 1)), t0(count - 1)))));
+
+        // Size one row
+        double oneRowSize = sstable.uncompressedLength() * 1.0 / count;
+        System.out.println("One row size: " + oneRowSize);
+
+        if (!sstable.compression)
+        {
+            double delta = 0.9;
+
+            // Ranges are end-inclusive, indexes are adjusted by one here to 
account for that.
+            assertEquals((52 - 38),
+                         onDiskSizeForRanges(sstable, 
Collections.singleton(new Range<>(t0(37), t0(51)))) / oneRowSize,
+                         delta);
+
+            // Try non-matching positions (inexact indexes are not adjusted 
for the count).
+            assertEquals((34 - 30),
+                         onDiskSizeForRanges(sstable, 
Collections.singleton(new Range<>(t(cut(k0(30), 1)),
+                                                                               
         t0(33)))) / oneRowSize,
+                         delta);
+
+            assertEquals((700 - 554),
+                         onDiskSizeForRanges(sstable, 
Collections.singleton(new Range<>(t0(553),
+                                                                               
        t(cut(k0(700), 2))))) / oneRowSize,
+                         delta);
+
+            assertEquals((500 - 30),
+                         onDiskSizeForRanges(sstable, 
Collections.singleton(new Range<>(t(cut(k0(30), 1)),
+                                                                               
        t(cut(k0(500), 2))))) / oneRowSize,
+                         delta);
+
+            // Try a list
+            List<Range<Token>> ranges = ImmutableList.of(new Range<>(t0(37), 
t0(51)),
+                                                         new Range<>(t0(71), 
t(cut(k0(100), 2))),
+                                                         new 
Range<>(t(cut(k0(230), 1)), t0(243)),
+                                                         new 
Range<>(t(cut(k0(260), 1)), t(cut(k0(300), 2))),
+                                                         new Range<>(t0(373), 
t0(382)),
+                                                         new Range<>(t0(382), 
t0(385)),
+                                                         new 
Range<>(t(cut(k0(400), 2)), t(cut(k0(400), 1))),  // empty range
+                                                         new Range<>(t0(563), 
t(cut(k0(600), 2))), // touching ranges
+                                                         new 
Range<>(t(cut(k0(600), 1)), t0(621))
+            );
+            assertEquals((52 - 38 + 100 - 72 + 244 - 230 + 300 - 260 + 383 - 
374 + 386 - 383 + 400 - 400 + 622 - 564),
+                         onDiskSizeForRanges(sstable, ranges) / oneRowSize,
+                         delta);
+
+            // Check going through the whole file
+            assertEquals(sstable.onDiskLength(),
+                         onDiskSizeForRanges(sstable, 
Collections.singleton(new Range<>(t(cut(k0(0), 1)), t0(count - 1)))));
+
+            assertEquals(sstable.onDiskLength(),
+                         onDiskSizeForRanges(sstable, ImmutableList.of(new 
Range<>(t(cut(k0(0), 1)), t0(347)),
+                                                                      new 
Range<>(t0(347), t0(count - 1)))));
+
+            assertEquals(sstable.onDiskLength(),
+                         onDiskSizeForRanges(sstable, ImmutableList.of(new 
Range<>(t(cut(k0(0), 1)), t(cut(k0(600), 2))),
+                                                                      new 
Range<>(t(cut(k0(600), 1)), t0(count - 1)))));
+        }
+        else
+        {
+            // It's much harder to test with compression.
+
+            // Check first three rows have the same size (they must be in the 
same chunk)
+            final long row0size = onDiskSizeForRanges(sstable, 
Collections.singleton(new Range<>(t(cut(k0(0), 1)), t0(0))));
+            assertEquals(row0size, onDiskSizeForRanges(sstable, 
Collections.singleton(new Range<>(t0(0), t0(1)))));
+            assertEquals(row0size, onDiskSizeForRanges(sstable, 
Collections.singleton(new Range<>(t0(1), t0(2)))));
+
+            // As well as the first three rows together
+            assertEquals(row0size, onDiskSizeForRanges(sstable, 
Collections.singleton(new Range<>(t(cut(k0(0), 1)), t0(2)))));
+
+            // And also when we query for them in separate ranges
+            assertEquals(row0size, onDiskSizeForRanges(sstable, 
ImmutableList.of(new Range<>(t(cut(k0(0), 1)), t0(0)),
+                                                                               
 new Range<>(t0(0), t0(1)))));
+            assertEquals(row0size, onDiskSizeForRanges(sstable, 
ImmutableList.of(new Range<>(t(cut(k0(0), 1)), t0(0)),
+                                                                               
 new Range<>(t0(1), t0(2)))));
+            assertEquals(row0size, onDiskSizeForRanges(sstable, 
ImmutableList.of(new Range<>(t(cut(k0(0), 1)), t0(0)),
+                                                                               
 new Range<>(t0(0), t0(1)),
+                                                                               
 new Range<>(t0(1), t0(2)))));
+
+            // Finally, check that if we query for every second row we get the 
total size of the file.
+            assertEquals(sstable.onDiskLength(),
+                         onDiskSizeForRanges(sstable, IntStream.range(0, count)
+                                                              .filter(i -> i % 
2 != 0)
+                                                              .mapToObj(i -> 
new Range<>(t0(i), t0(i + 1)))
+                                                              
.collect(Collectors.toList())));
+        }
+    }
+
+
+    @Test
+    public void testOnDiskSizeCompressedBoundaries()
+    {
+        ColumnFamilyStore store = discardSSTables(KEYSPACE1, CF_COMPRESSED);
+        partitioner = store.getPartitioner();
+        int count = 1000;
+        // Use a longish string to let a key align with a chunk boundary
+        ByteBuffer dataBuf = ByteBufferUtil.bytes(String.format("%43d", 123));
+
+        // insert data and compact to a single sstable
+        for (int j = 0; j < count; j++)
+        {
+            new RowUpdateBuilder(store.metadata(), 15000, k0(j))
+            .clustering("0")
+            .add("val", dataBuf)
+            .build()
+            .applyUnsafe();
+        }
+        store.forceBlockingFlush(UNIT_TESTS);
+        store.forceMajorCompaction();
+
+        SSTableReader sstable = store.getLiveSSTables().iterator().next();
+
+        int chunkLength = sstable.getCompressionMetadata().chunkLength();
+        System.out.println("Chunk length: " + chunkLength);
+        int[] alignedKeys = IntStream.range(0, count).filter(i -> 
(sstable.getPosition(dk0(i), SSTableReader.Operator.EQ) & (chunkLength - 1)) == 
0).toArray();
+        assertTrue("Test needs an aligned key, try changing the length of 
dataBuf", alignedKeys.length > 1);
+        for (int k : alignedKeys)
+            assertEquals("Coverage must not include chunk starting at end 
position",
+                         
sstable.getCompressionMetadata().chunkFor(sstable.getPosition(dk0(k), 
SSTableReader.Operator.EQ)).offset,
+                         onDiskSizeForRanges(sstable, 
Collections.singleton(new Range<>(partitioner.getMinimumToken(), t0(k - 1))))); 
  // inclusive end
+    }
+
+
+    long onDiskSizeForRanges(SSTableReader sstable, Collection<Range<Token>> 
ranges)
+    {
+        return 
sstable.onDiskSizeForPartitionPositions(sstable.getPositionsForRanges(ranges));
+    }
+
+    private Token t(String key)
+    {
+        return partitioner.getToken(ByteBufferUtil.bytes(key));
+    }
+
+    private String k0(int k)
+    {
+        return String.format("%08d", k);
+    }
+
+    private Token t0(int k)
+    {
+        return t(k0(k));
+    }
+
+    private DecoratedKey dk0(int k)
+    {
+        return partitioner.decorateKey(ByteBufferUtil.bytes(k0(k)));
+    }
+
+    private String cut(String s, int n)
+    {
+        return s.substring(0, s.length() - n);
+    }
+
+
     @Test
     public void testSpannedIndexPositions() throws IOException
     {
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
index 17b8a6cbb2..73195b0617 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
@@ -21,10 +21,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 import com.google.common.collect.Iterables;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -40,6 +43,7 @@ import org.apache.cassandra.db.Slices;
 import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
@@ -49,6 +53,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.hamcrest.Matchers;
 
 import static org.apache.cassandra.dht.AbstractBounds.isEmpty;
 import static org.junit.Assert.assertEquals;
@@ -180,6 +185,12 @@ public class SSTableScannerTest
     }
 
     private static void assertScanMatches(SSTableReader sstable, int 
scanStart, int scanEnd, int ... boundaries)
+    {
+        assertScanMatchesUsingScanner(sstable, scanStart, scanEnd, boundaries);
+        assertScanMatchesUsingSimple(sstable, scanStart, scanEnd, boundaries);
+    }
+
+    private static void assertScanMatchesUsingScanner(SSTableReader sstable, 
int scanStart, int scanEnd, int ... boundaries)
     {
         assert boundaries.length % 2 == 0;
         for (DataRange range : dataRanges(sstable.metadata(), scanStart, 
scanEnd))
@@ -200,6 +211,28 @@ public class SSTableScannerTest
         }
     }
 
+    private static void assertScanMatchesUsingSimple(SSTableReader sstable, 
int scanStart, int scanEnd, int ... boundaries)
+    {
+        assert boundaries.length % 2 == 0;
+        for (DataRange range : dataRanges(sstable.metadata(), scanStart, 
scanEnd))
+        {
+            if (range.isWrapAround() && !range.keyRange().right.isMinimum()) 
// getScanner on AbstractBounds<PartitionPosition> does not handle wraparounds
+                continue;
+
+            try(UnfilteredPartitionIterator scanner = 
sstable.getScanner(Collections.singleton(range.keyRange()).iterator()))
+            {
+                for (int b = 0; b < boundaries.length; b += 2)
+                    for (int i = boundaries[b]; i <= boundaries[b + 1]; i++)
+                        assertEquals(toKey(i), new 
String(scanner.next().partitionKey().getKey().array()));
+                assertFalse(scanner.hasNext());
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
     private static void assertScanEmpty(SSTableReader sstable, int scanStart, 
int scanEnd)
     {
         assertScanMatches(sstable, scanStart, scanEnd);
@@ -547,7 +580,30 @@ public class SSTableScannerTest
         assertScanContainsRanges(scanner, 205, 205);
     }
 
-    private static void 
testRequestNextRowIteratorWithoutConsumingPrevious(Consumer<ISSTableScanner> 
consumer)
+    private static void 
testRequestNextRowIteratorWithoutConsumingPrevious(Function<SSTableReader, 
UnfilteredPartitionIterator> makeScanner,
+                                                                           
Consumer<UnfilteredPartitionIterator> requestNext,
+                                                                           
String messagePattern)
+    {
+        final SSTableReader sstable = prepareSmallSSTable();
+
+        try (UnfilteredPartitionIterator scanner = makeScanner.apply(sstable);
+             UnfilteredRowIterator currentRowIterator = scanner.next())
+        {
+            assertTrue(currentRowIterator.hasNext());
+            try
+            {
+                requestNext.accept(scanner);
+                currentRowIterator.next();
+                fail("Should have thrown IllegalStateException");
+            }
+            catch (IllegalStateException e)
+            {
+                Assert.assertThat(e.getMessage(), 
Matchers.matchesPattern(messagePattern));
+            }
+        }
+    }
+
+    private static SSTableReader prepareSmallSSTable()
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore store = keyspace.getColumnFamilyStore(TABLE);
@@ -557,38 +613,77 @@ public class SSTableScannerTest
         store.disableAutoCompaction();
 
         insertRowWithKey(store.metadata(), 0);
+        insertRowWithKey(store.metadata(), 3);
         store.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
 
         assertEquals(1, store.getLiveSSTables().size());
         SSTableReader sstable = store.getLiveSSTables().iterator().next();
+        return sstable;
+    }
 
-        try (ISSTableScanner scanner = sstable.getScanner();
-             UnfilteredRowIterator currentRowIterator = scanner.next())
+    @Test
+    public void testSimpleHasNextRowIteratorWithoutConsumingPrevious()
+    {
+        
testRequestNextRowIteratorWithoutConsumingPrevious(SSTableReader::getScanner,
+                                                           
UnfilteredPartitionIterator::hasNext,
+                                                           "Iterator used 
after closing.");
+    }
+
+    @Test
+    public void testSimpleNextRowIteratorWithoutConsumingPrevious()
+    {
+        
testRequestNextRowIteratorWithoutConsumingPrevious(SSTableReader::getScanner,
+                                                           
UnfilteredPartitionIterator::next,
+                                                           "Iterator used 
after closing.");
+    }
+
+    @Test
+    public void testHasNextRowIteratorWithoutConsumingPrevious()
+    {
+        testRequestNextRowIteratorWithoutConsumingPrevious(r -> 
r.partitionIterator(ColumnFilter.NONE, DataRange.allData(r.getPartitioner()), 
SSTableReadsListener.NOOP_LISTENER),
+                                                           
UnfilteredPartitionIterator::hasNext,
+                                                           
".*UnfilteredRowIterator.*must be closed.*");
+    }
+
+    @Test
+    public void testNextRowIteratorWithoutConsumingPrevious()
+    {
+        testRequestNextRowIteratorWithoutConsumingPrevious(r -> 
r.partitionIterator(ColumnFilter.NONE, DataRange.allData(r.getPartitioner()), 
SSTableReadsListener.NOOP_LISTENER),
+                                                           
UnfilteredPartitionIterator::next,
+                                                           
".*UnfilteredRowIterator.*must be closed.*");
+    }
+
+    private static void 
testRequestNextRowIteratorAfterClosingPrevious(Function<SSTableReader, 
UnfilteredPartitionIterator> makeScanner)
+    {
+        final SSTableReader sstable = prepareSmallSSTable();
+
+        try (UnfilteredPartitionIterator scanner = makeScanner.apply(sstable))
         {
-            assertTrue(currentRowIterator.hasNext());
-            try
+            try (UnfilteredRowIterator p = scanner.next())
             {
-                consumer.accept(scanner);
-                fail("Should have thrown IllegalStateException");
+                assertEquals(toKey(0), new 
String(p.partitionKey().getKey().array()));
+                // do not read it, but close it
             }
-            catch (IllegalStateException e)
+
+            try (UnfilteredRowIterator p = scanner.next())
             {
-                assertEquals("The UnfilteredRowIterator returned by the last 
call to next() was initialized: " +
-                             "it must be closed before calling hasNext() or 
next() again.",
-                             e.getMessage());
+                assertEquals(toKey(3), new 
String(p.partitionKey().getKey().array()));
+                assertTrue(p.hasNext());
+                assertTrue(p.next() instanceof Row);
             }
         }
     }
 
+
     @Test
-    public void testHasNextRowIteratorWithoutConsumingPrevious()
+    public void testSimpleRequestNextRowIteratorAfterClosingPreviouss()
     {
-        
testRequestNextRowIteratorWithoutConsumingPrevious(ISSTableScanner::hasNext);
+        
testRequestNextRowIteratorAfterClosingPrevious(SSTableReader::getScanner);
     }
 
     @Test
-    public void testNextRowIteratorWithoutConsumingPrevious()
+    public void testRequestNextRowIteratorAfterClosingPrevious()
     {
-        
testRequestNextRowIteratorWithoutConsumingPrevious(ISSTableScanner::next);
+        testRequestNextRowIteratorAfterClosingPrevious(r -> 
r.partitionIterator(ColumnFilter.NONE, DataRange.allData(r.getPartitioner()), 
SSTableReadsListener.NOOP_LISTENER));
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to