http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/Slice.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Slice.java b/src/java/org/apache/cassandra/db/Slice.java index 05c2977..2ffb91e 100644 --- a/src/java/org/apache/cassandra/db/Slice.java +++ b/src/java/org/apache/cassandra/db/Slice.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; import java.security.MessageDigest; @@ -25,6 +24,7 @@ import java.util.*; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.ObjectSizes; @@ -68,8 +68,8 @@ public class Slice private Slice(Bound start, Bound end) { assert start.isStart() && end.isEnd(); - this.start = start.takeAlias(); - this.end = end.takeAlias(); + this.start = start; + this.end = end; } public static Slice make(Bound start, Bound end) @@ -331,7 +331,7 @@ public class Slice + Bound.serializer.serializedSize(slice.end, version, types); } - public Slice deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException + public Slice deserialize(DataInputPlus in, int version, List<AbstractType<?>> types) throws IOException { Bound start = Bound.serializer.deserialize(in, version, types); Bound end = Bound.serializer.deserialize(in, version, types); @@ -346,21 +346,19 @@ public class Slice */ public static class Bound extends AbstractClusteringPrefix { - private static final long EMPTY_SIZE = ObjectSizes.measure(new Bound(Kind.INCL_START_BOUND, new ByteBuffer[0])); public static final Serializer serializer = new Serializer(); - /** The smallest start bound, i.e. the one that starts before any row. */ - public static final Bound BOTTOM = inclusiveStartOf(); - /** The biggest end bound, i.e. the one that ends after any row. */ - public static final Bound TOP = inclusiveEndOf(); - - protected final Kind kind; - protected final ByteBuffer[] values; + /** + * The smallest and biggest bound. Note that as range tomstone bounds are (special case) of slice bounds, + * we want the BOTTOM and TOP to be the same object, but we alias them here because it's cleaner when dealing + * with slices to refer to Slice.Bound.BOTTOM and Slice.Bound.TOP. + */ + public static final Bound BOTTOM = RangeTombstone.Bound.BOTTOM; + public static final Bound TOP = RangeTombstone.Bound.TOP; protected Bound(Kind kind, ByteBuffer[] values) { - this.kind = kind; - this.values = values; + super(kind, values); } public static Bound create(Kind kind, ByteBuffer[] values) @@ -396,22 +394,6 @@ public class Slice return create(Kind.EXCL_END_BOUND, values); } - public static Bound exclusiveStartOf(ClusteringPrefix prefix) - { - ByteBuffer[] values = new ByteBuffer[prefix.size()]; - for (int i = 0; i < prefix.size(); i++) - values[i] = prefix.get(i); - return exclusiveStartOf(values); - } - - public static Bound inclusiveEndOf(ClusteringPrefix prefix) - { - ByteBuffer[] values = new ByteBuffer[prefix.size()]; - for (int i = 0; i < prefix.size(); i++) - values[i] = prefix.get(i); - return inclusiveEndOf(values); - } - public static Bound create(ClusteringComparator comparator, boolean isStart, boolean isInclusive, Object... values) { CBuilder builder = CBuilder.create(comparator); @@ -426,21 +408,6 @@ public class Slice return builder.buildBound(isStart, isInclusive); } - public Kind kind() - { - return kind; - } - - public int size() - { - return values.length; - } - - public ByteBuffer get(int i) - { - return values[i]; - } - public Bound withNewKind(Kind kind) { assert !kind.isBoundary(); @@ -480,24 +447,6 @@ public class Slice return withNewKind(kind().invert()); } - public ByteBuffer[] getRawValues() - { - return values; - } - - public void digest(MessageDigest digest) - { - for (int i = 0; i < size(); i++) - digest.update(get(i).duplicate()); - FBUtilities.updateWithByte(digest, kind().ordinal()); - } - - public void writeTo(Slice.Bound.Writer writer) - { - super.writeTo(writer); - writer.writeBoundKind(kind()); - } - // For use by intersects, it's called with the sstable bound opposite to the slice bound // (so if the slice bound is a start, it's call with the max sstable bound) private int compareTo(ClusteringComparator comparator, List<ByteBuffer> sstableBound) @@ -544,66 +493,10 @@ public class Slice return sb.append(")").toString(); } - // Overriding to get a more precise type - @Override - public Bound takeAlias() - { - return this; - } - - @Override - public long unsharedHeapSize() - { - return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values); - } - - public long unsharedHeapSizeExcludingData() - { - return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values); - } - - public static Builder builder(int size) - { - return new Builder(size); - } - - public interface Writer extends ClusteringPrefix.Writer - { - public void writeBoundKind(Kind kind); - } - - public static class Builder implements Writer - { - private final ByteBuffer[] values; - private Kind kind; - private int idx; - - private Builder(int size) - { - this.values = new ByteBuffer[size]; - } - - public void writeClusteringValue(ByteBuffer value) - { - values[idx++] = value; - } - - public void writeBoundKind(Kind kind) - { - this.kind = kind; - } - - public Slice.Bound build() - { - assert idx == values.length; - return Slice.Bound.create(kind, values); - } - } - /** * Serializer for slice bounds. * <p> - * Contrarily to {@code Clustering}, a slice bound can only be a true prefix of the full clustering, so we actually record + * Contrarily to {@code Clustering}, a slice bound can be a true prefix of the full clustering, so we actually record * its size. */ public static class Serializer @@ -622,31 +515,21 @@ public class Slice + ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(bound, version, types); } - public Slice.Bound deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException + public Slice.Bound deserialize(DataInputPlus in, int version, List<AbstractType<?>> types) throws IOException { Kind kind = Kind.values()[in.readByte()]; return deserializeValues(in, kind, version, types); } - public Slice.Bound deserializeValues(DataInput in, Kind kind, int version, List<AbstractType<?>> types) throws IOException + public Slice.Bound deserializeValues(DataInputPlus in, Kind kind, int version, List<AbstractType<?>> types) throws IOException { int size = in.readUnsignedShort(); if (size == 0) return kind.isStart() ? BOTTOM : TOP; - Builder builder = builder(size); - ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, types, builder); - builder.writeBoundKind(kind); - return builder.build(); - } - - public void deserializeValues(DataInput in, Bound.Kind kind, int version, List<AbstractType<?>> types, Writer writer) throws IOException - { - int size = in.readUnsignedShort(); - ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, types, writer); - writer.writeBoundKind(kind); + ByteBuffer[] values = ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, types); + return Slice.Bound.create(kind, values); } - } } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/Slices.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Slices.java b/src/java/org/apache/cassandra/db/Slices.java index a6c690b..32ca06d 100644 --- a/src/java/org/apache/cassandra/db/Slices.java +++ b/src/java/org/apache/cassandra/db/Slices.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; @@ -28,6 +27,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; /** @@ -318,7 +318,7 @@ public abstract class Slices implements Iterable<Slice> return size; } - public Slices deserialize(DataInput in, int version, CFMetaData metadata) throws IOException + public Slices deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException { int size = in.readInt(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index e8247a3..df7e7ef 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -1160,7 +1160,7 @@ public final class SystemKeyspace // delete all previous values with a single range tombstone. int nowInSec = FBUtilities.nowInSeconds(); - update.addRangeTombstone(Slice.make(SizeEstimates.comparator, table), new SimpleDeletionTime(timestamp - 1, nowInSec)); + update.add(new RangeTombstone(Slice.make(SizeEstimates.comparator, table), new DeletionTime(timestamp - 1, nowInSec))); // add a CQL row for each primary token range. for (Map.Entry<Range<Token>, Pair<Long, Long>> entry : estimates.entrySet()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/TypeSizes.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/TypeSizes.java b/src/java/org/apache/cassandra/db/TypeSizes.java index a9e432f..73766c8 100644 --- a/src/java/org/apache/cassandra/db/TypeSizes.java +++ b/src/java/org/apache/cassandra/db/TypeSizes.java @@ -68,6 +68,11 @@ public final class TypeSizes return sizeof(value.remaining()) + value.remaining(); } + public static int sizeofWithVIntLength(ByteBuffer value) + { + return sizeofVInt(value.remaining()) + value.remaining(); + } + public static int sizeof(boolean value) { return BOOL_SIZE; http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java index cf7c2dd..b3709d2 100644 --- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java +++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java @@ -111,8 +111,7 @@ public abstract class UnfilteredDeserializer private boolean isReady; private boolean isDone; - private final ReusableRow row; - private final RangeTombstoneMarker.Builder markerBuilder; + private final Row.Builder builder; private CurrentDeserializer(CFMetaData metadata, DataInputPlus in, @@ -122,8 +121,7 @@ public abstract class UnfilteredDeserializer super(metadata, in, helper); this.header = header; this.clusteringDeserializer = new ClusteringPrefix.Deserializer(metadata.comparator, in, header); - this.row = new ReusableRow(metadata.clusteringColumns().size(), header.columns().regulars, true, metadata.isCounter()); - this.markerBuilder = new RangeTombstoneMarker.Builder(metadata.clusteringColumns().size()); + this.builder = ArrayBackedRow.sortedBuilder(helper.fetchedRegularColumns(header)); } public boolean hasNext() throws IOException @@ -181,17 +179,13 @@ public abstract class UnfilteredDeserializer isReady = false; if (UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) { - markerBuilder.reset(); - RangeTombstone.Bound.Kind kind = clusteringDeserializer.deserializeNextBound(markerBuilder); - UnfilteredSerializer.serializer.deserializeMarkerBody(in, header, kind.isBoundary(), markerBuilder); - return markerBuilder.build(); + RangeTombstone.Bound bound = clusteringDeserializer.deserializeNextBound(); + return UnfilteredSerializer.serializer.deserializeMarkerBody(in, header, bound); } else { - Row.Writer writer = row.writer(); - clusteringDeserializer.deserializeNextClustering(writer); - UnfilteredSerializer.serializer.deserializeRowBody(in, header, helper, nextFlags, writer); - return row; + builder.newRow(clusteringDeserializer.deserializeNextClustering()); + return UnfilteredSerializer.serializer.deserializeRowBody(in, header, helper, nextFlags, builder); } } @@ -205,7 +199,7 @@ public abstract class UnfilteredDeserializer } else { - UnfilteredSerializer.serializer.skipRowBody(in, header, helper, nextFlags); + UnfilteredSerializer.serializer.skipRowBody(in, header, nextFlags); } } @@ -221,7 +215,6 @@ public abstract class UnfilteredDeserializer private final boolean readAllAsDynamic; private boolean skipStatic; - private int nextFlags; private boolean isDone; private boolean isStart = true; @@ -254,13 +247,7 @@ public abstract class UnfilteredDeserializer public boolean hasNext() throws IOException { - if (nextAtom != null) - return true; - - if (isDone) - return false; - - return deserializeNextAtom(); + return nextAtom != null || (!isDone && deserializeNextAtom()); } private boolean deserializeNextAtom() throws IOException @@ -392,6 +379,7 @@ public abstract class UnfilteredDeserializer grouper.addAtom(nextAtom); while (deserializeNextAtom() && grouper.addAtom(nextAtom)) { + // Nothing to do, deserializeNextAtom() changes nextAtom and it's then added to the grouper } // if this was the first static row, we're done with it. Otherwise, we're also done with static. http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java index b406251..8625112 100644 --- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java @@ -38,8 +38,6 @@ import org.apache.cassandra.utils.ByteBufferUtil; abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator { - private static final Logger logger = LoggerFactory.getLogger(AbstractSSTableIterator.class); - protected final SSTableReader sstable; protected final DecoratedKey key; protected final DeletionTime partitionLevelDeletion; @@ -65,7 +63,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator this.sstable = sstable; this.key = key; this.columns = columnFilter; - this.helper = new SerializationHelper(sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL, columnFilter); + this.helper = new SerializationHelper(sstable.metadata, sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL, columnFilter); this.isForThrift = isForThrift; if (indexEntry == null) @@ -81,7 +79,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator { // We seek to the beginning to the partition if either: // - the partition is not indexed; we then have a single block to read anyway - // and we need to read the partition deletion time. + // (and we need to read the partition deletion time). // - we're querying static columns. boolean needSeekAtPartitionStart = !indexEntry.isIndexed() || !columns.fetchedColumns().statics.isEmpty(); @@ -104,24 +102,24 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator // Note that this needs to be called after file != null and after the partitionDeletion has been set, but before readStaticRow // (since it uses it) so we can't move that up (but we'll be able to simplify as soon as we drop support for the old file format). - this.reader = needsReader ? createReader(indexEntry, file, needSeekAtPartitionStart, shouldCloseFile) : null; + this.reader = needsReader ? createReader(indexEntry, file, true, shouldCloseFile) : null; this.staticRow = readStaticRow(sstable, file, helper, columns.fetchedColumns().statics, isForThrift, reader == null ? null : reader.deserializer); } else { this.partitionLevelDeletion = indexEntry.deletionTime(); this.staticRow = Rows.EMPTY_STATIC_ROW; - this.reader = needsReader ? createReader(indexEntry, file, needSeekAtPartitionStart, shouldCloseFile) : null; + this.reader = needsReader ? createReader(indexEntry, file, false, shouldCloseFile) : null; } - if (reader == null && shouldCloseFile) + if (reader == null && file != null && shouldCloseFile) file.close(); } catch (IOException e) { sstable.markSuspect(); String filePath = file.getPath(); - if (shouldCloseFile && file != null) + if (shouldCloseFile) { try { @@ -164,7 +162,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator if (statics.isEmpty() || isForThrift) return Rows.EMPTY_STATIC_ROW; - assert sstable.metadata.isStaticCompactTable() && !isForThrift; + assert sstable.metadata.isStaticCompactTable(); // As said above, if it's a CQL query and the table is a "static compact", the only exposed columns are the // static ones. So we don't have to mark the position to seek back later. @@ -221,45 +219,13 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator public boolean hasNext() { - try - { - return reader != null && reader.hasNext(); - } - catch (IOException e) - { - try - { - closeInternal(); - } - catch (IOException suppressed) - { - e.addSuppressed(suppressed); - } - sstable.markSuspect(); - throw new CorruptSSTableException(e, reader.file.getPath()); - } + return reader != null && reader.hasNext(); } public Unfiltered next() { - try - { - assert reader != null; - return reader.next(); - } - catch (IOException e) - { - try - { - closeInternal(); - } - catch (IOException suppressed) - { - e.addSuppressed(suppressed); - } - sstable.markSuspect(); - throw new CorruptSSTableException(e, reader.file.getPath()); - } + assert reader != null; + return reader.next(); } public Iterator<Unfiltered> slice(Slice slice) @@ -269,7 +235,8 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator if (reader == null) return Collections.emptyIterator(); - return reader.slice(slice); + reader.setForSlice(slice); + return reader; } catch (IOException e) { @@ -317,7 +284,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator } } - protected abstract class Reader + protected abstract class Reader implements Iterator<Unfiltered> { private final boolean shouldCloseFile; public FileDataInput file; @@ -327,12 +294,19 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator // Records the currently open range tombstone (if any) protected DeletionTime openMarker = null; - protected Reader(FileDataInput file, boolean shouldCloseFile) + // !isInit means we have never seeked in the file and thus should seek before reading anything + protected boolean isInit; + + protected Reader(FileDataInput file, boolean isInit, boolean shouldCloseFile) { this.file = file; + this.isInit = isInit; this.shouldCloseFile = shouldCloseFile; + if (file != null) createDeserializer(); + else + assert !isInit; } private void createDeserializer() @@ -369,9 +343,62 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator return toReturn; } - public abstract boolean hasNext() throws IOException; - public abstract Unfiltered next() throws IOException; - public abstract Iterator<Unfiltered> slice(Slice slice) throws IOException; + public boolean hasNext() + { + try + { + if (!isInit) + { + init(); + isInit = true; + } + + return hasNextInternal(); + } + catch (IOException e) + { + try + { + closeInternal(); + } + catch (IOException suppressed) + { + e.addSuppressed(suppressed); + } + sstable.markSuspect(); + throw new CorruptSSTableException(e, reader.file.getPath()); + } + } + + public Unfiltered next() + { + try + { + return nextInternal(); + } + catch (IOException e) + { + try + { + closeInternal(); + } + catch (IOException suppressed) + { + e.addSuppressed(suppressed); + } + sstable.markSuspect(); + throw new CorruptSSTableException(e, reader.file.getPath()); + } + } + + // Called is hasNext() is called but we haven't been yet initialized + protected abstract void init() throws IOException; + + // Set the reader so its hasNext/next methods return values within the provided slice + public abstract void setForSlice(Slice slice) throws IOException; + + protected abstract boolean hasNextInternal() throws IOException; + protected abstract Unfiltered nextInternal() throws IOException; public void close() throws IOException { @@ -380,35 +407,61 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator } } - protected abstract class IndexedReader extends Reader + // Used by indexed readers to store where they are of the index. + protected static class IndexState { - protected final RowIndexEntry indexEntry; - protected final List<IndexHelper.IndexInfo> indexes; + private final Reader reader; + private final ClusteringComparator comparator; - protected int currentIndexIdx = -1; + private final RowIndexEntry indexEntry; + private final List<IndexHelper.IndexInfo> indexes; + private final boolean reversed; - // Marks the beginning of the block corresponding to currentIndexIdx. - protected FileMark mark; + private int currentIndexIdx = -1; - // !isInit means we have never seeked in the file and thus shouldn't read as we could be anywhere - protected boolean isInit; + // Marks the beginning of the block corresponding to currentIndexIdx. + private FileMark mark; - protected IndexedReader(FileDataInput file, boolean shouldCloseFile, RowIndexEntry indexEntry, boolean isInit) + public IndexState(Reader reader, ClusteringComparator comparator, RowIndexEntry indexEntry, boolean reversed) { - super(file, shouldCloseFile); + this.reader = reader; + this.comparator = comparator; this.indexEntry = indexEntry; this.indexes = indexEntry.columnsIndex(); - this.isInit = isInit; + this.reversed = reversed; + this.currentIndexIdx = reversed ? indexEntry.columnsIndex().size() : -1; + } + + public boolean isDone() + { + return reversed ? currentIndexIdx < 0 : currentIndexIdx >= indexes.size(); } - // Should be called when we're at the beginning of blockIdx. - protected void updateBlock(int blockIdx) throws IOException + // Sets the reader to the beginning of blockIdx. + public void setToBlock(int blockIdx) throws IOException { - seekToPosition(indexEntry.position + indexes.get(blockIdx).offset); + if (blockIdx >= 0 && blockIdx < indexes.size()) + reader.seekToPosition(indexEntry.position + indexes.get(blockIdx).offset); currentIndexIdx = blockIdx; - openMarker = blockIdx > 0 ? indexes.get(blockIdx - 1).endOpenMarker : null; - mark = file.mark(); + reader.openMarker = blockIdx > 0 ? indexes.get(blockIdx - 1).endOpenMarker : null; + mark = reader.file.mark(); + } + + public int blocksCount() + { + return indexes.size(); + } + + // Check if we've crossed an index boundary (based on the mark on the beginning of the index block). + public boolean isPastCurrentBlock() + { + return currentIndexIdx < indexes.size() && reader.file.bytesPastMark(mark) >= currentIndex().width; + } + + public int currentBlockIdx() + { + return currentIndexIdx; } public IndexHelper.IndexInfo currentIndex() @@ -416,9 +469,16 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator return indexes.get(currentIndexIdx); } - public IndexHelper.IndexInfo previousIndex() + // Finds the index of the first block containing the provided bound, starting at the current index. + // Will be -1 if the bound is before any block, and blocksCount() if it is after every block. + public int findBlockIndex(Slice.Bound bound) { - return currentIndexIdx <= 1 ? null : indexes.get(currentIndexIdx - 1); + if (bound == Slice.Bound.BOTTOM) + return -1; + if (bound == Slice.Bound.TOP) + return blocksCount(); + + return IndexHelper.indexFor(bound, indexes, comparator, reversed, currentIndexIdx); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java index 4fd5205..a58ea3e 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java @@ -18,30 +18,19 @@ package org.apache.cassandra.db.columniterator; import java.io.IOException; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; - -import com.google.common.collect.AbstractIterator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.NoSuchElementException; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.sstable.CorruptSSTableException; -import org.apache.cassandra.io.sstable.IndexHelper; import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.utils.ByteBufferUtil; /** * A Cell Iterator over SSTable */ public class SSTableIterator extends AbstractSSTableIterator { - private static final Logger logger = LoggerFactory.getLogger(SSTableIterator.class); - public SSTableIterator(SSTableReader sstable, DecoratedKey key, ColumnFilter columns, boolean isForThrift) { this(sstable, null, key, sstable.getPosition(key, SSTableReader.Operator.EQ), columns, isForThrift); @@ -71,222 +60,215 @@ public class SSTableIterator extends AbstractSSTableIterator private class ForwardReader extends Reader { + // The start of the current slice. This will be null as soon as we know we've passed that bound. + protected Slice.Bound start; + // The end of the current slice. Will never be null. + protected Slice.Bound end = Slice.Bound.TOP; + + protected Unfiltered next; // the next element to return: this is computed by hasNextInternal(). + + protected boolean sliceDone; // set to true once we know we have no more result for the slice. This is in particular + // used by the indexed reader when we know we can't have results based on the index. + private ForwardReader(FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile) { - super(file, shouldCloseFile); - assert isAtPartitionStart; + super(file, isAtPartitionStart, shouldCloseFile); } - public boolean hasNext() throws IOException + protected void init() throws IOException { - assert deserializer != null; - return deserializer.hasNext(); + // We should always have been initialized (at the beginning of the partition). Only indexed readers may + // have to initialize. + throw new IllegalStateException(); } - public Unfiltered next() throws IOException + public void setForSlice(Slice slice) throws IOException { - return deserializer.readNext(); + start = slice.start() == Slice.Bound.BOTTOM ? null : slice.start(); + end = slice.end(); + + sliceDone = false; + next = null; } - public Iterator<Unfiltered> slice(final Slice slice) throws IOException + // Skip all data that comes before the currently set slice. + // Return what should be returned at the end of this, or null if nothing should. + private Unfiltered handlePreSliceData() throws IOException { - return new AbstractIterator<Unfiltered>() + // Note that the following comparison is not strict. The reason is that the only cases + // where it can be == is if the "next" is a RT start marker (either a '[' of a ')[' boundary), + // and if we had a strict inequality and an open RT marker before this, we would issue + // the open marker first, and then return then next later, which would send in the + // stream both '[' (or '(') and then ')[' for the same clustering value, which is wrong. + // By using a non-strict inequality, we avoid that problem (if we do get ')[' for the same + // clustering value than the slice, we'll simply record it in 'openMarker'). + while (deserializer.hasNext() && deserializer.compareNextTo(start) <= 0) { - private boolean beforeStart = true; + if (deserializer.nextIsRow()) + deserializer.skipNext(); + else + updateOpenMarker((RangeTombstoneMarker)deserializer.readNext()); + } + + Slice.Bound sliceStart = start; + start = null; + + // We've reached the beginning of our queried slice. If we have an open marker + // we should return that first. + if (openMarker != null) + return new RangeTombstoneBoundMarker(sliceStart, openMarker); - protected Unfiltered computeNext() + return null; + } + + // Compute the next element to return, assuming we're in the middle to the slice + // and the next element is either in the slice, or just after it. Returns null + // if we're done with the slice. + protected Unfiltered computeNext() throws IOException + { + if (!deserializer.hasNext() || deserializer.compareNextTo(end) > 0) + return null; + + Unfiltered next = deserializer.readNext(); + if (next.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) + updateOpenMarker((RangeTombstoneMarker)next); + return next; + } + + protected boolean hasNextInternal() throws IOException + { + if (next != null) + return true; + + if (sliceDone) + return false; + + assert deserializer != null; + + if (start != null) + { + Unfiltered unfiltered = handlePreSliceData(); + if (unfiltered != null) { - try - { - // While we're before the start of the slice, we can skip row but we should keep - // track of open range tombstones - if (beforeStart) - { - // Note that the following comparison is not strict. The reason is that the only cases - // where it can be == is if the "next" is a RT start marker (either a '[' of a ')[' boundary), - // and if we had a strict inequality and an open RT marker before this, we would issue - // the open marker first, and then return then next later, which would yet in the - // stream both '[' (or '(') and then ')[' for the same clustering value, which is wrong. - // By using a non-strict inequality, we avoid that problem (if we do get ')[' for the same - // clustering value than the slice, we'll simply record it in 'openMarker'). - while (deserializer.hasNext() && deserializer.compareNextTo(slice.start()) <= 0) - { - if (deserializer.nextIsRow()) - deserializer.skipNext(); - else - updateOpenMarker((RangeTombstoneMarker)deserializer.readNext()); - } - - beforeStart = false; - - // We've reached the beginning of our queried slice. If we have an open marker - // we should return that first. - if (openMarker != null) - return new RangeTombstoneBoundMarker(slice.start(), openMarker); - } - - if (deserializer.hasNext() && deserializer.compareNextTo(slice.end()) <= 0) - { - Unfiltered next = deserializer.readNext(); - if (next.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) - updateOpenMarker((RangeTombstoneMarker)next); - return next; - } - - // If we have an open marker, we should close it before finishing - if (openMarker != null) - return new RangeTombstoneBoundMarker(slice.end(), getAndClearOpenMarker()); - - return endOfData(); - } - catch (IOException e) - { - try - { - close(); - } - catch (IOException suppressed) - { - e.addSuppressed(suppressed); - } - sstable.markSuspect(); - throw new CorruptSSTableException(e, file.getPath()); - } + next = unfiltered; + return true; } - }; + } + + next = computeNext(); + if (next != null) + return true; + + // If we have an open marker, we should close it before finishing + if (openMarker != null) + { + next = new RangeTombstoneBoundMarker(end, getAndClearOpenMarker()); + return true; + } + + sliceDone = true; // not absolutely necessary but accurate and cheap + return false; + } + + protected Unfiltered nextInternal() throws IOException + { + if (!hasNextInternal()) + throw new NoSuchElementException(); + + Unfiltered toReturn = next; + next = null; + return toReturn; } } - private class ForwardIndexedReader extends IndexedReader + private class ForwardIndexedReader extends ForwardReader { + private final IndexState indexState; + + private int lastBlockIdx; // the last index block that has data for the current query + private ForwardIndexedReader(RowIndexEntry indexEntry, FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile) { - super(file, shouldCloseFile, indexEntry, isAtPartitionStart); + super(file, isAtPartitionStart, shouldCloseFile); + this.indexState = new IndexState(this, sstable.metadata.comparator, indexEntry, false); + this.lastBlockIdx = indexState.blocksCount(); // if we never call setForSlice, that's where we want to stop } - public boolean hasNext() throws IOException + @Override + protected void init() throws IOException { - // If it's called before we've created the file, create it. This then mean - // we're reading from the beginning of the partition. - if (!isInit) - { - seekToPosition(indexEntry.position); - ByteBufferUtil.skipShortLength(file); // partition key - DeletionTime.serializer.skip(file); // partition deletion - if (sstable.header.hasStatic()) - UnfilteredSerializer.serializer.skipStaticRow(file, sstable.header, helper); - isInit = true; - } - return deserializer.hasNext(); + // If this is called, it means we're calling hasNext() before any call to setForSlice. Which means + // we're reading everything from the beginning. So just set us up at the beginning of the first block. + indexState.setToBlock(0); } - public Unfiltered next() throws IOException + @Override + public void setForSlice(Slice slice) throws IOException { - return deserializer.readNext(); - } + super.setForSlice(slice); - public Iterator<Unfiltered> slice(final Slice slice) throws IOException - { - final List<IndexHelper.IndexInfo> indexes = indexEntry.columnsIndex(); + isInit = true; // if our previous slicing already got us the biggest row in the sstable, we're done - if (currentIndexIdx >= indexes.size()) - return Collections.emptyIterator(); + if (indexState.isDone()) + { + sliceDone = true; + return; + } // Find the first index block we'll need to read for the slice. - final int startIdx = IndexHelper.indexFor(slice.start(), indexes, sstable.metadata.comparator, false, currentIndexIdx); - if (startIdx >= indexes.size()) - return Collections.emptyIterator(); + int startIdx = indexState.findBlockIndex(slice.start()); + if (startIdx >= indexState.blocksCount()) + { + sliceDone = true; + return; + } // If that's the last block we were reading, we're already where we want to be. Otherwise, // seek to that first block - if (startIdx != currentIndexIdx) - updateBlock(startIdx); + if (startIdx != indexState.currentBlockIdx()) + indexState.setToBlock(startIdx); // Find the last index block we'll need to read for the slice. - final int endIdx = IndexHelper.indexFor(slice.end(), indexes, sstable.metadata.comparator, false, startIdx); - - final IndexHelper.IndexInfo startIndex = currentIndex(); + lastBlockIdx = indexState.findBlockIndex(slice.end()); // The index search is based on the last name of the index blocks, so at that point we have that: - // 1) indexes[startIdx - 1].lastName < slice.start <= indexes[startIdx].lastName - // 2) indexes[endIdx - 1].lastName < slice.end <= indexes[endIdx].lastName - // so if startIdx == endIdx and slice.end < indexes[startIdx].firstName, we're guaranteed that the - // whole slice is between the previous block end and this bloc start, and thus has no corresponding + // 1) indexes[currentIdx - 1].lastName < slice.start <= indexes[currentIdx].lastName + // 2) indexes[lastBlockIdx - 1].lastName < slice.end <= indexes[lastBlockIdx].lastName + // so if currentIdx == lastBlockIdx and slice.end < indexes[currentIdx].firstName, we're guaranteed that the + // whole slice is between the previous block end and this block start, and thus has no corresponding // data. One exception is if the previous block ends with an openMarker as it will cover our slice // and we need to return it. - if (startIdx == endIdx && metadata().comparator.compare(slice.end(), startIndex.firstName) < 0 && openMarker == null && sstable.descriptor.version.storeRows()) - return Collections.emptyIterator(); - - return new AbstractIterator<Unfiltered>() + if (indexState.currentBlockIdx() == lastBlockIdx + && metadata().comparator.compare(slice.end(), indexState.currentIndex().firstName) < 0 + && openMarker == null + && sstable.descriptor.version.storeRows()) { - private boolean beforeStart = true; - private int currentIndexIdx = startIdx; + sliceDone = true; + } + } - protected Unfiltered computeNext() - { - try - { - // While we're before the start of the slice, we can skip row but we should keep - // track of open range tombstones - if (beforeStart) - { - // See ForwardReader equivalent method to see why this inequality is not strict. - while (deserializer.hasNext() && deserializer.compareNextTo(slice.start()) <= 0) - { - if (deserializer.nextIsRow()) - deserializer.skipNext(); - else - updateOpenMarker((RangeTombstoneMarker)deserializer.readNext()); - } - - beforeStart = false; - - // We've reached the beginning of our queried slice. If we have an open marker - // we should return that first. - if (openMarker != null) - return new RangeTombstoneBoundMarker(slice.start(), openMarker); - } - - // If we've crossed an index block boundary, update our informations - if (currentIndexIdx < indexes.size() && file.bytesPastMark(mark) >= currentIndex().width) - updateBlock(++currentIndexIdx); - - // Return the next atom unless we've reached the end, or we're beyond our slice - // end (note that unless we're on the last block for the slice, there is no point - // in checking the slice end). - if (currentIndexIdx < indexes.size() - && currentIndexIdx <= endIdx - && deserializer.hasNext() - && (currentIndexIdx != endIdx || deserializer.compareNextTo(slice.end()) <= 0)) - { - Unfiltered next = deserializer.readNext(); - if (next.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) - updateOpenMarker((RangeTombstoneMarker)next); - return next; - } - - // If we have an open marker, we should close it before finishing - if (openMarker != null) - return new RangeTombstoneBoundMarker(slice.end(), getAndClearOpenMarker()); - - return endOfData(); - } - catch (IOException e) - { - try - { - close(); - } - catch (IOException suppressed) - { - e.addSuppressed(suppressed); - } - sstable.markSuspect(); - throw new CorruptSSTableException(e, file.getPath()); - } - } - }; + @Override + protected Unfiltered computeNext() throws IOException + { + // Our previous read might have made us cross an index block boundary. If so, update our informations. + if (indexState.isPastCurrentBlock()) + indexState.setToBlock(indexState.currentBlockIdx() + 1); + + // Return the next unfiltered unless we've reached the end, or we're beyond our slice + // end (note that unless we're on the last block for the slice, there is no point + // in checking the slice end). + if (indexState.isDone() + || indexState.currentBlockIdx() > lastBlockIdx + || !deserializer.hasNext() + || (indexState.currentBlockIdx() == lastBlockIdx && deserializer.compareNextTo(end) > 0)) + return null; + + + Unfiltered next = deserializer.readNext(); + if (next.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) + updateOpenMarker((RangeTombstoneMarker)next); + return next; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java index 0e18d4a..e15d330 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java @@ -20,29 +20,19 @@ package org.apache.cassandra.db.columniterator; import java.io.IOException; import java.util.*; -import com.google.common.collect.AbstractIterator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.db.partitions.AbstractPartitionData; +import org.apache.cassandra.db.partitions.AbstractThreadUnsafePartition; import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.sstable.CorruptSSTableException; -import org.apache.cassandra.io.sstable.IndexHelper; import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.io.util.FileMark; -import org.apache.cassandra.utils.ByteBufferUtil; /** * A Cell Iterator in reversed clustering order over SSTable */ public class SSTableReversedIterator extends AbstractSSTableIterator { - private static final Logger logger = LoggerFactory.getLogger(SSTableReversedIterator.class); - public SSTableReversedIterator(SSTableReader sstable, DecoratedKey key, ColumnFilter columns, boolean isForThrift) { this(sstable, null, key, sstable.getPosition(key, SSTableReader.Operator.EQ), columns, isForThrift); @@ -70,319 +60,290 @@ public class SSTableReversedIterator extends AbstractSSTableIterator return true; } - private ReusablePartitionData createBuffer(int blocksCount) + private class ReverseReader extends Reader { - int estimatedRowCount = 16; - int columnCount = metadata().partitionColumns().regulars.columnCount(); - if (columnCount == 0 || metadata().clusteringColumns().size() == 0) + protected ReusablePartitionData buffer; + protected Iterator<Unfiltered> iterator; + + private ReverseReader(FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile) { - estimatedRowCount = 1; + super(file, isAtPartitionStart, shouldCloseFile); } - else + + protected ReusablePartitionData createBuffer(int blocksCount) { - try + int estimatedRowCount = 16; + int columnCount = metadata().partitionColumns().regulars.columnCount(); + if (columnCount == 0 || metadata().clusteringColumns().isEmpty()) { - // To avoid wasted resizing we guess-estimate the number of rows we're likely to read. For that - // we use the stats on the number of rows per partition for that sstable. - // FIXME: so far we only keep stats on cells, so to get a rough estimate on the number of rows, - // we divide by the number of regular columns the table has. We should fix once we collect the - // stats on rows - int estimatedRowsPerPartition = (int)(sstable.getEstimatedColumnCount().percentile(0.75) / columnCount); - estimatedRowCount = Math.max(estimatedRowsPerPartition / blocksCount, 1); + estimatedRowCount = 1; } - catch (IllegalStateException e) + else { - // The EstimatedHistogram mean() method can throw this (if it overflows). While such overflow - // shouldn't happen, it's not worth taking the risk of letting the exception bubble up. + try + { + // To avoid wasted resizing we guess-estimate the number of rows we're likely to read. For that + // we use the stats on the number of rows per partition for that sstable. + // FIXME: so far we only keep stats on cells, so to get a rough estimate on the number of rows, + // we divide by the number of regular columns the table has. We should fix once we collect the + // stats on rows + int estimatedRowsPerPartition = (int)(sstable.getEstimatedColumnCount().percentile(0.75) / columnCount); + estimatedRowCount = Math.max(estimatedRowsPerPartition / blocksCount, 1); + } + catch (IllegalStateException e) + { + // The EstimatedHistogram mean() method can throw this (if it overflows). While such overflow + // shouldn't happen, it's not worth taking the risk of letting the exception bubble up. + } } + return new ReusablePartitionData(metadata(), partitionKey(), columns(), estimatedRowCount); } - return new ReusablePartitionData(metadata(), partitionKey(), DeletionTime.LIVE, columns(), estimatedRowCount); - } - - private class ReverseReader extends Reader - { - private ReusablePartitionData partition; - private UnfilteredRowIterator iterator; - private ReverseReader(FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile) + protected void init() throws IOException { - super(file, shouldCloseFile); - assert isAtPartitionStart; + // We should always have been initialized (at the beginning of the partition). Only indexed readers may + // have to initialize. + throw new IllegalStateException(); } - public boolean hasNext() throws IOException + public void setForSlice(Slice slice) throws IOException { - if (partition == null) + // If we have read the data, just create the iterator for the slice. Otherwise, read the data. + if (buffer == null) { - partition = createBuffer(1); - partition.populateFrom(this, null, null, new Tester() - { - public boolean isDone() - { - return false; - } - }); - iterator = partition.unfilteredIterator(columns, Slices.ALL, true); + buffer = createBuffer(1); + // Note that we can reuse that buffer between slices (we could alternatively re-read from disk + // every time, but that feels more wasteful) so we want to include everything from the beginning. + // We can stop at the slice end however since any following slice will be before that. + loadFromDisk(null, slice.end()); } + setIterator(slice); + } + + protected void setIterator(Slice slice) + { + assert buffer != null; + iterator = buffer.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true); + } + + protected boolean hasNextInternal() throws IOException + { + // If we've never called setForSlice, we're reading everything + if (iterator == null) + setForSlice(Slice.ALL); + return iterator.hasNext(); } - public Unfiltered next() throws IOException + protected Unfiltered nextInternal() throws IOException { if (!hasNext()) throw new NoSuchElementException(); return iterator.next(); } - public Iterator<Unfiltered> slice(final Slice slice) throws IOException + protected boolean stopReadingDisk() { - if (partition == null) + return false; + } + + // Reads the unfiltered from disk and load them into the reader buffer. It stops reading when either the partition + // is fully read, or when stopReadingDisk() returns true. + protected void loadFromDisk(Slice.Bound start, Slice.Bound end) throws IOException + { + buffer.reset(); + + // If the start might be in this block, skip everything that comes before it. + if (start != null) { - partition = createBuffer(1); - partition.populateFrom(this, slice.start(), slice.end(), new Tester() + while (deserializer.hasNext() && deserializer.compareNextTo(start) <= 0 && !stopReadingDisk()) { - public boolean isDone() - { - return false; - } - }); + if (deserializer.nextIsRow()) + deserializer.skipNext(); + else + updateOpenMarker((RangeTombstoneMarker)deserializer.readNext()); + } } - return partition.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true); - } - } - - private class ReverseIndexedReader extends IndexedReader - { - private ReusablePartitionData partition; - private UnfilteredRowIterator iterator; - - private ReverseIndexedReader(RowIndexEntry indexEntry, FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile) - { - super(file, shouldCloseFile, indexEntry, isAtPartitionStart); - this.currentIndexIdx = indexEntry.columnsIndex().size(); - } + // If we have an open marker, it's either one from what we just skipped (if start != null), or it's from the previous index block. + if (openMarker != null) + { + RangeTombstone.Bound markerStart = start == null ? RangeTombstone.Bound.BOTTOM : RangeTombstone.Bound.fromSliceBound(start); + buffer.add(new RangeTombstoneBoundMarker(markerStart, openMarker)); + } - public boolean hasNext() throws IOException - { - // If it's called before we've created the file, create it. This then mean - // we're reading from the end of the partition. - if (!isInit) + // Now deserialize everything until we reach our requested end (if we have one) + while (deserializer.hasNext() + && (end == null || deserializer.compareNextTo(end) <= 0) + && !stopReadingDisk()) { - seekToPosition(indexEntry.position); - ByteBufferUtil.skipShortLength(file); // partition key - DeletionTime.serializer.skip(file); // partition deletion - if (sstable.header.hasStatic()) - UnfilteredSerializer.serializer.skipStaticRow(file, sstable.header, helper); - isInit = true; + Unfiltered unfiltered = deserializer.readNext(); + buffer.add(unfiltered); + + if (unfiltered.isRangeTombstoneMarker()) + updateOpenMarker((RangeTombstoneMarker)unfiltered); } - if (partition == null) + // If we have an open marker, we should close it before finishing + if (openMarker != null) { - partition = createBuffer(indexes.size()); - partition.populateFrom(this, null, null, new Tester() - { - public boolean isDone() - { - return false; - } - }); - iterator = partition.unfilteredIterator(columns, Slices.ALL, true); + // If we have no end and still an openMarker, this means we're indexed and the marker is closed in a following block. + RangeTombstone.Bound markerEnd = end == null ? RangeTombstone.Bound.TOP : RangeTombstone.Bound.fromSliceBound(end); + buffer.add(new RangeTombstoneBoundMarker(markerEnd, getAndClearOpenMarker())); } - return iterator.hasNext(); + buffer.build(); } + } + + private class ReverseIndexedReader extends ReverseReader + { + private final IndexState indexState; - public Unfiltered next() throws IOException + // The slice we're currently iterating over + private Slice slice; + // The last index block to consider for the slice + private int lastBlockIdx; + + private ReverseIndexedReader(RowIndexEntry indexEntry, FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile) { - if (!hasNext()) - throw new NoSuchElementException(); - return iterator.next(); + super(file, isAtPartitionStart, shouldCloseFile); + this.indexState = new IndexState(this, sstable.metadata.comparator, indexEntry, true); } - private void prepareBlock(int blockIdx, Slice.Bound start, Slice.Bound end) throws IOException + protected void init() throws IOException { - updateBlock(blockIdx); - - if (partition == null) - partition = createBuffer(indexes.size()); - else - partition.clear(); - - final FileMark fileMark = mark; - final long width = currentIndex().width; - - partition.populateFrom(this, start, end, new Tester() - { - public boolean isDone() - { - return file.bytesPastMark(fileMark) >= width; - } - }); + // If this is called, it means we're calling hasNext() before any call to setForSlice. Which means + // we're reading everything from the end. So just set us up on the last block. + indexState.setToBlock(indexState.blocksCount() - 1); } @Override - public Iterator<Unfiltered> slice(final Slice slice) throws IOException + public void setForSlice(Slice slice) throws IOException { - // if our previous slicing already got us the smallest row in the sstable, we're done - if (currentIndexIdx < 0) - return Collections.emptyIterator(); + this.slice = slice; + isInit = true; - final List<IndexHelper.IndexInfo> indexes = indexEntry.columnsIndex(); + // if our previous slicing already got us pas the beginning of the sstable, we're done + if (indexState.isDone()) + { + iterator = Collections.emptyIterator(); + return; + } // Find the first index block we'll need to read for the slice. - final int startIdx = IndexHelper.indexFor(slice.end(), indexes, sstable.metadata.comparator, true, currentIndexIdx); + int startIdx = indexState.findBlockIndex(slice.end()); if (startIdx < 0) - return Collections.emptyIterator(); + { + iterator = Collections.emptyIterator(); + return; + } - // Find the last index block we'll need to read for the slice. - int lastIdx = IndexHelper.indexFor(slice.start(), indexes, sstable.metadata.comparator, true, startIdx); + boolean isCurrentBlock = startIdx == indexState.currentBlockIdx(); + if (!isCurrentBlock) + indexState.setToBlock(startIdx); - // The index search is by firstname and so lastIdx is such that - // indexes[lastIdx].firstName < slice.start <= indexes[lastIdx + 1].firstName - // However, if indexes[lastIdx].lastName < slice.start we can bump lastIdx. - if (lastIdx >= 0 && metadata().comparator.compare(indexes.get(lastIdx).lastName, slice.start()) < 0) - ++lastIdx; + lastBlockIdx = indexState.findBlockIndex(slice.start()); - final int endIdx = lastIdx; + if (!isCurrentBlock) + readCurrentBlock(true); - // Because we're reversed, even if it is our current block, we should re-prepare the block since we would - // have skipped anything not in the previous slice. - prepareBlock(startIdx, slice.start(), slice.end()); + setIterator(slice); + } - return new AbstractIterator<Unfiltered>() - { - private Iterator<Unfiltered> currentBlockIterator = partition.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true); + @Override + protected boolean hasNextInternal() throws IOException + { + if (super.hasNextInternal()) + return true; + + // We have nothing more for our current block, move the previous one. + int previousBlockIdx = indexState.currentBlockIdx() - 1; + if (previousBlockIdx < 0 || previousBlockIdx < lastBlockIdx) + return false; + + // The slice start can be in + indexState.setToBlock(previousBlockIdx); + readCurrentBlock(false); + setIterator(slice); + // since that new block is within the bounds we've computed in setToSlice(), we know there will + // always be something matching the slice unless we're on the lastBlockIdx (in which case there + // may or may not be results, but if there isn't, we're done for the slice). + return iterator.hasNext(); + } - protected Unfiltered computeNext() - { - try - { - if (currentBlockIterator.hasNext()) - return currentBlockIterator.next(); - - --currentIndexIdx; - if (currentIndexIdx < 0 || currentIndexIdx < endIdx) - return endOfData(); - - // Note that since we know we're read blocks backward, there is no point in checking the slice end, so we pass null - prepareBlock(currentIndexIdx, slice.start(), null); - currentBlockIterator = partition.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true); - return computeNext(); - } - catch (IOException e) - { - try - { - close(); - } - catch (IOException suppressed) - { - e.addSuppressed(suppressed); - } - sstable.markSuspect(); - throw new CorruptSSTableException(e, file.getPath()); - } - } - }; + /** + * Reads the current block, the last one we've set. + * + * @param canIncludeSliceEnd whether the block can include the slice end. + */ + private void readCurrentBlock(boolean canIncludeSliceEnd) throws IOException + { + if (buffer == null) + buffer = createBuffer(indexState.blocksCount()); + + boolean canIncludeSliceStart = indexState.currentBlockIdx() == lastBlockIdx; + loadFromDisk(canIncludeSliceStart ? slice.start() : null, canIncludeSliceEnd ? slice.end() : null); } - } - private abstract class Tester - { - public abstract boolean isDone(); + @Override + protected boolean stopReadingDisk() + { + return indexState.isPastCurrentBlock(); + } } - private class ReusablePartitionData extends AbstractPartitionData + private class ReusablePartitionData extends AbstractThreadUnsafePartition { - private final Writer rowWriter; - private final RangeTombstoneCollector markerWriter; + private MutableDeletionInfo.Builder deletionBuilder; + private MutableDeletionInfo deletionInfo; private ReusablePartitionData(CFMetaData metadata, DecoratedKey partitionKey, - DeletionTime deletionTime, PartitionColumns columns, int initialRowCapacity) { - super(metadata, partitionKey, deletionTime, columns, initialRowCapacity, false); + super(metadata, partitionKey, columns, new ArrayList<>(initialRowCapacity)); + } - this.rowWriter = new Writer(true); - // Note that even though the iterator handles the reverse case, this object holds the data for a single index bock, and we read index blocks in - // forward clustering order. - this.markerWriter = new RangeTombstoneCollector(false); + public DeletionInfo deletionInfo() + { + return deletionInfo; } - // Note that this method is here rather than in the readers because we want to use it for both readers and they - // don't extend one another - private void populateFrom(Reader reader, Slice.Bound start, Slice.Bound end, Tester tester) throws IOException + protected boolean canHaveShadowedData() { - // If we have a start bound, skip everything that comes before it. - while (reader.deserializer.hasNext() && start != null && reader.deserializer.compareNextTo(start) <= 0 && !tester.isDone()) - { - if (reader.deserializer.nextIsRow()) - reader.deserializer.skipNext(); - else - reader.updateOpenMarker((RangeTombstoneMarker)reader.deserializer.readNext()); - } + return false; + } - // If we have an open marker, it's either one from what we just skipped (if start != null), or it's from the previous index block. - if (reader.openMarker != null) - { - // If we have no start but still an openMarker, this means we're indexed and it's coming from the previous block - Slice.Bound markerStart = start; - if (start == null) - { - ClusteringPrefix c = ((IndexedReader)reader).previousIndex().lastName; - markerStart = Slice.Bound.exclusiveStartOf(c); - } - writeMarker(markerStart, reader.openMarker); - } + public Row staticRow() + { + return Rows.EMPTY_STATIC_ROW; // we don't actually use that + } - // Now deserialize everything until we reach our requested end (if we have one) - while (reader.deserializer.hasNext() - && (end == null || reader.deserializer.compareNextTo(end) <= 0) - && !tester.isDone()) - { - Unfiltered unfiltered = reader.deserializer.readNext(); - if (unfiltered.kind() == Unfiltered.Kind.ROW) - { - ((Row) unfiltered).copyTo(rowWriter); - } - else - { - RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered; - reader.updateOpenMarker(marker); - marker.copyTo(markerWriter); - } - } + public RowStats stats() + { + return RowStats.NO_STATS; // we don't actually use that + } - // If we have an open marker, we should close it before finishing - if (reader.openMarker != null) - { - // If we no end and still an openMarker, this means we're indexed and the marker can be close using the blocks end - Slice.Bound markerEnd = end; - if (end == null) - { - ClusteringPrefix c = ((IndexedReader)reader).currentIndex().lastName; - markerEnd = Slice.Bound.inclusiveEndOf(c); - } - writeMarker(markerEnd, reader.getAndClearOpenMarker()); - } + public void add(Unfiltered unfiltered) + { + if (unfiltered.isRow()) + rows.add((Row)unfiltered); + else + deletionBuilder.add((RangeTombstoneMarker)unfiltered); } - private void writeMarker(Slice.Bound bound, DeletionTime dt) + public void reset() { - bound.writeTo(markerWriter); - markerWriter.writeBoundDeletion(dt); - markerWriter.endOfMarker(); + rows.clear(); + deletionBuilder = MutableDeletionInfo.builder(partitionLevelDeletion, metadata().comparator, false); } - @Override - public void clear() + public void build() { - super.clear(); - rowWriter.reset(); - markerWriter.reset(); + deletionInfo = deletionBuilder.build(); + deletionBuilder = null; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index b3cb370..0149582 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -26,7 +26,6 @@ import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.sstable.ISSTableScanner; -import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.metrics.CompactionMetrics; /** @@ -34,7 +33,7 @@ import org.apache.cassandra.metrics.CompactionMetrics; * <p> * On top of the actual merging the source iterators, this class: * <ul> - * <li>purge gc-able tombstones if possible (see PurgingPartitionIterator below).</li> + * <li>purge gc-able tombstones if possible (see PurgeIterator below).</li> * <li>update 2ndary indexes if necessary (as we don't read-before-write on index updates, index entries are * not deleted on deletion of the base table data, which is ok because we'll fix index inconsistency * on reads. This however mean that potentially obsolete index entries could be kept a long time for @@ -65,12 +64,9 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte */ private final long[] mergeCounters; - private final UnfilteredPartitionIterator mergedIterator; + private final UnfilteredPartitionIterator compacted; private final CompactionMetrics metrics; - // The number of row/RT merged by the iterator - private int merged; - public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId) { this(type, scanners, controller, nowInSec, compactionId, null); @@ -96,9 +92,9 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte if (metrics != null) metrics.beginCompaction(this); - this.mergedIterator = scanners.isEmpty() - ? UnfilteredPartitionIterators.EMPTY - : UnfilteredPartitionIterators.convertExpiredCellsToTombstones(new PurgingPartitionIterator(UnfilteredPartitionIterators.merge(scanners, nowInSec, listener()), controller), nowInSec); + this.compacted = scanners.isEmpty() + ? UnfilteredPartitionIterators.EMPTY + : new PurgeIterator(UnfilteredPartitionIterators.merge(scanners, nowInSec, listener()), controller); } public boolean isForThrift() @@ -143,57 +139,46 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte CompactionIterator.this.updateCounterFor(merged); - /* - * The row level listener does 2 things: - * - It updates 2ndary indexes for deleted/shadowed cells - * - It updates progress regularly (every UNFILTERED_TO_UPDATE_PROGRESS) - */ - final SecondaryIndexManager.Updater indexer = type == OperationType.COMPACTION - ? controller.cfs.indexManager.gcUpdaterFor(partitionKey, nowInSec) - : SecondaryIndexManager.nullUpdater; + if (type != OperationType.COMPACTION || !controller.cfs.indexManager.hasIndexes()) + return null; - return new UnfilteredRowIterators.MergeListener() + // If we have a 2ndary index, we must update it with deleted/shadowed cells. + // TODO: this should probably be done asynchronously and batched. + final SecondaryIndexManager.Updater indexer = controller.cfs.indexManager.gcUpdaterFor(partitionKey, nowInSec); + final RowDiffListener diffListener = new RowDiffListener() { - private Clustering clustering; + public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original) + { + } - public void onMergePartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) + public void onDeletion(int i, Clustering clustering, DeletionTime merged, DeletionTime original) { } - public void onMergingRows(Clustering clustering, LivenessInfo mergedInfo, DeletionTime mergedDeletion, Row[] versions) + public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original) { - this.clustering = clustering; } - public void onMergedComplexDeletion(ColumnDefinition c, DeletionTime mergedCompositeDeletion, DeletionTime[] versions) + public void onCell(int i, Clustering clustering, Cell merged, Cell original) { + if (original != null && (merged == null || !merged.isLive(nowInSec))) + indexer.remove(clustering, original); } + }; - public void onMergedCells(Cell mergedCell, Cell[] versions) + return new UnfilteredRowIterators.MergeListener() + { + public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) { - if (indexer == SecondaryIndexManager.nullUpdater) - return; - - for (int i = 0; i < versions.length; i++) - { - Cell version = versions[i]; - if (version != null && (mergedCell == null || !mergedCell.equals(version))) - indexer.remove(clustering, version); - } } - public void onRowDone() + public void onMergedRows(Row merged, Columns columns, Row[] versions) { - int merged = ++CompactionIterator.this.merged; - if (merged % UNFILTERED_TO_UPDATE_PROGRESS == 0) - updateBytesRead(); + Rows.diff(merged, columns, versions, diffListener); } public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker mergedMarker, RangeTombstoneMarker[] versions) { - int merged = ++CompactionIterator.this.merged; - if (merged % UNFILTERED_TO_UPDATE_PROGRESS == 0) - updateBytesRead(); } public void close() @@ -218,12 +203,12 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte public boolean hasNext() { - return mergedIterator.hasNext(); + return compacted.hasNext(); } public UnfilteredRowIterator next() { - return mergedIterator.next(); + return compacted.next(); } public void remove() @@ -235,7 +220,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte { try { - mergedIterator.close(); + compacted.close(); } finally { @@ -249,7 +234,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte return this.getCompactionInfo().toString(); } - private class PurgingPartitionIterator extends TombstonePurgingPartitionIterator + private class PurgeIterator extends PurgingPartitionIterator { private final CompactionController controller; @@ -257,28 +242,33 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte private long maxPurgeableTimestamp; private boolean hasCalculatedMaxPurgeableTimestamp; - private PurgingPartitionIterator(UnfilteredPartitionIterator toPurge, CompactionController controller) + private long compactedUnfiltered; + + private PurgeIterator(UnfilteredPartitionIterator toPurge, CompactionController controller) { super(toPurge, controller.gcBefore); this.controller = controller; } @Override - protected void onEmpty(DecoratedKey key) + protected void onEmptyPartitionPostPurge(DecoratedKey key) { if (type == OperationType.COMPACTION) controller.cfs.invalidateCachedPartition(key); } @Override - protected boolean shouldFilter(UnfilteredRowIterator iterator) + protected void onNewPartition(DecoratedKey key) { - currentKey = iterator.partitionKey(); + currentKey = key; hasCalculatedMaxPurgeableTimestamp = false; + } - // TODO: we could be able to skip filtering if UnfilteredRowIterator was giving us some stats - // (like the smallest local deletion time). - return true; + @Override + protected void updateProgress() + { + if ((++compactedUnfiltered) % UNFILTERED_TO_UPDATE_PROGRESS == 0) + updateBytesRead(); } /* http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java index 29ea7fe..ed7584b 100644 --- a/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java +++ b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java @@ -17,13 +17,13 @@ */ package org.apache.cassandra.db.filter; -import java.io.DataInput; import java.io.IOException; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.ReversedType; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; public abstract class AbstractClusteringIndexFilter implements ClusteringIndexFilter @@ -68,7 +68,7 @@ public abstract class AbstractClusteringIndexFilter implements ClusteringIndexFi int i = 0; for (ColumnDefinition column : metadata.clusteringColumns()) sb.append(i++ == 0 ? "" : ", ").append(column.name).append(column.type instanceof ReversedType ? " ASC" : " DESC"); - sb.append(")"); + sb.append(')'); } } @@ -84,7 +84,7 @@ public abstract class AbstractClusteringIndexFilter implements ClusteringIndexFi filter.serializeInternal(out, version); } - public ClusteringIndexFilter deserialize(DataInput in, int version, CFMetaData metadata) throws IOException + public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException { Kind kind = Kind.values()[in.readUnsignedByte()]; boolean reversed = in.readBoolean(); @@ -104,6 +104,6 @@ public abstract class AbstractClusteringIndexFilter implements ClusteringIndexFi protected static abstract class InternalDeserializer { - public abstract ClusteringIndexFilter deserialize(DataInput in, int version, CFMetaData metadata, boolean reversed) throws IOException; + public abstract ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException; } }
