Merge branch cassandra-2.2 into cassandra-3.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7b430eee
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7b430eee
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7b430eee

Branch: refs/heads/trunk
Commit: 7b430eee69d8f70b086a30a6e9d3c42a9db4aa08
Parents: d300a18 61e0251
Author: Benjamin Lerer <b.le...@gmail.com>
Authored: Fri Nov 27 11:15:08 2015 +0100
Committer: Benjamin Lerer <b.le...@gmail.com>
Committed: Fri Nov 27 11:16:02 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/ReadCommand.java    |  9 +-
 .../index/IndexNotAvailableException.java       | 34 ++++++++
 .../cassandra/index/SecondaryIndexManager.java  | 64 +++++++++++---
 .../index/internal/CassandraIndex.java          |  6 +-
 .../cassandra/net/MessageDeliveryTask.java      |  7 +-
 .../validation/entities/SecondaryIndexTest.java | 88 ++++++++++++++++----
 .../index/internal/CustomCassandraIndex.java    |  4 +-
 8 files changed, 178 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b430eee/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 5e6c03c,63305d6..252972c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,16 -1,5 +1,17 @@@
 -2.2.4
 +3.0.1
 + * Fix SELECT statement with IN restrictions on partition key,
 +   ORDER BY and LIMIT (CASSANDRA-10729)
 + * Improve stress performance over 1k threads (CASSANDRA-7217)
 + * Wait for migration responses to complete before bootstrapping 
(CASSANDRA-10731)
 + * Unable to create a function with argument of type Inet (CASSANDRA-10741)
 + * Fix backward incompatibiliy in CqlInputFormat (CASSANDRA-10717)
 + * Correctly preserve deletion info on updated rows when notifying indexers
 +   of single-row deletions (CASSANDRA-10694)
 + * Notify indexers of partition delete during cleanup (CASSANDRA-10685)
 + * Keep the file open in trySkipCache (CASSANDRA-10669)
 + * Updated trigger example (CASSANDRA-10257)
 +Merged from 2.2:
+  * Reject index queries while the index is building (CASSANDRA-8505)
   * CQL.textile syntax incorrectly includes optional keyspace for aggregate 
SFUNC and FINALFUNC (CASSANDRA-10747)
   * Fix JSON update with prepared statements (CASSANDRA-10631)
   * Don't do anticompaction after subrange repair (CASSANDRA-10422)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b430eee/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadCommand.java
index 301cb86,cd86336..5ab1ee5
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@@ -17,92 -17,39 +17,93 @@@
   */
  package org.apache.cassandra.db;
  
 -import java.io.DataInput;
  import java.io.IOException;
  import java.nio.ByteBuffer;
 +import java.util.*;
  
 -import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.db.filter.IDiskAtomFilter;
 -import org.apache.cassandra.db.filter.NamesQueryFilter;
 -import org.apache.cassandra.db.filter.SliceQueryFilter;
 +import com.google.common.collect.Lists;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.*;
 +import org.apache.cassandra.cql3.Operator;
 +import org.apache.cassandra.db.filter.*;
 +import org.apache.cassandra.db.partitions.*;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.index.Index;
