Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3e4d000c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3e4d000c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3e4d000c Branch: refs/heads/trunk Commit: 3e4d000c9e3ffa2df88c32d78c866e0598898dd4 Parents: 76efcc6 7ad1945 Author: Aleksey Yeschenko <[email protected]> Authored: Wed Aug 30 17:32:09 2017 +0100 Committer: Aleksey Yeschenko <[email protected]> Committed: Wed Aug 30 17:37:41 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cql3/statements/SelectStatement.java | 16 +- .../db/AbstractReadCommandBuilder.java | 2 +- .../cassandra/db/PartitionRangeReadCommand.java | 112 +++++++++++--- .../org/apache/cassandra/db/ReadCommand.java | 131 +++++++++------- .../db/SinglePartitionReadCommand.java | 148 ++++++++++++++----- .../cassandra/index/SecondaryIndexManager.java | 8 +- .../internal/composites/CompositesSearcher.java | 3 +- .../index/internal/keys/KeysSearcher.java | 3 +- .../cassandra/service/AbstractReadExecutor.java | 4 +- .../service/pager/PartitionRangeQueryPager.java | 8 +- test/unit/org/apache/cassandra/Util.java | 25 +--- .../apache/cassandra/db/SecondaryIndexTest.java | 10 +- .../db/SinglePartitionSliceCommandTest.java | 29 ++-- .../cassandra/index/sasi/SASIIndexTest.java | 43 +++--- .../cassandra/io/sstable/SSTableReaderTest.java | 2 +- .../cassandra/service/ReadExecutorTest.java | 2 +- 17 files changed, 338 insertions(+), 209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 77eebcf,85efafb..84fef5e --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@@ -569,18 -552,10 +569,10 @@@ public class SelectStatement implement if (keyBounds == null) return ReadQuery.EMPTY; - PartitionRangeReadCommand command = new PartitionRangeReadCommand(table, - nowInSec, - columnFilter, - rowFilter, - limit, - new DataRange(keyBounds, clusteringIndexFilter), - Optional.empty()); - // If there's a secondary index that the command can use, have it validate - // the request parameters. Note that as a side effect, if a viable Index is - // identified by the CFS's index manager, it will be cached in the command - // and serialized during distribution to replicas in order to avoid performing - // further lookups. + PartitionRangeReadCommand command = - PartitionRangeReadCommand.create(false, cfm, nowInSec, queriedColumns, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter)); ++ PartitionRangeReadCommand.create(table, nowInSec, columnFilter, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter)); + + // If there's a secondary index that the command can use, have it validate the request parameters. command.maybeValidateIndex(); return command; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java index 8ced1c7,1c69813..481e906 --- a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java +++ b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java @@@ -336,10 -336,10 +336,10 @@@ public abstract class AbstractReadComma else bounds = new ExcludingBounds<>(start, end); - return new PartitionRangeReadCommand(cfs.metadata(), nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()), Optional.empty()); - return PartitionRangeReadCommand.create(false, cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter())); ++ return PartitionRangeReadCommand.create(cfs.metadata(), nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter())); } - static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey) + static DecoratedKey makeKey(TableMetadata metadata, Object... partitionKey) { if (partitionKey.length == 1 && partitionKey[0] instanceof DecoratedKey) return (DecoratedKey)partitionKey[0]; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index da7daa7,f7b6660..e88f7fb --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@@ -20,11 -20,11 +20,11 @@@ package org.apache.cassandra.db import java.io.IOException; import java.util.ArrayList; import java.util.List; - import java.util.Optional; + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; -import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.lifecycle.View; @@@ -59,30 -60,39 +59,36 @@@ public class PartitionRangeReadCommand private final DataRange dataRange; private int oldestUnrepairedTombstone = Integer.MAX_VALUE; - public PartitionRangeReadCommand(boolean isDigest, + private PartitionRangeReadCommand(boolean isDigest, - int digestVersion, - boolean isForThrift, - CFMetaData metadata, - int nowInSec, - ColumnFilter columnFilter, - RowFilter rowFilter, - DataLimits limits, - DataRange dataRange, - IndexMetadata index) + int digestVersion, + TableMetadata metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DataRange dataRange, - Optional<IndexMetadata> index) ++ IndexMetadata index) { - super(Kind.PARTITION_RANGE, isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits); - super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index); ++ super(Kind.PARTITION_RANGE, isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, index); this.dataRange = dataRange; - this.index = index; } - public PartitionRangeReadCommand(TableMetadata metadata, - int nowInSec, - ColumnFilter columnFilter, - RowFilter rowFilter, - DataLimits limits, - DataRange dataRange, - Optional<IndexMetadata> index) - public static PartitionRangeReadCommand create(boolean isForThrift, - CFMetaData metadata, ++ public static PartitionRangeReadCommand create(TableMetadata metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DataRange dataRange) { - this(false, 0, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange, index); + return new PartitionRangeReadCommand(false, + 0, - isForThrift, + metadata, + nowInSec, + columnFilter, + rowFilter, + limits, + dataRange, + findIndex(metadata, rowFilter)); } /** @@@ -93,9 -103,10 +99,11 @@@ * * @return a newly created read command that queries everything in the table. */ - public static PartitionRangeReadCommand allDataRead(CFMetaData metadata, int nowInSec) + public static PartitionRangeReadCommand allDataRead(TableMetadata metadata, int nowInSec) { - return new PartitionRangeReadCommand(metadata, - return new PartitionRangeReadCommand(false, 0, false, ++ return new PartitionRangeReadCommand(false, ++ 0, + metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, @@@ -142,18 -152,86 +149,67 @@@ // DataLimits.CQLGroupByLimits.GroupByAwareCounter assumes that if GroupingState.hasClustering(), then we're in // the middle of a group, but we can't make that assumption if we query and range "in advance" of where we are // on the ring. - DataLimits newLimits = isRangeContinuation ? limits() : limits().withoutState(); - return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, newRange, index); + return new PartitionRangeReadCommand(isDigestQuery(), + digestVersion(), - isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + isRangeContinuation ? limits() : limits().withoutState(), + dataRange().forSubRange(range), + indexMetadata()); } public PartitionRangeReadCommand copy() { - return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange(), index); + return new PartitionRangeReadCommand(isDigestQuery(), + digestVersion(), - isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits(), + dataRange(), + indexMetadata()); + } + + public PartitionRangeReadCommand copyAsDigestQuery() + { + return new PartitionRangeReadCommand(true, + digestVersion(), - isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits(), + dataRange(), + indexMetadata()); + } + + public ReadCommand withUpdatedLimit(DataLimits newLimits) + { + return new PartitionRangeReadCommand(isDigestQuery(), + digestVersion(), - isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + newLimits, + dataRange(), + indexMetadata()); } - public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits) - public PartitionRangeReadCommand withUpdatedDataRange(DataRange newDataRange) - { - return new PartitionRangeReadCommand(isDigestQuery(), - digestVersion(), - isForThrift(), - metadata(), - nowInSec(), - columnFilter(), - rowFilter(), - limits(), - newDataRange, - indexMetadata()); - } - + public PartitionRangeReadCommand withUpdatedLimitsAndDataRange(DataLimits newLimits, DataRange newDataRange) { - return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange(), index); + return new PartitionRangeReadCommand(isDigestQuery(), + digestVersion(), - isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + newLimits, + newDataRange, + indexMetadata()); } public long getTimeout() @@@ -194,10 -272,11 +250,11 @@@ metric.rangeLatency.addNano(latencyNanos); } - protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadExecutionController executionController) + @VisibleForTesting + public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadExecutionController executionController) { ColumnFamilyStore.ViewFragment view = cfs.select(View.selectLive(dataRange().keyRange())); - Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().getKeyValidator())); + Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().partitionKeyType)); // fetch data from current memtable, historical memtables, and SSTables in the correct order. final List<UnfilteredPartitionIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size()); @@@ -356,7 -438,17 +413,16 @@@ private static class Deserializer extends SelectionDeserializer { - public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, TableMetadata metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) + public ReadCommand deserialize(DataInputPlus in, + int version, + boolean isDigest, + int digestVersion, - boolean isForThrift, - CFMetaData metadata, ++ TableMetadata metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + IndexMetadata index) throws IOException { DataRange range = DataRange.serializer.deserialize(in, version, metadata); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ReadCommand.java index 08224bf,54389f0..e135902 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@@ -18,9 -18,13 +18,10 @@@ package org.apache.cassandra.db; import java.io.IOException; -import java.nio.ByteBuffer; --import java.util.*; import java.util.function.Predicate; + import javax.annotation.Nullable; + -import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -32,22 -37,23 +33,22 @@@ import org.apache.cassandra.db.partitio import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.transform.StoppingTransformation; import org.apache.cassandra.db.transform.Transformation; -import org.apache.cassandra.dht.AbstractBounds; ++import org.apache.cassandra.exceptions.UnknownIndexException; import org.apache.cassandra.index.Index; import org.apache.cassandra.index.IndexNotAvailableException; -import org.apache.cassandra.io.ForwardingVersionedSerializer; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.net.MessageOut; -import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.IndexMetadata; -import org.apache.cassandra.schema.UnknownIndexException; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; - import org.apache.cassandra.exceptions.UnknownIndexException; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; /** * General interface for storage-engine read commands (common to both range and @@@ -69,23 -112,27 +70,25 @@@ public abstract class ReadCommand exten private final RowFilter rowFilter; private final DataLimits limits; - // SecondaryIndexManager will attempt to provide the most selective of any available indexes - // during execution. Here we also store an the results of that lookup to repeating it over - // the lifetime of the command. - protected Optional<IndexMetadata> index = Optional.empty(); - - // Flag to indicate whether the index manager has been queried to select an index for this - // command. This is necessary as the result of that lookup may be null, in which case we - // still don't want to repeat it. - private boolean indexManagerQueried = false; - - private boolean isDigestQuery; + private final boolean isDigestQuery; // if a digest query, the version for which the digest is expected. Ignored if not a digest. private int digestVersion; - private final boolean isForThrift; + @Nullable + private final IndexMetadata index; + protected static abstract class SelectionDeserializer { - public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, TableMetadata metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) throws IOException; + public abstract ReadCommand deserialize(DataInputPlus in, + int version, + boolean isDigest, + int digestVersion, - boolean isForThrift, - CFMetaData metadata, ++ TableMetadata metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + IndexMetadata index) throws IOException; } protected enum Kind @@@ -249,6 -288,40 +242,30 @@@ } /** - * Whether this query is for thrift or not. - * - * @return whether this query is for thrift. - */ - public boolean isForThrift() - { - return isForThrift; - } - - /** + * Index (metadata) chosen for this query. Can be null. + * + * @return index (metadata) chosen for this query + */ + @Nullable + public IndexMetadata indexMetadata() + { + return index; + } + + /** + * Index instance chosen for this query. Can be null. + * + * @return Index instance chosen for this query. Can be null. + */ + @Nullable + public Index index() + { + return null == index + ? null + : Keyspace.openAndGetStore(metadata).indexManager.getIndex(index); + } + + /** * The clustering index filter this command to use for the provided key. * <p> * Note that that method should only be called on a key actually queried by this command @@@ -289,25 -366,23 +310,23 @@@ public Index getIndex(ColumnFamilyStore cfs) { - // if we've already consulted the index manager, and it returned a valid index - // the result should be cached here. - if(index.isPresent()) - return cfs.indexManager.getIndex(index.get()); - - // if no cached index is present, but we've already consulted the index manager - // then no registered index is suitable for this command, so just return null. - if (indexManagerQueried) + return null != index + ? cfs.indexManager.getIndex(index) + : null; + } + - static IndexMetadata findIndex(CFMetaData table, RowFilter rowFilter) ++ static IndexMetadata findIndex(TableMetadata table, RowFilter rowFilter) + { - if (table.getIndexes().isEmpty() || rowFilter.isEmpty()) ++ if (table.indexes.isEmpty() || rowFilter.isEmpty()) return null; - // do the lookup, set the flag to indicate so and cache the result if not null - Index selected = cfs.indexManager.getBestIndexFor(this); - indexManagerQueried = true; + ColumnFamilyStore cfs = Keyspace.openAndGetStore(table); - if (selected == null) - return null; + Index index = cfs.indexManager.getBestIndexFor(rowFilter); - index = Optional.of(selected.getIndexMetadata()); - return selected; + return null != index + ? index.getIndexMetadata() + : null; } /** @@@ -619,11 -695,13 +638,11 @@@ public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException { - assert version >= MessagingService.VERSION_30; - out.writeByte(command.kind.ordinal()); - out.writeByte(digestFlag(command.isDigestQuery()) | indexFlag(command.index.isPresent())); - out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(null != command.index)); ++ out.writeByte(digestFlag(command.isDigestQuery()) | indexFlag(null != command.indexMetadata())); if (command.isDigestQuery()) out.writeUnsignedVInt(command.digestVersion()); - CFMetaData.serializer.serialize(command.metadata(), out, version); + command.metadata.id.serialize(out); out.writeInt(command.nowInSec()); ColumnFilter.serializer.serialize(command.columnFilter(), out, version); RowFilter.serializer.serialize(command.rowFilter(), out, version); @@@ -653,19 -726,17 +672,17 @@@ int nowInSec = in.readInt(); ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata); RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata); - DataLimits limits = DataLimits.serializer.deserialize(in, version, metadata.comparator); - Optional<IndexMetadata> index = hasIndex - ? deserializeIndexMetadata(in, version, metadata) - : Optional.empty(); + DataLimits limits = DataLimits.serializer.deserialize(in, version, metadata.comparator); + IndexMetadata index = hasIndex ? deserializeIndexMetadata(in, version, metadata) : null; - return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index); + return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, index); } - private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus in, int version, TableMetadata metadata) throws IOException - private IndexMetadata deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException ++ private IndexMetadata deserializeIndexMetadata(DataInputPlus in, int version, TableMetadata metadata) throws IOException { try { - return Optional.of(IndexMetadata.serializer.deserialize(in, version, metadata)); - return IndexMetadata.serializer.deserialize(in, version, cfm); ++ return IndexMetadata.serializer.deserialize(in, version, metadata); } catch (UnknownIndexException e) { @@@ -673,8 -744,8 +690,8 @@@ "If an index was just created, this is likely due to the schema not " + "being fully propagated. Local read will proceed without using the " + "index. Please wait for schema agreement after index creation.", - cfm.ksName, cfm.cfName, e.indexId); + metadata.keyspace, metadata.name, e.indexId); - return Optional.empty(); + return null; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index bd65535,c7080e7..f4f36d8 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@@ -22,6 -22,6 +22,7 @@@ import java.nio.ByteBuffer import java.util.*; import java.util.stream.Collectors; ++import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; @@@ -71,17 -73,19 +72,19 @@@ public class SinglePartitionReadComman private int oldestUnrepairedTombstone = Integer.MAX_VALUE; - public SinglePartitionReadCommand(boolean isDigest, - int digestVersion, - TableMetadata metadata, - int nowInSec, - ColumnFilter columnFilter, - RowFilter rowFilter, - DataLimits limits, - DecoratedKey partitionKey, - ClusteringIndexFilter clusteringIndexFilter) - private SinglePartitionReadCommand(boolean isDigest, - int digestVersion, - boolean isForThrift, - CFMetaData metadata, - int nowInSec, - ColumnFilter columnFilter, - RowFilter rowFilter, - DataLimits limits, - DecoratedKey partitionKey, - ClusteringIndexFilter clusteringIndexFilter, - IndexMetadata index) ++ @VisibleForTesting ++ protected SinglePartitionReadCommand(boolean isDigest, ++ int digestVersion, ++ TableMetadata metadata, ++ int nowInSec, ++ ColumnFilter columnFilter, ++ RowFilter rowFilter, ++ DataLimits limits, ++ DecoratedKey partitionKey, ++ ClusteringIndexFilter clusteringIndexFilter, ++ IndexMetadata index) { - super(Kind.SINGLE_PARTITION, isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits); - super(Kind.SINGLE_PARTITION, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index); ++ super(Kind.SINGLE_PARTITION, isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, index); assert partitionKey.getPartitioner() == metadata.partitioner; this.partitionKey = partitionKey; this.clusteringIndexFilter = clusteringIndexFilter; @@@ -97,6 -102,43 +100,41 @@@ * @param limits the limits to use for the query. * @param partitionKey the partition key for the partition to query. * @param clusteringIndexFilter the clustering index filter to use for the query. + * @param indexMetadata explicitly specified index to use for the query + * + * @return a newly created read command. + */ - public static SinglePartitionReadCommand create(boolean isForThrift, - CFMetaData metadata, ++ public static SinglePartitionReadCommand create(TableMetadata metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DecoratedKey partitionKey, + ClusteringIndexFilter clusteringIndexFilter, + IndexMetadata indexMetadata) + { + return new SinglePartitionReadCommand(false, + 0, - isForThrift, + metadata, + nowInSec, + columnFilter, + rowFilter, + limits, + partitionKey, + clusteringIndexFilter, + indexMetadata); + } + + /** + * Creates a new read command on a single partition. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param columnFilter the column filter to use for the query. + * @param rowFilter the row filter to use for the query. + * @param limits the limits to use for the query. + * @param partitionKey the partition key for the partition to query. + * @param clusteringIndexFilter the clustering index filter to use for the query. * * @return a newly created read command. */ @@@ -108,7 -176,15 +146,14 @@@ DecoratedKey partitionKey, ClusteringIndexFilter clusteringIndexFilter) { - return new SinglePartitionReadCommand(false, 0, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); - return create(isForThrift, - metadata, ++ return create(metadata, + nowInSec, + columnFilter, + rowFilter, + limits, + partitionKey, + clusteringIndexFilter, + findIndex(metadata, rowFilter)); } /** @@@ -122,7 -198,11 +167,11 @@@ * * @return a newly created read command. The returned command will use no row filter and have no limits. */ - public static SinglePartitionReadCommand create(TableMetadata metadata, int nowInSec, DecoratedKey key, ColumnFilter columnFilter, ClusteringIndexFilter filter) - public static SinglePartitionReadCommand create(CFMetaData metadata, ++ public static SinglePartitionReadCommand create(TableMetadata metadata, + int nowInSec, + DecoratedKey key, + ColumnFilter columnFilter, + ClusteringIndexFilter filter) { return create(metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, key, filter); } @@@ -136,9 -216,9 +185,9 @@@ * * @return a newly created read command that queries all the rows of {@code key}. */ - public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, DecoratedKey key) + public static SinglePartitionReadCommand fullPartitionRead(TableMetadata metadata, int nowInSec, DecoratedKey key) { - return SinglePartitionReadCommand.create(metadata, nowInSec, key, Slices.ALL); + return create(metadata, nowInSec, key, Slices.ALL); } /** @@@ -150,9 -230,9 +199,9 @@@ * * @return a newly created read command that queries all the rows of {@code key}. */ - public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, ByteBuffer key) + public static SinglePartitionReadCommand fullPartitionRead(TableMetadata metadata, int nowInSec, ByteBuffer key) { - return SinglePartitionReadCommand.create(metadata, nowInSec, metadata.partitioner.decorateKey(key), Slices.ALL); - return create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL); ++ return create(metadata, nowInSec, metadata.partitioner.decorateKey(key), Slices.ALL); } /** @@@ -182,10 -262,10 +231,10 @@@ * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will * query every columns for the table (without limit or row filtering) and be in forward order. */ - public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slices slices) + public static SinglePartitionReadCommand create(TableMetadata metadata, int nowInSec, DecoratedKey key, Slices slices) { ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(slices, false); - return SinglePartitionReadCommand.create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter); + return create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter); } /** @@@ -215,10 -295,10 +264,10 @@@ * @return a newly created read command that queries the {@code names} in {@code key}. The returned query will * query every columns (without limit or row filtering) and be in forward order. */ - public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, NavigableSet<Clustering> names) + public static SinglePartitionReadCommand create(TableMetadata metadata, int nowInSec, DecoratedKey key, NavigableSet<Clustering> names) { ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(names, false); - return SinglePartitionReadCommand.create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter); + return create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter); } /** @@@ -239,9 -319,99 +288,46 @@@ public SinglePartitionReadCommand copy() { - return new SinglePartitionReadCommand(isDigestQuery(), digestVersion(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter()); + return new SinglePartitionReadCommand(isDigestQuery(), + digestVersion(), - isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits(), + partitionKey(), + clusteringIndexFilter(), + indexMetadata()); + } + + public SinglePartitionReadCommand copyAsDigestQuery() + { + return new SinglePartitionReadCommand(true, + digestVersion(), - isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits(), + partitionKey(), + clusteringIndexFilter(), + indexMetadata()); + } + + public SinglePartitionReadCommand withUpdatedLimit(DataLimits newLimits) + { + return new SinglePartitionReadCommand(isDigestQuery(), + digestVersion(), - isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + newLimits, + partitionKey(), + clusteringIndexFilter(), + indexMetadata()); } - public SinglePartitionReadCommand withUpdatedClusteringIndexFilter(ClusteringIndexFilter filter) - { - return new SinglePartitionReadCommand(isDigestQuery(), - digestVersion(), - isForThrift(), - metadata(), - nowInSec(), - columnFilter(), - rowFilter(), - limits(), - partitionKey(), - filter, - indexMetadata()); - } - - static SinglePartitionReadCommand legacySliceCommand(boolean isDigest, - int digestVersion, - CFMetaData metadata, - int nowInSec, - ColumnFilter columnFilter, - DataLimits limits, - DecoratedKey partitionKey, - ClusteringIndexSliceFilter filter) - { - // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift - return new SinglePartitionReadCommand(isDigest, - digestVersion, - true, - metadata, - nowInSec, - columnFilter, - RowFilter.NONE, - limits, - partitionKey, - filter, - null); - } - - static SinglePartitionReadCommand legacyNamesCommand(boolean isDigest, - int digestVersion, - CFMetaData metadata, - int nowInSec, - ColumnFilter columnFilter, - DecoratedKey partitionKey, - ClusteringIndexNamesFilter filter) - { - // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift - return new SinglePartitionReadCommand(isDigest, digestVersion, true, metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, partitionKey, filter,null); - } - public DecoratedKey partitionKey() { return partitionKey; @@@ -1094,12 -1260,22 +1167,21 @@@ private static class Deserializer extends SelectionDeserializer { - public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, TableMetadata metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) + public ReadCommand deserialize(DataInputPlus in, + int version, + boolean isDigest, + int digestVersion, - boolean isForThrift, - CFMetaData metadata, ++ TableMetadata metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + IndexMetadata index) throws IOException { - DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in, DatabaseDescriptor.getMaxValueSize())); + DecoratedKey key = metadata.partitioner.decorateKey(metadata.partitionKeyType.readValue(in, DatabaseDescriptor.getMaxValueSize())); ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata); - return new SinglePartitionReadCommand(isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter); - return new SinglePartitionReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index); ++ return new SinglePartitionReadCommand(isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/src/java/org/apache/cassandra/index/SecondaryIndexManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/src/java/org/apache/cassandra/service/AbstractReadExecutor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/SecondaryIndexTest.java index eecb55e,f2100db..df5e7ce --- a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java +++ b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java @@@ -204,10 -202,10 +204,10 @@@ public class SecondaryIndexTes // verify that it's not being indexed under any other value either ReadCommand rc = Util.cmd(cfs).build(); - assertNull(cfs.indexManager.getBestIndexFor(rc)); + assertNull(rc.index()); // resurrect w/ a newer timestamp - new RowUpdateBuilder(cfs.metadata, 2, "k1").clustering("c").add("birthdate", 1L).build().apply();; + new RowUpdateBuilder(cfs.metadata(), 2, "k1").clustering("c").add("birthdate", 1L).build().apply();; assertIndexedOne(cfs, col, 1L); // verify that row and delete w/ older timestamp does nothing @@@ -220,15 -218,15 +220,15 @@@ // delete the entire row (w/ newer timestamp this time) // todo - checking the # of index searchers for the command is probably not the best thing to test here - RowUpdateBuilder.deleteRow(cfs.metadata, 3, "k1", "c").applyUnsafe(); + RowUpdateBuilder.deleteRow(cfs.metadata(), 3, "k1", "c").applyUnsafe(); rc = Util.cmd(cfs).build(); - assertNull(cfs.indexManager.getBestIndexFor(rc)); + assertNull(rc.index()); // make sure obsolete mutations don't generate an index entry // todo - checking the # of index searchers for the command is probably not the best thing to test here - new RowUpdateBuilder(cfs.metadata, 3, "k1").clustering("c").add("birthdate", 1L).build().apply();; + new RowUpdateBuilder(cfs.metadata(), 3, "k1").clustering("c").add("birthdate", 1L).build().apply();; rc = Util.cmd(cfs).build(); - assertNull(cfs.indexManager.getBestIndexFor(rc)); + assertNull(rc.index()); } @Test @@@ -533,10 -520,10 +533,10 @@@ } private void assertIndexedCount(ColumnFamilyStore cfs, ByteBuffer col, Object val, int count) { - ColumnDefinition cdef = cfs.metadata.getColumnDefinition(col); + ColumnMetadata cdef = cfs.metadata().getColumn(col); ReadCommand rc = Util.cmd(cfs).filterOn(cdef.name.toString(), Operator.EQ, ((AbstractType) cdef.cellValueType()).decompose(val)).build(); - Index.Searcher searcher = cfs.indexManager.getBestIndexFor(rc).searcherFor(rc); + Index.Searcher searcher = rc.index().searcherFor(rc); if (count != 0) assertNotNull(searcher); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java index 1d4bdb6,b056da1..f79066b --- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java +++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java @@@ -115,15 -174,16 +115,15 @@@ public class SinglePartitionSliceComman QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, s) VALUES ('k1', 's')"); Assert.assertFalse(QueryProcessor.executeInternal("SELECT s FROM ks.tbl WHERE k='k1'").isEmpty()); - ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(s)); + ColumnFilter columnFilter = ColumnFilter.selection(RegularAndStaticColumns.of(s)); ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.NONE, false); - ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, metadata, - FBUtilities.nowInSeconds(), - columnFilter, - RowFilter.NONE, - DataLimits.NONE, - key, - sliceFilter); - ReadCommand cmd = SinglePartitionReadCommand.create(true, - cfm, ++ ReadCommand cmd = SinglePartitionReadCommand.create(metadata, + FBUtilities.nowInSeconds(), + columnFilter, + RowFilter.NONE, + DataLimits.NONE, + key, + sliceFilter); // check raw iterator for static cell try (ReadExecutionController executionController = cmd.executionController(); UnfilteredPartitionIterator pi = cmd.executeLocally(executionController)) @@@ -170,19 -230,20 +170,18 @@@ @Test public void toCQLStringIsSafeToCall() throws IOException { - DecoratedKey key = cfm.decorateKey(ByteBufferUtil.bytes("k1")); + DecoratedKey key = metadata.partitioner.decorateKey(ByteBufferUtil.bytes("k1")); - ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(s)); + ColumnFilter columnFilter = ColumnFilter.selection(RegularAndStaticColumns.of(s)); Slice slice = Slice.make(ClusteringBound.BOTTOM, ClusteringBound.inclusiveEndOf(ByteBufferUtil.bytes("i1"))); - ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.with(cfm.comparator, slice), false); - ReadCommand cmd = SinglePartitionReadCommand.create(true, - cfm, + ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.with(metadata.comparator, slice), false); - ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, metadata, - FBUtilities.nowInSeconds(), - columnFilter, - RowFilter.NONE, - DataLimits.NONE, - key, - sliceFilter); - ++ ReadCommand cmd = SinglePartitionReadCommand.create(metadata, + FBUtilities.nowInSeconds(), + columnFilter, + RowFilter.NONE, + DataLimits.NONE, + key, + sliceFilter); - String ret = cmd.toCQLString(); Assert.assertNotNull(ret); Assert.assertFalse(ret.isEmpty()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java index 986e604,03d89e1..406832a --- a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java +++ b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java @@@ -1304,16 -1307,16 +1304,15 @@@ public class SASIIndexTes ColumnFamilyStore store = loadData(data1, true); RowFilter filter = RowFilter.create(); - filter.add(store.metadata.getColumnDefinition(firstName), Operator.LIKE_CONTAINS, AsciiType.instance.fromString("a")); + filter.add(store.metadata().getColumn(firstName), Operator.LIKE_CONTAINS, AsciiType.instance.fromString("a")); - ReadCommand command = new PartitionRangeReadCommand(store.metadata(), - FBUtilities.nowInSeconds(), - ColumnFilter.all(store.metadata()), - filter, - DataLimits.NONE, - DataRange.allData(store.metadata().partitioner), - Optional.empty()); - + ReadCommand command = - PartitionRangeReadCommand.create(false, - store.metadata, ++ PartitionRangeReadCommand.create(store.metadata(), + FBUtilities.nowInSeconds(), - ColumnFilter.all(store.metadata), ++ ColumnFilter.all(store.metadata()), + filter, + DataLimits.NONE, - DataRange.allData(store.metadata.partitioner)); ++ DataRange.allData(store.metadata().partitioner)); try { new QueryPlan(store, command, 0).execute(ReadExecutionController.empty()); @@@ -2267,16 -2270,17 +2266,16 @@@ put("key1", Pair.create("Pavel", 14)); }}, false); - ColumnIndex index = ((SASIIndex) store.indexManager.getIndexByName("first_name")).getIndex(); + ColumnIndex index = ((SASIIndex) store.indexManager.getIndexByName(store.name + "_first_name")).getIndex(); IndexMemtable beforeFlushMemtable = index.getCurrentMemtable(); - PartitionRangeReadCommand command = new PartitionRangeReadCommand(store.metadata(), - FBUtilities.nowInSeconds(), - ColumnFilter.all(store.metadata()), - RowFilter.NONE, - DataLimits.NONE, - DataRange.allData(store.getPartitioner()), - Optional.empty()); + PartitionRangeReadCommand command = - PartitionRangeReadCommand.create(false, - store.metadata, ++ PartitionRangeReadCommand.create(store.metadata(), + FBUtilities.nowInSeconds(), - ColumnFilter.all(store.metadata), ++ ColumnFilter.all(store.metadata()), + RowFilter.NONE, + DataLimits.NONE, + DataRange.allData(store.getPartitioner())); QueryController controller = new QueryController(store, command, Integer.MAX_VALUE); org.apache.cassandra.index.sasi.plan.Expression expression = @@@ -2408,15 -2412,16 +2407,15 @@@ RowFilter filter = RowFilter.create(); for (Expression e : expressions) - filter.add(store.metadata.getColumnDefinition(e.name), e.op, e.value); + filter.add(store.metadata().getColumn(e.name), e.op, e.value); - ReadCommand command = new PartitionRangeReadCommand(store.metadata(), - FBUtilities.nowInSeconds(), - columnFilter, - filter, - DataLimits.cqlLimits(maxResults), - range, - Optional.empty()); + ReadCommand command = - PartitionRangeReadCommand.create(false, - store.metadata, ++ PartitionRangeReadCommand.create(store.metadata(), + FBUtilities.nowInSeconds(), + columnFilter, + filter, - DataLimits.thriftLimits(maxResults, DataLimits.NO_LIMIT), ++ DataLimits.cqlLimits(maxResults), + range); return command.executeLocally(command.executionController()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/test/unit/org/apache/cassandra/service/ReadExecutorTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/service/ReadExecutorTest.java index fca8eca,0000000..7630cc6 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/service/ReadExecutorTest.java +++ b/test/unit/org/apache/cassandra/service/ReadExecutorTest.java @@@ -1,215 -1,0 +1,215 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import java.net.InetAddress; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.ImmutableList; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.exceptions.ReadFailureException; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.KeyspaceParams; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class ReadExecutorTest +{ + static Keyspace ks; + static ColumnFamilyStore cfs; + static List<InetAddress> targets; + + @BeforeClass + public static void setUpClass() throws Throwable + { + SchemaLoader.loadSchema(); + SchemaLoader.createKeyspace("Foo", KeyspaceParams.simple(3), SchemaLoader.standardCFMD("Foo", "Bar")); + ks = Keyspace.open("Foo"); + cfs = ks.getColumnFamilyStore("Bar"); + targets = ImmutableList.of(InetAddress.getByName("127.0.0.255"), InetAddress.getByName("127.0.0.254"), InetAddress.getByName("127.0.0.253")); + cfs.sampleLatencyNanos = 0; + } + + @Before + public void resetCounters() throws Throwable + { + cfs.metric.speculativeInsufficientReplicas.dec(cfs.metric.speculativeInsufficientReplicas.getCount()); + cfs.metric.speculativeRetries.dec(cfs.metric.speculativeRetries.getCount()); + cfs.metric.speculativeFailedRetries.dec(cfs.metric.speculativeFailedRetries.getCount()); + } + + /** + * If speculation would have been beneficial but could not be attempted due to lack of replicas + * count that it occured + */ + @Test + public void testUnableToSpeculate() throws Throwable + { + assertEquals(0, cfs.metric.speculativeInsufficientReplicas.getCount()); + assertEquals(0, ks.metric.speculativeInsufficientReplicas.getCount()); + AbstractReadExecutor executor = new AbstractReadExecutor.NeverSpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime(), true); + executor.maybeTryAdditionalReplicas(); + try + { + executor.get(); + fail(); + } + catch (ReadTimeoutException e) + { + //expected + } + assertEquals(1, cfs.metric.speculativeInsufficientReplicas.getCount()); + assertEquals(1, ks.metric.speculativeInsufficientReplicas.getCount()); + + //Shouldn't increment + executor = new AbstractReadExecutor.NeverSpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime(), false); + executor.maybeTryAdditionalReplicas(); + try + { + executor.get(); + fail(); + } + catch (ReadTimeoutException e) + { + //expected + } + assertEquals(1, cfs.metric.speculativeInsufficientReplicas.getCount()); + assertEquals(1, ks.metric.speculativeInsufficientReplicas.getCount()); + } + + /** + * Test that speculation when it is attempted is countedc, and when it succeed + * no failure is counted. + */ + @Test + public void testSpeculateSucceeded() throws Throwable + { + assertEquals(0, cfs.metric.speculativeRetries.getCount()); + assertEquals(0, cfs.metric.speculativeFailedRetries.getCount()); + assertEquals(0, ks.metric.speculativeRetries.getCount()); + assertEquals(0, ks.metric.speculativeFailedRetries.getCount()); + AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(TimeUnit.DAYS.toMillis(365)), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime()); + executor.maybeTryAdditionalReplicas(); + new Thread() + { + @Override + public void run() + { + //Failures end the read promptly but don't require mock data to be suppleid + executor.handler.onFailure(targets.get(0), RequestFailureReason.READ_TOO_MANY_TOMBSTONES); + executor.handler.onFailure(targets.get(1), RequestFailureReason.READ_TOO_MANY_TOMBSTONES); + executor.handler.condition.signalAll(); + } + }.start(); + + try + { + executor.get(); + fail(); + } + catch (ReadFailureException e) + { + //expected + } + assertEquals(1, cfs.metric.speculativeRetries.getCount()); + assertEquals(0, cfs.metric.speculativeFailedRetries.getCount()); + assertEquals(1, ks.metric.speculativeRetries.getCount()); + assertEquals(0, ks.metric.speculativeFailedRetries.getCount()); + + } + + /** + * Test that speculation failure statistics are incremented if speculation occurs + * and the read still times out. + */ + @Test + public void testSpeculateFailed() throws Throwable + { + assertEquals(0, cfs.metric.speculativeRetries.getCount()); + assertEquals(0, cfs.metric.speculativeFailedRetries.getCount()); + assertEquals(0, ks.metric.speculativeRetries.getCount()); + assertEquals(0, ks.metric.speculativeFailedRetries.getCount()); + AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime()); + executor.maybeTryAdditionalReplicas(); + try + { + executor.get(); + fail(); + } + catch (ReadTimeoutException e) + { + //expected + } + assertEquals(1, cfs.metric.speculativeRetries.getCount()); + assertEquals(1, cfs.metric.speculativeFailedRetries.getCount()); + assertEquals(1, ks.metric.speculativeRetries.getCount()); + assertEquals(1, ks.metric.speculativeFailedRetries.getCount()); + } + + public static class MockSinglePartitionReadCommand extends SinglePartitionReadCommand + { + private final long timeout; + + MockSinglePartitionReadCommand() + { + this(0); + } + + MockSinglePartitionReadCommand(long timeout) + { - super(false, 0, cfs.metadata(), 0, null, null, null, Util.dk("ry@n_luvs_teh_y@nk33z"), null); ++ super(false, 0, cfs.metadata(), 0, null, null, null, Util.dk("ry@n_luvs_teh_y@nk33z"), null, null); + this.timeout = timeout; + } + + @Override + public long getTimeout() + { + return timeout; + } + + @Override + public MessageOut createMessage() + { + return new MessageOut(MessagingService.Verb.BATCH_REMOVE) + { + @Override + public int serializedSize(int version) + { + return 0; + } + }; + } + + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
