Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 f961e84aa -> e097efc5f
  refs/heads/trunk b81942267 -> c5408a3a1


Cache selected index in ReadCommand to avoid multiple lookups

Patch by Sam Tunnicliffe; reviewed by Jake Luciani for CASSANDRA-10215


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e097efc5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e097efc5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e097efc5

Branch: refs/heads/cassandra-3.0
Commit: e097efc5f6f76a0da8d15b307301dffff79e4a35
Parents: f961e84
Author: Sam Tunnicliffe <[email protected]>
Authored: Wed Aug 26 16:29:58 2015 +0100
Committer: Sam Tunnicliffe <[email protected]>
Committed: Wed Sep 9 08:44:06 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cql3/statements/SelectStatement.java        |  4 +-
 .../db/AbstractReadCommandBuilder.java          |  8 +-
 .../cassandra/db/PartitionRangeReadCommand.java | 26 ++++--
 .../org/apache/cassandra/db/ReadCommand.java    | 96 ++++++++++++++++++--
 .../org/apache/cassandra/db/ReadOrderGroup.java |  2 +-
 .../db/SinglePartitionReadCommand.java          | 14 ++-
 .../cassandra/index/SecondaryIndexManager.java  | 23 ++---
 .../apache/cassandra/schema/IndexMetadata.java  | 39 +++++++-
 .../org/apache/cassandra/schema/Indexes.java    | 51 ++++++++---
 .../cassandra/schema/UnknownIndexException.java | 39 ++++++++
 .../apache/cassandra/service/StorageProxy.java  |  2 +-
 .../service/pager/RangeSliceQueryPager.java     | 16 ++--
 .../cassandra/thrift/CassandraServer.java       | 14 +--
 14 files changed, 258 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a1c66a2..ab1b4ed 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-rc1
+ * Cache selected index in read command to reduce lookups (CASSANDRA-10215)
  * Small optimizations of sstable index serialization (CASSANDRA-10232)
  * Support for both encrypted and unencrypted native transport connections 
(CASSANDRA-9590)
 Merged from 2.2:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 2aac6ab..7ad6c09 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.view.MaterializedView;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.SecondaryIndexManager;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.service.ClientState;
@@ -458,12 +459,13 @@ public class SelectStatement implements CQLStatement
             return ReadQuery.EMPTY;
 
         RowFilter rowFilter = getRowFilter(options);
+
         // The LIMIT provided by the user is the number of CQL row he wants 
returned.
         // We want to have getRangeSlice to count the number of columns, not 
the number of keys.
         AbstractBounds<PartitionPosition> keyBounds = 
restrictions.getPartitionKeyBounds(options);
         return keyBounds == null
              ? ReadQuery.EMPTY
-             : new PartitionRangeReadCommand(cfm, nowInSec, queriedColumns, 
rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter));
+             : new PartitionRangeReadCommand(cfm, nowInSec, queriedColumns, 
rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter), 
Optional.empty());
     }
 
     private ClusteringIndexFilter makeClusteringIndexFilter(QueryOptions 
options)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java 
b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
index 5e3b726..9bb89a6 100644
--- a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
+++ b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
@@ -23,9 +23,11 @@ import java.util.*;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -327,7 +329,7 @@ public abstract class AbstractReadCommandBuilder
             else
                 bounds = new ExcludingBounds<>(start, end);
 
-            return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, 
makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()));
+            return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, 
makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()), 
Optional.empty());
         }
 
         static DecoratedKey makeKey(CFMetaData metadata, Object... 
partitionKey)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java 
b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index da62557..965e9af 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import com.google.common.collect.Iterables;
 
@@ -39,6 +40,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.pager.*;
@@ -64,10 +66,12 @@ public class PartitionRangeReadCommand extends ReadCommand
                                      ColumnFilter columnFilter,
                                      RowFilter rowFilter,
                                      DataLimits limits,
-                                     DataRange dataRange)
+                                     DataRange dataRange,
+                                     Optional<IndexMetadata> index)
     {
         super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, 
metadata, nowInSec, columnFilter, rowFilter, limits);
         this.dataRange = dataRange;