++import org.apache.cassandra.index.IndexNotAvailableException;
  import org.apache.cassandra.io.IVersionedSerializer;
 +import org.apache.cassandra.io.util.DataInputPlus;
  import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.metrics.TableMetrics;
  import org.apache.cassandra.net.MessageOut;
  import org.apache.cassandra.net.MessagingService;
 -import org.apache.cassandra.service.IReadCommand;
 -import org.apache.cassandra.service.RowDataResolver;
 -import org.apache.cassandra.service.pager.Pageable;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.schema.UnknownIndexException;
 +import org.apache.cassandra.service.ClientWarn;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.Pair;
  
 -public abstract class ReadCommand implements IReadCommand, Pageable
 +/**
 + * General interface for storage-engine read commands (common to both range 
and
 + * single partition commands).
 + * <p>
 + * This contains all the informations needed to do a local read.
 + */
 +public abstract class ReadCommand implements ReadQuery
  {
 -    public enum Type
 +    protected static final Logger logger = 
LoggerFactory.getLogger(ReadCommand.class);
 +
 +    public static final IVersionedSerializer<ReadCommand> serializer = new 
Serializer();
 +    // For RANGE_SLICE verb: will either dispatch on 'serializer' for 3.0 or 
'legacyRangeSliceCommandSerializer' for earlier version.
 +    // Can be removed (and replaced by 'serializer') once we drop pre-3.0 
backward compatibility.
 +    public static final IVersionedSerializer<ReadCommand> 
rangeSliceSerializer = new RangeSliceSerializer();
 +
 +    public static final IVersionedSerializer<ReadCommand> 
legacyRangeSliceCommandSerializer = new LegacyRangeSliceCommandSerializer();
 +    public static final IVersionedSerializer<ReadCommand> 
legacyPagedRangeCommandSerializer = new LegacyPagedRangeCommandSerializer();
 +    public static final IVersionedSerializer<ReadCommand> 
legacyReadCommandSerializer = new LegacyReadCommandSerializer();
 +
 +    private final Kind kind;
 +    private final CFMetaData metadata;
 +    private final int nowInSec;
 +
 +    private final ColumnFilter columnFilter;
 +    private final RowFilter rowFilter;
 +    private final DataLimits limits;
 +
 +    // SecondaryIndexManager will attempt to provide the most selective of 
any available indexes
 +    // during execution. Here we also store an the results of that lookup to 
repeating it over
 +    // the lifetime of the command.
 +    protected Optional<IndexMetadata> index = Optional.empty();
 +
 +    // Flag to indicate whether the index manager has been queried to select 
an index for this
 +    // command. This is necessary as the result of that lookup may be null, 
in which case we
 +    // still don't want to repeat it.
 +    private boolean indexManagerQueried = false;
 +
 +    private boolean isDigestQuery;
 +    // if a digest query, the version for which the digest is expected. 
Ignored if not a digest.
 +    private int digestVersion;
 +    private final boolean isForThrift;
 +
 +    protected static abstract class SelectionDeserializer
      {
 -        GET_BY_NAMES((byte)1),
 -        GET_SLICES((byte)2);
 +        public abstract ReadCommand deserialize(DataInputPlus in, int 
version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData 
metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, 
DataLimits limits, Optional<IndexMetadata> index) throws IOException;
 +    }
  
 -        public final byte serializedValue;
 +    protected enum Kind
 +    {
 +        SINGLE_PARTITION (SinglePartitionReadCommand.selectionDeserializer),
 +        PARTITION_RANGE  (PartitionRangeReadCommand.selectionDeserializer);
  
 -        private Type(byte b)
 -        {
 -            this.serializedValue = b;
 -        }
 +        private final SelectionDeserializer selectionDeserializer;
  
 -        public static Type fromSerializedValue(byte b)
 +        Kind(SelectionDeserializer selectionDeserializer)
          {
 -            return b == 1 ? GET_BY_NAMES : GET_SLICES;
 +            this.selectionDeserializer = selectionDeserializer;
          }
      }
  
@@@ -231,707 -95,55 +232,713 @@@
          return this;
      }
  
 -    public String getColumnFamilyName()
 +    /**
 +     * Sets the digest version, for when digest for that command is requested.
 +     * <p>
 +     * Note that we allow setting this independently of setting the command 
as a digest query as
 +     * this allows us to use the command as a carrier of the digest version 
even if we only call
 +     * setIsDigestQuery on some copy of it.
 +     *
 +     * @param digestVersion the version for the digest is this command is 
used for digest query..
 +     * @return this read command.
 +     */
 +    public ReadCommand setDigestVersion(int digestVersion)
      {
 -        return cfName;
 +        this.digestVersion = digestVersion;
 +        return this;
      }
  
 +    /**
 +     * Whether this query is for thrift or not.
 +     *
 +     * @return whether this query is for thrift.
 +     */
 +    public boolean isForThrift()
 +    {
 +        return isForThrift;
 +    }
 +
 +    /**
 +     * The clustering index filter this command to use for the provided key.
 +     * <p>
 +     * Note that that method should only be called on a key actually queried 
by this command
 +     * and in practice, this will almost always return the same filter, but 
for the sake of
 +     * paging, the filter on the first key of a range command might be 
slightly different.
 +     *
 +     * @param key a partition key queried by this command.
 +     *
 +     * @return the {@code ClusteringIndexFilter} to use for the partition of 
key {@code key}.
 +     */
 +    public abstract ClusteringIndexFilter clusteringIndexFilter(DecoratedKey 
key);
 +
 +    /**
 +     * Returns a copy of this command.
 +     *
 +     * @return a copy of this command.
 +     */
      public abstract ReadCommand copy();
  
 -    public abstract Row getRow(Keyspace keyspace);
 +    protected abstract UnfilteredPartitionIterator 
