http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SuperColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SuperColumns.java b/src/java/org/apache/cassandra/db/SuperColumns.java deleted file mode 100644 index 65e153f..0000000 --- a/src/java/org/apache/cassandra/db/SuperColumns.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.io.DataInput; -import java.io.IOError; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.composites.*; -import org.apache.cassandra.db.filter.*; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.utils.ByteBufferUtil; - -public class SuperColumns -{ - public static Iterator<OnDiskAtom> onDiskIterator(DataInput in, int superColumnCount, ColumnSerializer.Flag flag, int expireBefore, CellNameType type) - { - return new SCIterator(in, superColumnCount, flag, expireBefore, type); - } - - public static void deserializerSuperColumnFamily(DataInput in, ColumnFamily cf, ColumnSerializer.Flag flag, int version) throws IOException - { - // Note that there was no way to insert a range tombstone in a SCF in 1.2 - cf.delete(cf.getComparator().deletionInfoSerializer().deserialize(in, version)); - assert !cf.deletionInfo().rangeIterator().hasNext(); - - Iterator<OnDiskAtom> iter = onDiskIterator(in, in.readInt(), flag, Integer.MIN_VALUE, cf.getComparator()); - while (iter.hasNext()) - cf.addAtom(iter.next()); - } - - private static class SCIterator implements Iterator<OnDiskAtom> - { - private final DataInput in; - private final int scCount; - - private final ColumnSerializer.Flag flag; - private final int expireBefore; - - private final CellNameType type; - - private int read; - private ByteBuffer scName; - private Iterator<Cell> subColumnsIterator; - - private SCIterator(DataInput in, int superColumnCount, ColumnSerializer.Flag flag, int expireBefore, CellNameType type) - { - this.in = in; - this.scCount = superColumnCount; - this.flag = flag; - this.expireBefore = expireBefore; - this.type = type; - } - - public boolean hasNext() - { - return (subColumnsIterator != null && subColumnsIterator.hasNext()) || read < scCount; - } - - public OnDiskAtom next() - { - try - { - if (subColumnsIterator != null && subColumnsIterator.hasNext()) - { - Cell c = subColumnsIterator.next(); - return c.withUpdatedName(type.makeCellName(scName, c.name().toByteBuffer())); - } - - // Read one more super column - ++read; - - scName = ByteBufferUtil.readWithShortLength(in); - DeletionInfo delInfo = new DeletionInfo(DeletionTime.serializer.deserialize(in)); - - /* read the number of columns */ - int size = in.readInt(); - List<Cell> subCells = new ArrayList<>(size); - - ColumnSerializer colSer = subType(type).columnSerializer(); - for (int i = 0; i < size; ++i) - subCells.add(colSer.deserialize(in, flag, expireBefore)); - - subColumnsIterator = subCells.iterator(); - - // If the SC was deleted, return that first, otherwise return the first subcolumn - DeletionTime dtime = delInfo.getTopLevelDeletion(); - if (!dtime.equals(DeletionTime.LIVE)) - return new RangeTombstone(startOf(scName), endOf(scName), dtime); - - return next(); - } - catch (IOException e) - { - throw new IOError(e); - } - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - } - - private static CellNameType subType(CellNameType type) - { - return new SimpleDenseCellNameType(type.subtype(1)); - } - - public static CellNameType scNameType(CellNameType type) - { - return new SimpleDenseCellNameType(type.subtype(0)); - } - - public static AbstractType<?> getComparatorFor(CFMetaData metadata, ByteBuffer superColumn) - { - return getComparatorFor(metadata, superColumn != null); - } - - public static AbstractType<?> getComparatorFor(CFMetaData metadata, boolean subColumn) - { - return metadata.isSuper() - ? metadata.comparator.subtype(subColumn ? 1 : 0) - : metadata.comparator.asAbstractType(); - } - - // Extract the first component of a columnName, i.e. the super column name - public static ByteBuffer scName(Composite columnName) - { - return columnName.get(0); - } - - // Extract the 2nd component of a columnName, i.e. the sub-column name - public static ByteBuffer subName(Composite columnName) - { - return columnName.get(1); - } - - public static Composite startOf(ByteBuffer scName) - { - return CellNames.compositeDense(scName).start(); - } - - public static Composite endOf(ByteBuffer scName) - { - return CellNames.compositeDense(scName).end(); - } - - public static IDiskAtomFilter fromSCFilter(CellNameType type, ByteBuffer scName, IDiskAtomFilter filter) - { - if (filter instanceof NamesQueryFilter) - return fromSCNamesFilter(type, scName, (NamesQueryFilter)filter); - else - return fromSCSliceFilter(type, scName, (SliceQueryFilter)filter); - } - - public static IDiskAtomFilter fromSCNamesFilter(CellNameType type, ByteBuffer scName, NamesQueryFilter filter) - { - if (scName == null) - { - ColumnSlice[] slices = new ColumnSlice[filter.columns.size()]; - int i = 0; - for (CellName name : filter.columns) - { - // Note that, because the filter in argument is the one from thrift, 'name' are SimpleDenseCellName. - // So calling name.slice() would be incorrect, as simple cell names don't handle the EOC properly. - // This is why we call buffer() and rebuild a Composite of the right type before call slice(). - slices[i++] = type.make(name.toByteBuffer()).slice(); - } - return new SliceQueryFilter(slices, false, slices.length, 1); - } - else - { - SortedSet<CellName> newColumns = new TreeSet<>(type); - for (CellName c : filter.columns) - newColumns.add(type.makeCellName(scName, c.toByteBuffer())); - return filter.withUpdatedColumns(newColumns); - } - } - - public static SliceQueryFilter fromSCSliceFilter(CellNameType type, ByteBuffer scName, SliceQueryFilter filter) - { - assert filter.slices.length == 1; - if (scName == null) - { - // The filter is on the super column name - CBuilder builder = type.builder(); - Composite start = filter.start().isEmpty() - ? Composites.EMPTY - : builder.buildWith(filter.start().toByteBuffer()).withEOC(filter.reversed ? Composite.EOC.END : Composite.EOC.START); - Composite finish = filter.finish().isEmpty() - ? Composites.EMPTY - : builder.buildWith(filter.finish().toByteBuffer()).withEOC(filter.reversed ? Composite.EOC.START : Composite.EOC.END); - return new SliceQueryFilter(start, finish, filter.reversed, filter.count, 1); - } - else - { - CBuilder builder = type.builder().add(scName); - Composite start = filter.start().isEmpty() - ? builder.build().withEOC(filter.reversed ? Composite.EOC.END : Composite.EOC.START) - : builder.buildWith(filter.start().toByteBuffer()); - Composite end = filter.finish().isEmpty() - ? builder.build().withEOC(filter.reversed ? Composite.EOC.START : Composite.EOC.END) - : builder.buildWith(filter.finish().toByteBuffer()); - return new SliceQueryFilter(start, end, filter.reversed, filter.count); - } - } -}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 4bc1522..34c617f 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -36,11 +36,10 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.compaction.CompactionHistoryTabularData; import org.apache.cassandra.db.compaction.LeveledCompactionStrategy; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; @@ -126,8 +125,10 @@ public final class SystemKeyspace + "in_progress_ballot timeuuid," + "most_recent_commit blob," + "most_recent_commit_at timeuuid," + + "most_recent_commit_version int," + "proposal blob," + "proposal_ballot timeuuid," + + "proposal_version int," + "PRIMARY KEY ((row_key), cf_id))") .compactionStrategyClass(LeveledCompactionStrategy.class); @@ -831,27 +832,22 @@ public final class SystemKeyspace public static boolean isIndexBuilt(String keyspaceName, String indexName) { - ColumnFamilyStore cfs = Keyspace.open(NAME).getColumnFamilyStore(BUILT_INDEXES); - QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(keyspaceName)), - BUILT_INDEXES, - FBUtilities.singleton(cfs.getComparator().makeCellName(indexName), cfs.getComparator()), - System.currentTimeMillis()); - return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null; + String req = "SELECT index_name FROM %s.\"%s\" WHERE table_name=? AND index_name=?"; + UntypedResultSet result = executeInternal(String.format(req, NAME, BUILT_INDEXES), keyspaceName, indexName); + return !result.isEmpty(); } public static void setIndexBuilt(String keyspaceName, String indexName) { - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(NAME, BUILT_INDEXES); - cf.addColumn(new BufferCell(cf.getComparator().makeCellName(indexName), ByteBufferUtil.EMPTY_BYTE_BUFFER, FBUtilities.timestampMicros())); - new Mutation(NAME, ByteBufferUtil.bytes(keyspaceName), cf).apply(); + String req = "INSERT INTO %s.\"%s\" (table_name, index_name) VALUES (?, ?)"; + executeInternal(String.format(req, NAME, BUILT_INDEXES), keyspaceName, indexName); forceBlockingFlush(BUILT_INDEXES); } public static void setIndexRemoved(String keyspaceName, String indexName) { - Mutation mutation = new Mutation(NAME, ByteBufferUtil.bytes(keyspaceName)); - mutation.delete(BUILT_INDEXES, BuiltIndexes.comparator.makeCellName(indexName), FBUtilities.timestampMicros()); - mutation.apply(); + String req = "DELETE FROM %s.\"%s\" WHERE table_name = ? AND index_name = ?"; + executeInternal(String.format(req, NAME, BUILT_INDEXES), keyspaceName, indexName); forceBlockingFlush(BUILT_INDEXES); } @@ -884,23 +880,26 @@ public final class SystemKeyspace return hostId; } - public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata) + + public static PaxosState loadPaxosState(DecoratedKey key, CFMetaData metadata) { String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?"; - UntypedResultSet results = executeInternal(String.format(req, PAXOS), key, metadata.cfId); + UntypedResultSet results = executeInternal(String.format(req, PAXOS), key.getKey(), metadata.cfId); if (results.isEmpty()) return new PaxosState(key, metadata); UntypedResultSet.Row row = results.one(); Commit promised = row.has("in_progress_ballot") - ? new Commit(key, row.getUUID("in_progress_ballot"), ArrayBackedSortedColumns.factory.create(metadata)) + ? new Commit(row.getUUID("in_progress_ballot"), new PartitionUpdate(metadata, key, metadata.partitionColumns(), 1)) : Commit.emptyCommit(key, metadata); // either we have both a recently accepted ballot and update or we have neither + int proposalVersion = row.has("proposal_version") ? row.getInt("proposal_version") : MessagingService.VERSION_21; Commit accepted = row.has("proposal") - ? new Commit(key, row.getUUID("proposal_ballot"), ColumnFamily.fromBytes(row.getBytes("proposal"))) + ? new Commit(row.getUUID("proposal_ballot"), PartitionUpdate.fromBytes(row.getBytes("proposal"), proposalVersion, key)) : Commit.emptyCommit(key, metadata); // either most_recent_commit and most_recent_commit_at will both be set, or neither + int mostRecentVersion = row.has("most_recent_commit_version") ? row.getInt("most_recent_commit_version") : MessagingService.VERSION_21; Commit mostRecent = row.has("most_recent_commit") - ? new Commit(key, row.getUUID("most_recent_commit_at"), ColumnFamily.fromBytes(row.getBytes("most_recent_commit"))) + ? new Commit(row.getUUID("most_recent_commit_at"), PartitionUpdate.fromBytes(row.getBytes("most_recent_commit"), mostRecentVersion, key)) : Commit.emptyCommit(key, metadata); return new PaxosState(promised, accepted, mostRecent); } @@ -910,21 +909,22 @@ public final class SystemKeyspace String req = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?"; executeInternal(String.format(req, PAXOS), UUIDGen.microsTimestamp(promise.ballot), - paxosTtl(promise.update.metadata), + paxosTtl(promise.update.metadata()), promise.ballot, - promise.key, - promise.update.id()); + promise.update.partitionKey().getKey(), + promise.update.metadata().cfId); } public static void savePaxosProposal(Commit proposal) { - executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS), + executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ?, proposal_version = ? WHERE row_key = ? AND cf_id = ?", PAXOS), UUIDGen.microsTimestamp(proposal.ballot), - paxosTtl(proposal.update.metadata), + paxosTtl(proposal.update.metadata()), proposal.ballot, - proposal.update.toBytes(), - proposal.key, - proposal.update.id()); + PartitionUpdate.toBytes(proposal.update, MessagingService.current_version), + MessagingService.current_version, + proposal.update.partitionKey().getKey(), + proposal.update.metadata().cfId); } private static int paxosTtl(CFMetaData metadata) @@ -937,14 +937,15 @@ public final class SystemKeyspace { // We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old) // even though that's really just an optimization since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc. - String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ? WHERE row_key = ? AND cf_id = ?"; + String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = ? WHERE row_key = ? AND cf_id = ?"; executeInternal(String.format(cql, PAXOS), UUIDGen.microsTimestamp(commit.ballot), - paxosTtl(commit.update.metadata), + paxosTtl(commit.update.metadata()), commit.ballot, - commit.update.toBytes(), - commit.key, - commit.update.id()); + PartitionUpdate.toBytes(commit.update, MessagingService.current_version), + MessagingService.current_version, + commit.update.partitionKey().getKey(), + commit.update.metadata().cfId); } /** @@ -998,24 +999,24 @@ public final class SystemKeyspace public static void updateSizeEstimates(String keyspace, String table, Map<Range<Token>, Pair<Long, Long>> estimates) { long timestamp = FBUtilities.timestampMicros(); - Mutation mutation = new Mutation(NAME, UTF8Type.instance.decompose(keyspace)); + DecoratedKey key = decorate(UTF8Type.instance.decompose(keyspace)); + PartitionUpdate update = new PartitionUpdate(SizeEstimates, key, SizeEstimates.partitionColumns(), estimates.size()); + Mutation mutation = new Mutation(update); // delete all previous values with a single range tombstone. - mutation.deleteRange(SIZE_ESTIMATES, - SizeEstimates.comparator.make(table).start(), - SizeEstimates.comparator.make(table).end(), - timestamp - 1); + int nowInSec = FBUtilities.nowInSeconds(); + update.addRangeTombstone(Slice.make(SizeEstimates.comparator, table), new SimpleDeletionTime(timestamp - 1, nowInSec)); // add a CQL row for each primary token range. - ColumnFamily cells = mutation.addOrGet(SizeEstimates); for (Map.Entry<Range<Token>, Pair<Long, Long>> entry : estimates.entrySet()) { Range<Token> range = entry.getKey(); Pair<Long, Long> values = entry.getValue(); - Composite prefix = SizeEstimates.comparator.make(table, range.left.toString(), range.right.toString()); - CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp); - adder.add("partitions_count", values.left) - .add("mean_partition_size", values.right); + new RowUpdateBuilder(SizeEstimates, timestamp, mutation) + .clustering(table, range.left.toString(), range.right.toString()) + .add("partitions_count", values.left) + .add("mean_partition_size", values.right) + .build(); } mutation.apply(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java new file mode 100644 index 0000000..a15fb61 --- /dev/null +++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.io.DataInput; +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.net.MessagingService; + +/** + * Helper class to deserialize Unfiltered object from disk efficiently. + * + * More precisely, this class is used by the low-level reader to ensure + * we don't do more work than necessary (i.e. we don't allocate/deserialize + * objects for things we don't care about). + */ +public abstract class UnfilteredDeserializer +{ + private static final Logger logger = LoggerFactory.getLogger(UnfilteredDeserializer.class); + + protected final CFMetaData metadata; + protected final DataInput in; + protected final SerializationHelper helper; + + protected UnfilteredDeserializer(CFMetaData metadata, + DataInput in, + SerializationHelper helper) + { + this.metadata = metadata; + this.in = in; + this.helper = helper; + } + + public static UnfilteredDeserializer create(CFMetaData metadata, + DataInput in, + SerializationHeader header, + SerializationHelper helper, + DeletionTime partitionDeletion, + boolean readAllAsDynamic) + { + if (helper.version >= MessagingService.VERSION_30) + return new CurrentDeserializer(metadata, in, header, helper); + else + return new OldFormatDeserializer(metadata, in, helper, partitionDeletion, readAllAsDynamic); + } + + /** + * Whether or not there is more atom to read. + */ + public abstract boolean hasNext() throws IOException; + + /** + * Compare the provided bound to the next atom to read on disk. + * + * This will not read/deserialize the whole atom but only what is necessary for the + * comparison. Whenever we know what to do with this atom (read it or skip it), + * readNext or skipNext should be called. + */ + public abstract int compareNextTo(Slice.Bound bound) throws IOException; + + /** + * Returns whether the next atom is a row or not. + */ + public abstract boolean nextIsRow() throws IOException; + + /** + * Returns whether the next atom is the static row or not. + */ + public abstract boolean nextIsStatic() throws IOException; + + /** + * Returns the next atom. + */ + public abstract Unfiltered readNext() throws IOException; + + /** + * Clears any state in this deserializer. + */ + public abstract void clearState() throws IOException; + + /** + * Skips the next atom. + */ + public abstract void skipNext() throws IOException; + + private static class CurrentDeserializer extends UnfilteredDeserializer + { + private final ClusteringPrefix.Deserializer clusteringDeserializer; + private final SerializationHeader header; + + private int nextFlags; + private boolean isReady; + private boolean isDone; + + private final ReusableRow row; + private final RangeTombstoneMarker.Builder markerBuilder; + + private CurrentDeserializer(CFMetaData metadata, + DataInput in, + SerializationHeader header, + SerializationHelper helper) + { + super(metadata, in, helper); + this.header = header; + this.clusteringDeserializer = new ClusteringPrefix.Deserializer(metadata.comparator, in, header); + this.row = new ReusableRow(metadata.clusteringColumns().size(), header.columns().regulars, true, metadata.isCounter()); + this.markerBuilder = new RangeTombstoneMarker.Builder(metadata.clusteringColumns().size()); + } + + public boolean hasNext() throws IOException + { + if (isReady) + return true; + + prepareNext(); + return !isDone; + } + + private void prepareNext() throws IOException + { + if (isDone) + return; + + nextFlags = in.readUnsignedByte(); + if (UnfilteredSerializer.isEndOfPartition(nextFlags)) + { + isDone = true; + isReady = false; + return; + } + + clusteringDeserializer.prepare(nextFlags); + isReady = true; + } + + public int compareNextTo(Slice.Bound bound) throws IOException + { + if (!isReady) + prepareNext(); + + assert !isDone; + + return clusteringDeserializer.compareNextTo(bound); + } + + public boolean nextIsRow() throws IOException + { + if (!isReady) + prepareNext(); + + return UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.ROW; + } + + public boolean nextIsStatic() throws IOException + { + // This exists only for the sake of the OldFormatDeserializer + throw new UnsupportedOperationException(); + } + + public Unfiltered readNext() throws IOException + { + isReady = false; + if (UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) + { + markerBuilder.reset(); + RangeTombstone.Bound.Kind kind = clusteringDeserializer.deserializeNextBound(markerBuilder); + UnfilteredSerializer.serializer.deserializeMarkerBody(in, header, kind.isBoundary(), markerBuilder); + return markerBuilder.build(); + } + else + { + Row.Writer writer = row.writer(); + clusteringDeserializer.deserializeNextClustering(writer); + UnfilteredSerializer.serializer.deserializeRowBody(in, header, helper, nextFlags, writer); + return row; + } + } + + public void skipNext() throws IOException + { + isReady = false; + ClusteringPrefix.Kind kind = clusteringDeserializer.skipNext(); + if (UnfilteredSerializer.kind(nextFlags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) + { + UnfilteredSerializer.serializer.skipMarkerBody(in, header, kind.isBoundary()); + } + else + { + UnfilteredSerializer.serializer.skipRowBody(in, header, helper, nextFlags); + } + } + + public void clearState() + { + isReady = false; + isDone = false; + } + } + + public static class OldFormatDeserializer extends UnfilteredDeserializer + { + private final boolean readAllAsDynamic; + private boolean skipStatic; + + private int nextFlags; + private boolean isDone; + private boolean isStart = true; + + private final LegacyLayout.CellGrouper grouper; + private LegacyLayout.LegacyAtom nextAtom; + + private boolean staticFinished; + private LegacyLayout.LegacyAtom savedAtom; + + private final LegacyLayout.TombstoneTracker tombstoneTracker; + + private RangeTombstoneMarker closingMarker; + + private OldFormatDeserializer(CFMetaData metadata, + DataInput in, + SerializationHelper helper, + DeletionTime partitionDeletion, + boolean readAllAsDynamic) + { + super(metadata, in, helper); + this.readAllAsDynamic = readAllAsDynamic; + this.grouper = new LegacyLayout.CellGrouper(metadata, helper); + this.tombstoneTracker = new LegacyLayout.TombstoneTracker(metadata, partitionDeletion); + } + + public void setSkipStatic() + { + this.skipStatic = true; + } + + public boolean hasNext() throws IOException + { + if (nextAtom != null) + return true; + + if (isDone) + return false; + + return deserializeNextAtom(); + } + + private boolean deserializeNextAtom() throws IOException + { + if (staticFinished && savedAtom != null) + { + nextAtom = savedAtom; + savedAtom = null; + return true; + } + + while (true) + { + nextAtom = LegacyLayout.readLegacyAtom(metadata, in, readAllAsDynamic); + if (nextAtom == null) + { + isDone = true; + return false; + } + else if (tombstoneTracker.isShadowed(nextAtom)) + { + // We don't want to return shadowed data because that would fail the contract + // of UnfilteredRowIterator. However the old format could have shadowed data, so filter it here. + nextAtom = null; + continue; + } + + tombstoneTracker.update(nextAtom); + + // For static compact tables, the "column_metadata" columns are supposed to be static, but in the old + // format they are intermingled with other columns. We deal with that with 2 different strategy: + // 1) for thrift queries, we basically consider everything as a "dynamic" cell. This is ok because + // that's basically what we end up with on ThriftResultsMerger has done its thing. + // 2) otherwise, we make sure to extract the "static" columns first (see AbstractSSTableIterator.readStaticRow + // and SSTableSimpleIterator.readStaticRow) as a first pass. So, when we do a 2nd pass for dynamic columns + // (which in practice we only do for compactions), we want to ignore those extracted static columns. + if (skipStatic && metadata.isStaticCompactTable() && nextAtom.isCell()) + { + LegacyLayout.LegacyCell cell = nextAtom.asCell(); + if (cell.name.column.isStatic()) + { + nextAtom = null; + continue; + } + } + + // We want to fetch the static row as the first thing this deserializer return. + // However, in practice, it's possible to have range tombstone before the static row cells + // if that tombstone has an empty start. So if we do, we save it initially so we can get + // to the static parts (if there is any). + if (isStart) + { + isStart = false; + if (!nextAtom.isCell()) + { + LegacyLayout.LegacyRangeTombstone tombstone = nextAtom.asRangeTombstone(); + if (tombstone.start.bound.size() == 0) + { + savedAtom = tombstone; + nextAtom = LegacyLayout.readLegacyAtom(metadata, in, readAllAsDynamic); + if (nextAtom == null) + { + // That was actually the only atom so use it after all + nextAtom = savedAtom; + savedAtom = null; + } + else if (!nextAtom.isStatic()) + { + // We don't have anything static. So we do want to send first + // the saved atom, so switch + LegacyLayout.LegacyAtom atom = nextAtom; + nextAtom = savedAtom; + savedAtom = atom; + } + } + } + } + + return true; + } + } + + private void checkReady() throws IOException + { + if (nextAtom == null) + hasNext(); + assert !isDone; + } + + public int compareNextTo(Slice.Bound bound) throws IOException + { + checkReady(); + return metadata.comparator.compare(nextAtom, bound); + } + + public boolean nextIsRow() throws IOException + { + checkReady(); + if (nextAtom.isCell()) + return true; + + LegacyLayout.LegacyRangeTombstone tombstone = nextAtom.asRangeTombstone(); + return tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata); + } + + public boolean nextIsStatic() throws IOException + { + checkReady(); + return nextAtom.isStatic(); + } + + public Unfiltered readNext() throws IOException + { + if (!nextIsRow()) + { + LegacyLayout.LegacyRangeTombstone tombstone = nextAtom.asRangeTombstone(); + // TODO: this is actually more complex, we can have repeated markers etc.... + if (closingMarker == null) + throw new UnsupportedOperationException(); + closingMarker = new RangeTombstoneBoundMarker(tombstone.stop.bound, tombstone.deletionTime); + return new RangeTombstoneBoundMarker(tombstone.start.bound, tombstone.deletionTime); + } + + LegacyLayout.CellGrouper grouper = nextAtom.isStatic() + ? LegacyLayout.CellGrouper.staticGrouper(metadata, helper) + : this.grouper; + + grouper.reset(); + grouper.addAtom(nextAtom); + while (deserializeNextAtom() && grouper.addAtom(nextAtom)) + { + } + + // if this was the first static row, we're done with it. Otherwise, we're also done with static. + staticFinished = true; + return grouper.getRow(); + } + + public void skipNext() throws IOException + { + readNext(); + } + + public void clearState() + { + isDone = false; + nextAtom = null; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/UnknownColumnException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/UnknownColumnException.java b/src/java/org/apache/cassandra/db/UnknownColumnException.java new file mode 100644 index 0000000..55dc453 --- /dev/null +++ b/src/java/org/apache/cassandra/db/UnknownColumnException.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.utils.ByteBufferUtil; + +/** + * Exception thrown when we read a column internally that is unknown. Note that + * this is an internal exception and is not meant to be user facing. + */ +public class UnknownColumnException extends Exception +{ + public final ByteBuffer columnName; + + public UnknownColumnException(CFMetaData metadata, ByteBuffer columnName) + { + super(String.format("Unknown column %s in table %s.%s", stringify(columnName), metadata.ksName, metadata.cfName)); + this.columnName = columnName; + } + + private static String stringify(ByteBuffer name) + { + try + { + return UTF8Type.instance.getString(name); + } + catch (Exception e) + { + return ByteBufferUtil.bytesToHex(name); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java new file mode 100644 index 0000000..b406251 --- /dev/null +++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.columniterator; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.IndexHelper; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.io.util.FileMark; +import org.apache.cassandra.utils.ByteBufferUtil; + +abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator +{ + private static final Logger logger = LoggerFactory.getLogger(AbstractSSTableIterator.class); + + protected final SSTableReader sstable; + protected final DecoratedKey key; + protected final DeletionTime partitionLevelDeletion; + protected final ColumnFilter columns; + protected final SerializationHelper helper; + + protected final Row staticRow; + protected final Reader reader; + + private final boolean isForThrift; + + private boolean isClosed; + + @SuppressWarnings("resource") // We need this because the analysis is not able to determine that we do close + // file on every path where we created it. + protected AbstractSSTableIterator(SSTableReader sstable, + FileDataInput file, + DecoratedKey key, + RowIndexEntry indexEntry, + ColumnFilter columnFilter, + boolean isForThrift) + { + this.sstable = sstable; + this.key = key; + this.columns = columnFilter; + this.helper = new SerializationHelper(sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL, columnFilter); + this.isForThrift = isForThrift; + + if (indexEntry == null) + { + this.partitionLevelDeletion = DeletionTime.LIVE; + this.reader = null; + this.staticRow = Rows.EMPTY_STATIC_ROW; + } + else + { + boolean shouldCloseFile = file == null; + try + { + // We seek to the beginning to the partition if either: + // - the partition is not indexed; we then have a single block to read anyway + // and we need to read the partition deletion time. + // - we're querying static columns. + boolean needSeekAtPartitionStart = !indexEntry.isIndexed() || !columns.fetchedColumns().statics.isEmpty(); + + // For CQL queries on static compact tables, we only want to consider static value (only those are exposed), + // but readStaticRow have already read them and might in fact have consumed the whole partition (when reading + // the legacy file format), so set the reader to null so we don't try to read anything more. We can remove this + // once we drop support for the legacy file format + boolean needsReader = sstable.descriptor.version.storeRows() || isForThrift || !sstable.metadata.isStaticCompactTable(); + + if (needSeekAtPartitionStart) + { + // Not indexed (or is reading static), set to the beginning of the partition and read partition level deletion there + if (file == null) + file = sstable.getFileDataInput(indexEntry.position); + else + file.seek(indexEntry.position); + + ByteBufferUtil.skipShortLength(file); // Skip partition key + this.partitionLevelDeletion = DeletionTime.serializer.deserialize(file); + + // Note that this needs to be called after file != null and after the partitionDeletion has been set, but before readStaticRow + // (since it uses it) so we can't move that up (but we'll be able to simplify as soon as we drop support for the old file format). + this.reader = needsReader ? createReader(indexEntry, file, needSeekAtPartitionStart, shouldCloseFile) : null; + this.staticRow = readStaticRow(sstable, file, helper, columns.fetchedColumns().statics, isForThrift, reader == null ? null : reader.deserializer); + } + else + { + this.partitionLevelDeletion = indexEntry.deletionTime(); + this.staticRow = Rows.EMPTY_STATIC_ROW; + this.reader = needsReader ? createReader(indexEntry, file, needSeekAtPartitionStart, shouldCloseFile) : null; + } + + if (reader == null && shouldCloseFile) + file.close(); + } + catch (IOException e) + { + sstable.markSuspect(); + String filePath = file.getPath(); + if (shouldCloseFile && file != null) + { + try + { + file.close(); + } + catch (IOException suppressed) + { + e.addSuppressed(suppressed); + } + } + throw new CorruptSSTableException(e, filePath); + } + } + } + + private static Row readStaticRow(SSTableReader sstable, + FileDataInput file, + SerializationHelper helper, + Columns statics, + boolean isForThrift, + UnfilteredDeserializer deserializer) throws IOException + { + if (!sstable.descriptor.version.storeRows()) + { + if (!sstable.metadata.isCompactTable()) + { + assert deserializer != null; + return deserializer.hasNext() && deserializer.nextIsStatic() + ? (Row)deserializer.readNext() + : Rows.EMPTY_STATIC_ROW; + } + + // For compact tables, we use statics for the "column_metadata" definition. However, in the old format, those + // "column_metadata" are intermingled as any other "cell". In theory, this means that we'd have to do a first + // pass to extract the static values. However, for thrift, we'll use the ThriftResultsMerger right away which + // will re-merge static values with dynamic ones, so we can just ignore static and read every cell as a + // "dynamic" one. For CQL, if the table is a "static compact", then is has only static columns exposed and no + // dynamic ones. So we do a pass to extract static columns here, but will have no more work to do. Otherwise, + // the table won't have static columns. + if (statics.isEmpty() || isForThrift) + return Rows.EMPTY_STATIC_ROW; + + assert sstable.metadata.isStaticCompactTable() && !isForThrift; + + // As said above, if it's a CQL query and the table is a "static compact", the only exposed columns are the + // static ones. So we don't have to mark the position to seek back later. + return LegacyLayout.extractStaticColumns(sstable.metadata, file, statics); + } + + if (!sstable.header.hasStatic()) + return Rows.EMPTY_STATIC_ROW; + + if (statics.isEmpty()) + { + UnfilteredSerializer.serializer.skipStaticRow(file, sstable.header, helper); + return Rows.EMPTY_STATIC_ROW; + } + else + { + return UnfilteredSerializer.serializer.deserializeStaticRow(file, sstable.header, helper); + } + } + + protected abstract Reader createReader(RowIndexEntry indexEntry, FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile); + + public CFMetaData metadata() + { + return sstable.metadata; + } + + public PartitionColumns columns() + { + return columns.fetchedColumns(); + } + + public DecoratedKey partitionKey() + { + return key; + } + + public DeletionTime partitionLevelDeletion() + { + return partitionLevelDeletion; + } + + public Row staticRow() + { + return staticRow; + } + + public RowStats stats() + { + // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see + // SerializationHeader.make() for details) so we use the latter instead. + return new RowStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL(), sstable.getAvgColumnSetPerRow()); + } + + public boolean hasNext() + { + try + { + return reader != null && reader.hasNext(); + } + catch (IOException e) + { + try + { + closeInternal(); + } + catch (IOException suppressed) + { + e.addSuppressed(suppressed); + } + sstable.markSuspect(); + throw new CorruptSSTableException(e, reader.file.getPath()); + } + } + + public Unfiltered next() + { + try + { + assert reader != null; + return reader.next(); + } + catch (IOException e) + { + try + { + closeInternal(); + } + catch (IOException suppressed) + { + e.addSuppressed(suppressed); + } + sstable.markSuspect(); + throw new CorruptSSTableException(e, reader.file.getPath()); + } + } + + public Iterator<Unfiltered> slice(Slice slice) + { + try + { + if (reader == null) + return Collections.emptyIterator(); + + return reader.slice(slice); + } + catch (IOException e) + { + try + { + closeInternal(); + } + catch (IOException suppressed) + { + e.addSuppressed(suppressed); + } + sstable.markSuspect(); + throw new CorruptSSTableException(e, reader.file.getPath()); + } + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + + private void closeInternal() throws IOException + { + // It's important to make closing idempotent since it would bad to double-close 'file' as its a RandomAccessReader + // and its close is not idemptotent in the case where we recycle it. + if (isClosed) + return; + + if (reader != null) + reader.close(); + + isClosed = true; + } + + public void close() + { + try + { + closeInternal(); + } + catch (IOException e) + { + sstable.markSuspect(); + throw new CorruptSSTableException(e, reader.file.getPath()); + } + } + + protected abstract class Reader + { + private final boolean shouldCloseFile; + public FileDataInput file; + + protected UnfilteredDeserializer deserializer; + + // Records the currently open range tombstone (if any) + protected DeletionTime openMarker = null; + + protected Reader(FileDataInput file, boolean shouldCloseFile) + { + this.file = file; + this.shouldCloseFile = shouldCloseFile; + if (file != null) + createDeserializer(); + } + + private void createDeserializer() + { + assert file != null && deserializer == null; + deserializer = UnfilteredDeserializer.create(sstable.metadata, file, sstable.header, helper, partitionLevelDeletion, isForThrift); + } + + protected void seekToPosition(long position) throws IOException + { + // This may be the first time we're actually looking into the file + if (file == null) + { + file = sstable.getFileDataInput(position); + createDeserializer(); + } + else + { + file.seek(position); + deserializer.clearState(); + } + } + + protected void updateOpenMarker(RangeTombstoneMarker marker) + { + // Note that we always read index blocks in forward order so this method is always called in forward order + openMarker = marker.isOpen(false) ? marker.openDeletionTime(false) : null; + } + + protected DeletionTime getAndClearOpenMarker() + { + DeletionTime toReturn = openMarker; + openMarker = null; + return toReturn; + } + + public abstract boolean hasNext() throws IOException; + public abstract Unfiltered next() throws IOException; + public abstract Iterator<Unfiltered> slice(Slice slice) throws IOException; + + public void close() throws IOException + { + if (shouldCloseFile && file != null) + file.close(); + } + } + + protected abstract class IndexedReader extends Reader + { + protected final RowIndexEntry indexEntry; + protected final List<IndexHelper.IndexInfo> indexes; + + protected int currentIndexIdx = -1; + + // Marks the beginning of the block corresponding to currentIndexIdx. + protected FileMark mark; + + // !isInit means we have never seeked in the file and thus shouldn't read as we could be anywhere + protected boolean isInit; + + protected IndexedReader(FileDataInput file, boolean shouldCloseFile, RowIndexEntry indexEntry, boolean isInit) + { + super(file, shouldCloseFile); + this.indexEntry = indexEntry; + this.indexes = indexEntry.columnsIndex(); + this.isInit = isInit; + } + + // Should be called when we're at the beginning of blockIdx. + protected void updateBlock(int blockIdx) throws IOException + { + seekToPosition(indexEntry.position + indexes.get(blockIdx).offset); + + currentIndexIdx = blockIdx; + openMarker = blockIdx > 0 ? indexes.get(blockIdx - 1).endOpenMarker : null; + mark = file.mark(); + } + + public IndexHelper.IndexInfo currentIndex() + { + return indexes.get(currentIndexIdx); + } + + public IndexHelper.IndexInfo previousIndex() + { + return currentIndexIdx <= 1 ? null : indexes.get(currentIndexIdx - 1); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/columniterator/IColumnIteratorFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/IColumnIteratorFactory.java b/src/java/org/apache/cassandra/db/columniterator/IColumnIteratorFactory.java deleted file mode 100644 index 46983e9..0000000 --- a/src/java/org/apache/cassandra/db/columniterator/IColumnIteratorFactory.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.columniterator; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -public interface IColumnIteratorFactory -{ - OnDiskAtomIterator create(); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java b/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java deleted file mode 100644 index 7185eef..0000000 --- a/src/java/org/apache/cassandra/db/columniterator/IdentityQueryFilter.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.columniterator; - -import org.apache.cassandra.db.composites.Composites; -import org.apache.cassandra.db.filter.SliceQueryFilter; - -public class IdentityQueryFilter extends SliceQueryFilter -{ - /** - * Will read entire CF into memory. Use with caution. - */ - public IdentityQueryFilter() - { - super(Composites.EMPTY, Composites.EMPTY, false, Integer.MAX_VALUE); - } - - @Override - protected boolean respectTombstoneThresholds() - { - return false; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/columniterator/LazyColumnIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/LazyColumnIterator.java b/src/java/org/apache/cassandra/db/columniterator/LazyColumnIterator.java deleted file mode 100644 index 9d1cecb..0000000 --- a/src/java/org/apache/cassandra/db/columniterator/LazyColumnIterator.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.columniterator; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import com.google.common.collect.AbstractIterator; -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.OnDiskAtom; - -import java.io.IOException; - - -/* - * The goal of this encapsulating OnDiskAtomIterator is to delay the use of - * the filter until columns are actually queried. - * The reason for that is get_paged_slice because it change the start of - * the filter after having seen the first row, and so we must not use the - * filter before the row data is actually queried. However, mergeIterator - * needs to "fetch" a row in advance. But all it needs is the key and so - * this IColumnIterator make sure getKey() can be called without triggering - * the use of the filter itself. - */ -public class LazyColumnIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator -{ - private final DecoratedKey key; - private final IColumnIteratorFactory subIteratorFactory; - - private OnDiskAtomIterator subIterator; - - public LazyColumnIterator(DecoratedKey key, IColumnIteratorFactory subIteratorFactory) - { - this.key = key; - this.subIteratorFactory = subIteratorFactory; - } - - private OnDiskAtomIterator getSubIterator() - { - if (subIterator == null) - subIterator = subIteratorFactory.create(); - return subIterator; - } - - protected OnDiskAtom computeNext() - { - getSubIterator(); - return subIterator.hasNext() ? subIterator.next() : endOfData(); - } - - public ColumnFamily getColumnFamily() - { - return getSubIterator().getColumnFamily(); - } - - public DecoratedKey getKey() - { - return key; - } - - public void close() throws IOException - { - if (subIterator != null) - subIterator.close(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/columniterator/OnDiskAtomIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/OnDiskAtomIterator.java b/src/java/org/apache/cassandra/db/columniterator/OnDiskAtomIterator.java deleted file mode 100644 index 21c38f7..0000000 --- a/src/java/org/apache/cassandra/db/columniterator/OnDiskAtomIterator.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.columniterator; - -import java.io.IOException; -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.OnDiskAtom; -import org.apache.cassandra.utils.CloseableIterator; - -public interface OnDiskAtomIterator extends CloseableIterator<OnDiskAtom> -{ - /** - * @return A ColumnFamily holding metadata for the row being iterated. - * Do not modify this CF. Whether it is empty or not is implementation-dependent. - */ - public abstract ColumnFamily getColumnFamily(); - - /** - * @return the current row key - */ - public DecoratedKey getKey(); - - /** clean up any open resources */ - public void close() throws IOException; -} - http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java new file mode 100644 index 0000000..4fd5205 --- /dev/null +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.columniterator; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import com.google.common.collect.AbstractIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.IndexHelper; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.utils.ByteBufferUtil; + +/** + * A Cell Iterator over SSTable + */ +public class SSTableIterator extends AbstractSSTableIterator +{ + private static final Logger logger = LoggerFactory.getLogger(SSTableIterator.class); + + public SSTableIterator(SSTableReader sstable, DecoratedKey key, ColumnFilter columns, boolean isForThrift) + { + this(sstable, null, key, sstable.getPosition(key, SSTableReader.Operator.EQ), columns, isForThrift); + } + + public SSTableIterator(SSTableReader sstable, + FileDataInput file, + DecoratedKey key, + RowIndexEntry indexEntry, + ColumnFilter columns, + boolean isForThrift) + { + super(sstable, file, key, indexEntry, columns, isForThrift); + } + + protected Reader createReader(RowIndexEntry indexEntry, FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile) + { + return indexEntry.isIndexed() + ? new ForwardIndexedReader(indexEntry, file, isAtPartitionStart, shouldCloseFile) + : new ForwardReader(file, isAtPartitionStart, shouldCloseFile); + } + + public boolean isReverseOrder() + { + return false; + } + + private class ForwardReader extends Reader + { + private ForwardReader(FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile) + { + super(file, shouldCloseFile); + assert isAtPartitionStart; + } + + public boolean hasNext() throws IOException + { + assert deserializer != null; + return deserializer.hasNext(); + } + + public Unfiltered next() throws IOException + { + return deserializer.readNext(); + } + + public Iterator<Unfiltered> slice(final Slice slice) throws IOException + { + return new AbstractIterator<Unfiltered>() + { + private boolean beforeStart = true; + + protected Unfiltered computeNext() + { + try + { + // While we're before the start of the slice, we can skip row but we should keep + // track of open range tombstones + if (beforeStart) + { + // Note that the following comparison is not strict. The reason is that the only cases + // where it can be == is if the "next" is a RT start marker (either a '[' of a ')[' boundary), + // and if we had a strict inequality and an open RT marker before this, we would issue + // the open marker first, and then return then next later, which would yet in the + // stream both '[' (or '(') and then ')[' for the same clustering value, which is wrong. + // By using a non-strict inequality, we avoid that problem (if we do get ')[' for the same + // clustering value than the slice, we'll simply record it in 'openMarker'). + while (deserializer.hasNext() && deserializer.compareNextTo(slice.start()) <= 0) + { + if (deserializer.nextIsRow()) + deserializer.skipNext(); + else + updateOpenMarker((RangeTombstoneMarker)deserializer.readNext()); + } + + beforeStart = false; + + // We've reached the beginning of our queried slice. If we have an open marker + // we should return that first. + if (openMarker != null) + return new RangeTombstoneBoundMarker(slice.start(), openMarker); + } + + if (deserializer.hasNext() && deserializer.compareNextTo(slice.end()) <= 0) + { + Unfiltered next = deserializer.readNext(); + if (next.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) + updateOpenMarker((RangeTombstoneMarker)next); + return next; + } + + // If we have an open marker, we should close it before finishing + if (openMarker != null) + return new RangeTombstoneBoundMarker(slice.end(), getAndClearOpenMarker()); + + return endOfData(); + } + catch (IOException e) + { + try + { + close(); + } + catch (IOException suppressed) + { + e.addSuppressed(suppressed); + } + sstable.markSuspect(); + throw new CorruptSSTableException(e, file.getPath()); + } + } + }; + } + } + + private class ForwardIndexedReader extends IndexedReader + { + private ForwardIndexedReader(RowIndexEntry indexEntry, FileDataInput file, boolean isAtPartitionStart, boolean shouldCloseFile) + { + super(file, shouldCloseFile, indexEntry, isAtPartitionStart); + } + + public boolean hasNext() throws IOException + { + // If it's called before we've created the file, create it. This then mean + // we're reading from the beginning of the partition. + if (!isInit) + { + seekToPosition(indexEntry.position); + ByteBufferUtil.skipShortLength(file); // partition key + DeletionTime.serializer.skip(file); // partition deletion + if (sstable.header.hasStatic()) + UnfilteredSerializer.serializer.skipStaticRow(file, sstable.header, helper); + isInit = true; + } + return deserializer.hasNext(); + } + + public Unfiltered next() throws IOException + { + return deserializer.readNext(); + } + + public Iterator<Unfiltered> slice(final Slice slice) throws IOException + { + final List<IndexHelper.IndexInfo> indexes = indexEntry.columnsIndex(); + + // if our previous slicing already got us the biggest row in the sstable, we're done + if (currentIndexIdx >= indexes.size()) + return Collections.emptyIterator(); + + // Find the first index block we'll need to read for the slice. + final int startIdx = IndexHelper.indexFor(slice.start(), indexes, sstable.metadata.comparator, false, currentIndexIdx); + if (startIdx >= indexes.size()) + return Collections.emptyIterator(); + + // If that's the last block we were reading, we're already where we want to be. Otherwise, + // seek to that first block + if (startIdx != currentIndexIdx) + updateBlock(startIdx); + + // Find the last index block we'll need to read for the slice. + final int endIdx = IndexHelper.indexFor(slice.end(), indexes, sstable.metadata.comparator, false, startIdx); + + final IndexHelper.IndexInfo startIndex = currentIndex(); + + // The index search is based on the last name of the index blocks, so at that point we have that: + // 1) indexes[startIdx - 1].lastName < slice.start <= indexes[startIdx].lastName + // 2) indexes[endIdx - 1].lastName < slice.end <= indexes[endIdx].lastName + // so if startIdx == endIdx and slice.end < indexes[startIdx].firstName, we're guaranteed that the + // whole slice is between the previous block end and this bloc start, and thus has no corresponding + // data. One exception is if the previous block ends with an openMarker as it will cover our slice + // and we need to return it. + if (startIdx == endIdx && metadata().comparator.compare(slice.end(), startIndex.firstName) < 0 && openMarker == null && sstable.descriptor.version.storeRows()) + return Collections.emptyIterator(); + + return new AbstractIterator<Unfiltered>() + { + private boolean beforeStart = true; + private int currentIndexIdx = startIdx; + + protected Unfiltered computeNext() + { + try + { + // While we're before the start of the slice, we can skip row but we should keep + // track of open range tombstones + if (beforeStart) + { + // See ForwardReader equivalent method to see why this inequality is not strict. + while (deserializer.hasNext() && deserializer.compareNextTo(slice.start()) <= 0) + { + if (deserializer.nextIsRow()) + deserializer.skipNext(); + else + updateOpenMarker((RangeTombstoneMarker)deserializer.readNext()); + } + + beforeStart = false; + + // We've reached the beginning of our queried slice. If we have an open marker + // we should return that first. + if (openMarker != null) + return new RangeTombstoneBoundMarker(slice.start(), openMarker); + } + + // If we've crossed an index block boundary, update our informations + if (currentIndexIdx < indexes.size() && file.bytesPastMark(mark) >= currentIndex().width) + updateBlock(++currentIndexIdx); + + // Return the next atom unless we've reached the end, or we're beyond our slice + // end (note that unless we're on the last block for the slice, there is no point + // in checking the slice end). + if (currentIndexIdx < indexes.size() + && currentIndexIdx <= endIdx + && deserializer.hasNext() + && (currentIndexIdx != endIdx || deserializer.compareNextTo(slice.end()) <= 0)) + { + Unfiltered next = deserializer.readNext(); + if (next.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) + updateOpenMarker((RangeTombstoneMarker)next); + return next; + } + + // If we have an open marker, we should close it before finishing + if (openMarker != null) + return new RangeTombstoneBoundMarker(slice.end(), getAndClearOpenMarker()); + + return endOfData(); + } + catch (IOException e) + { + try + { + close(); + } + catch (IOException suppressed) + { + e.addSuppressed(suppressed); + } + sstable.markSuspect(); + throw new CorruptSSTableException(e, file.getPath()); + } + } + }; + } + } +}
