This is an automated email from the ASF dual-hosted git repository.
blambov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3078aea1cf Introduce SSTableSimpleScanner for compaction
3078aea1cf is described below
commit 3078aea1cfc70092a185bab8ac5dc8a35627330f
Author: Branimir Lambov <[email protected]>
AuthorDate: Tue Nov 19 12:41:41 2024 +0200
Introduce SSTableSimpleScanner for compaction
This removes the usage of index files during compaction and simplifies
and improves the performance of compaction.
patch by Branimir Lambov; reviewed by Sylvain Lebresne for CASSANDRA-20092
---
CHANGES.txt | 1 +
.../org/apache/cassandra/cache/KeyCacheKey.java | 12 +-
.../db/compaction/CompactionIterator.java | 2 +-
.../cassandra/io/compress/CompressionMetadata.java | 8 +
.../io/sstable/SSTableIdentityIterator.java | 62 +++++++
.../cassandra/io/sstable/format/SSTableReader.java | 142 ++++++++++++---
.../io/sstable/format/SSTableScanner.java | 16 --
.../io/sstable/format/SSTableSimpleScanner.java | 195 ++++++++++++++++++++
.../io/sstable/format/big/BigTableReader.java | 37 ----
.../io/sstable/format/big/BigTableScanner.java | 21 ---
.../io/sstable/format/bti/BtiTableReader.java | 23 ---
.../io/sstable/format/bti/BtiTableScanner.java | 21 ---
.../apache/cassandra/tools/SSTablePartitions.java | 4 +-
.../io/sstable/SSTableCorruptionDetectionTest.java | 33 ++++
.../cassandra/io/sstable/SSTableReaderTest.java | 198 +++++++++++++++++++++
.../cassandra/io/sstable/SSTableScannerTest.java | 125 +++++++++++--
16 files changed, 740 insertions(+), 160 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 49eb15619f..af48dfb1d3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Introduce SSTableSimpleScanner for compaction (CASSANDRA-20092)
* Include column drop timestamp in alter table transformation
(CASSANDRA-18961)
* Make JMX SSL configurable in cassandra.yaml (CASSANDRA-18508)
* Fix cqlsh CAPTURE command to save query results without trace details when
TRACING is ON (CASSANDRA-19105)
diff --git a/src/java/org/apache/cassandra/cache/KeyCacheKey.java
b/src/java/org/apache/cassandra/cache/KeyCacheKey.java
index ac6f1f9693..1a722b05e6 100644
--- a/src/java/org/apache/cassandra/cache/KeyCacheKey.java
+++ b/src/java/org/apache/cassandra/cache/KeyCacheKey.java
@@ -21,7 +21,6 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
-import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -31,9 +30,7 @@ public class KeyCacheKey extends CacheKey
{
public final Descriptor desc;
- private static final long EMPTY_SIZE = ObjectSizes.measure(new
KeyCacheKey(TableMetadata.builder("ks", "tab")
-
.addPartitionKeyColumn("pk", UTF8Type.instance)
-
.build(), null, ByteBufferUtil.EMPTY_BYTE_BUFFER));
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new
KeyCacheKey());
// keeping an array instead of a ByteBuffer lowers the overhead of the key
cache working set,
// without extra copies on lookup since client-provided key ByteBuffers
will be array-backed already
@@ -47,6 +44,13 @@ public class KeyCacheKey extends CacheKey
assert this.key != null;
}
+ private KeyCacheKey() // Only for EMPTY_SIZE
+ {
+ super(null, null);
+ this.desc = null;
+ this.key = null;
+ }
+
public String toString()
{
return String.format("KeyCacheKey(%s, %s)", desc,
ByteBufferUtil.bytesToHex(ByteBuffer.wrap(key)));
diff --git
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index b27da6f0ae..eb1e761493 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -290,7 +290,7 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
{
long n = 0;
for (ISSTableScanner scanner : scanners)
- n += scanner.getCurrentPosition();
+ n += scanner.getBytesScanned();
bytesRead = n;
}
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 96b4ce8418..d5f5f05655 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -539,6 +539,14 @@ public class CompressionMetadata extends
WrappedSharedCloseable
{
return String.format("Chunk<offset: %d, length: %d>", offset,
length);
}
+
+ /**
+ * @return the end of the chunk in the file, including the checksum
+ */
+ public long chunkEnd()
+ {
+ return offset + length + 4;
+ }
}
static class ChunkSerializer implements IVersionedSerializer<Chunk>
diff --git
a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index cc201b4125..072a364af3 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -46,6 +46,7 @@ public class SSTableIdentityIterator implements
Comparable<SSTableIdentityIterat
protected final SSTableSimpleIterator iterator;
private final Row staticRow;
+ boolean isClosed = false;
public SSTableIdentityIterator(SSTableReader sstable, DecoratedKey key,
DeletionTime partitionLevelDeletion,
String filename, SSTableSimpleIterator iterator) throws IOException
@@ -100,6 +101,28 @@ public class SSTableIdentityIterator implements
Comparable<SSTableIdentityIterat
}
}
+ public static SSTableIdentityIterator create(SSTableReader sstable,
FileDataInput dfile, boolean tombstoneOnly)
+ {
+ try
+ {
+ DecoratedKey key =
sstable.decorateKey(ByteBufferUtil.readWithShortLength(dfile));
+ DeletionTime partitionLevelDeletion =
DeletionTime.getSerializer(sstable.descriptor.version).deserialize(dfile);
+ if (!partitionLevelDeletion.validate())
+ UnfilteredValidation.handleInvalid(sstable.metadata(), key,
sstable, "partitionLevelDeletion="+partitionLevelDeletion.toString());
+
+ DeserializationHelper helper = new
DeserializationHelper(sstable.metadata(),
sstable.descriptor.version.correspondingMessagingVersion(),
DeserializationHelper.Flag.LOCAL);
+ SSTableSimpleIterator iterator = tombstoneOnly
+ ?
SSTableSimpleIterator.createTombstoneOnly(sstable.metadata(), dfile,
sstable.header, helper, partitionLevelDeletion)
+ :
SSTableSimpleIterator.create(sstable.metadata(), dfile, sstable.header, helper,
partitionLevelDeletion);
+ return new SSTableIdentityIterator(sstable, key,
partitionLevelDeletion, dfile.getPath(), iterator);
+ }
+ catch (IOException e)
+ {
+ sstable.markSuspect();
+ throw new CorruptSSTableException(e, dfile.getPath());
+ }
+ }
+
public TableMetadata metadata()
{
return iterator.metadata;
@@ -159,6 +182,9 @@ public class SSTableIdentityIterator implements
Comparable<SSTableIdentityIterat
{
try
{
+ if (isClosed)
+ throw new IllegalStateException("Iterator used after
closing.");
+
return doCompute();
}
catch (IndexOutOfBoundsException | VIntOutOfRangeException |
AssertionError e)
@@ -190,6 +216,42 @@ public class SSTableIdentityIterator implements
Comparable<SSTableIdentityIterat
public void close()
{
// creator is responsible for closing file when finished
+ isClosed = true;
+ }
+
+ public boolean isClosed()
+ {
+ return isClosed;
+ }
+
+ /**
+ * Called to advance to the next partition and make sure that we process
all outstanding rows if user did not
+ * do so. Unlike next() and hasNext(), this can and will be called after
the iterator is closed.
+ */
+ public void exhaust()
+ {
+ try
+ {
+ while (iterator.hasNext())
+ iterator.next();
+ }
+ catch (IndexOutOfBoundsException | VIntOutOfRangeException |
AssertionError e)
+ {
+ sstable.markSuspect();
+ throw new CorruptSSTableException(e, filename);
+ }
+ catch (IOError e)
+ {
+ if (e.getCause() instanceof IOException)
+ {
+ sstable.markSuspect();
+ throw new CorruptSSTableException((Exception)e.getCause(),
filename);
+ }
+ else
+ {
+ throw e;
+ }
+ }
}
public String getPath()
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 716d9a8157..fe99b8430d 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -719,7 +719,7 @@ public abstract class SSTableReader extends SSTable
implements UnfilteredSource,
/**
* Determine the minimal set of sections that can be extracted from this
SSTable to cover the given ranges.
*
- * @return A sorted list of (offset,end) pairs that cover the given ranges
in the datafile for this SSTable.
+ * @return A sorted list of [offset,end) pairs that cover the given ranges
in the datafile for this SSTable.
*/
public List<PartitionPositionBounds>
getPositionsForRanges(Collection<Range<Token>> ranges)
{
@@ -728,27 +728,110 @@ public abstract class SSTableReader extends SSTable
implements UnfilteredSource,
for (Range<Token> range : Range.normalize(ranges))
{
assert !range.isWrapAround() || range.right.isMinimum();
- // truncate the range so it at most covers the sstable
AbstractBounds<PartitionPosition> bounds =
Range.makeRowRange(range);
- PartitionPosition leftBound = bounds.left.compareTo(first) > 0 ?
bounds.left : first.getToken().minKeyBound();
- PartitionPosition rightBound = bounds.right.isMinimum() ?
last.getToken().maxKeyBound() : bounds.right;
+ PartitionPositionBounds pb = getPositionsForBounds(bounds);
+ if (pb != null)
+ positions.add(pb);
+ }
+ return positions;
+ }
- if (leftBound.compareTo(last) > 0 || rightBound.compareTo(first) <
0)
- continue;
+ /**
+ * Get a list of data positions in this SSTable that correspond to the
given list of bounds. This method will remove
+ * non-covered intervals, but will not correct order or overlap in the
supplied list, e.g. if bounds overlap, the
+ * result will be sections of the data file that repeat the same positions.
+ *
+ * @return A sorted list of [offset,end) pairs corresponding to the given
boundsList in the datafile for this
+ * SSTable.
+ */
+ public List<PartitionPositionBounds>
getPositionsForBoundsIterator(Iterator<AbstractBounds<PartitionPosition>>
boundsList)
+ {
+ // use the index to determine a minimal section for each range
+ List<PartitionPositionBounds> positions = new ArrayList<>();
+ while (boundsList.hasNext())
+ {
+ AbstractBounds<PartitionPosition> bounds = boundsList.next();
+ PartitionPositionBounds pb = getPositionsForBounds(bounds);
+ if (pb != null)
+ positions.add(pb);
+ }
+ return positions;
+ }
- long left = getPosition(leftBound, Operator.GT);
- long right = (rightBound.compareTo(last) > 0)
- ? uncompressedLength()
- : getPosition(rightBound, Operator.GT);
+ /**
+ * Determine the data positions in this SSTable that cover the given
bounds.
+ *
+ * @return An [offset,end) pair that cover the given bounds in the
datafile for this SSTable, or null if the range
+ * is not covered by the sstable or is empty.
+ */
+ public PartitionPositionBounds
getPositionsForBounds(AbstractBounds<PartitionPosition> bounds)
+ {
+ long left = getPosition(bounds.left, bounds.inclusiveLeft() ?
Operator.GE : Operator.GT);
+ // Note: getPosition will apply a moved start if the sstable is in
MOVED_START state.
+ if (left < 0) // empty range
+ return null;
- if (left == right)
- // empty range
- continue;
+ long right = bounds.right.isMinimum() ? -1
+ : getPosition(bounds.right,
bounds.inclusiveRight() ? Operator.GT
+
: Operator.GE);
+ if (right < 0) // right is beyond end
+ right = uncompressedLength(); // this should also be correct for
EARLY readers
+
+ if (left >= right) // empty range
+ return null;
+
+ return new PartitionPositionBounds(left, right);
+ }
+
+ /**
+ * Return an [offset,end) pair that covers the whole file. This could be
null if the sstable's moved start has
+ * made the sstable effectively empty.
+ */
+ public PartitionPositionBounds getPositionsForFullRange()
+ {
+ if (openReason != OpenReason.MOVED_START)
+ return new PartitionPositionBounds(0, uncompressedLength());
+ else
+ {
+ // query a full range, so that the required adjustments can be
applied
+ PartitionPosition minToken =
getPartitioner().getMinimumToken().minKeyBound();
+ return getPositionsForBounds(new Range<>(minToken, minToken));
+ }
+ }
- assert left < right : String.format("Range=%s openReason=%s
first=%s last=%s left=%d right=%d", range, openReason, first, last, left,
right);
- positions.add(new PartitionPositionBounds(left, right));
+ /**
+ * Calculate a total on-disk (compressed) size for the given partition
positions. For uncompressed files this is
+ * equal to the sum of the size of the covered ranges. For compressed
files this is the sum of the size of the
+ * chunks that contain the requested ranges and may be significantly
bigger than the size of the requested ranges.
+ *
+ * @param positionBounds a list of [offset,end) pairs that specify the
relevant sections of the data file; this must
+ * be non-overlapping and in ascending order.
+ */
+ public long
onDiskSizeForPartitionPositions(Collection<PartitionPositionBounds>
positionBounds)
+ {
+ long total = 0;
+ if (!compression)
+ {
+ for (PartitionPositionBounds position : positionBounds)
+ total += position.upperPosition - position.lowerPosition;
}
- return positions;
+ else
+ {
+ final CompressionMetadata compressionMetadata =
getCompressionMetadata();
+ long lastEnd = 0;
+ for (PartitionPositionBounds position : positionBounds)
+ {
+ // The end of the chunk that contains the last required byte
from the range.
+ long upperChunkEnd =
compressionMetadata.chunkFor(position.upperPosition - 1).chunkEnd();
+ // The start of the chunk that contains the first required
byte from the range.
+ long lowerChunkStart =
compressionMetadata.chunkFor(position.lowerPosition).offset;
+ if (lowerChunkStart < lastEnd) // if regions include the same
chunk, count it only once
+ lowerChunkStart = lastEnd;
+ total += upperChunkEnd - lowerChunkStart;
+ lastEnd = upperChunkEnd;
+ }
+ }
+ return total;
}
/**
@@ -940,11 +1023,18 @@ public abstract class SSTableReader extends SSTable
implements UnfilteredSource,
}
/**
- * Direct I/O SSTableScanner over the entirety of the sstable..
+ * Direct I/O SSTableScanner over the entirety of the sstable.
*
* @return A Scanner over the full content of the SSTable.
*/
- public abstract ISSTableScanner getScanner();
+ public ISSTableScanner getScanner()
+ {
+ PartitionPositionBounds fullRange = getPositionsForFullRange();
+ if (fullRange != null)
+ return new SSTableSimpleScanner(this,
Collections.singletonList(fullRange));
+ else
+ return new SSTableSimpleScanner(this, Collections.emptyList());
+ }
/**
* Direct I/O SSTableScanner over a defined collection of ranges of tokens.
@@ -952,15 +1042,25 @@ public abstract class SSTableReader extends SSTable
implements UnfilteredSource,
* @param ranges the range of keys to cover
* @return A Scanner for seeking over the rows of the SSTable.
*/
- public abstract ISSTableScanner getScanner(Collection<Range<Token>>
ranges);
+ public ISSTableScanner getScanner(Collection<Range<Token>> ranges)
+ {
+ if (ranges != null)
+ return new SSTableSimpleScanner(this,
getPositionsForRanges(ranges));
+ else
+ return getScanner();
+ }
/**
* Direct I/O SSTableScanner over an iterator of bounds.
*
- * @param rangeIterator the keys to cover
+ * @param boundsIterator the keys to cover
* @return A Scanner for seeking over the rows of the SSTable.
*/
- public abstract ISSTableScanner
getScanner(Iterator<AbstractBounds<PartitionPosition>> rangeIterator);
+ public ISSTableScanner
getScanner(Iterator<AbstractBounds<PartitionPosition>> boundsIterator)
+ {
+ return new SSTableSimpleScanner(this,
getPositionsForBoundsIterator(boundsIterator));
+ }
+
/**
* Create a {@link FileDataInput} for the data file of the sstable
represented by this reader. This method returns
diff --git
a/src/java/org/apache/cassandra/io/sstable/format/SSTableScanner.java
b/src/java/org/apache/cassandra/io/sstable/format/SSTableScanner.java
index 217c177206..86c0f7ec2d 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableScanner.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.io.sstable.format;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -35,9 +34,7 @@ import
org.apache.cassandra.db.rows.LazilyInitializedUnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.AbstractBounds.Boundary;
-import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.ISSTableScanner;
@@ -86,14 +83,6 @@ implements ISSTableScanner
this.listener = listener;
}
- protected static List<AbstractBounds<PartitionPosition>>
makeBounds(SSTableReader sstable, Collection<Range<Token>> tokenRanges)
- {
- List<AbstractBounds<PartitionPosition>> boundsList = new
ArrayList<>(tokenRanges.size());
- for (Range<Token> range : Range.normalize(tokenRanges))
- addRange(sstable, Range.makeRowRange(range), boundsList);
- return boundsList;
- }
-
protected static List<AbstractBounds<PartitionPosition>>
makeBounds(SSTableReader sstable, DataRange dataRange)
{
List<AbstractBounds<PartitionPosition>> boundsList = new
ArrayList<>(2);
@@ -101,11 +90,6 @@ implements ISSTableScanner
return boundsList;
}
- protected static AbstractBounds<PartitionPosition> fullRange(SSTableReader
sstable)
- {
- return new Bounds<>(sstable.getFirst(), sstable.getLast());
- }
-
private static void addRange(SSTableReader sstable,
AbstractBounds<PartitionPosition> requested,
List<AbstractBounds<PartitionPosition>> boundsList)
{
if (requested instanceof Range && ((Range<?>)
requested).isWrapAround())
diff --git
a/src/java/org/apache/cassandra/io/sstable/format/SSTableSimpleScanner.java
b/src/java/org/apache/cassandra/io/sstable/format/SSTableSimpleScanner.java
new file mode 100644
index 0000000000..6015265ba0
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableSimpleScanner.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.schema.TableMetadata;
+
+import static
org.apache.cassandra.io.sstable.format.SSTableReader.PartitionPositionBounds;
+
+/// Simple SSTable scanner that reads sequentially through an SSTable without
using the index.
+///
+/// This is a significant improvement for the performance of compaction over
using the full-blown DataRange-capable
+/// [SSTableScanner] and enables correct calculation of data sizes to process.
+public class SSTableSimpleScanner
+implements ISSTableScanner
+{
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+ private final RandomAccessReader dfile;
+ private final SSTableReader sstable;
+
+ private final Iterator<PartitionPositionBounds> rangeIterator;
+
+ private long bytesScannedInPreviousRanges;
+
+ private final long sizeInBytes;
+ private final long compressedSizeInBytes;
+
+ private long currentEndPosition;
+ private long currentStartPosition;
+
+ private SSTableIdentityIterator currentIterator;
+ private DecoratedKey lastKey;
+
+ /// Create a new simple scanner over the given sstables and the given
ranges of uncompressed positions.
+ /// Each range must start and end on a partition boundary, and, to satisfy
the contract of [ISSTableScanner], the
+ /// ranges must be non-overlapping and in ascending order. This scanner
will throw an [IllegalArgumentException] if
+ /// the latter is not true.
+ ///
+ /// The ranges can be constructed by [SSTableReader#getPositionsForRanges]
and similar methods as done by the
+ /// various [SSTableReader#getScanner] variations.
+ public SSTableSimpleScanner(SSTableReader sstable,
+ Collection<PartitionPositionBounds> boundsList)
+ {
+ assert sstable != null;
+
+ this.dfile = sstable.openDataReader();
+ this.sstable = sstable;
+ this.sizeInBytes = boundsList.stream().mapToLong(ppb ->
ppb.upperPosition - ppb.lowerPosition).sum();
+ this.compressedSizeInBytes = sstable.compression ?
sstable.onDiskSizeForPartitionPositions(boundsList) : sizeInBytes;
+ this.rangeIterator = boundsList.iterator();
+ this.currentEndPosition = 0;
+ this.currentStartPosition = 0;
+ this.bytesScannedInPreviousRanges = 0;
+ this.currentIterator = null;
+ this.lastKey = null;
+ }
+
+ public void close()
+ {
+ if (isClosed.compareAndSet(false, true))
+ {
+ // ensure we report what we have actually processed
+ bytesScannedInPreviousRanges += dfile.getFilePointer() -
currentStartPosition;
+ dfile.close();
+ // close() may change the file pointer, update so that the
difference is 0 when reported by getBytesScanned()
+ currentStartPosition = dfile.getFilePointer();
+ }
+ }
+
+ @Override
+ public long getLengthInBytes()
+ {
+ return sizeInBytes;
+ }
+
+
+ public long getCompressedLengthInBytes()
+ {
+ return compressedSizeInBytes;
+ }
+
+ @Override
+ public long getCurrentPosition()
+ {
+ return dfile.getFilePointer();
+ }
+
+ public long getBytesScanned()
+ {
+ return bytesScannedInPreviousRanges + dfile.getFilePointer() -
currentStartPosition;
+ }
+
+ @Override
+ public Set<SSTableReader> getBackingSSTables()
+ {
+ return ImmutableSet.of(sstable);
+ }
+
+ public TableMetadata metadata()
+ {
+ return sstable.metadata();
+ }
+
+ public boolean hasNext()
+ {
+ if (currentIterator != null)
+ {
+ currentIterator.close(); // Ensure that the iterator cannot be
used further. No op if already closed.
+
+ // Row iterator must be exhausted to advance to next partition
+ currentIterator.exhaust();
+ currentIterator = null;
+ }
+
+ if (dfile.getFilePointer() < currentEndPosition)
+ return true;
+
+ return advanceRange();
+ }
+
+ boolean advanceRange()
+ {
+ if (!rangeIterator.hasNext())
+ return false;
+
+ bytesScannedInPreviousRanges += currentEndPosition -
currentStartPosition;
+
+ PartitionPositionBounds nextRange = rangeIterator.next();
+ if (currentEndPosition > nextRange.lowerPosition)
+ throw new IllegalArgumentException("Ranges supplied to
SSTableSimpleScanner must be non-overlapping and in ascending order.");
+
+ currentEndPosition = nextRange.upperPosition;
+ currentStartPosition = nextRange.lowerPosition;
+ dfile.seek(currentStartPosition);
+ return true;
+ }
+
+ public UnfilteredRowIterator next()
+ {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ currentIterator = SSTableIdentityIterator.create(sstable, dfile,
false);
+ DecoratedKey currentKey = currentIterator.partitionKey();
+ if (lastKey != null && lastKey.compareTo(currentKey) >= 0)
+ {
+ sstable.markSuspect();
+ throw new CorruptSSTableException(new
IllegalStateException(String.format("Invalid key order: current %s <= previous
%s",
+
currentKey,
+
lastKey)),
+ sstable.getFilename());
+ }
+ lastKey = currentKey;
+ return currentIterator;
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("%s(sstable=%s)", getClass().getSimpleName(),
sstable);
+ }
+}
diff --git
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index b58dbc532e..692cadf34d 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -46,7 +45,6 @@ import org.apache.cassandra.db.rows.Rows;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIteratorWithLowerBound;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
-import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
@@ -157,41 +155,6 @@ public class BigTableReader extends
SSTableReaderWithFilter implements IndexSumm
return BigTableKeyReader.create(ifile, rowIndexEntrySerializer);
}
- /**
- * Direct I/O SSTableScanner over an iterator of bounds.
- *
- * @param boundsIterator the keys to cover
- * @return A Scanner for seeking over the rows of the SSTable.
- */
- public ISSTableScanner
getScanner(Iterator<AbstractBounds<PartitionPosition>> boundsIterator)
- {
- return BigTableScanner.getScanner(this, boundsIterator);
- }
-
- /**
- * Direct I/O SSTableScanner over the full sstable.
- *
- * @return A Scanner for reading the full SSTable.
- */
- public ISSTableScanner getScanner()
- {
- return BigTableScanner.getScanner(this);
- }
-
- /**
- * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
- *
- * @param ranges the range of keys to cover
- * @return A Scanner for seeking over the rows of the SSTable.
- */
- public ISSTableScanner getScanner(Collection<Range<Token>> ranges)
- {
- if (ranges != null)
- return BigTableScanner.getScanner(this, ranges);
- else
- return getScanner();
- }
-
/**
* Finds and returns the first key beyond a given token in this SSTable or
null if no such key exists.
*/
diff --git
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index 887d997846..83243529c4 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -18,11 +18,8 @@
package org.apache.cassandra.io.sstable.format.big;
import java.io.IOException;
-import java.util.Collection;
import java.util.Iterator;
-import com.google.common.collect.Iterators;
-
import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.PartitionPosition;
@@ -30,8 +27,6 @@ import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.SSTable;
@@ -50,12 +45,6 @@ public class BigTableScanner extends
SSTableScanner<BigTableReader, RowIndexEntr
private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
- // Full scan of the sstables
- public static ISSTableScanner getScanner(BigTableReader sstable)
- {
- return getScanner(sstable,
Iterators.singletonIterator(fullRange(sstable)));
- }
-
public static ISSTableScanner getScanner(BigTableReader sstable,
ColumnFilter columns,
DataRange dataRange,
@@ -64,16 +53,6 @@ public class BigTableScanner extends
SSTableScanner<BigTableReader, RowIndexEntr
return new BigTableScanner(sstable, columns, dataRange,
makeBounds(sstable, dataRange).iterator(), listener);
}
- public static ISSTableScanner getScanner(BigTableReader sstable,
Collection<Range<Token>> tokenRanges)
- {
- return getScanner(sstable, makeBounds(sstable,
tokenRanges).iterator());
- }
-
- public static ISSTableScanner getScanner(BigTableReader sstable,
Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
- {
- return new BigTableScanner(sstable,
ColumnFilter.all(sstable.metadata()), null, rangeIterator,
SSTableReadsListener.NOOP_LISTENER);
- }
-
private BigTableScanner(BigTableReader sstable,
ColumnFilter columns,
DataRange dataRange,
diff --git
a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java
b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java
index c5571e7fbb..9a65be1137 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import com.google.common.annotations.VisibleForTesting;
@@ -45,7 +44,6 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.IVerifier;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableReadsListener;
@@ -382,27 +380,6 @@ public class BtiTableReader extends SSTableReaderWithFilter
return new SSTableIterator(this, dataFileInput, key, indexEntry,
slices, selectedColumns, rowIndexFile);
}
- @Override
- public ISSTableScanner getScanner()
- {
- return BtiTableScanner.getScanner(this);
- }
-
- @Override
- public ISSTableScanner getScanner(Collection<Range<Token>> ranges)
- {
- if (ranges != null)
- return BtiTableScanner.getScanner(this, ranges);
- else
- return getScanner();
- }
-
- @Override
- public ISSTableScanner
getScanner(Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
- {
- return BtiTableScanner.getScanner(this, rangeIterator);
- }
-
@VisibleForTesting
@Override
public BtiTableReader cloneAndReplace(IFilter filter)
diff --git
a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableScanner.java
b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableScanner.java
index a9f862c68b..4507ccf7f5 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableScanner.java
@@ -19,11 +19,8 @@ package org.apache.cassandra.io.sstable.format.bti;
import java.io.Closeable;
import java.io.IOException;
-import java.util.Collection;
import java.util.Iterator;
-import com.google.common.collect.Iterators;
-
import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.PartitionPosition;
@@ -31,20 +28,12 @@ import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableReadsListener;
import org.apache.cassandra.io.sstable.format.SSTableScanner;
import org.apache.cassandra.io.util.FileUtils;
public class BtiTableScanner extends SSTableScanner<BtiTableReader,
TrieIndexEntry, BtiTableScanner.BtiScanningIterator>
{
- // Full scan of the sstables
- public static BtiTableScanner getScanner(BtiTableReader sstable)
- {
- return getScanner(sstable,
Iterators.singletonIterator(fullRange(sstable)));
- }
-
public static BtiTableScanner getScanner(BtiTableReader sstable,
ColumnFilter columns,
DataRange dataRange,
@@ -53,16 +42,6 @@ public class BtiTableScanner extends
SSTableScanner<BtiTableReader, TrieIndexEnt
return new BtiTableScanner(sstable, columns, dataRange,
makeBounds(sstable, dataRange).iterator(), listener);
}
- public static BtiTableScanner getScanner(BtiTableReader sstable,
Collection<Range<Token>> tokenRanges)
- {
- return getScanner(sstable, makeBounds(sstable,
tokenRanges).iterator());
- }
-
- public static BtiTableScanner getScanner(BtiTableReader sstable,
Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
- {
- return new BtiTableScanner(sstable,
ColumnFilter.all(sstable.metadata()), null, rangeIterator,
SSTableReadsListener.NOOP_LISTENER);
- }
-
private BtiTableScanner(BtiTableReader sstable,
ColumnFilter columns,
DataRange dataRange,
diff --git a/src/java/org/apache/cassandra/tools/SSTablePartitions.java
b/src/java/org/apache/cassandra/tools/SSTablePartitions.java
index 2181346271..b435994bba 100644
--- a/src/java/org/apache/cassandra/tools/SSTablePartitions.java
+++ b/src/java/org/apache/cassandra/tools/SSTablePartitions.java
@@ -369,13 +369,15 @@ public class SSTablePartitions
{
while (scanner.hasNext())
{
+ // hasNext() positions us on the next partition, next() has to
advance to read its header.
+ long startOfPartition = scanner.getCurrentPosition();
try (UnfilteredRowIterator partition = scanner.next())
{
ByteBuffer key = partition.partitionKey().getKey();
boolean isExcluded =
excludedKeys.contains(metadata.partitionKeyType.getString(key));
PartitionStats partitionStats = new PartitionStats(key,
-
scanner.getCurrentPosition(),
+
startOfPartition,
partition.partitionLevelDeletion().isLive());
// Consume the partition to populate the stats.
diff --git
a/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
b/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
index 21ac51ee86..c631d5d099 100644
---
a/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
+++
b/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.cache.ChunkCache;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Slices;
@@ -156,6 +157,12 @@ public class SSTableCorruptionDetectionTest extends
SSTableWriterTestBase
bruteForceCorruptionTest(ssTableReader, sstableScanner());
}
+ @Test
+ public void testSSTableSimpleScanner() throws Throwable
+ {
+ bruteForceCorruptionTest(ssTableReader, sstableSimpleScanner());
+ }
+
private void bruteForceCorruptionTest(SSTableReader ssTableReader,
Consumer<SSTableReader> walker) throws Throwable
{
FileChannel fc = new
File(ssTableReader.getFilename()).newReadWriteChannel();
@@ -193,6 +200,32 @@ public class SSTableCorruptionDetectionTest extends
SSTableWriterTestBase
}
private Consumer<SSTableReader> sstableScanner()
+ {
+ return (SSTableReader sstable) -> {
+ try (var scanner = sstable.partitionIterator(ColumnFilter.NONE,
DataRange.allData(sstable.getPartitioner()),
SSTableReadsListener.NOOP_LISTENER))
+ {
+ while (scanner.hasNext())
+ {
+ try (UnfilteredRowIterator rowIter = scanner.next())
+ {
+ if (rowIter.hasNext())
+ {
+ Unfiltered unfiltered = rowIter.next();
+ if (unfiltered.isRow())
+ {
+ Row row = (Row) unfiltered;
+ assertEquals(2, row.clustering().size());
+ // no-op read
+ }
+ }
+ }
+
+ }
+ }
+ };
+ }
+
+ private Consumer<SSTableReader> sstableSimpleScanner()
{
return (SSTableReader sstable) -> {
try (ISSTableScanner scanner = sstable.getScanner())
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 9cb7ca3d14..a87ec024b5 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -30,8 +30,11 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import org.junit.Assume;
import org.junit.BeforeClass;
@@ -89,6 +92,7 @@ import org.mockito.Mockito;
import static java.lang.String.format;
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -178,6 +182,200 @@ public class SSTableReaderTest
}
}
+ @Test
+ public void testOnDiskSizeForRanges()
+ {
+ ColumnFamilyStore store = discardSSTables(KEYSPACE1, CF_STANDARD2);
+ partitioner = store.getPartitioner();
+ int count = 1000;
+
+ // insert data and compact to a single sstable
+ for (int j = 0; j < count; j++)
+ {
+ new RowUpdateBuilder(store.metadata(), 15000, k0(j))
+ .clustering("0")
+ .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+ .build()
+ .applyUnsafe();
+ }
+ store.forceBlockingFlush(UNIT_TESTS);
+ store.forceMajorCompaction();
+
+ SSTableReader sstable = store.getLiveSSTables().iterator().next();
+
+ // Non-compression-dependent checks
+ // Check several ways of going through the whole file
+ assertEquals(sstable.onDiskLength(),
+ onDiskSizeForRanges(sstable, Collections.singleton(new
Range<>(t(cut(k0(0), 1)), t0(count - 1)))));
+ assertEquals(sstable.onDiskLength(),
+ onDiskSizeForRanges(sstable, Collections.singleton(new
Range<>(sstable.getPartitioner().getMinimumToken(),
+
sstable.getPartitioner().getMinimumToken()))));
+ assertEquals(sstable.onDiskLength(),
+ onDiskSizeForRanges(sstable, Collections.singleton(new
Range<>(sstable.getPartitioner().getMinimumToken(),
+
sstable.getLast().getToken()))));
+
+ // Split at exact match
+ assertEquals(sstable.onDiskLength(),
+ onDiskSizeForRanges(sstable, ImmutableList.of(new
Range<>(t(cut(k0(0), 1)), t0(347)),
+ new
Range<>(t0(347), t0(count - 1)))));
+
+ // Split at different prefixes pointing to the same position
+ assertEquals(sstable.onDiskLength(),
+ onDiskSizeForRanges(sstable, ImmutableList.of(new
Range<>(t(cut(k0(0), 1)), t(cut(k0(600), 2))),
+ new
Range<>(t(cut(k0(600), 1)), t0(count - 1)))));
+
+ // Size one row
+ double oneRowSize = sstable.uncompressedLength() * 1.0 / count;
+ System.out.println("One row size: " + oneRowSize);
+
+ if (!sstable.compression)
+ {
+ double delta = 0.9;
+
+ // Ranges are end-inclusive, indexes are adjusted by one here to
account for that.
+ assertEquals((52 - 38),
+ onDiskSizeForRanges(sstable,
Collections.singleton(new Range<>(t0(37), t0(51)))) / oneRowSize,
+ delta);
+
+ // Try non-matching positions (inexact indexes are not adjusted
for the count).
+ assertEquals((34 - 30),
+ onDiskSizeForRanges(sstable,
Collections.singleton(new Range<>(t(cut(k0(30), 1)),
+
t0(33)))) / oneRowSize,
+ delta);
+
+ assertEquals((700 - 554),
+ onDiskSizeForRanges(sstable,
Collections.singleton(new Range<>(t0(553),
+
t(cut(k0(700), 2))))) / oneRowSize,
+ delta);
+
+ assertEquals((500 - 30),
+ onDiskSizeForRanges(sstable,
Collections.singleton(new Range<>(t(cut(k0(30), 1)),
+
t(cut(k0(500), 2))))) / oneRowSize,
+ delta);
+
+ // Try a list
+ List<Range<Token>> ranges = ImmutableList.of(new Range<>(t0(37),
t0(51)),
+ new Range<>(t0(71),
t(cut(k0(100), 2))),
+ new
Range<>(t(cut(k0(230), 1)), t0(243)),
+ new
Range<>(t(cut(k0(260), 1)), t(cut(k0(300), 2))),
+ new Range<>(t0(373),
t0(382)),
+ new Range<>(t0(382),
t0(385)),
+ new
Range<>(t(cut(k0(400), 2)), t(cut(k0(400), 1))), // empty range
+ new Range<>(t0(563),
t(cut(k0(600), 2))), // touching ranges
+ new
Range<>(t(cut(k0(600), 1)), t0(621))
+ );
+ assertEquals((52 - 38 + 100 - 72 + 244 - 230 + 300 - 260 + 383 -
374 + 386 - 383 + 400 - 400 + 622 - 564),
+ onDiskSizeForRanges(sstable, ranges) / oneRowSize,
+ delta);
+
+ // Check going through the whole file
+ assertEquals(sstable.onDiskLength(),
+ onDiskSizeForRanges(sstable,
Collections.singleton(new Range<>(t(cut(k0(0), 1)), t0(count - 1)))));
+
+ assertEquals(sstable.onDiskLength(),
+ onDiskSizeForRanges(sstable, ImmutableList.of(new
Range<>(t(cut(k0(0), 1)), t0(347)),
+ new
Range<>(t0(347), t0(count - 1)))));
+
+ assertEquals(sstable.onDiskLength(),
+ onDiskSizeForRanges(sstable, ImmutableList.of(new
Range<>(t(cut(k0(0), 1)), t(cut(k0(600), 2))),
+ new
Range<>(t(cut(k0(600), 1)), t0(count - 1)))));
+ }
+ else
+ {
+ // It's much harder to test with compression.
+
+ // Check first three rows have the same size (they must be in the
same chunk)
+ final long row0size = onDiskSizeForRanges(sstable,
Collections.singleton(new Range<>(t(cut(k0(0), 1)), t0(0))));
+ assertEquals(row0size, onDiskSizeForRanges(sstable,
Collections.singleton(new Range<>(t0(0), t0(1)))));
+ assertEquals(row0size, onDiskSizeForRanges(sstable,
Collections.singleton(new Range<>(t0(1), t0(2)))));
+
+ // As well as the first three rows together
+ assertEquals(row0size, onDiskSizeForRanges(sstable,
Collections.singleton(new Range<>(t(cut(k0(0), 1)), t0(2)))));
+
+ // And also when we query for them in separate ranges
+ assertEquals(row0size, onDiskSizeForRanges(sstable,
ImmutableList.of(new Range<>(t(cut(k0(0), 1)), t0(0)),
+
new Range<>(t0(0), t0(1)))));
+ assertEquals(row0size, onDiskSizeForRanges(sstable,
ImmutableList.of(new Range<>(t(cut(k0(0), 1)), t0(0)),
+
new Range<>(t0(1), t0(2)))));
+ assertEquals(row0size, onDiskSizeForRanges(sstable,
ImmutableList.of(new Range<>(t(cut(k0(0), 1)), t0(0)),
+
new Range<>(t0(0), t0(1)),
+
new Range<>(t0(1), t0(2)))));
+
+ // Finally, check that if we query for every second row we get the
total size of the file.
+ assertEquals(sstable.onDiskLength(),
+ onDiskSizeForRanges(sstable, IntStream.range(0, count)
+ .filter(i -> i %
2 != 0)
+ .mapToObj(i ->
new Range<>(t0(i), t0(i + 1)))
+
.collect(Collectors.toList())));
+ }
+ }
+
+
+ @Test
+ public void testOnDiskSizeCompressedBoundaries()
+ {
+ ColumnFamilyStore store = discardSSTables(KEYSPACE1, CF_COMPRESSED);
+ partitioner = store.getPartitioner();
+ int count = 1000;
+ // Use a longish string to let a key align with a chunk boundary
+ ByteBuffer dataBuf = ByteBufferUtil.bytes(String.format("%43d", 123));
+
+ // insert data and compact to a single sstable
+ for (int j = 0; j < count; j++)
+ {
+ new RowUpdateBuilder(store.metadata(), 15000, k0(j))
+ .clustering("0")
+ .add("val", dataBuf)
+ .build()
+ .applyUnsafe();
+ }
+ store.forceBlockingFlush(UNIT_TESTS);
+ store.forceMajorCompaction();
+
+ SSTableReader sstable = store.getLiveSSTables().iterator().next();
+
+ int chunkLength = sstable.getCompressionMetadata().chunkLength();
+ System.out.println("Chunk length: " + chunkLength);
+ int[] alignedKeys = IntStream.range(0, count).filter(i ->
(sstable.getPosition(dk0(i), SSTableReader.Operator.EQ) & (chunkLength - 1)) ==
0).toArray();
+ assertTrue("Test needs an aligned key, try changing the length of
dataBuf", alignedKeys.length > 1);
+ for (int k : alignedKeys)
+ assertEquals("Coverage must not include chunk starting at end
position",
+
sstable.getCompressionMetadata().chunkFor(sstable.getPosition(dk0(k),
SSTableReader.Operator.EQ)).offset,
+ onDiskSizeForRanges(sstable,
Collections.singleton(new Range<>(partitioner.getMinimumToken(), t0(k - 1)))));
// inclusive end
+ }
+
+
+ long onDiskSizeForRanges(SSTableReader sstable, Collection<Range<Token>>
ranges)
+ {
+ return
sstable.onDiskSizeForPartitionPositions(sstable.getPositionsForRanges(ranges));
+ }
+
+ private Token t(String key)
+ {
+ return partitioner.getToken(ByteBufferUtil.bytes(key));
+ }
+
+ private String k0(int k)
+ {
+ return String.format("%08d", k);
+ }
+
+ private Token t0(int k)
+ {
+ return t(k0(k));
+ }
+
+ private DecoratedKey dk0(int k)
+ {
+ return partitioner.decorateKey(ByteBufferUtil.bytes(k0(k)));
+ }
+
+ private String cut(String s, int n)
+ {
+ return s.substring(0, s.length() - n);
+ }
+
+
@Test
public void testSpannedIndexPositions() throws IOException
{
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
index 17b8a6cbb2..73195b0617 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
@@ -21,10 +21,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
+import java.util.function.Function;
import com.google.common.collect.Iterables;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -40,6 +43,7 @@ import org.apache.cassandra.db.Slices;
import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.ByteOrderedPartitioner;
@@ -49,6 +53,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.hamcrest.Matchers;
import static org.apache.cassandra.dht.AbstractBounds.isEmpty;
import static org.junit.Assert.assertEquals;
@@ -180,6 +185,12 @@ public class SSTableScannerTest
}
private static void assertScanMatches(SSTableReader sstable, int
scanStart, int scanEnd, int ... boundaries)
+ {
+ assertScanMatchesUsingScanner(sstable, scanStart, scanEnd, boundaries);
+ assertScanMatchesUsingSimple(sstable, scanStart, scanEnd, boundaries);
+ }
+
+ private static void assertScanMatchesUsingScanner(SSTableReader sstable,
int scanStart, int scanEnd, int ... boundaries)
{
assert boundaries.length % 2 == 0;
for (DataRange range : dataRanges(sstable.metadata(), scanStart,
scanEnd))
@@ -200,6 +211,28 @@ public class SSTableScannerTest
}
}
+ private static void assertScanMatchesUsingSimple(SSTableReader sstable,
int scanStart, int scanEnd, int ... boundaries)
+ {
+ assert boundaries.length % 2 == 0;
+ for (DataRange range : dataRanges(sstable.metadata(), scanStart,
scanEnd))
+ {
+ if (range.isWrapAround() && !range.keyRange().right.isMinimum())
// getScanner on AbstractBounds<PartitionPosition> does not handle wraparounds
+ continue;
+
+ try(UnfilteredPartitionIterator scanner =
sstable.getScanner(Collections.singleton(range.keyRange()).iterator()))
+ {
+ for (int b = 0; b < boundaries.length; b += 2)
+ for (int i = boundaries[b]; i <= boundaries[b + 1]; i++)
+ assertEquals(toKey(i), new
String(scanner.next().partitionKey().getKey().array()));
+ assertFalse(scanner.hasNext());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
private static void assertScanEmpty(SSTableReader sstable, int scanStart,
int scanEnd)
{
assertScanMatches(sstable, scanStart, scanEnd);
@@ -547,7 +580,30 @@ public class SSTableScannerTest
assertScanContainsRanges(scanner, 205, 205);
}
- private static void
testRequestNextRowIteratorWithoutConsumingPrevious(Consumer<ISSTableScanner>
consumer)
+ private static void
testRequestNextRowIteratorWithoutConsumingPrevious(Function<SSTableReader,
UnfilteredPartitionIterator> makeScanner,
+
Consumer<UnfilteredPartitionIterator> requestNext,
+
String messagePattern)
+ {
+ final SSTableReader sstable = prepareSmallSSTable();
+
+ try (UnfilteredPartitionIterator scanner = makeScanner.apply(sstable);
+ UnfilteredRowIterator currentRowIterator = scanner.next())
+ {
+ assertTrue(currentRowIterator.hasNext());
+ try
+ {
+ requestNext.accept(scanner);
+ currentRowIterator.next();
+ fail("Should have thrown IllegalStateException");
+ }
+ catch (IllegalStateException e)
+ {
+ Assert.assertThat(e.getMessage(),
Matchers.matchesPattern(messagePattern));
+ }
+ }
+ }
+
+ private static SSTableReader prepareSmallSSTable()
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore store = keyspace.getColumnFamilyStore(TABLE);
@@ -557,38 +613,77 @@ public class SSTableScannerTest
store.disableAutoCompaction();
insertRowWithKey(store.metadata(), 0);
+ insertRowWithKey(store.metadata(), 3);
store.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
assertEquals(1, store.getLiveSSTables().size());
SSTableReader sstable = store.getLiveSSTables().iterator().next();
+ return sstable;
+ }
- try (ISSTableScanner scanner = sstable.getScanner();
- UnfilteredRowIterator currentRowIterator = scanner.next())
+ @Test
+ public void testSimpleHasNextRowIteratorWithoutConsumingPrevious()
+ {
+
testRequestNextRowIteratorWithoutConsumingPrevious(SSTableReader::getScanner,
+
UnfilteredPartitionIterator::hasNext,
+ "Iterator used
after closing.");
+ }
+
+ @Test
+ public void testSimpleNextRowIteratorWithoutConsumingPrevious()
+ {
+
testRequestNextRowIteratorWithoutConsumingPrevious(SSTableReader::getScanner,
+
UnfilteredPartitionIterator::next,
+ "Iterator used
after closing.");
+ }
+
+ @Test
+ public void testHasNextRowIteratorWithoutConsumingPrevious()
+ {
+ testRequestNextRowIteratorWithoutConsumingPrevious(r ->
r.partitionIterator(ColumnFilter.NONE, DataRange.allData(r.getPartitioner()),
SSTableReadsListener.NOOP_LISTENER),
+
UnfilteredPartitionIterator::hasNext,
+
".*UnfilteredRowIterator.*must be closed.*");
+ }
+
+ @Test
+ public void testNextRowIteratorWithoutConsumingPrevious()
+ {
+ testRequestNextRowIteratorWithoutConsumingPrevious(r ->
r.partitionIterator(ColumnFilter.NONE, DataRange.allData(r.getPartitioner()),
SSTableReadsListener.NOOP_LISTENER),
+
UnfilteredPartitionIterator::next,
+
".*UnfilteredRowIterator.*must be closed.*");
+ }
+
+ private static void
testRequestNextRowIteratorAfterClosingPrevious(Function<SSTableReader,
UnfilteredPartitionIterator> makeScanner)
+ {
+ final SSTableReader sstable = prepareSmallSSTable();
+
+ try (UnfilteredPartitionIterator scanner = makeScanner.apply(sstable))
{
- assertTrue(currentRowIterator.hasNext());
- try
+ try (UnfilteredRowIterator p = scanner.next())
{
- consumer.accept(scanner);
- fail("Should have thrown IllegalStateException");
+ assertEquals(toKey(0), new
String(p.partitionKey().getKey().array()));
+ // do not read it, but close it
}
- catch (IllegalStateException e)
+
+ try (UnfilteredRowIterator p = scanner.next())
{
- assertEquals("The UnfilteredRowIterator returned by the last
call to next() was initialized: " +
- "it must be closed before calling hasNext() or
next() again.",
- e.getMessage());
+ assertEquals(toKey(3), new
String(p.partitionKey().getKey().array()));
+ assertTrue(p.hasNext());
+ assertTrue(p.next() instanceof Row);
}
}
}
+
@Test
- public void testHasNextRowIteratorWithoutConsumingPrevious()
+ public void testSimpleRequestNextRowIteratorAfterClosingPreviouss()
{
-
testRequestNextRowIteratorWithoutConsumingPrevious(ISSTableScanner::hasNext);
+
testRequestNextRowIteratorAfterClosingPrevious(SSTableReader::getScanner);
}
@Test
- public void testNextRowIteratorWithoutConsumingPrevious()
+ public void testRequestNextRowIteratorAfterClosingPrevious()
{
-
testRequestNextRowIteratorWithoutConsumingPrevious(ISSTableScanner::next);
+ testRequestNextRowIteratorAfterClosingPrevious(r ->
r.partitionIterator(ColumnFilter.NONE, DataRange.allData(r.getPartitioner()),
SSTableReadsListener.NOOP_LISTENER));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]