queryStorage(ColumnFamilyStore cfs, ReadOrderGroup orderGroup);
 +
 +    protected abstract int oldestUnrepairedTombstone();
 +
 +    public ReadResponse createResponse(UnfilteredPartitionIterator iterator, 
ColumnFilter selection)
 +    {
 +        return isDigestQuery()
 +             ? ReadResponse.createDigestResponse(iterator, digestVersion)
 +             : ReadResponse.createDataResponse(iterator, selection);
 +    }
 +
 +    public long indexSerializedSize(int version)
 +    {
 +        if (index.isPresent())
 +            return IndexMetadata.serializer.serializedSize(index.get(), 
version);
 +        else
 +            return 0;
 +    }
 +
 +    public Index getIndex(ColumnFamilyStore cfs)
 +    {
 +        // if we've already consulted the index manager, and it returned a 
valid index
 +        // the result should be cached here.
 +        if(index.isPresent())
 +            return cfs.indexManager.getIndex(index.get());
 +
 +        // if no cached index is present, but we've already consulted the 
index manager
 +        // then no registered index is suitable for this command, so just 
return null.
 +        if (indexManagerQueried)
 +            return null;
 +
 +        // do the lookup, set the flag to indicate so and cache the result if 
not null
 +        Index selected = cfs.indexManager.getBestIndexFor(this);
 +        indexManagerQueried = true;
  
 -    public abstract IDiskAtomFilter filter();
 +        if (selected == null)
 +            return null;
  
 -    public String getKeyspace()
 +        index = Optional.of(selected.getIndexMetadata());
 +        return selected;
 +    }
 +
 +    /**
 +     * Executes this command on the local host.
 +     *
 +     * @param orderGroup the operation group spanning this command
 +     *
 +     * @return an iterator over the result of executing this command locally.
 +     */
 +    @SuppressWarnings("resource") // The result iterator is closed upon 
exceptions (we know it's fine to potentially not close the intermediary
 +                                  // iterators created inside the try as long 
as we do close the original resultIterator), or by closing the result.
 +    public UnfilteredPartitionIterator executeLocally(ReadOrderGroup 
orderGroup)
      {
 -        return ksName;
 +        long startTimeNanos = System.nanoTime();
 +
 +        ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
 +        Index index = getIndex(cfs);
-         Index.Searcher searcher = index == null ? null : 
index.searcherFor(this);
 +
++        Index.Searcher searcher = null;
 +        if (index != null)
++        {
++            if (!cfs.indexManager.isIndexQueryable(index))
++                throw new IndexNotAvailableException(index);
++
++            searcher = index.searcherFor(this);
 +            Tracing.trace("Executing read on {}.{} using index {}", 
cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexMetadata().name);
++        }
 +
 +        UnfilteredPartitionIterator resultIterator = searcher == null
 +                                         ? queryStorage(cfs, orderGroup)
 +                                         : searcher.search(orderGroup);
 +
 +        try
 +        {
 +            resultIterator = 
withMetricsRecording(withoutPurgeableTombstones(resultIterator, cfs), 
cfs.metric, startTimeNanos);
 +
 +            // If we've used a 2ndary index, we know the result already 
satisfy the primary expression used, so
 +            // no point in checking it again.
 +            RowFilter updatedFilter = searcher == null
 +                                    ? rowFilter()
 +                                    : 
index.getPostIndexQueryFilter(rowFilter());
 +
 +            // TODO: We'll currently do filtering by the rowFilter here 
because it's convenient. However,
 +            // we'll probably want to optimize by pushing it down the layer 
(like for dropped columns) as it
 +            // would be more efficient (the sooner we discard stuff we know 
we don't care, the less useless
 +            // processing we do on it).
 +            return limits().filter(updatedFilter.filter(resultIterator, 
nowInSec()), nowInSec());
 +        }
 +        catch (RuntimeException | Error e)
 +        {
 +            resultIterator.close();
 +            throw e;
 +        }
      }
  
 -    // maybeGenerateRetryCommand is used to generate a retry for short reads
 -    public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, 
Row row)
 +    protected abstract void recordLatency(TableMetrics metric, long 
latencyNanos);
 +
 +    public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
      {
 -        return null;
 +        return 
UnfilteredPartitionIterators.filter(executeLocally(orderGroup), nowInSec());
      }
  
 -    // maybeTrim removes columns from a response that is too long
 -    public Row maybeTrim(Row row)
 +    public ReadOrderGroup startOrderGroup()
      {
 -        return row;
 +        return ReadOrderGroup.forCommand(this);
      }
  
 -    public long getTimeout()
 +    /**
 +     * Wraps the provided iterator so that metrics on what is scanned by the 
command are recorded.
 +     * This also log warning/trow TombstoneOverwhelmingException if 
appropriate.
 +     */
 +    private UnfilteredPartitionIterator 