+        this.index = index;
     }
 
     public PartitionRangeReadCommand(CFMetaData metadata,
@@ -75,9 +79,10 @@ public class PartitionRangeReadCommand extends ReadCommand
                                      ColumnFilter columnFilter,
                                      RowFilter rowFilter,
                                      DataLimits limits,
-                                     DataRange dataRange)
+                                     DataRange dataRange,
+                                     Optional<IndexMetadata> index)
     {
-        this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, 
limits, dataRange);
+        this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, 
limits, dataRange, index);
     }
 
     /**
@@ -95,7 +100,8 @@ public class PartitionRangeReadCommand extends ReadCommand
                                              ColumnFilter.all(metadata),
                                              RowFilter.NONE,
                                              DataLimits.NONE,
-                                             
DataRange.allData(metadata.partitioner));
+                                             
DataRange.allData(metadata.partitioner),
+                                             Optional.empty());
     }
 
     public DataRange dataRange()
@@ -115,17 +121,17 @@ public class PartitionRangeReadCommand extends ReadCommand
 
     public PartitionRangeReadCommand 
forSubRange(AbstractBounds<PartitionPosition> range)
     {
-        return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), 
isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), 
dataRange().forSubRange(range));
+        return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), 
isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), 
dataRange().forSubRange(range), index);
     }
 
     public PartitionRangeReadCommand copy()
     {
-        return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), 
isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), 
dataRange());
+        return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), 
isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), 
dataRange(), index);
     }
 
     public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits)
     {
-        return new PartitionRangeReadCommand(metadata(), nowInSec(), 
columnFilter(), rowFilter(), newLimits, dataRange());
+        return new PartitionRangeReadCommand(metadata(), nowInSec(), 
columnFilter(), rowFilter(), newLimits, dataRange(), index);
     }
 
     public long getTimeout()
@@ -275,7 +281,7 @@ public class PartitionRangeReadCommand extends ReadCommand
     public PartitionIterator postReconciliationProcessing(PartitionIterator 
result)
     {
         ColumnFamilyStore cfs = 
Keyspace.open(metadata().ksName).getColumnFamilyStore(metadata().cfName);
-        Index index = getIndex(cfs, false);
+        Index index = getIndex(cfs);
         return index == null ? result : 
index.postProcessorFor(this).apply(result, this);
     }
 
@@ -303,11 +309,11 @@ public class PartitionRangeReadCommand extends ReadCommand
 
     private static class Deserializer extends SelectionDeserializer
     {
-        public ReadCommand deserialize(DataInputPlus in, int version, boolean 
isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int 
nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits)
+        public ReadCommand deserialize(DataInputPlus in, int version, boolean 
isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int 
nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, 
Optional<IndexMetadata> index)
         throws IOException
         {
             DataRange range = DataRange.serializer.deserialize(in, version, 
metadata);
-            return new PartitionRangeReadCommand(isDigest, digestVersion, 
isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range);
+            return new PartitionRangeReadCommand(isDigest, digestVersion, 
isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range, index);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java 
b/src/java/org/apache/cassandra/db/ReadCommand.java
index 5a10716..e183963 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -38,6 +38,8 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.UnknownIndexException;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -70,6 +72,16 @@ public abstract class ReadCommand implements ReadQuery
     private final RowFilter rowFilter;
     private final DataLimits limits;
 
+    // SecondaryIndexManager will attempt to provide the most selective of any 
available indexes
+    // during execution. Here we also store an the results of that lookup to 
repeating it over
+    // the lifetime of the command.
+    protected Optional<IndexMetadata> index = Optional.empty();
+
+    // Flag to indicate whether the index manager has been queried to select 
an index for this
+    // command. This is necessary as the result of that lookup may be null, in 
which case we
+    // still don't want to repeat it.
+    private boolean indexManagerQueried = false;
+
     private boolean isDigestQuery;
     // if a digest query, the version for which the digest is expected. 
Ignored if not a digest.
     private int digestVersion;
@@ -77,7 +89,7 @@ public abstract class ReadCommand implements ReadQuery
 
     protected static abstract class SelectionDeserializer
     {
-        public abstract ReadCommand deserialize(DataInputPlus in, int version, 
boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, 
int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits 
limits) throws IOException;
+        public abstract ReadCommand deserialize(DataInputPlus in, int version, 
boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, 
int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits 
limits, Optional<IndexMetadata> index) throws IOException;
     }
 
     protected enum Kind
@@ -287,9 +299,35 @@ public abstract class ReadCommand implements ReadQuery
              : ReadResponse.createDataResponse(iterator, selection);
     }
 
-    protected Index getIndex(ColumnFamilyStore cfs, boolean includeInTrace)
+    public long indexSerializedSize(int version)
+    {
+        if (index.isPresent())
+            return IndexMetadata.serializer.serializedSize(index.get(), 
version);
+        else
+            return 0;
+    }
+
+    public Index getIndex(ColumnFamilyStore cfs)
     {
-        return cfs.indexManager.getBestIndexFor(this, includeInTrace);
+        // if we've already consulted the index manager, and it returned a 
valid index
+        // the result should be cached here.
+        if(index.isPresent())
+            return cfs.indexManager.getIndex(index.get());
+
+        // if no cached index is present, but we've already consulted the 
index manager
+        // then no registered index is suitable for this command, so just 
return null.
+        if (indexManagerQueried)
+            return null;
+
+        // do the lookup, set the flag to indicate so and cache the result if 
not null
+        Index selected = cfs.indexManager.getBestIndexFor(this);
+        indexManagerQueried = true;
+
+        if (selected == null)
+            return null;
+
+        index = Optional.of(selected.getIndexMetadata());
+        return selected;
     }
 
     /**
@@ -306,9 +344,12 @@ public abstract class ReadCommand implements ReadQuery
         long startTimeNanos = System.nanoTime();
 
         ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
-        Index index = getIndex(cfs, true);
+        Index index = getIndex(cfs);
         Index.Searcher searcher = index == null ? null : 
index.searcherFor(this);
 
+        if (index != null)
+            Tracing.trace("Executing read on {}.{} using index {}", 
cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexName());
+
         UnfilteredPartitionIterator resultIterator = searcher == null
                                          ? queryStorage(cfs, orderGroup)
                                          : searcher.search(orderGroup);
@@ -505,13 +546,23 @@ public abstract class ReadCommand implements ReadQuery
             return (flags & 0x02) != 0;
         }
 
+        private static int indexFlag(boolean hasIndex)
+        {
+            return hasIndex ? 0x04 : 0;
+        }
+
+        private static boolean hasIndex(int flags)
+        {
+            return (flags & 0x04) != 0;
+        }
+
         public void serialize(ReadCommand command, DataOutputPlus out, int 
version) throws IOException
         {
             // for serialization, createLegacyMessage() should cause 
legacyReadCommandSerializer to be used directly
             assert version >= MessagingService.VERSION_30;
 
             out.writeByte(command.kind.ordinal());
-            out.writeByte(digestFlag(command.isDigestQuery()) | 
thriftFlag(command.isForThrift()));
+            out.writeByte(digestFlag(command.isDigestQuery()) | 
thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent()));
             if (command.isDigestQuery())
                 out.writeVInt(command.digestVersion());
             CFMetaData.serializer.serialize(command.metadata(), out, version);
@@ -519,6 +570,8 @@ public abstract class ReadCommand implements ReadQuery
             ColumnFilter.serializer.serialize(command.columnFilter(), out, 
version);
             RowFilter.serializer.serialize(command.rowFilter(), out, version);
             DataLimits.serializer.serialize(command.limits(), out, version);
+            if (command.index.isPresent())
+                IndexMetadata.serializer.serialize(command.index.get(), out, 
version);
 
             command.serializeSelection(out, version);
         }
@@ -532,14 +585,36 @@ public abstract class ReadCommand implements ReadQuery
             int flags = in.readByte();
             boolean isDigest = isDigest(flags);
             boolean isForThrift = isForThrift(flags);
+            boolean hasIndex = hasIndex(flags);
             int digestVersion = isDigest ? (int)in.readVInt() : 0;
             CFMetaData metadata = CFMetaData.serializer.deserialize(in, 
version);
             int nowInSec = in.readInt();
             ColumnFilter columnFilter = 
ColumnFilter.serializer.deserialize(in, version, metadata);
             RowFilter rowFilter = RowFilter.serializer.deserialize(in, 
version, metadata);
             DataLimits limits = DataLimits.serializer.deserialize(in, version);
+            Optional<IndexMetadata> index = hasIndex
+                                            ? deserializeIndexMetadata(in, 
version, metadata)
+                                            : Optional.empty();
+
+            return kind.selectionDeserializer.deserialize(in, version, 
isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, 
rowFilter, limits, index);
+        }
 
-            return kind.selectionDeserializer.deserialize(in, version, 
isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, 
rowFilter, limits);
+        private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus 
in, int version, CFMetaData cfm) throws IOException
+        {
+            try
+            {
+                return Optional.of(IndexMetadata.serializer.deserialize(in, 
version, cfm));
+            }
+            catch (UnknownIndexException e)
+            {
+                String message = String.format("Couldn't find a defined index 
on %s.%s with the id %s. " +
+                                               "If an index was just created, 
this is likely due to the schema not " +
+                                               "being fully propagated. Local 
read will proceed without using the " +
+                                               "index. Please wait for schema 
agreement after index creation.",
+                                               cfm.ksName, cfm.cfName, 
e.indexId.toString());
+                logger.info(message);
+                return Optional.empty();
+            }
         }
 
         public long serializedSize(ReadCommand command, int version)
@@ -554,7 +629,8 @@ public abstract class ReadCommand implements ReadQuery
                  + 
ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
                  + RowFilter.serializer.serializedSize(command.rowFilter(), 
version)
                  + DataLimits.serializer.serializedSize(command.limits(), 
version)
-                 + command.selectionSerializedSize(version);
+                 + command.selectionSerializedSize(version)
+                 + command.indexSerializedSize(version);
         }
     }
 
@@ -739,7 +815,7 @@ public abstract class ReadCommand implements ReadQuery
             else
                 limits = DataLimits.cqlLimits(maxResults);
 
-            return new PartitionRangeReadCommand(false, 0, true, metadata, 
nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter));
+            return new PartitionRangeReadCommand(false, 0, true, metadata, 
nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), 
Optional.empty());
         }
 
         static void serializeRowFilter(DataOutputPlus out, RowFilter 
rowFilter) throws IOException
@@ -850,7 +926,7 @@ public abstract class ReadCommand implements ReadQuery
             DataRange newRange = new DataRange(command.dataRange().keyRange(), 
sliceFilter);
             return new PartitionRangeReadCommand(
                     command.isDigestQuery(), command.digestVersion(), 
command.isForThrift(), metadata, command.nowInSec(),
-                    command.columnFilter(), command.rowFilter(), 
command.limits(), newRange);
+                    command.columnFilter(), command.rowFilter(), 
command.limits(), newRange, Optional.empty());
         }
 
         static ColumnFilter 
getColumnSelectionForSlice(ClusteringIndexSliceFilter filter, int 
compositesToGroup, CFMetaData metadata)
@@ -1000,7 +1076,7 @@ public abstract class ReadCommand implements ReadQuery
                 // missing without any problems, so we can safely always set 
"inclusive" to false in the data range
                 dataRange = dataRange.forPaging(keyRange, metadata.comparator, 
startBound.getAsClustering(metadata), false);
             }
-            return new PartitionRangeReadCommand(false, 0, true, metadata, 
nowInSec, selection, rowFilter, limits, dataRange);
+            return new PartitionRangeReadCommand(false, 0, true, metadata, 
nowInSec, selection, rowFilter, limits, dataRange, Optional.empty());
         }
 
         public long serializedSize(ReadCommand command, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/ReadOrderGroup.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadOrderGroup.java 
b/src/java/org/apache/cassandra/db/ReadOrderGroup.java
index 44befa2..0720d79 100644
--- a/src/java/org/apache/cassandra/db/ReadOrderGroup.java
+++ b/src/java/org/apache/cassandra/db/ReadOrderGroup.java
@@ -98,7 +98,7 @@ public class ReadOrderGroup implements AutoCloseable
 
     private static ColumnFamilyStore maybeGetIndexCfs(ColumnFamilyStore 
baseCfs, ReadCommand command)
     {
-        Index index = baseCfs.indexManager.getBestIndexFor(command);
+        Index index = command.getIndex(baseCfs);
         return index == null ? null : index.getBackingTable().orElse(null);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 7b62f5a..c08ef6a 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -21,20 +21,26 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.cache.*;
+import org.apache.cassandra.cache.IRowCacheEntry;
+import org.apache.cassandra.cache.RowCacheKey;
+import org.apache.cassandra.cache.RowCacheSentinel;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.*;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.pager.*;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -507,7 +513,7 @@ public abstract class SinglePartitionReadCommand<F extends 
ClusteringIndexFilter
 
     private static class Deserializer extends SelectionDeserializer
     {
-        public ReadCommand deserialize(DataInputPlus in, int version, boolean 
isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int 
nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits)
+        public ReadCommand deserialize(DataInputPlus in, int version, boolean 
isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int 
nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, 
Optional<IndexMetadata> index)
         throws IOException
         {
             DecoratedKey key = 
metadata.decorateKey(metadata.getKeyValidator().readValue(in));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java 
b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index fabfebc..bd3202d 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -536,19 +536,15 @@ public class SecondaryIndexManager implements 
IndexRegistry
      * index should be performed in the searcherFor method to ensure that we 
pick the right
      * index regardless of the validity of the expression.
      *
-     * This method is called at various points during the lifecycle of a 
ReadCommand (to obtain a Searcher,
-     * get the index's underlying CFS for ReadOrderGroup, or an estimate of 
the result size from an average index
-     * query).
-     *
-     * Ideally, we would do this relatively expensive operation only once, and 
attach the index to the
-     * ReadCommand for future reference. This requires the index be passed 
onto additional commands generated
-     * to process subranges etc.
+     * This method is only called once during the lifecycle of a ReadCommand 
and the result is
+     * cached for future use when obtaining a Searcher, getting the index's 
underlying CFS for
+     * ReadOrderGroup, or an estimate of the result size from an average index 
query.
      *
      * @param command ReadCommand to be executed
      * @return an Index instance, ready to use during execution of the 
command, or null if none
      * of the registered indexes can support the command.
      */
