Merge branch cassandra-2.2 into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7b430eee Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7b430eee Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7b430eee Branch: refs/heads/trunk Commit: 7b430eee69d8f70b086a30a6e9d3c42a9db4aa08 Parents: d300a18 61e0251 Author: Benjamin Lerer <b.le...@gmail.com> Authored: Fri Nov 27 11:15:08 2015 +0100 Committer: Benjamin Lerer <b.le...@gmail.com> Committed: Fri Nov 27 11:16:02 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ReadCommand.java | 9 +- .../index/IndexNotAvailableException.java | 34 ++++++++ .../cassandra/index/SecondaryIndexManager.java | 64 +++++++++++--- .../index/internal/CassandraIndex.java | 6 +- .../cassandra/net/MessageDeliveryTask.java | 7 +- .../validation/entities/SecondaryIndexTest.java | 88 ++++++++++++++++---- .../index/internal/CustomCassandraIndex.java | 4 +- 8 files changed, 178 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b430eee/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 5e6c03c,63305d6..252972c --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,16 -1,5 +1,17 @@@ -2.2.4 +3.0.1 + * Fix SELECT statement with IN restrictions on partition key, + ORDER BY and LIMIT (CASSANDRA-10729) + * Improve stress performance over 1k threads (CASSANDRA-7217) + * Wait for migration responses to complete before bootstrapping (CASSANDRA-10731) + * Unable to create a function with argument of type Inet (CASSANDRA-10741) + * Fix backward incompatibiliy in CqlInputFormat (CASSANDRA-10717) + * Correctly preserve deletion info on updated rows when notifying indexers + of single-row deletions (CASSANDRA-10694) + * Notify indexers of partition delete during cleanup (CASSANDRA-10685) + * Keep the file open in trySkipCache (CASSANDRA-10669) + * Updated trigger example (CASSANDRA-10257) +Merged from 2.2: + * Reject index queries while the index is building (CASSANDRA-8505) * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747) * Fix JSON update with prepared statements (CASSANDRA-10631) * Don't do anticompaction after subrange repair (CASSANDRA-10422) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b430eee/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ReadCommand.java index 301cb86,cd86336..5ab1ee5 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@@ -17,92 -17,39 +17,93 @@@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.*; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.filter.IDiskAtomFilter; -import org.apache.cassandra.db.filter.NamesQueryFilter; -import org.apache.cassandra.db.filter.SliceQueryFilter; +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.*; +import org.apache.cassandra.cql3.Operator; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.index.Index; ++import org.apache.cassandra.index.IndexNotAvailableException; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.service.IReadCommand; -import org.apache.cassandra.service.RowDataResolver; -import org.apache.cassandra.service.pager.Pageable; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.schema.UnknownIndexException; +import org.apache.cassandra.service.ClientWarn; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Pair; -public abstract class ReadCommand implements IReadCommand, Pageable +/** + * General interface for storage-engine read commands (common to both range and + * single partition commands). + * <p> + * This contains all the informations needed to do a local read. + */ +public abstract class ReadCommand implements ReadQuery { - public enum Type + protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class); + + public static final IVersionedSerializer<ReadCommand> serializer = new Serializer(); + // For RANGE_SLICE verb: will either dispatch on 'serializer' for 3.0 or 'legacyRangeSliceCommandSerializer' for earlier version. + // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility. + public static final IVersionedSerializer<ReadCommand> rangeSliceSerializer = new RangeSliceSerializer(); + + public static final IVersionedSerializer<ReadCommand> legacyRangeSliceCommandSerializer = new LegacyRangeSliceCommandSerializer(); + public static final IVersionedSerializer<ReadCommand> legacyPagedRangeCommandSerializer = new LegacyPagedRangeCommandSerializer(); + public static final IVersionedSerializer<ReadCommand> legacyReadCommandSerializer = new LegacyReadCommandSerializer(); + + private final Kind kind; + private final CFMetaData metadata; + private final int nowInSec; + + private final ColumnFilter columnFilter; + private final RowFilter rowFilter; + private final DataLimits limits; + + // SecondaryIndexManager will attempt to provide the most selective of any available indexes + // during execution. Here we also store an the results of that lookup to repeating it over + // the lifetime of the command. + protected Optional<IndexMetadata> index = Optional.empty(); + + // Flag to indicate whether the index manager has been queried to select an index for this + // command. This is necessary as the result of that lookup may be null, in which case we + // still don't want to repeat it. + private boolean indexManagerQueried = false; + + private boolean isDigestQuery; + // if a digest query, the version for which the digest is expected. Ignored if not a digest. + private int digestVersion; + private final boolean isForThrift; + + protected static abstract class SelectionDeserializer { - GET_BY_NAMES((byte)1), - GET_SLICES((byte)2); + public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) throws IOException; + } - public final byte serializedValue; + protected enum Kind + { + SINGLE_PARTITION (SinglePartitionReadCommand.selectionDeserializer), + PARTITION_RANGE (PartitionRangeReadCommand.selectionDeserializer); - private Type(byte b) - { - this.serializedValue = b; - } + private final SelectionDeserializer selectionDeserializer; - public static Type fromSerializedValue(byte b) + Kind(SelectionDeserializer selectionDeserializer) { - return b == 1 ? GET_BY_NAMES : GET_SLICES; + this.selectionDeserializer = selectionDeserializer; } } @@@ -231,707 -95,55 +232,713 @@@ return this; } - public String getColumnFamilyName() + /** + * Sets the digest version, for when digest for that command is requested. + * <p> + * Note that we allow setting this independently of setting the command as a digest query as + * this allows us to use the command as a carrier of the digest version even if we only call + * setIsDigestQuery on some copy of it. + * + * @param digestVersion the version for the digest is this command is used for digest query.. + * @return this read command. + */ + public ReadCommand setDigestVersion(int digestVersion) { - return cfName; + this.digestVersion = digestVersion; + return this; } + /** + * Whether this query is for thrift or not. + * + * @return whether this query is for thrift. + */ + public boolean isForThrift() + { + return isForThrift; + } + + /** + * The clustering index filter this command to use for the provided key. + * <p> + * Note that that method should only be called on a key actually queried by this command + * and in practice, this will almost always return the same filter, but for the sake of + * paging, the filter on the first key of a range command might be slightly different. + * + * @param key a partition key queried by this command. + * + * @return the {@code ClusteringIndexFilter} to use for the partition of key {@code key}. + */ + public abstract ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key); + + /** + * Returns a copy of this command. + * + * @return a copy of this command. + */ public abstract ReadCommand copy(); - public abstract Row getRow(Keyspace keyspace); + protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadOrderGroup orderGroup); + + protected abstract int oldestUnrepairedTombstone(); + + public ReadResponse createResponse(UnfilteredPartitionIterator iterator, ColumnFilter selection) + { + return isDigestQuery() + ? ReadResponse.createDigestResponse(iterator, digestVersion) + : ReadResponse.createDataResponse(iterator, selection); + } + + public long indexSerializedSize(int version) + { + if (index.isPresent()) + return IndexMetadata.serializer.serializedSize(index.get(), version); + else + return 0; + } + + public Index getIndex(ColumnFamilyStore cfs) + { + // if we've already consulted the index manager, and it returned a valid index + // the result should be cached here. + if(index.isPresent()) + return cfs.indexManager.getIndex(index.get()); + + // if no cached index is present, but we've already consulted the index manager + // then no registered index is suitable for this command, so just return null. + if (indexManagerQueried) + return null; + + // do the lookup, set the flag to indicate so and cache the result if not null + Index selected = cfs.indexManager.getBestIndexFor(this); + indexManagerQueried = true; - public abstract IDiskAtomFilter filter(); + if (selected == null) + return null; - public String getKeyspace() + index = Optional.of(selected.getIndexMetadata()); + return selected; + } + + /** + * Executes this command on the local host. + * + * @param orderGroup the operation group spanning this command + * + * @return an iterator over the result of executing this command locally. + */ + @SuppressWarnings("resource") // The result iterator is closed upon exceptions (we know it's fine to potentially not close the intermediary + // iterators created inside the try as long as we do close the original resultIterator), or by closing the result. + public UnfilteredPartitionIterator executeLocally(ReadOrderGroup orderGroup) { - return ksName; + long startTimeNanos = System.nanoTime(); + + ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata()); + Index index = getIndex(cfs); - Index.Searcher searcher = index == null ? null : index.searcherFor(this); + ++ Index.Searcher searcher = null; + if (index != null) ++ { ++ if (!cfs.indexManager.isIndexQueryable(index)) ++ throw new IndexNotAvailableException(index); ++ ++ searcher = index.searcherFor(this); + Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexMetadata().name); ++ } + + UnfilteredPartitionIterator resultIterator = searcher == null + ? queryStorage(cfs, orderGroup) + : searcher.search(orderGroup); + + try + { + resultIterator = withMetricsRecording(withoutPurgeableTombstones(resultIterator, cfs), cfs.metric, startTimeNanos); + + // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so + // no point in checking it again. + RowFilter updatedFilter = searcher == null + ? rowFilter() + : index.getPostIndexQueryFilter(rowFilter()); + + // TODO: We'll currently do filtering by the rowFilter here because it's convenient. However, + // we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it + // would be more efficient (the sooner we discard stuff we know we don't care, the less useless + // processing we do on it). + return limits().filter(updatedFilter.filter(resultIterator, nowInSec()), nowInSec()); + } + catch (RuntimeException | Error e) + { + resultIterator.close(); + throw e; + } } - // maybeGenerateRetryCommand is used to generate a retry for short reads - public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row) + protected abstract void recordLatency(TableMetrics metric, long latencyNanos); + + public PartitionIterator executeInternal(ReadOrderGroup orderGroup) { - return null; + return UnfilteredPartitionIterators.filter(executeLocally(orderGroup), nowInSec()); } - // maybeTrim removes columns from a response that is too long - public Row maybeTrim(Row row) + public ReadOrderGroup startOrderGroup() { - return row; + return ReadOrderGroup.forCommand(this); } - public long getTimeout() + /** + * Wraps the provided iterator so that metrics on what is scanned by the command are recorded. + * This also log warning/trow TombstoneOverwhelmingException if appropriate. + */ + private UnfilteredPartitionIterator withMetricsRecording(UnfilteredPartitionIterator iter, final TableMetrics metric, final long startTimeNanos) { - return DatabaseDescriptor.getReadRpcTimeout(); + class MetricRecording extends Transformation<UnfilteredRowIterator> + { + private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold(); + private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold(); + + private final boolean respectTombstoneThresholds = !Schema.isSystemKeyspace(ReadCommand.this.metadata().ksName); + + private int liveRows = 0; + private int tombstones = 0; + + private DecoratedKey currentKey; + + @Override + public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter) + { + currentKey = iter.partitionKey(); + return Transformation.apply(iter, this); + } + + @Override + public Row applyToStatic(Row row) + { + return applyToRow(row); + } + + @Override + public Row applyToRow(Row row) + { + if (row.hasLiveData(ReadCommand.this.nowInSec())) + ++liveRows; + + for (Cell cell : row.cells()) + { + if (!cell.isLive(ReadCommand.this.nowInSec())) + countTombstone(row.clustering()); + } + return row; + } + + @Override + public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + countTombstone(marker.clustering()); + return marker; + } + + private void countTombstone(ClusteringPrefix clustering) + { + ++tombstones; + if (tombstones > failureThreshold && respectTombstoneThresholds) + { + String query = ReadCommand.this.toCQLString(); + Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query); + throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering); + } + } + + @Override + public void onClose() + { + recordLatency(metric, System.nanoTime() - startTimeNanos); + + metric.tombstoneScannedHistogram.update(tombstones); + metric.liveScannedHistogram.update(liveRows); + + boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds; + if (warnTombstones) + { + String msg = String.format("Read %d live rows and %d tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows, tombstones, ReadCommand.this.toCQLString()); + ClientWarn.warn(msg); + logger.warn(msg); + } + + Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : "")); + } + }; + + return Transformation.apply(iter, new MetricRecording()); } -} -class ReadCommandSerializer implements IVersionedSerializer<ReadCommand> -{ - public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException + /** + * Creates a message for this command. + */ + public abstract MessageOut<ReadCommand> createMessage(int version); + + protected abstract void appendCQLWhereClause(StringBuilder sb); + + // Skip purgeable tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it + // can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException for purgeable tombstones (which + // are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive). + protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, ColumnFamilyStore cfs) + { + final boolean isForThrift = iterator.isForThrift(); + class WithoutPurgeableTombstones extends PurgeFunction + { + public WithoutPurgeableTombstones() + { + super(isForThrift, cfs.gcBefore(nowInSec()), oldestUnrepairedTombstone(), cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones()); + } + + protected long getMaxPurgeableTimestamp() + { + return Long.MAX_VALUE; + } + } + return Transformation.apply(iterator, new WithoutPurgeableTombstones()); + } + + /** + * Recreate the CQL string corresponding to this query. + * <p> + * Note that in general the returned string will not be exactly the original user string, first + * because there isn't always a single syntax for a given query, but also because we don't have + * all the information needed (we know the non-PK columns queried but not the PK ones as internally + * we query them all). So this shouldn't be relied too strongly, but this should be good enough for + * debugging purpose which is what this is for. + */ + public String toCQLString() + { + StringBuilder sb = new StringBuilder(); + sb.append("SELECT ").append(columnFilter()); + sb.append(" FROM ").append(metadata().ksName).append('.').append(metadata.cfName); + appendCQLWhereClause(sb); + + if (limits() != DataLimits.NONE) + sb.append(' ').append(limits()); + return sb.toString(); + } + + private static class Serializer implements IVersionedSerializer<ReadCommand> + { + private static int digestFlag(boolean isDigest) + { + return isDigest ? 0x01 : 0; + } + + private static boolean isDigest(int flags) + { + return (flags & 0x01) != 0; + } + + private static int thriftFlag(boolean isForThrift) + { + return isForThrift ? 0x02 : 0; + } + + private static boolean isForThrift(int flags) + { + return (flags & 0x02) != 0; + } + + private static int indexFlag(boolean hasIndex) + { + return hasIndex ? 0x04 : 0; + } + + private static boolean hasIndex(int flags) + { + return (flags & 0x04) != 0; + } + + public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException + { + // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly + assert version >= MessagingService.VERSION_30; + + out.writeByte(command.kind.ordinal()); + out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent())); + if (command.isDigestQuery()) + out.writeUnsignedVInt(command.digestVersion()); + CFMetaData.serializer.serialize(command.metadata(), out, version); + out.writeInt(command.nowInSec()); + ColumnFilter.serializer.serialize(command.columnFilter(), out, version); + RowFilter.serializer.serialize(command.rowFilter(), out, version); + DataLimits.serializer.serialize(command.limits(), out, version); + if (command.index.isPresent()) + IndexMetadata.serializer.serialize(command.index.get(), out, version); + + command.serializeSelection(out, version); + } + + public ReadCommand deserialize(DataInputPlus in, int version) throws IOException + { + if (version < MessagingService.VERSION_30) + return legacyReadCommandSerializer.deserialize(in, version); + + Kind kind = Kind.values()[in.readByte()]; + int flags = in.readByte(); + boolean isDigest = isDigest(flags); + boolean isForThrift = isForThrift(flags); + boolean hasIndex = hasIndex(flags); + int digestVersion = isDigest ? (int)in.readUnsignedVInt() : 0; + CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); + int nowInSec = in.readInt(); + ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata); + RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata); + DataLimits limits = DataLimits.serializer.deserialize(in, version); + Optional<IndexMetadata> index = hasIndex + ? deserializeIndexMetadata(in, version, metadata) + : Optional.empty(); + + return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index); + } + + private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException + { + try + { + return Optional.of(IndexMetadata.serializer.deserialize(in, version, cfm)); + } + catch (UnknownIndexException e) + { + String message = String.format("Couldn't find a defined index on %s.%s with the id %s. " + + "If an index was just created, this is likely due to the schema not " + + "being fully propagated. Local read will proceed without using the " + + "index. Please wait for schema agreement after index creation.", + cfm.ksName, cfm.cfName, e.indexId.toString()); + logger.info(message); + return Optional.empty(); + } + } + + public long serializedSize(ReadCommand command, int version) + { + // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly + assert version >= MessagingService.VERSION_30; + + return 2 // kind + flags + + (command.isDigestQuery() ? TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0) + + CFMetaData.serializer.serializedSize(command.metadata(), version) + + TypeSizes.sizeof(command.nowInSec()) + + ColumnFilter.serializer.serializedSize(command.columnFilter(), version) + + RowFilter.serializer.serializedSize(command.rowFilter(), version) + + DataLimits.serializer.serializedSize(command.limits(), version) + + command.selectionSerializedSize(version) + + command.indexSerializedSize(version); + } + } + + // Dispatch to either Serializer or LegacyRangeSliceCommandSerializer. Only useful as long as we maintain pre-3.0 + // compatibility + private static class RangeSliceSerializer implements IVersionedSerializer<ReadCommand> + { + public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException + { + if (version < MessagingService.VERSION_30) + legacyRangeSliceCommandSerializer.serialize(command, out, version); + else + serializer.serialize(command, out, version); + } + + public ReadCommand deserialize(DataInputPlus in, int version) throws IOException + { + return version < MessagingService.VERSION_30 + ? legacyRangeSliceCommandSerializer.deserialize(in, version) + : serializer.deserialize(in, version); + } + + public long serializedSize(ReadCommand command, int version) + { + return version < MessagingService.VERSION_30 + ? legacyRangeSliceCommandSerializer.serializedSize(command, version) + : serializer.serializedSize(command, version); + } + } + + private enum LegacyType + { + GET_BY_NAMES((byte)1), + GET_SLICES((byte)2); + + public final byte serializedValue; + + LegacyType(byte b) + { + this.serializedValue = b; + } + + public static LegacyType fromPartitionFilterKind(ClusteringIndexFilter.Kind kind) + { + return kind == ClusteringIndexFilter.Kind.SLICE + ? GET_SLICES + : GET_BY_NAMES; + } + + public static LegacyType fromSerializedValue(byte b) + { + return b == 1 ? GET_BY_NAMES : GET_SLICES; + } + } + + /** + * Serializer for pre-3.0 RangeSliceCommands. + */ + private static class LegacyRangeSliceCommandSerializer implements IVersionedSerializer<ReadCommand> { - out.writeByte(command.commandType.serializedValue); - switch (command.commandType) + public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException + { + assert version < MessagingService.VERSION_30; + + PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command; + assert !rangeCommand.dataRange().isPaging(); + + // convert pre-3.0 incompatible names filters to slice filters + rangeCommand = maybeConvertNamesToSlice(rangeCommand); + + CFMetaData metadata = rangeCommand.metadata(); + + out.writeUTF(metadata.ksName); + out.writeUTF(metadata.cfName); + out.writeLong(rangeCommand.nowInSec() * 1000L); // convert from seconds to millis + + // begin DiskAtomFilterSerializer.serialize() + if (rangeCommand.isNamesQuery()) + { + out.writeByte(1); // 0 for slices, 1 for names + ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter; + LegacyReadCommandSerializer.serializeNamesFilter(rangeCommand, filter, out); + } + else + { + out.writeByte(0); // 0 for slices, 1 for names + + // slice filter serialization + ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter; + + boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); + LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata); + + out.writeBoolean(filter.isReversed()); + + // limit + DataLimits.Kind kind = rangeCommand.limits().kind(); + boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && rangeCommand.limits().perPartitionCount() == 1; + if (isDistinct) + out.writeInt(1); + else + out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(), filter.requestedSlices())); + + int compositesToGroup; + boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() || filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); + if (kind == DataLimits.Kind.THRIFT_LIMIT) + compositesToGroup = -1; + else if (isDistinct && !selectsStatics) + compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490) + else + compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size(); + + out.writeInt(compositesToGroup); + } + + serializeRowFilter(out, rangeCommand.rowFilter()); + AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(), out, version); + + // maxResults + out.writeInt(rangeCommand.limits().count()); + + // countCQL3Rows + if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount() == 1) // if for Thrift or DISTINCT + out.writeBoolean(false); + else + out.writeBoolean(true); + + // isPaging + out.writeBoolean(false); + } + + public ReadCommand deserialize(DataInputPlus in, int version) throws IOException + { + assert version < MessagingService.VERSION_30; + + String keyspace = in.readUTF(); + String columnFamily = in.readUTF(); + + CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily); + if (metadata == null) + { + String message = String.format("Got legacy range command for nonexistent table %s.%s.", keyspace, columnFamily); + throw new UnknownColumnFamilyException(message, null); + } + + int nowInSec = (int) (in.readLong() / 1000); // convert from millis to seconds + + ClusteringIndexFilter filter; + ColumnFilter selection; + int compositesToGroup = 0; + int perPartitionLimit = -1; + byte readType = in.readByte(); // 0 for slices, 1 for names + if (readType == 1) + { + Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = LegacyReadCommandSerializer.deserializeNamesSelectionAndFilter(in, metadata); + selection = selectionAndFilter.left; + filter = selectionAndFilter.right; + } + else + { + Pair<ClusteringIndexSliceFilter, Boolean> p = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata); + filter = p.left; + perPartitionLimit = in.readInt(); + compositesToGroup = in.readInt(); + selection = getColumnSelectionForSlice(p.right, compositesToGroup, metadata); + } + + RowFilter rowFilter = deserializeRowFilter(in, metadata); + + AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version); + int maxResults = in.readInt(); + + in.readBoolean(); // countCQL3Rows (not needed) + in.readBoolean(); // isPaging (not needed) + + boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING)); + boolean isDistinct = compositesToGroup == -2 || (perPartitionLimit == 1 && selectsStatics); + DataLimits limits; + if (isDistinct) + limits = DataLimits.distinctLimits(maxResults); + else if (compositesToGroup == -1) + limits = DataLimits.thriftLimits(maxResults, perPartitionLimit); + else + limits = DataLimits.cqlLimits(maxResults); + + return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), Optional.empty()); + } + + static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException + { + ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rowFilter.iterator()); + out.writeInt(indexExpressions.size()); + for (RowFilter.Expression expression : indexExpressions) + { + ByteBufferUtil.writeWithShortLength(expression.column().name.bytes, out); + expression.operator().writeTo(out); + ByteBufferUtil.writeWithShortLength(expression.getIndexValue(), out); + } + } + + static RowFilter deserializeRowFilter(DataInputPlus in, CFMetaData metadata) throws IOException + { + int numRowFilters = in.readInt(); + if (numRowFilters == 0) + return RowFilter.NONE; + + RowFilter rowFilter = RowFilter.create(numRowFilters); + for (int i = 0; i < numRowFilters; i++) + { + ByteBuffer columnName = ByteBufferUtil.readWithShortLength(in); + ColumnDefinition column = metadata.getColumnDefinition(columnName); + Operator op = Operator.readFrom(in); + ByteBuffer indexValue = ByteBufferUtil.readWithShortLength(in); + rowFilter.add(column, op, indexValue); + } + return rowFilter; + } + + static long serializedRowFilterSize(RowFilter rowFilter) + { + long size = TypeSizes.sizeof(0); // rowFilterCount + for (RowFilter.Expression expression : rowFilter) + { + size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes); + size += TypeSizes.sizeof(0); // operator int value + size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue()); + } + return size; + } + + public long serializedSize(ReadCommand command, int version) + { + assert version < MessagingService.VERSION_30; + assert command.kind == Kind.PARTITION_RANGE; + + PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command; + rangeCommand = maybeConvertNamesToSlice(rangeCommand); + CFMetaData metadata = rangeCommand.metadata(); + + long size = TypeSizes.sizeof(metadata.ksName); + size += TypeSizes.sizeof(metadata.cfName); + size += TypeSizes.sizeof((long) rangeCommand.nowInSec()); + + size += 1; // single byte flag: 0 for slices, 1 for names + if (rangeCommand.isNamesQuery()) + { + PartitionColumns columns = rangeCommand.columnFilter().fetchedColumns(); + ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter; + size += LegacyReadCommandSerializer.serializedNamesFilterSize(filter, metadata, columns); + } + else + { + ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter; + boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); + size += LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), makeStaticSlice, metadata); + size += TypeSizes.sizeof(filter.isReversed()); + size += TypeSizes.sizeof(rangeCommand.limits().perPartitionCount()); + size += TypeSizes.sizeof(0); // compositesToGroup + } + + if (rangeCommand.rowFilter().equals(RowFilter.NONE)) + { + size += TypeSizes.sizeof(0); + } + else + { + ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rangeCommand.rowFilter().iterator()); + size += TypeSizes.sizeof(indexExpressions.size()); + for (RowFilter.Expression expression : indexExpressions) + { + size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes); + size += TypeSizes.sizeof(expression.operator().ordinal()); + size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue()); + } + } + + size += AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(), version); + size += TypeSizes.sizeof(rangeCommand.limits().count()); + size += TypeSizes.sizeof(!rangeCommand.isForThrift()); + return size + TypeSizes.sizeof(rangeCommand.dataRange().isPaging()); + } + + static PartitionRangeReadCommand maybeConvertNamesToSlice(PartitionRangeReadCommand command) { - case GET_BY_NAMES: - SliceByNamesReadCommand.serializer.serialize(command, out, version); - break; - case GET_SLICES: - SliceFromReadCommand.serializer.serialize(command, out, version); - break; - default: - throw new AssertionError(); + if (!command.dataRange().isNamesQuery()) + return command; + + CFMetaData metadata = command.metadata(); + if (!LegacyReadCommandSerializer.shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns())) + return command; + + ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) command.dataRange().clusteringIndexFilter; + ClusteringIndexSliceFilter sliceFilter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata); + DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter); + return new PartitionRangeReadCommand( + command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(), + command.columnFilter(), command.rowFilter(), command.limits(), newRange, Optional.empty()); + } + + static ColumnFilter getColumnSelectionForSlice(boolean selectsStatics, int compositesToGroup, CFMetaData metadata) + { + // A value of -2 indicates this is a DISTINCT query that doesn't select static columns, only partition keys. + // In that case, we'll basically be querying the first row of the partition, but we must make sure we include + // all columns so we get at least one cell if there is a live row as it would confuse pre-3.0 nodes otherwise. + if (compositesToGroup == -2) + return ColumnFilter.all(metadata); + + // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all + PartitionColumns columns = selectsStatics + ? metadata.partitionColumns() + : metadata.partitionColumns().withoutStatics(); + return ColumnFilter.selectionBuilder().addAll(columns).build(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b430eee/src/java/org/apache/cassandra/index/IndexNotAvailableException.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/index/IndexNotAvailableException.java index 0000000,0000000..5440e2a new file mode 100644 --- /dev/null +++ b/src/java/org/apache/cassandra/index/IndexNotAvailableException.java @@@ -1,0 -1,0 +1,34 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.cassandra.index; ++ ++/** ++ * Thrown if a secondary index is not currently available. ++ */ ++public final class IndexNotAvailableException extends RuntimeException ++{ ++ /** ++ * Creates a new <code>IndexNotAvailableException</code> for the specified index. ++ * @param name the index name ++ */ ++ public IndexNotAvailableException(Index index) ++ { ++ super(String.format("The secondary index '%s' is not yet available", index.getIndexMetadata().name)); ++ } ++}