withMetricsRecording(UnfilteredPartitionIterator iter, final TableMetrics 
metric, final long startTimeNanos)
      {
 -        return DatabaseDescriptor.getReadRpcTimeout();
 +        class MetricRecording extends Transformation<UnfilteredRowIterator>
 +        {
 +            private final int failureThreshold = 
DatabaseDescriptor.getTombstoneFailureThreshold();
 +            private final int warningThreshold = 
DatabaseDescriptor.getTombstoneWarnThreshold();
 +
 +            private final boolean respectTombstoneThresholds = 
!Schema.isSystemKeyspace(ReadCommand.this.metadata().ksName);
 +
 +            private int liveRows = 0;
 +            private int tombstones = 0;
 +
 +            private DecoratedKey currentKey;
 +
 +            @Override
 +            public UnfilteredRowIterator 
applyToPartition(UnfilteredRowIterator iter)
 +            {
 +                currentKey = iter.partitionKey();
 +                return Transformation.apply(iter, this);
 +            }
 +
 +            @Override
 +            public Row applyToStatic(Row row)
 +            {
 +                return applyToRow(row);
 +            }
 +
 +            @Override
 +            public Row applyToRow(Row row)
 +            {
 +                if (row.hasLiveData(ReadCommand.this.nowInSec()))
 +                    ++liveRows;
 +
 +                for (Cell cell : row.cells())
 +                {
 +                    if (!cell.isLive(ReadCommand.this.nowInSec()))
 +                        countTombstone(row.clustering());
 +                }
 +                return row;
 +            }
 +
 +            @Override
 +            public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker 
marker)
 +            {
 +                countTombstone(marker.clustering());
 +                return marker;
 +            }
 +
 +            private void countTombstone(ClusteringPrefix clustering)
 +            {
 +                ++tombstones;
 +                if (tombstones > failureThreshold && 
respectTombstoneThresholds)
 +                {
 +                    String query = ReadCommand.this.toCQLString();
 +                    Tracing.trace("Scanned over {} tombstones for query {}; 
query aborted (see tombstone_failure_threshold)", failureThreshold, query);
 +                    throw new TombstoneOverwhelmingException(tombstones, 
query, ReadCommand.this.metadata(), currentKey, clustering);
 +                }
 +            }
 +
 +            @Override
 +            public void onClose()
 +            {
 +                recordLatency(metric, System.nanoTime() - startTimeNanos);
 +
 +                metric.tombstoneScannedHistogram.update(tombstones);
 +                metric.liveScannedHistogram.update(liveRows);
 +
 +                boolean warnTombstones = tombstones > warningThreshold && 
respectTombstoneThresholds;
 +                if (warnTombstones)
 +                {
 +                    String msg = String.format("Read %d live rows and %d 
tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows, 
tombstones, ReadCommand.this.toCQLString());
 +                    ClientWarn.warn(msg);
 +                    logger.warn(msg);
 +                }
 +
 +                Tracing.trace("Read {} live and {} tombstone cells{}", 
liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : 
""));
 +            }
 +        };
 +
 +        return Transformation.apply(iter, new MetricRecording());
      }
 -}
  
 -class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
 -{
 -    public void serialize(ReadCommand command, DataOutputPlus out, int 
version) throws IOException
 +    /**
 +     * Creates a message for this command.
 +     */
 +    public abstract MessageOut<ReadCommand> createMessage(int version);
 +
 +    protected abstract void appendCQLWhereClause(StringBuilder sb);
 +
 +    // Skip purgeable tombstones. We do this because it's safe to do 
(post-merge of the memtable and sstable at least), it
 +    // can save us some bandwith, and avoid making us throw a 
TombstoneOverwhelmingException for purgeable tombstones (which
 +    // are to some extend an artefact of compaction lagging behind and hence 
counting them is somewhat unintuitive).
 +    protected UnfilteredPartitionIterator 
withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, 
ColumnFamilyStore cfs)
 +    {
 +        final boolean isForThrift = iterator.isForThrift();
 +        class WithoutPurgeableTombstones extends PurgeFunction
 +        {
 +            public WithoutPurgeableTombstones()
 +            {
 +                super(isForThrift, cfs.gcBefore(nowInSec()), 
oldestUnrepairedTombstone(), 
cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones());
 +            }
 +
 +            protected long getMaxPurgeableTimestamp()
 +            {
 +                return Long.MAX_VALUE;
 +            }
 +        }
 +        return Transformation.apply(iterator, new 
WithoutPurgeableTombstones());
 +    }
 +
 +    /**
 +     * Recreate the CQL string corresponding to this query.
 +     * <p>
 +     * Note that in general the returned string will not be exactly the 
original user string, first
 +     * because there isn't always a single syntax for a given query,  but 
also because we don't have
 +     * all the information needed (we know the non-PK columns queried but not 
the PK ones as internally
 +     * we query them all). So this shouldn't be relied too strongly, but this 
should be good enough for
 +     * debugging purpose which is what this is for.
 +     */
 +    public String toCQLString()
 +    {
 +        StringBuilder sb = new StringBuilder();
 +        sb.append("SELECT ").append(columnFilter());
 +        sb.append(" FROM 
").append(metadata().ksName).append('.').append(metadata.cfName);
 +        appendCQLWhereClause(sb);
 +
 +        if (limits() != DataLimits.NONE)
 +            sb.append(' ').append(limits());
 +        return sb.toString();
 +    }
 +
 +    private static class Serializer implements 
IVersionedSerializer<ReadCommand>
 +    {
 +        private static int digestFlag(boolean isDigest)
 +        {
 +            return isDigest ? 0x01 : 0;
 +        }
 +
 +        private static boolean isDigest(int flags)
 +        {
 +            return (flags & 0x01) != 0;
 +        }
 +
 +        private static int thriftFlag(boolean isForThrift)
 +        {
 +            return isForThrift ? 0x02 : 0;
 +        }
 +
 +        private static boolean isForThrift(int flags)
 +        {
 +            return (flags & 0x02) != 0;
 +        }
 +
 +        private static int indexFlag(boolean hasIndex)
 +        {
 +            return hasIndex ? 0x04 : 0;
 +        }
 +
 +        private static boolean hasIndex(int flags)
 +        {
 +            return (flags & 0x04) != 0;
 +        }
 +
 +        public void serialize(ReadCommand command, DataOutputPlus out, int 
version) throws IOException
 +        {
 +            // for serialization, createLegacyMessage() should cause 
legacyReadCommandSerializer to be used directly
 +            assert version >= MessagingService.VERSION_30;
 +
 +            out.writeByte(command.kind.ordinal());
 +            out.writeByte(digestFlag(command.isDigestQuery()) | 
thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent()));
 +            if (command.isDigestQuery())
 +                out.writeUnsignedVInt(command.digestVersion());
 +            CFMetaData.serializer.serialize(command.metadata(), out, version);
 +            out.writeInt(command.nowInSec());
 +            ColumnFilter.serializer.serialize(command.columnFilter(), out, 
version);
 +            RowFilter.serializer.serialize(command.rowFilter(), out, version);
 +            DataLimits.serializer.serialize(command.limits(), out, version);
 +            if (command.index.isPresent())
 +                IndexMetadata.serializer.serialize(command.index.get(), out, 
version);
 +
 +            command.serializeSelection(out, version);
 +        }
 +
 +        public ReadCommand deserialize(DataInputPlus in, int version) throws 
IOException
 +        {
 +            if (version < MessagingService.VERSION_30)
 +                return legacyReadCommandSerializer.deserialize(in, version);
 +
 +            Kind kind = Kind.values()[in.readByte()];
 +            int flags = in.readByte();
 +            boolean isDigest = isDigest(flags);
 +            boolean isForThrift = isForThrift(flags);
 +            boolean hasIndex = hasIndex(flags);
 +            int digestVersion = isDigest ? (int)in.readUnsignedVInt() : 0;
 +            CFMetaData metadata = CFMetaData.serializer.deserialize(in, 
version);
 +            int nowInSec = in.readInt();
 +            ColumnFilter columnFilter = 
ColumnFilter.serializer.deserialize(in, version, metadata);
 +            RowFilter rowFilter = RowFilter.serializer.deserialize(in, 
version, metadata);
 +            DataLimits limits = DataLimits.serializer.deserialize(in, 
version);
 +            Optional<IndexMetadata> index = hasIndex
 +                                            ? deserializeIndexMetadata(in, 
version, metadata)
 +                                            : Optional.empty();
 +
 +            return kind.selectionDeserializer.deserialize(in, version, 
isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, 
rowFilter, limits, index);
 +        }
 +
 +        private Optional<IndexMetadata> 
deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws 
IOException
 +        {
 +            try
 +            {
 +                return Optional.of(IndexMetadata.serializer.deserialize(in, 
version, cfm));
 +            }
 +            catch (UnknownIndexException e)
 +            {
 +                String message = String.format("Couldn't find a defined index 
on %s.%s with the id %s. " +
 +                                               "If an index was just created, 
this is likely due to the schema not " +
 +                                               "being fully propagated. Local 
read will proceed without using the " +
 +                                               "index. Please wait for schema 
agreement after index creation.",
 +                                               cfm.ksName, cfm.cfName, 
e.indexId.toString());
 +                logger.info(message);
 +                return Optional.empty();
 +            }
 +        }
 +
 +        public long serializedSize(ReadCommand command, int version)
 +        {
 +            // for serialization, createLegacyMessage() should cause 
legacyReadCommandSerializer to be used directly
 +            assert version >= MessagingService.VERSION_30;
 +
 +            return 2 // kind + flags
 +                 + (command.isDigestQuery() ? 
TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0)
 +                 + CFMetaData.serializer.serializedSize(command.metadata(), 
version)
 +                 + TypeSizes.sizeof(command.nowInSec())
 +                 + 
ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
 +                 + RowFilter.serializer.serializedSize(command.rowFilter(), 
version)
 +                 + DataLimits.serializer.serializedSize(command.limits(), 
version)
 +                 + command.selectionSerializedSize(version)
 +                 + command.indexSerializedSize(version);
 +        }
 +    }
 +
 +    // Dispatch to either Serializer or LegacyRangeSliceCommandSerializer. 