-    public Index getBestIndexFor(ReadCommand command, boolean includeInTrace)
+    public Index getBestIndexFor(ReadCommand command)
     {
         if (indexes.isEmpty() || command.rowFilter().isEmpty())
             return null;
@@ -564,8 +560,7 @@ public class SecondaryIndexManager implements IndexRegistry
         if (searchableIndexes.isEmpty())
         {
             logger.debug("No applicable indexes found");
-            if (includeInTrace)
-                Tracing.trace("No applicable indexes found");
+            Tracing.trace("No applicable indexes found");
             return null;
         }
 
@@ -575,7 +570,7 @@ public class SecondaryIndexManager implements IndexRegistry
                                           .orElseThrow(() -> new 
AssertionError("Could not select most selective index"));
 
         // pay for an additional threadlocal get() rather than build the 
strings unnecessarily
-        if (includeInTrace && Tracing.isTracing())
+        if (Tracing.isTracing())
         {
             Tracing.trace("Index mean cardinalities are {}. Scanning with {}.",
                           searchableIndexes.stream().map(i -> i.getIndexName() 
+ ':' + i.getEstimatedResultRows())
@@ -585,12 +580,6 @@ public class SecondaryIndexManager implements IndexRegistry
         return selected;
     }
 
-    // convenience method which doesn't emit tracing messages
-    public Index getBestIndexFor(ReadCommand command)
-    {
-        return getBestIndexFor(command, false);
-    }
-
     /**
      * Called at write time to ensure that values present in the update
      * are valid according to the rules of all registered indexes which

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/schema/IndexMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/IndexMetadata.java 
b/src/java/org/apache/cassandra/schema/IndexMetadata.java
index 40a75c6..6846a14 100644
--- a/src/java/org/apache/cassandra/schema/IndexMetadata.java
+++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java
@@ -18,10 +18,9 @@
 
 package org.apache.cassandra.schema;
 
+import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
@@ -37,7 +36,10 @@ import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.index.Index;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDSerializer;
 
 /**
  * An immutable representation of secondary index metadata.
@@ -46,6 +48,8 @@ public final class IndexMetadata
 {
     private static final Logger logger = 
LoggerFactory.getLogger(IndexMetadata.class);
 
+    public static final Serializer serializer = new Serializer();
+
     public enum IndexType
     {
         KEYS, CUSTOM, COMPOSITES
@@ -56,6 +60,9 @@ public final class IndexMetadata
         COLUMN, ROW
     }
 
+    // UUID for serialization. This is a deterministic UUID generated from the 
index name
+    // Both the id and name are guaranteed unique per keyspace.
+    public final UUID id;
     public final String name;
     public final IndexType indexType;
     public final TargetType targetType;
@@ -68,6 +75,7 @@ public final class IndexMetadata
                           TargetType targetType,
                           Set<ColumnIdentifier> columns)
     {
+        this.id = UUID.nameUUIDFromBytes(name.getBytes());
         this.name = name;
         this.options = options == null ? ImmutableMap.of() : 
ImmutableMap.copyOf(options);
         this.indexType = indexType;
@@ -194,7 +202,7 @@ public final class IndexMetadata
 
     public int hashCode()
     {
-        return Objects.hashCode(name, indexType, targetType, options, columns);
+        return Objects.hashCode(id, name, indexType, targetType, options, 
columns);
     }
 
     public boolean equalsWithoutName(IndexMetadata other)
@@ -215,12 +223,13 @@ public final class IndexMetadata
 
         IndexMetadata other = (IndexMetadata)obj;
 
-        return Objects.equal(name, other.name) && equalsWithoutName(other);
+        return Objects.equal(id, other.id) && Objects.equal(name, other.name) 
&& equalsWithoutName(other);
     }
 
     public String toString()
     {
         return new ToStringBuilder(this)
+            .append("id", id.toString())
             .append("name", name)
             .append("indexType", indexType)
             .append("targetType", targetType)
@@ -228,4 +237,24 @@ public final class IndexMetadata
             .append("options", options)
             .build();
     }
+
+    public static class Serializer
+    {
+        public void serialize(IndexMetadata metadata, DataOutputPlus out, int 
version) throws IOException
+        {
+            UUIDSerializer.serializer.serialize(metadata.id, out, version);
+        }
+
+        public IndexMetadata deserialize(DataInputPlus in, int version, 
CFMetaData cfm) throws IOException
+        {
+            UUID id = UUIDSerializer.serializer.deserialize(in, version);
+            return cfm.getIndexes().get(id).orElseThrow(() -> new 
UnknownIndexException(cfm, id));
+        }
+
+        public long serializedSize(IndexMetadata metadata, int version)
+        {
+            return UUIDSerializer.serializer.serializedSize(metadata.id, 
version);
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/schema/Indexes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Indexes.java 
b/src/java/org/apache/cassandra/schema/Indexes.java
index 6227e0b..9114f63 100644
--- a/src/java/org/apache/cassandra/schema/Indexes.java
+++ b/src/java/org/apache/cassandra/schema/Indexes.java
@@ -40,12 +40,14 @@ import static com.google.common.collect.Iterables.filter;
  */
 public class Indexes implements Iterable<IndexMetadata>
 {
-    private final ImmutableMap<String, IndexMetadata> indexes;
+    private final ImmutableMap<String, IndexMetadata> indexesByName;
+    private final ImmutableMap<UUID, IndexMetadata> indexesById;
     private final ImmutableMultimap<ColumnIdentifier, IndexMetadata> 
indexesByColumn;
 
     private Indexes(Builder builder)
     {
-        indexes = builder.indexes.build();
+        indexesByName = builder.indexesByName.build();
+        indexesById = builder.indexesById.build();
         indexesByColumn = builder.indexesByColumn.build();
     }
 
@@ -61,17 +63,17 @@ public class Indexes implements Iterable<IndexMetadata>
 
     public Iterator<IndexMetadata> iterator()
     {
-        return indexes.values().iterator();
+        return indexesByName.values().iterator();
     }
 
     public int size()
     {
-        return indexes.size();
+        return indexesByName.size();
     }
 
     public boolean isEmpty()
     {
-        return indexes.isEmpty();
+        return indexesByName.isEmpty();
     }
 
     /**
@@ -82,7 +84,7 @@ public class Indexes implements Iterable<IndexMetadata>
      */
     public Optional<IndexMetadata> get(String name)
     {
-        return indexes.values().stream().filter(def -> 
def.name.equals(name)).findFirst();
+        return Optional.ofNullable(indexesByName.get(name));
     }
 
     /**
@@ -92,7 +94,30 @@ public class Indexes implements Iterable<IndexMetadata>
      */
     public boolean has(String name)
     {
-        return get(name).isPresent();
+        return indexesByName.containsKey(name);
+    }
+
+    /**
+     * Get the index with the specified id
+     *
+     * @param name a UUID which identifies an index
+     * @return an empty {@link Optional} if no index with the specified id is 
found; a non-empty optional of
+     *         {@link IndexMetadata} otherwise
+     */
+
+    public Optional<IndexMetadata> get(UUID id)
+    {
+        return Optional.ofNullable(indexesById.get(id));
+    }
+
+    /**
+     * Answer true if contains an index with the specified id.
+     * @param name a UUID which identifies an index.
+     * @return true if an index with the specified id is found; false otherwise
+     */
+    public boolean has(UUID id)
+    {
+        return indexesById.containsKey(id);
     }
 
     /**
@@ -148,19 +173,19 @@ public class Indexes implements Iterable<IndexMetadata>
     @Override
     public boolean equals(Object o)
     {
-        return this == o || (o instanceof Indexes && indexes.equals(((Indexes) 
o).indexes));
+        return this == o || (o instanceof Indexes && 
indexesByName.equals(((Indexes) o).indexesByName));
     }
 
     @Override
     public int hashCode()
     {
-        return indexes.hashCode();
+        return indexesByName.hashCode();
     }
 
     @Override
     public String toString()
     {
-        return indexes.values().toString();
+        return indexesByName.values().toString();
     }
 
     public static String getAvailableIndexName(String ksName, String cfName, 
ColumnIdentifier columnName)
@@ -179,7 +204,8 @@ public class Indexes implements Iterable<IndexMetadata>
 
     public static final class Builder
     {
-        final ImmutableMap.Builder<String, IndexMetadata> indexes = new 
ImmutableMap.Builder<>();
+        final ImmutableMap.Builder<String, IndexMetadata> indexesByName = new 
ImmutableMap.Builder<>();
+        final ImmutableMap.Builder<UUID, IndexMetadata> indexesById = new 
ImmutableMap.Builder<>();
         final ImmutableMultimap.Builder<ColumnIdentifier, IndexMetadata> 
indexesByColumn = new ImmutableMultimap.Builder<>();
 
         private Builder()
@@ -193,7 +219,8 @@ public class Indexes implements Iterable<IndexMetadata>
 
         public Builder add(IndexMetadata index)
         {
-            indexes.put(index.name, index);
+            indexesByName.put(index.name, index);
+            indexesById.put(index.id, index);
             // All indexes are column indexes at the moment
             if (index.isColumnIndex())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/schema/UnknownIndexException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/UnknownIndexException.java 
b/src/java/org/apache/cassandra/schema/UnknownIndexException.java
new file mode 100644
index 0000000..5daf631
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/UnknownIndexException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.cassandra.config.CFMetaData;
+
+/**
+ * Exception thrown when we read an index id from a serialized ReadCommand and 
no corresponding IndexMetadata
+ * can be found in the CFMetaData#indexes collection. Note that this is an 
internal exception and is not meant
+ * to be user facing, the node reading the ReadCommand should proceed as if no 
index id were present.
+ */
+public class UnknownIndexException extends IOException
+{
+    public final UUID indexId;
+    public UnknownIndexException(CFMetaData metadata, UUID id)
+    {
+        super(String.format("Unknown index %s for table %s.%s", id.toString(), 
metadata.ksName, metadata.cfName));
+        indexId = id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 59f1c1c..e3b884e 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1717,7 +1717,7 @@ public class StorageProxy implements StorageProxyMBean
     private static float estimateResultsPerRange(PartitionRangeReadCommand 
command, Keyspace keyspace)
     {
         ColumnFamilyStore cfs = 
keyspace.getColumnFamilyStore(command.metadata().cfId);
-        Index index = cfs.indexManager.getBestIndexFor(command);
+        Index index = command.getIndex(cfs);
         float maxExpectedResults = index == null
                                  ? command.limits().estimateTotalResults(cfs)
                                  : index.getEstimatedResultRows();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java 
b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
index 2e57a8b..87eb018 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -17,15 +17,17 @@
  */
 package org.apache.cassandra.service.pager;
 
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.exceptions.RequestExecutionException;
+import java.util.Optional;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+
 /**
  * Pages a RangeSliceCommand whose predicate is a slice query.
  *
@@ -89,7 +91,9 @@ public class RangeSliceQueryPager extends AbstractQueryPager
             }
         }
 
-        return new PartitionRangeReadCommand(command.metadata(), 
command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, 
pageRange);
+        // it won't hurt for the next page command to query the index manager
+        // again to check for an applicable index, so don't supply one here
+        return new PartitionRangeReadCommand(command.metadata(), 
command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, 
pageRange, Optional.empty());
     }
 
     protected void recordLast(DecoratedKey key, Row last)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java 
b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 038384e..9cd1653 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -30,12 +30,9 @@ import java.util.zip.Inflater;
 import com.google.common.base.Joiner;
 import com.google.common.collect.*;
 import com.google.common.primitives.Longs;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.filter.ColumnFilter;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.QueryOptions;
@@ -1520,7 +1517,8 @@ public class CassandraServer implements Cassandra.Iface
                                                                               
columns,
                                                                               
ThriftConversion.rowFilterFromThrift(metadata, range.row_filter),
                                                                               
limits,
-                                                                              
new DataRange(bounds, filter));
+                                                                              
new DataRange(bounds, filter),
+                                                                              
Optional.empty());
                 try (PartitionIterator results = 
StorageProxy.getRangeSlice(cmd, consistencyLevel))
                 {
                     assert results != null;
@@ -1613,7 +1611,8 @@ public class CassandraServer implements Cassandra.Iface
                                                                               
ColumnFilter.all(metadata),
                                                                               
RowFilter.NONE,
                                                                               
limits,
-                                                                              
new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, 
true));
+                                                                              
new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, 
true),
+                                                                              
Optional.empty());
                 try (PartitionIterator results = 
StorageProxy.getRangeSlice(cmd, consistencyLevel))
                 {
                     return thriftifyKeySlices(results, new 
ColumnParent(column_family), limits.perPartitionCount());
@@ -1704,7 +1703,8 @@ public class CassandraServer implements Cassandra.Iface
                                                                           
columns,
                                                                           
ThriftConversion.rowFilterFromThrift(metadata, index_clause.expressions),
                                                                           
limits,
-                                                                          new 
DataRange(bounds, filter));
+                                                                          new 
DataRange(bounds, filter),
+                                                                          
Optional.empty());
             try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, 
consistencyLevel))
             {
                 return thriftifyKeySlices(results, column_parent, 
limits.perPartitionCount());

Reply via email to