Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 f961e84aa -> e097efc5f refs/heads/trunk b81942267 -> c5408a3a1
Cache selected index in ReadCommand to avoid multiple lookups Patch by Sam Tunnicliffe; reviewed by Jake Luciani for CASSANDRA-10215 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e097efc5 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e097efc5 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e097efc5 Branch: refs/heads/cassandra-3.0 Commit: e097efc5f6f76a0da8d15b307301dffff79e4a35 Parents: f961e84 Author: Sam Tunnicliffe <[email protected]> Authored: Wed Aug 26 16:29:58 2015 +0100 Committer: Sam Tunnicliffe <[email protected]> Committed: Wed Sep 9 08:44:06 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cql3/statements/SelectStatement.java | 4 +- .../db/AbstractReadCommandBuilder.java | 8 +- .../cassandra/db/PartitionRangeReadCommand.java | 26 ++++-- .../org/apache/cassandra/db/ReadCommand.java | 96 ++++++++++++++++++-- .../org/apache/cassandra/db/ReadOrderGroup.java | 2 +- .../db/SinglePartitionReadCommand.java | 14 ++- .../cassandra/index/SecondaryIndexManager.java | 23 ++--- .../apache/cassandra/schema/IndexMetadata.java | 39 +++++++- .../org/apache/cassandra/schema/Indexes.java | 51 ++++++++--- .../cassandra/schema/UnknownIndexException.java | 39 ++++++++ .../apache/cassandra/service/StorageProxy.java | 2 +- .../service/pager/RangeSliceQueryPager.java | 16 ++-- .../cassandra/thrift/CassandraServer.java | 14 +-- 14 files changed, 258 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a1c66a2..ab1b4ed 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.0-rc1 + * Cache selected index in read command to reduce lookups (CASSANDRA-10215) * Small optimizations of sstable index serialization (CASSANDRA-10232) * Support for both encrypted and unencrypted native transport connections (CASSANDRA-9590) Merged from 2.2: http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 2aac6ab..7ad6c09 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -47,6 +47,7 @@ import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.db.view.MaterializedView; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.exceptions.*; +import org.apache.cassandra.index.Index; import org.apache.cassandra.index.SecondaryIndexManager; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.service.ClientState; @@ -458,12 +459,13 @@ public class SelectStatement implements CQLStatement return ReadQuery.EMPTY; RowFilter rowFilter = getRowFilter(options); + // The LIMIT provided by the user is the number of CQL row he wants returned. // We want to have getRangeSlice to count the number of columns, not the number of keys. AbstractBounds<PartitionPosition> keyBounds = restrictions.getPartitionKeyBounds(options); return keyBounds == null ? ReadQuery.EMPTY - : new PartitionRangeReadCommand(cfm, nowInSec, queriedColumns, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter)); + : new PartitionRangeReadCommand(cfm, nowInSec, queriedColumns, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter), Optional.empty()); } private ClusteringIndexFilter makeClusteringIndexFilter(QueryOptions options) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java index 5e3b726..9bb89a6 100644 --- a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java +++ b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java @@ -23,9 +23,11 @@ import java.util.*; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.cql3.*; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.Operator; import org.apache.cassandra.db.filter.*; -import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.CollectionType; import org.apache.cassandra.dht.*; import org.apache.cassandra.utils.FBUtilities; @@ -327,7 +329,7 @@ public abstract class AbstractReadCommandBuilder else bounds = new ExcludingBounds<>(start, end); - return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter())); + return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()), Optional.empty()); } static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index da62557..965e9af 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import com.google.common.collect.Iterables; @@ -39,6 +40,7 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.pager.*; @@ -64,10 +66,12 @@ public class PartitionRangeReadCommand extends ReadCommand ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, - DataRange dataRange) + DataRange dataRange, + Optional<IndexMetadata> index) { super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits); this.dataRange = dataRange; + this.index = index; } public PartitionRangeReadCommand(CFMetaData metadata, @@ -75,9 +79,10 @@ public class PartitionRangeReadCommand extends ReadCommand ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, - DataRange dataRange) + DataRange dataRange, + Optional<IndexMetadata> index) { - this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange); + this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange, index); } /** @@ -95,7 +100,8 @@ public class PartitionRangeReadCommand extends ReadCommand ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, - DataRange.allData(metadata.partitioner)); + DataRange.allData(metadata.partitioner), + Optional.empty()); } public DataRange dataRange() @@ -115,17 +121,17 @@ public class PartitionRangeReadCommand extends ReadCommand public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range) { - return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range)); + return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range), index); } public PartitionRangeReadCommand copy() { - return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange()); + return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange(), index); } public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits) { - return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange()); + return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange(), index); } public long getTimeout() @@ -275,7 +281,7 @@ public class PartitionRangeReadCommand extends ReadCommand public PartitionIterator postReconciliationProcessing(PartitionIterator result) { ColumnFamilyStore cfs = Keyspace.open(metadata().ksName).getColumnFamilyStore(metadata().cfName); - Index index = getIndex(cfs, false); + Index index = getIndex(cfs); return index == null ? result : index.postProcessorFor(this).apply(result, this); } @@ -303,11 +309,11 @@ public class PartitionRangeReadCommand extends ReadCommand private static class Deserializer extends SelectionDeserializer { - public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) + public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) throws IOException { DataRange range = DataRange.serializer.deserialize(in, version, metadata); - return new PartitionRangeReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range); + return new PartitionRangeReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range, index); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 5a10716..e183963 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -38,6 +38,8 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.schema.UnknownIndexException; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.ByteBufferUtil; @@ -70,6 +72,16 @@ public abstract class ReadCommand implements ReadQuery private final RowFilter rowFilter; private final DataLimits limits; + // SecondaryIndexManager will attempt to provide the most selective of any available indexes + // during execution. Here we also store an the results of that lookup to repeating it over + // the lifetime of the command. + protected Optional<IndexMetadata> index = Optional.empty(); + + // Flag to indicate whether the index manager has been queried to select an index for this + // command. This is necessary as the result of that lookup may be null, in which case we + // still don't want to repeat it. + private boolean indexManagerQueried = false; + private boolean isDigestQuery; // if a digest query, the version for which the digest is expected. Ignored if not a digest. private int digestVersion; @@ -77,7 +89,7 @@ public abstract class ReadCommand implements ReadQuery protected static abstract class SelectionDeserializer { - public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) throws IOException; + public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) throws IOException; } protected enum Kind @@ -287,9 +299,35 @@ public abstract class ReadCommand implements ReadQuery : ReadResponse.createDataResponse(iterator, selection); } - protected Index getIndex(ColumnFamilyStore cfs, boolean includeInTrace) + public long indexSerializedSize(int version) + { + if (index.isPresent()) + return IndexMetadata.serializer.serializedSize(index.get(), version); + else + return 0; + } + + public Index getIndex(ColumnFamilyStore cfs) { - return cfs.indexManager.getBestIndexFor(this, includeInTrace); + // if we've already consulted the index manager, and it returned a valid index + // the result should be cached here. + if(index.isPresent()) + return cfs.indexManager.getIndex(index.get()); + + // if no cached index is present, but we've already consulted the index manager + // then no registered index is suitable for this command, so just return null. + if (indexManagerQueried) + return null; + + // do the lookup, set the flag to indicate so and cache the result if not null + Index selected = cfs.indexManager.getBestIndexFor(this); + indexManagerQueried = true; + + if (selected == null) + return null; + + index = Optional.of(selected.getIndexMetadata()); + return selected; } /** @@ -306,9 +344,12 @@ public abstract class ReadCommand implements ReadQuery long startTimeNanos = System.nanoTime(); ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata()); - Index index = getIndex(cfs, true); + Index index = getIndex(cfs); Index.Searcher searcher = index == null ? null : index.searcherFor(this); + if (index != null) + Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexName()); + UnfilteredPartitionIterator resultIterator = searcher == null ? queryStorage(cfs, orderGroup) : searcher.search(orderGroup); @@ -505,13 +546,23 @@ public abstract class ReadCommand implements ReadQuery return (flags & 0x02) != 0; } + private static int indexFlag(boolean hasIndex) + { + return hasIndex ? 0x04 : 0; + } + + private static boolean hasIndex(int flags) + { + return (flags & 0x04) != 0; + } + public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException { // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly assert version >= MessagingService.VERSION_30; out.writeByte(command.kind.ordinal()); - out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift())); + out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent())); if (command.isDigestQuery()) out.writeVInt(command.digestVersion()); CFMetaData.serializer.serialize(command.metadata(), out, version); @@ -519,6 +570,8 @@ public abstract class ReadCommand implements ReadQuery ColumnFilter.serializer.serialize(command.columnFilter(), out, version); RowFilter.serializer.serialize(command.rowFilter(), out, version); DataLimits.serializer.serialize(command.limits(), out, version); + if (command.index.isPresent()) + IndexMetadata.serializer.serialize(command.index.get(), out, version); command.serializeSelection(out, version); } @@ -532,14 +585,36 @@ public abstract class ReadCommand implements ReadQuery int flags = in.readByte(); boolean isDigest = isDigest(flags); boolean isForThrift = isForThrift(flags); + boolean hasIndex = hasIndex(flags); int digestVersion = isDigest ? (int)in.readVInt() : 0; CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); int nowInSec = in.readInt(); ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata); RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata); DataLimits limits = DataLimits.serializer.deserialize(in, version); + Optional<IndexMetadata> index = hasIndex + ? deserializeIndexMetadata(in, version, metadata) + : Optional.empty(); + + return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index); + } - return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits); + private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException + { + try + { + return Optional.of(IndexMetadata.serializer.deserialize(in, version, cfm)); + } + catch (UnknownIndexException e) + { + String message = String.format("Couldn't find a defined index on %s.%s with the id %s. " + + "If an index was just created, this is likely due to the schema not " + + "being fully propagated. Local read will proceed without using the " + + "index. Please wait for schema agreement after index creation.", + cfm.ksName, cfm.cfName, e.indexId.toString()); + logger.info(message); + return Optional.empty(); + } } public long serializedSize(ReadCommand command, int version) @@ -554,7 +629,8 @@ public abstract class ReadCommand implements ReadQuery + ColumnFilter.serializer.serializedSize(command.columnFilter(), version) + RowFilter.serializer.serializedSize(command.rowFilter(), version) + DataLimits.serializer.serializedSize(command.limits(), version) - + command.selectionSerializedSize(version); + + command.selectionSerializedSize(version) + + command.indexSerializedSize(version); } } @@ -739,7 +815,7 @@ public abstract class ReadCommand implements ReadQuery else limits = DataLimits.cqlLimits(maxResults); - return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter)); + return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), Optional.empty()); } static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException @@ -850,7 +926,7 @@ public abstract class ReadCommand implements ReadQuery DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter); return new PartitionRangeReadCommand( command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(), - command.columnFilter(), command.rowFilter(), command.limits(), newRange); + command.columnFilter(), command.rowFilter(), command.limits(), newRange, Optional.empty()); } static ColumnFilter getColumnSelectionForSlice(ClusteringIndexSliceFilter filter, int compositesToGroup, CFMetaData metadata) @@ -1000,7 +1076,7 @@ public abstract class ReadCommand implements ReadQuery // missing without any problems, so we can safely always set "inclusive" to false in the data range dataRange = dataRange.forPaging(keyRange, metadata.comparator, startBound.getAsClustering(metadata), false); } - return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, dataRange); + return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, dataRange, Optional.empty()); } public long serializedSize(ReadCommand command, int version) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/ReadOrderGroup.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadOrderGroup.java b/src/java/org/apache/cassandra/db/ReadOrderGroup.java index 44befa2..0720d79 100644 --- a/src/java/org/apache/cassandra/db/ReadOrderGroup.java +++ b/src/java/org/apache/cassandra/db/ReadOrderGroup.java @@ -98,7 +98,7 @@ public class ReadOrderGroup implements AutoCloseable private static ColumnFamilyStore maybeGetIndexCfs(ColumnFamilyStore baseCfs, ReadCommand command) { - Index index = baseCfs.indexManager.getBestIndexFor(command); + Index index = command.getIndex(baseCfs); return index == null ? null : index.getBackingTable().orElse(null); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 7b62f5a..c08ef6a 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -21,20 +21,26 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; -import org.apache.cassandra.cache.*; +import org.apache.cassandra.cache.IRowCacheEntry; +import org.apache.cassandra.cache.RowCacheKey; +import org.apache.cassandra.cache.RowCacheSentinel; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.service.*; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.service.CacheService; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.pager.*; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -507,7 +513,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter private static class Deserializer extends SelectionDeserializer { - public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) + public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) throws IOException { DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index fabfebc..bd3202d 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -536,19 +536,15 @@ public class SecondaryIndexManager implements IndexRegistry * index should be performed in the searcherFor method to ensure that we pick the right * index regardless of the validity of the expression. * - * This method is called at various points during the lifecycle of a ReadCommand (to obtain a Searcher, - * get the index's underlying CFS for ReadOrderGroup, or an estimate of the result size from an average index - * query). - * - * Ideally, we would do this relatively expensive operation only once, and attach the index to the - * ReadCommand for future reference. This requires the index be passed onto additional commands generated - * to process subranges etc. + * This method is only called once during the lifecycle of a ReadCommand and the result is + * cached for future use when obtaining a Searcher, getting the index's underlying CFS for + * ReadOrderGroup, or an estimate of the result size from an average index query. * * @param command ReadCommand to be executed * @return an Index instance, ready to use during execution of the command, or null if none * of the registered indexes can support the command. */ - public Index getBestIndexFor(ReadCommand command, boolean includeInTrace) + public Index getBestIndexFor(ReadCommand command) { if (indexes.isEmpty() || command.rowFilter().isEmpty()) return null; @@ -564,8 +560,7 @@ public class SecondaryIndexManager implements IndexRegistry if (searchableIndexes.isEmpty()) { logger.debug("No applicable indexes found"); - if (includeInTrace) - Tracing.trace("No applicable indexes found"); + Tracing.trace("No applicable indexes found"); return null; } @@ -575,7 +570,7 @@ public class SecondaryIndexManager implements IndexRegistry .orElseThrow(() -> new AssertionError("Could not select most selective index")); // pay for an additional threadlocal get() rather than build the strings unnecessarily - if (includeInTrace && Tracing.isTracing()) + if (Tracing.isTracing()) { Tracing.trace("Index mean cardinalities are {}. Scanning with {}.", searchableIndexes.stream().map(i -> i.getIndexName() + ':' + i.getEstimatedResultRows()) @@ -585,12 +580,6 @@ public class SecondaryIndexManager implements IndexRegistry return selected; } - // convenience method which doesn't emit tracing messages - public Index getBestIndexFor(ReadCommand command) - { - return getBestIndexFor(command, false); - } - /** * Called at write time to ensure that values present in the update * are valid according to the rules of all registered indexes which http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/schema/IndexMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/IndexMetadata.java b/src/java/org/apache/cassandra/schema/IndexMetadata.java index 40a75c6..6846a14 100644 --- a/src/java/org/apache/cassandra/schema/IndexMetadata.java +++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java @@ -18,10 +18,9 @@ package org.apache.cassandra.schema; +import java.io.IOException; import java.lang.reflect.InvocationTargetException; -import java.util.Collections; -import java.util.Map; -import java.util.Set; +import java.util.*; import com.google.common.base.Objects; import com.google.common.collect.ImmutableMap; @@ -37,7 +36,10 @@ import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.cql3.statements.IndexTarget; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.index.Index; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDSerializer; /** * An immutable representation of secondary index metadata. @@ -46,6 +48,8 @@ public final class IndexMetadata { private static final Logger logger = LoggerFactory.getLogger(IndexMetadata.class); + public static final Serializer serializer = new Serializer(); + public enum IndexType { KEYS, CUSTOM, COMPOSITES @@ -56,6 +60,9 @@ public final class IndexMetadata COLUMN, ROW } + // UUID for serialization. This is a deterministic UUID generated from the index name + // Both the id and name are guaranteed unique per keyspace. + public final UUID id; public final String name; public final IndexType indexType; public final TargetType targetType; @@ -68,6 +75,7 @@ public final class IndexMetadata TargetType targetType, Set<ColumnIdentifier> columns) { + this.id = UUID.nameUUIDFromBytes(name.getBytes()); this.name = name; this.options = options == null ? ImmutableMap.of() : ImmutableMap.copyOf(options); this.indexType = indexType; @@ -194,7 +202,7 @@ public final class IndexMetadata public int hashCode() { - return Objects.hashCode(name, indexType, targetType, options, columns); + return Objects.hashCode(id, name, indexType, targetType, options, columns); } public boolean equalsWithoutName(IndexMetadata other) @@ -215,12 +223,13 @@ public final class IndexMetadata IndexMetadata other = (IndexMetadata)obj; - return Objects.equal(name, other.name) && equalsWithoutName(other); + return Objects.equal(id, other.id) && Objects.equal(name, other.name) && equalsWithoutName(other); } public String toString() { return new ToStringBuilder(this) + .append("id", id.toString()) .append("name", name) .append("indexType", indexType) .append("targetType", targetType) @@ -228,4 +237,24 @@ public final class IndexMetadata .append("options", options) .build(); } + + public static class Serializer + { + public void serialize(IndexMetadata metadata, DataOutputPlus out, int version) throws IOException + { + UUIDSerializer.serializer.serialize(metadata.id, out, version); + } + + public IndexMetadata deserialize(DataInputPlus in, int version, CFMetaData cfm) throws IOException + { + UUID id = UUIDSerializer.serializer.deserialize(in, version); + return cfm.getIndexes().get(id).orElseThrow(() -> new UnknownIndexException(cfm, id)); + } + + public long serializedSize(IndexMetadata metadata, int version) + { + return UUIDSerializer.serializer.serializedSize(metadata.id, version); + } + + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/schema/Indexes.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/Indexes.java b/src/java/org/apache/cassandra/schema/Indexes.java index 6227e0b..9114f63 100644 --- a/src/java/org/apache/cassandra/schema/Indexes.java +++ b/src/java/org/apache/cassandra/schema/Indexes.java @@ -40,12 +40,14 @@ import static com.google.common.collect.Iterables.filter; */ public class Indexes implements Iterable<IndexMetadata> { - private final ImmutableMap<String, IndexMetadata> indexes; + private final ImmutableMap<String, IndexMetadata> indexesByName; + private final ImmutableMap<UUID, IndexMetadata> indexesById; private final ImmutableMultimap<ColumnIdentifier, IndexMetadata> indexesByColumn; private Indexes(Builder builder) { - indexes = builder.indexes.build(); + indexesByName = builder.indexesByName.build(); + indexesById = builder.indexesById.build(); indexesByColumn = builder.indexesByColumn.build(); } @@ -61,17 +63,17 @@ public class Indexes implements Iterable<IndexMetadata> public Iterator<IndexMetadata> iterator() { - return indexes.values().iterator(); + return indexesByName.values().iterator(); } public int size() { - return indexes.size(); + return indexesByName.size(); } public boolean isEmpty() { - return indexes.isEmpty(); + return indexesByName.isEmpty(); } /** @@ -82,7 +84,7 @@ public class Indexes implements Iterable<IndexMetadata> */ public Optional<IndexMetadata> get(String name) { - return indexes.values().stream().filter(def -> def.name.equals(name)).findFirst(); + return Optional.ofNullable(indexesByName.get(name)); } /** @@ -92,7 +94,30 @@ public class Indexes implements Iterable<IndexMetadata> */ public boolean has(String name) { - return get(name).isPresent(); + return indexesByName.containsKey(name); + } + + /** + * Get the index with the specified id + * + * @param name a UUID which identifies an index + * @return an empty {@link Optional} if no index with the specified id is found; a non-empty optional of + * {@link IndexMetadata} otherwise + */ + + public Optional<IndexMetadata> get(UUID id) + { + return Optional.ofNullable(indexesById.get(id)); + } + + /** + * Answer true if contains an index with the specified id. + * @param name a UUID which identifies an index. + * @return true if an index with the specified id is found; false otherwise + */ + public boolean has(UUID id) + { + return indexesById.containsKey(id); } /** @@ -148,19 +173,19 @@ public class Indexes implements Iterable<IndexMetadata> @Override public boolean equals(Object o) { - return this == o || (o instanceof Indexes && indexes.equals(((Indexes) o).indexes)); + return this == o || (o instanceof Indexes && indexesByName.equals(((Indexes) o).indexesByName)); } @Override public int hashCode() { - return indexes.hashCode(); + return indexesByName.hashCode(); } @Override public String toString() { - return indexes.values().toString(); + return indexesByName.values().toString(); } public static String getAvailableIndexName(String ksName, String cfName, ColumnIdentifier columnName) @@ -179,7 +204,8 @@ public class Indexes implements Iterable<IndexMetadata> public static final class Builder { - final ImmutableMap.Builder<String, IndexMetadata> indexes = new ImmutableMap.Builder<>(); + final ImmutableMap.Builder<String, IndexMetadata> indexesByName = new ImmutableMap.Builder<>(); + final ImmutableMap.Builder<UUID, IndexMetadata> indexesById = new ImmutableMap.Builder<>(); final ImmutableMultimap.Builder<ColumnIdentifier, IndexMetadata> indexesByColumn = new ImmutableMultimap.Builder<>(); private Builder() @@ -193,7 +219,8 @@ public class Indexes implements Iterable<IndexMetadata> public Builder add(IndexMetadata index) { - indexes.put(index.name, index); + indexesByName.put(index.name, index); + indexesById.put(index.id, index); // All indexes are column indexes at the moment if (index.isColumnIndex()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/schema/UnknownIndexException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/UnknownIndexException.java b/src/java/org/apache/cassandra/schema/UnknownIndexException.java new file mode 100644 index 0000000..5daf631 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/UnknownIndexException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.schema; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.cassandra.config.CFMetaData; + +/** + * Exception thrown when we read an index id from a serialized ReadCommand and no corresponding IndexMetadata + * can be found in the CFMetaData#indexes collection. Note that this is an internal exception and is not meant + * to be user facing, the node reading the ReadCommand should proceed as if no index id were present. + */ +public class UnknownIndexException extends IOException +{ + public final UUID indexId; + public UnknownIndexException(CFMetaData metadata, UUID id) + { + super(String.format("Unknown index %s for table %s.%s", id.toString(), metadata.ksName, metadata.cfName)); + indexId = id; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 59f1c1c..e3b884e 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1717,7 +1717,7 @@ public class StorageProxy implements StorageProxyMBean private static float estimateResultsPerRange(PartitionRangeReadCommand command, Keyspace keyspace) { ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().cfId); - Index index = cfs.indexManager.getBestIndexFor(command); + Index index = command.getIndex(cfs); float maxExpectedResults = index == null ? command.limits().estimateTotalResults(cfs) : index.getEstimatedResultRows(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java index 2e57a8b..87eb018 100644 --- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java @@ -17,15 +17,17 @@ */ package org.apache.cassandra.service.pager; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.db.filter.*; -import org.apache.cassandra.dht.*; -import org.apache.cassandra.exceptions.RequestExecutionException; +import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.dht.*; +import org.apache.cassandra.exceptions.RequestExecutionException; + /** * Pages a RangeSliceCommand whose predicate is a slice query. * @@ -89,7 +91,9 @@ public class RangeSliceQueryPager extends AbstractQueryPager } } - return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, pageRange); + // it won't hurt for the next page command to query the index manager + // again to check for an applicable index, so don't supply one here + return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, pageRange, Optional.empty()); } protected void recordLast(DecoratedKey key, Row last) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index 038384e..9cd1653 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -30,12 +30,9 @@ import java.util.zip.Inflater; import com.google.common.base.Joiner; import com.google.common.collect.*; import com.google.common.primitives.Longs; - -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.filter.ColumnFilter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.cassandra.auth.Permission; import org.apache.cassandra.config.*; import org.apache.cassandra.cql3.QueryOptions; @@ -1520,7 +1517,8 @@ public class CassandraServer implements Cassandra.Iface columns, ThriftConversion.rowFilterFromThrift(metadata, range.row_filter), limits, - new DataRange(bounds, filter)); + new DataRange(bounds, filter), + Optional.empty()); try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel)) { assert results != null; @@ -1613,7 +1611,8 @@ public class CassandraServer implements Cassandra.Iface ColumnFilter.all(metadata), RowFilter.NONE, limits, - new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true)); + new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true), + Optional.empty()); try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel)) { return thriftifyKeySlices(results, new ColumnParent(column_family), limits.perPartitionCount()); @@ -1704,7 +1703,8 @@ public class CassandraServer implements Cassandra.Iface columns, ThriftConversion.rowFilterFromThrift(metadata, index_clause.expressions), limits, - new DataRange(bounds, filter)); + new DataRange(bounds, filter), + Optional.empty()); try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel)) { return thriftifyKeySlices(results, column_parent, limits.perPartitionCount());