Only useful as long as we maintain pre-3.0
 +    // compatibility
 +    private static class RangeSliceSerializer implements 
IVersionedSerializer<ReadCommand>
 +    {
 +        public void serialize(ReadCommand command, DataOutputPlus out, int 
version) throws IOException
 +        {
 +            if (version < MessagingService.VERSION_30)
 +                legacyRangeSliceCommandSerializer.serialize(command, out, 
version);
 +            else
 +                serializer.serialize(command, out, version);
 +        }
 +
 +        public ReadCommand deserialize(DataInputPlus in, int version) throws 
IOException
 +        {
 +            return version < MessagingService.VERSION_30
 +                 ? legacyRangeSliceCommandSerializer.deserialize(in, version)
 +                 : serializer.deserialize(in, version);
 +        }
 +
 +        public long serializedSize(ReadCommand command, int version)
 +        {
 +            return version < MessagingService.VERSION_30
 +                 ? legacyRangeSliceCommandSerializer.serializedSize(command, 
version)
 +                 : serializer.serializedSize(command, version);
 +        }
 +    }
 +
 +    private enum LegacyType
 +    {
 +        GET_BY_NAMES((byte)1),
 +        GET_SLICES((byte)2);
 +
 +        public final byte serializedValue;
 +
 +        LegacyType(byte b)
 +        {
 +            this.serializedValue = b;
 +        }
 +
 +        public static LegacyType 
fromPartitionFilterKind(ClusteringIndexFilter.Kind kind)
 +        {
 +            return kind == ClusteringIndexFilter.Kind.SLICE
 +                   ? GET_SLICES
 +                   : GET_BY_NAMES;
 +        }
 +
 +        public static LegacyType fromSerializedValue(byte b)
 +        {
 +            return b == 1 ? GET_BY_NAMES : GET_SLICES;
 +        }
 +    }
 +
 +    /**
 +     * Serializer for pre-3.0 RangeSliceCommands.
 +     */
 +    private static class LegacyRangeSliceCommandSerializer implements 
IVersionedSerializer<ReadCommand>
      {
 -        out.writeByte(command.commandType.serializedValue);
 -        switch (command.commandType)
 +        public void serialize(ReadCommand command, DataOutputPlus out, int 
version) throws IOException
 +        {
 +            assert version < MessagingService.VERSION_30;
 +
 +            PartitionRangeReadCommand rangeCommand = 
(PartitionRangeReadCommand) command;
 +            assert !rangeCommand.dataRange().isPaging();
 +
 +            // convert pre-3.0 incompatible names filters to slice filters
 +            rangeCommand = maybeConvertNamesToSlice(rangeCommand);
 +
 +            CFMetaData metadata = rangeCommand.metadata();
 +
 +            out.writeUTF(metadata.ksName);
 +            out.writeUTF(metadata.cfName);
 +            out.writeLong(rangeCommand.nowInSec() * 1000L);  // convert from 
seconds to millis
 +
 +            // begin DiskAtomFilterSerializer.serialize()
 +            if (rangeCommand.isNamesQuery())
 +            {
 +                out.writeByte(1);  // 0 for slices, 1 for names
 +                ClusteringIndexNamesFilter filter = 
(ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +                
LegacyReadCommandSerializer.serializeNamesFilter(rangeCommand, filter, out);
 +            }
 +            else
 +            {
 +                out.writeByte(0);  // 0 for slices, 1 for names
 +
 +                // slice filter serialization
 +                ClusteringIndexSliceFilter filter = 
(ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +
 +                boolean makeStaticSlice = 
!rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && 
!filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
 +                LegacyReadCommandSerializer.serializeSlices(out, 
filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata);
 +
 +                out.writeBoolean(filter.isReversed());
 +
 +                // limit
 +                DataLimits.Kind kind = rangeCommand.limits().kind();
 +                boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || 
kind == DataLimits.Kind.CQL_PAGING_LIMIT) && 
rangeCommand.limits().perPartitionCount() == 1;
 +                if (isDistinct)
 +                    out.writeInt(1);
 +                else
 +                    
out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(),
 filter.requestedSlices()));
 +
 +                int compositesToGroup;
 +                boolean selectsStatics = 
!rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() || 
filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
 +                if (kind == DataLimits.Kind.THRIFT_LIMIT)
 +                    compositesToGroup = -1;
 +                else if (isDistinct && !selectsStatics)
 +                    compositesToGroup = -2;  // for DISTINCT queries 
(CASSANDRA-8490)
 +                else
 +                    compositesToGroup = metadata.isDense() ? -1 : 
metadata.clusteringColumns().size();
 +
 +                out.writeInt(compositesToGroup);
 +            }
 +
 +            serializeRowFilter(out, rangeCommand.rowFilter());
 +            
AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(),
 out, version);
 +
 +            // maxResults
 +            out.writeInt(rangeCommand.limits().count());
 +
 +            // countCQL3Rows
 +            if (rangeCommand.isForThrift() || 
rangeCommand.limits().perPartitionCount() == 1)  // if for Thrift or DISTINCT
 +                out.writeBoolean(false);
 +            else
 +                out.writeBoolean(true);
 +
 +            // isPaging
 +            out.writeBoolean(false);
 +        }
 +
 +        public ReadCommand deserialize(DataInputPlus in, int version) throws 
IOException
 +        {
 +            assert version < MessagingService.VERSION_30;
 +
 +            String keyspace = in.readUTF();
 +            String columnFamily = in.readUTF();
 +
 +            CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, 
columnFamily);
 +            if (metadata == null)
 +            {
 +                String message = String.format("Got legacy range command for 
nonexistent table %s.%s.", keyspace, columnFamily);
 +                throw new UnknownColumnFamilyException(message, null);
 +            }
 +
 +            int nowInSec = (int) (in.readLong() / 1000);  // convert from 
millis to seconds
 +
 +            ClusteringIndexFilter filter;
 +            ColumnFilter selection;
 +            int compositesToGroup = 0;
 +            int perPartitionLimit = -1;
 +            byte readType = in.readByte();  // 0 for slices, 1 for names
 +            if (readType == 1)
 +            {
 +                Pair<ColumnFilter, ClusteringIndexNamesFilter> 
selectionAndFilter = 
LegacyReadCommandSerializer.deserializeNamesSelectionAndFilter(in, metadata);
 +                selection = selectionAndFilter.left;
 +                filter = selectionAndFilter.right;
 +            }
 +            else
 +            {
 +                Pair<ClusteringIndexSliceFilter, Boolean> p = 
LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata);
 +                filter = p.left;
 +                perPartitionLimit = in.readInt();
 +                compositesToGroup = in.readInt();
 +                selection = getColumnSelectionForSlice(p.right, 
compositesToGroup, metadata);
 +            }
 +
 +            RowFilter rowFilter = deserializeRowFilter(in, metadata);
 +
 +            AbstractBounds<PartitionPosition> keyRange = 
AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, 
version);
 +            int maxResults = in.readInt();
 +
 +            in.readBoolean();  // countCQL3Rows (not needed)
 +            in.readBoolean();  // isPaging (not needed)
 +
 +            boolean selectsStatics = 
(!selection.fetchedColumns().statics.isEmpty() || 
filter.selects(Clustering.STATIC_CLUSTERING));
 +            boolean isDistinct = compositesToGroup == -2 || 
(perPartitionLimit == 1 && selectsStatics);
 +            DataLimits limits;
 +            if (isDistinct)
 +                limits = DataLimits.distinctLimits(maxResults);
 +            else if (compositesToGroup == -1)
 +                limits = DataLimits.thriftLimits(maxResults, 
perPartitionLimit);
 +            else
 +                limits = DataLimits.cqlLimits(maxResults);
 +
 +            return new PartitionRangeReadCommand(false, 0, true, metadata, 
nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), 
Optional.empty());
 +        }
 +
 +        static void serializeRowFilter(DataOutputPlus out, RowFilter 
rowFilter) throws IOException
 +        {
 +            ArrayList<RowFilter.Expression> indexExpressions = 
Lists.newArrayList(rowFilter.iterator());
 +            out.writeInt(indexExpressions.size());
 +            for (RowFilter.Expression expression : indexExpressions)
 +            {
 +                
ByteBufferUtil.writeWithShortLength(expression.column().name.bytes, out);
 +                expression.operator().writeTo(out);
 +                
ByteBufferUtil.writeWithShortLength(expression.getIndexValue(), out);
 +            }
 +        }
 +
 +        static RowFilter deserializeRowFilter(DataInputPlus in, CFMetaData 
metadata) throws IOException
 +        {
 +            int numRowFilters = in.readInt();
 +            if (numRowFilters == 0)
 +                return RowFilter.NONE;
 +
 +            RowFilter rowFilter = RowFilter.create(numRowFilters);
 +            for (int i = 0; i < numRowFilters; i++)
 +            {
 +                ByteBuffer columnName = 
ByteBufferUtil.readWithShortLength(in);
 +                ColumnDefinition column = 
metadata.getColumnDefinition(columnName);
 +                Operator op = Operator.readFrom(in);
 +                ByteBuffer indexValue = 
ByteBufferUtil.readWithShortLength(in);
 +                rowFilter.add(column, op, indexValue);
 +            }
 +            return rowFilter;
 +        }
 +
 +        static long serializedRowFilterSize(RowFilter rowFilter)
 +        {
 +            long size = TypeSizes.sizeof(0);  // rowFilterCount
 +            for (RowFilter.Expression expression : rowFilter)
 +            {
 +                size += 
ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
 +                size += TypeSizes.sizeof(0);  // operator int value
 +                size += 
ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
 +            }
 +            return size;
 +        }
 +
 +        public long serializedSize(ReadCommand command, int version)
 +        {
 +            assert version < MessagingService.VERSION_30;
 +            assert command.kind == Kind.PARTITION_RANGE;
 +
 +            PartitionRangeReadCommand rangeCommand = 
(PartitionRangeReadCommand) command;
 +            rangeCommand = maybeConvertNamesToSlice(rangeCommand);
 +            CFMetaData metadata = rangeCommand.metadata();
 +
 +            long size = TypeSizes.sizeof(metadata.ksName);
 +            size += TypeSizes.sizeof(metadata.cfName);
 +            size += TypeSizes.sizeof((long) rangeCommand.nowInSec());
 +
 +            size += 1;  // single byte flag: 0 for slices, 1 for names
 +            if (rangeCommand.isNamesQuery())
 +            {
 +                PartitionColumns columns = 
rangeCommand.columnFilter().fetchedColumns();
 +                ClusteringIndexNamesFilter filter = 
(ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +                size += 
LegacyReadCommandSerializer.serializedNamesFilterSize(filter, metadata, 
columns);
 +            }
 +            else
 +            {
 +                ClusteringIndexSliceFilter filter = 
(ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +                boolean makeStaticSlice = 
!rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && 
!filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
 +                size += 
LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), 
makeStaticSlice, metadata);
 +                size += TypeSizes.sizeof(filter.isReversed());
 +                size += 
TypeSizes.sizeof(rangeCommand.limits().perPartitionCount());
 +                size += TypeSizes.sizeof(0); // compositesToGroup
 +            }
 +
 +            if (rangeCommand.rowFilter().equals(RowFilter.NONE))
 +            {
 +                size += TypeSizes.sizeof(0);
 +            }
 +            else
 +            {
 +                ArrayList<RowFilter.Expression> indexExpressions = 
Lists.newArrayList(rangeCommand.rowFilter().iterator());
 +                size += TypeSizes.sizeof(indexExpressions.size());
 +                for (RowFilter.Expression expression : indexExpressions)
 +                {
 +                    size += 
ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
 +                    size += TypeSizes.sizeof(expression.operator().ordinal());
 +                    size += 
ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
 +                }
 +            }
 +
 +            size += 
AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(),
 version);
 +            size += TypeSizes.sizeof(rangeCommand.limits().count());
 +            size += TypeSizes.sizeof(!rangeCommand.isForThrift());
 +            return size + 
TypeSizes.sizeof(rangeCommand.dataRange().isPaging());
 +        }
 +
 +        static PartitionRangeReadCommand 
maybeConvertNamesToSlice(PartitionRangeReadCommand command)
          {
 -            case GET_BY_NAMES:
 -                SliceByNamesReadCommand.serializer.serialize(command, out, 
version);
 -                break;
 -            case GET_SLICES:
 -                SliceFromReadCommand.serializer.serialize(command, out, 
version);
 -                break;
 -            default:
 -                throw new AssertionError();
 +            if (!command.dataRange().isNamesQuery())
 +                return command;
 +
 +            CFMetaData metadata = command.metadata();
 +            if 
(!LegacyReadCommandSerializer.shouldConvertNamesToSlice(metadata, 
command.columnFilter().fetchedColumns()))
 +                return command;
 +
 +            ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) 
command.dataRange().clusteringIndexFilter;
 +            ClusteringIndexSliceFilter sliceFilter = 
LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata);
 +            DataRange newRange = new 
DataRange(command.dataRange().keyRange(), sliceFilter);
 +            return new PartitionRangeReadCommand(
 +                    command.isDigestQuery(), command.digestVersion(), 
command.isForThrift(), metadata, command.nowInSec(),
 +                    command.columnFilter(), command.rowFilter(), 
command.limits(), newRange, Optional.empty());
 +        }
 +
 +        static ColumnFilter getColumnSelectionForSlice(boolean 
selectsStatics, int compositesToGroup, CFMetaData metadata)
 +        {
 +            // A value of -2 indicates this is a DISTINCT query that doesn't 
select static columns, only partition keys.
 +            // In that case, we'll basically be querying the first row of the 
partition, but we must make sure we include
 +            // all columns so we get at least one cell if there is a live row 
as it would confuse pre-3.0 nodes otherwise.
 +            if (compositesToGroup == -2)
 +                return ColumnFilter.all(metadata);
 +
 +            // if a slice query from a pre-3.0 node doesn't cover statics, we 
shouldn't select them at all
 +            PartitionColumns columns = selectsStatics
 +                                     ? metadata.partitionColumns()
 +                                     : 
metadata.partitionColumns().withoutStatics();
 +            return ColumnFilter.selectionBuilder().addAll(columns).build();
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7b430eee/src/java/org/apache/cassandra/index/IndexNotAvailableException.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/IndexNotAvailableException.java
index 0000000,0000000..5440e2a
new file mode 100644
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/IndexNotAvailableException.java
@@@ -1,0 -1,0 +1,34 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.cassandra.index;
++
++/**
++ * Thrown if a secondary index is not currently available.
++ */
++public final class IndexNotAvailableException extends RuntimeException
++{
++    /**
++     * Creates a new <code>IndexNotAvailableException</code> for the 
specified index.
++     * @param name the index name
++     */
++    public IndexNotAvailableException(Index index)
++    {
++        super(String.format("The secondary index '%s' is not yet available", 
index.getIndexMetadata().name));
++    }
++}

Reply via email to