Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7ad1945e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7ad1945e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7ad1945e Branch: refs/heads/trunk Commit: 7ad1945ee7592990027bee4fe6bbfcac72940954 Parents: 6d6081e 7f297bc Author: Aleksey Yeschenko <[email protected]> Authored: Wed Aug 30 17:04:49 2017 +0100 Committer: Aleksey Yeschenko <[email protected]> Committed: Wed Aug 30 17:08:53 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cql3/statements/SelectStatement.java | 16 +- .../db/AbstractReadCommandBuilder.java | 2 +- .../cassandra/db/PartitionRangeReadCommand.java | 149 ++++++++++--- .../org/apache/cassandra/db/ReadCommand.java | 149 +++++++------ .../db/SinglePartitionReadCommand.java | 209 +++++++++++++++---- .../cassandra/index/SecondaryIndexManager.java | 9 +- .../internal/composites/CompositesSearcher.java | 6 +- .../index/internal/keys/KeysSearcher.java | 3 +- .../cassandra/service/AbstractReadExecutor.java | 4 +- .../service/pager/PartitionRangeQueryPager.java | 8 +- .../cassandra/thrift/CassandraServer.java | 68 +++--- test/unit/org/apache/cassandra/Util.java | 26 +-- .../apache/cassandra/db/SecondaryIndexTest.java | 10 +- .../db/SinglePartitionSliceCommandTest.java | 45 ++-- .../cassandra/index/sasi/SASIIndexTest.java | 46 ++-- .../cassandra/io/sstable/SSTableReaderTest.java | 2 +- 17 files changed, 479 insertions(+), 274 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index c4aee3a,aca9e1f..d848eff --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,11 -1,6 +1,12 @@@ -3.0.15 +3.11.1 + * Fix cassandra-stress hang issues when an error during cluster connection happens (CASSANDRA-12938) + * Better bootstrap failure message when blocked by (potential) range movement (CASSANDRA-13744) + * "ignore" option is ignored in sstableloader (CASSANDRA-13721) + * Deadlock in AbstractCommitLogSegmentManager (CASSANDRA-13652) + * Duplicate the buffer before passing it to analyser in SASI operation (CASSANDRA-13512) + * Properly evict pstmts from prepared statements cache (CASSANDRA-13641) +Merged from 3.0: + * Fix race condition in read command serialization (CASSANDRA-13363) - * Enable segement creation before recovering commitlogs (CASSANDRA-13587) * Fix AssertionError in short read protection (CASSANDRA-13747) * Don't skip corrupted sstables on startup (CASSANDRA-13620) * Fix the merging of cells with different user type versions (CASSANDRA-13776) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index d0487a3,9e557e0..f7b6660 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@@ -121,41 -129,74 +130,108 @@@ public class PartitionRangeReadCommand return dataRange.isNamesQuery(); } - public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range) + /** + * Returns an equivalent command but that only queries data within the provided range. + * + * @param range the sub-range to restrict the command to. This method <b>assumes</b> that this is a proper sub-range + * of the command this is applied to. + * @param isRangeContinuation whether {@code range} is a direct continuation of whatever previous range we have + * queried. This matters for the {@code DataLimits} that may contain states when we do paging and in the context of + * parallel queries: that state only make sense if the range queried is indeed the follow-up of whatever range we've + * previously query (that yield said state). In practice this means that ranges for which {@code isRangeContinuation} + * is false may have to be slightly pessimistic when counting data and may include a little bit than necessary, and + * this should be dealt with post-query (in the case of {@code StorageProxy.getRangeSlice()}, which uses this method + * for replica queries, this is dealt with by re-counting results on the coordinator). Note that if this is the + * first range we queried, then the {@code DataLimits} will have not state and the value of this parameter doesn't + * matter. + */ + public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range, boolean isRangeContinuation) { - DataRange newRange = dataRange().forSubRange(range); + // If we're not a continuation of whatever range we've previously queried, we should ignore the states of the + // DataLimits as it's either useless, or misleading. This is particularly important for GROUP BY queries, where + // DataLimits.CQLGroupByLimits.GroupByAwareCounter assumes that if GroupingState.hasClustering(), then we're in + // the middle of a group, but we can't make that assumption if we query and range "in advance" of where we are + // on the ring. - DataLimits newLimits = isRangeContinuation ? limits() : limits().withoutState(); - return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, newRange, index); + return new PartitionRangeReadCommand(isDigestQuery(), + digestVersion(), + isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), - limits(), ++ isRangeContinuation ? limits() : limits().withoutState(), + dataRange().forSubRange(range), + indexMetadata()); } public PartitionRangeReadCommand copy() { - return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange(), index); + return new PartitionRangeReadCommand(isDigestQuery(), + digestVersion(), + isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits(), + dataRange(), + indexMetadata()); + } + + public PartitionRangeReadCommand copyAsDigestQuery() + { + return new PartitionRangeReadCommand(true, + digestVersion(), + isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits(), + dataRange(), + indexMetadata()); + } + ++ public ReadCommand withUpdatedLimit(DataLimits newLimits) ++ { ++ return new PartitionRangeReadCommand(isDigestQuery(), ++ digestVersion(), ++ isForThrift(), ++ metadata(), ++ nowInSec(), ++ columnFilter(), ++ rowFilter(), ++ newLimits, ++ dataRange(), ++ indexMetadata()); ++ } ++ + public PartitionRangeReadCommand withUpdatedDataRange(DataRange newDataRange) + { + return new PartitionRangeReadCommand(isDigestQuery(), + digestVersion(), + isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits(), + newDataRange, + indexMetadata()); } - public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits) + public PartitionRangeReadCommand withUpdatedLimitsAndDataRange(DataLimits newLimits, DataRange newDataRange) { - return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange(), index); + return new PartitionRangeReadCommand(isDigestQuery(), + digestVersion(), + isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + newLimits, + newDataRange, + indexMetadata()); } public long getTimeout() @@@ -196,7 -237,8 +272,8 @@@ metric.rangeLatency.addNano(latencyNanos); } - protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadExecutionController executionController) + @VisibleForTesting - public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup) ++ public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadExecutionController executionController) { ColumnFamilyStore.ViewFragment view = cfs.select(View.selectLive(dataRange().keyRange())); Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().getKeyValidator())); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ReadCommand.java index 050546c,66985b6..54389f0 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@@ -322,7 -329,12 +341,12 @@@ public abstract class ReadCommand exten */ public abstract ReadCommand copy(); + /** + * Returns a copy of this command with isDigestQuery set to true. + */ + public abstract ReadCommand copyAsDigestQuery(); + - protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadOrderGroup orderGroup); + protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadExecutionController executionController); protected abstract int oldestUnrepairedTombstone(); @@@ -684,9 -630,9 +705,9 @@@ out.writeInt(command.nowInSec()); ColumnFilter.serializer.serialize(command.columnFilter(), out, version); RowFilter.serializer.serialize(command.rowFilter(), out, version); - DataLimits.serializer.serialize(command.limits(), out, version); + DataLimits.serializer.serialize(command.limits(), out, version, command.metadata.comparator); - if (command.index.isPresent()) - IndexMetadata.serializer.serialize(command.index.get(), out, version); + if (null != command.index) + IndexMetadata.serializer.serialize(command.index, out, version); command.serializeSelection(out, version); } @@@ -705,10 -651,8 +726,8 @@@ int nowInSec = in.readInt(); ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata); RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata); - DataLimits limits = DataLimits.serializer.deserialize(in, version); + DataLimits limits = DataLimits.serializer.deserialize(in, version, metadata.comparator); - Optional<IndexMetadata> index = hasIndex - ? deserializeIndexMetadata(in, version, metadata) - : Optional.empty(); + IndexMetadata index = hasIndex ? deserializeIndexMetadata(in, version, metadata) : null; return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index); } @@@ -721,12 -665,13 +740,12 @@@ } catch (UnknownIndexException e) { - String message = String.format("Couldn't find a defined index on %s.%s with the id %s. " + - "If an index was just created, this is likely due to the schema not " + - "being fully propagated. Local read will proceed without using the " + - "index. Please wait for schema agreement after index creation.", - cfm.ksName, cfm.cfName, e.indexId.toString()); - logger.info(message); + logger.info("Couldn't find a defined index on {}.{} with the id {}. " + + "If an index was just created, this is likely due to the schema not " + + "being fully propagated. Local read will proceed without using the " + + "index. Please wait for schema agreement after index creation.", + cfm.ksName, cfm.cfName, e.indexId); - return Optional.empty(); + return null; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 5d93c65,00464ca..c7080e7 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@@ -268,7 -316,82 +319,97 @@@ public class SinglePartitionReadComman public SinglePartitionReadCommand copy() { - return new SinglePartitionReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter()); + return new SinglePartitionReadCommand(isDigestQuery(), + digestVersion(), + isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits(), + partitionKey(), + clusteringIndexFilter(), + indexMetadata()); + } + + public SinglePartitionReadCommand copyAsDigestQuery() + { + return new SinglePartitionReadCommand(true, + digestVersion(), + isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits(), + partitionKey(), + clusteringIndexFilter(), + indexMetadata()); + } + ++ public SinglePartitionReadCommand withUpdatedLimit(DataLimits newLimits) ++ { ++ return new SinglePartitionReadCommand(isDigestQuery(), ++ digestVersion(), ++ isForThrift(), ++ metadata(), ++ nowInSec(), ++ columnFilter(), ++ rowFilter(), ++ newLimits, ++ partitionKey(), ++ clusteringIndexFilter(), ++ indexMetadata()); ++ } ++ + public SinglePartitionReadCommand withUpdatedClusteringIndexFilter(ClusteringIndexFilter filter) + { + return new SinglePartitionReadCommand(isDigestQuery(), + digestVersion(), + isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits(), + partitionKey(), + filter, + indexMetadata()); + } + + static SinglePartitionReadCommand legacySliceCommand(boolean isDigest, + int digestVersion, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + DataLimits limits, + DecoratedKey partitionKey, + ClusteringIndexSliceFilter filter) + { + // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift + return new SinglePartitionReadCommand(isDigest, + digestVersion, + true, + metadata, + nowInSec, + columnFilter, + RowFilter.NONE, + limits, + partitionKey, + filter, + null); + } + + static SinglePartitionReadCommand legacyNamesCommand(boolean isDigest, + int digestVersion, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + DecoratedKey partitionKey, + ClusteringIndexNamesFilter filter) + { + // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift + return new SinglePartitionReadCommand(isDigest, digestVersion, true, metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, partitionKey, filter,null); } public DecoratedKey partitionKey() @@@ -334,26 -457,12 +475,12 @@@ lastReturned == null ? clusteringIndexFilter() : clusteringIndexFilter.forPaging(metadata().comparator, lastReturned, false)); } - public SinglePartitionReadCommand withUpdatedLimit(DataLimits newLimits) - { - return new SinglePartitionReadCommand(isDigestQuery(), - digestVersion(), - isForThrift(), - metadata(), - nowInSec(), - columnFilter(), - rowFilter(), - newLimits, - partitionKey, - clusteringIndexFilter); - } - - public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException + public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException { - return StorageProxy.read(Group.one(this), consistency, clientState); + return StorageProxy.read(Group.one(this), consistency, clientState, queryStartNanoTime); } - public SinglePartitionPager getPager(PagingState pagingState, int protocolVersion) + public SinglePartitionPager getPager(PagingState pagingState, ProtocolVersion protocolVersion) { return getPager(this, pagingState, protocolVersion); } @@@ -449,7 -558,7 +576,7 @@@ final int rowsToCache = metadata().params.caching.rowsPerPartitionToCache(); @SuppressWarnings("resource") // we close on exception or upon closing the result of this method - UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, executionController); - UnfilteredRowIterator iter = fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp); ++ UnfilteredRowIterator iter = fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, executionController); try { // Use a custom iterator instead of DataLimits to avoid stopping the original iterator http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/index/SecondaryIndexManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java index e777620,f8a7c66..2007800 --- a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java +++ b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java @@@ -109,65 -108,43 +109,67 @@@ public class CompositesSearcher extend nextEntry = index.decodeEntry(indexKey, indexHits.next()); } - // Gather all index hits belonging to the same partition and query the data for those hits. - // TODO: it's much more efficient to do 1 read for all hits to the same partition than doing - // 1 read per index hit. However, this basically mean materializing all hits for a partition - // in memory so we should consider adding some paging mechanism. However, index hits should - // be relatively small so it's much better than the previous code that was materializing all - // *data* for a given partition. - BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(index.baseCfs.getComparator()); - List<IndexEntry> entries = new ArrayList<>(); + SinglePartitionReadCommand dataCmd; DecoratedKey partitionKey = index.baseCfs.decorateKey(nextEntry.indexedKey); - - while (nextEntry != null && partitionKey.getKey().equals(nextEntry.indexedKey)) + List<IndexEntry> entries = new ArrayList<>(); + if (isStaticColumn()) { - // We're queried a slice of the index, but some hits may not match some of the clustering column constraints - if (isMatchingEntry(partitionKey, nextEntry, command)) - { - clusterings.add(nextEntry.indexedEntryClustering); - entries.add(nextEntry); + // The index hit may not match the commad key constraint + if (!isMatchingEntry(partitionKey, nextEntry, command)) { + nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null; + continue; } + // If the index is on a static column, we just need to do a full read on the partition. + // Note that we want to re-use the command.columnFilter() in case of future change. + dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata, + command.nowInSec(), + command.columnFilter(), + RowFilter.NONE, + DataLimits.NONE, + partitionKey, + new ClusteringIndexSliceFilter(Slices.ALL, false)); + entries.add(nextEntry); nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null; } + else + { + // Gather all index hits belonging to the same partition and query the data for those hits. + // TODO: it's much more efficient to do 1 read for all hits to the same partition than doing + // 1 read per index hit. However, this basically mean materializing all hits for a partition + // in memory so we should consider adding some paging mechanism. However, index hits should + // be relatively small so it's much better than the previous code that was materializing all + // *data* for a given partition. + BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(index.baseCfs.getComparator()); + while (nextEntry != null && partitionKey.getKey().equals(nextEntry.indexedKey)) + { + // We're queried a slice of the index, but some hits may not match some of the clustering column constraints + if (isMatchingEntry(partitionKey, nextEntry, command)) + { + clusterings.add(nextEntry.indexedEntryClustering); + entries.add(nextEntry); + } + + nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null; + } - // Because we've eliminated entries that don't match the clustering columns, it's possible we added nothing - if (clusterings.isEmpty()) - continue; + // Because we've eliminated entries that don't match the clustering columns, it's possible we added nothing + if (clusterings.isEmpty()) + continue; + + // Query the gathered index hits. We still need to filter stale hits from the resulting query. + ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings.build(), false); - dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata, ++ dataCmd = SinglePartitionReadCommand.create(isForThrift(), ++ index.baseCfs.metadata, + command.nowInSec(), + command.columnFilter(), + command.rowFilter(), + DataLimits.NONE, + partitionKey, - filter); ++ filter, ++ null); + } - // Query the gathered index hits. We still need to filter stale hits from the resulting query. - ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings.build(), false); - SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(), - index.baseCfs.metadata, - command.nowInSec(), - command.columnFilter(), - command.rowFilter(), - DataLimits.NONE, - partitionKey, - filter, - null); @SuppressWarnings("resource") // We close right away if empty, and if it's assign to next it will be called either // by the next caller of next, or through closing this iterator is this come before. UnfilteredRowIterator dataIter = http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/service/AbstractReadExecutor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java index 5ba13a4,ea79017..e6ad3d4 --- a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java @@@ -17,16 -17,14 +17,12 @@@ */ package org.apache.cassandra.service.pager; - import java.util.Optional; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -- import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.dht.*; import org.apache.cassandra.exceptions.RequestExecutionException; - import org.apache.cassandra.index.Index; - import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.transport.ProtocolVersion; /** * Pages a PartitionRangeReadCommand. http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/thrift/CassandraServer.java index e71f512,cb74b15..f43b7a4 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@@ -1547,17 -1519,17 +1547,17 @@@ public class CassandraServer implement ColumnFilter columns = makeColumnFilter(metadata, column_parent, predicate); ClusteringIndexFilter filter = toInternalFilter(metadata, column_parent, predicate); DataLimits limits = getLimits(range.count, metadata.isSuper() && !column_parent.isSetSuper_column(), predicate); - PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false, - 0, - true, - metadata, - nowInSec, - columns, - ThriftConversion.rowFilterFromThrift(metadata, range.row_filter), - limits, - new DataRange(bounds, filter), - Optional.empty()); + + PartitionRangeReadCommand cmd = + PartitionRangeReadCommand.create(true, + metadata, + nowInSec, + columns, + ThriftConversion.rowFilterFromThrift(metadata, range.row_filter), + limits, + new DataRange(bounds, filter)); + - try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel)) + try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel, queryStartNanoTime)) { assert results != null; return thriftifyKeySlices(results, column_parent, limits.perPartitionCount()); @@@ -1640,19 -1611,19 +1640,19 @@@ ClusteringIndexFilter filter = new ClusteringIndexSliceFilter(Slices.ALL, false); DataLimits limits = getLimits(range.count, true, Integer.MAX_VALUE); Clustering pageFrom = metadata.isSuper() - ? new Clustering(start_column) + ? Clustering.make(start_column) : LegacyLayout.decodeCellName(metadata, start_column).clustering; - PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false, - 0, - true, - metadata, - nowInSec, - ColumnFilter.all(metadata), - RowFilter.NONE, - limits, - new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true), - Optional.empty()); + + PartitionRangeReadCommand cmd = + PartitionRangeReadCommand.create(true, + metadata, + nowInSec, + ColumnFilter.all(metadata), + RowFilter.NONE, + limits, + new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true)); + - try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel)) + try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel, queryStartNanoTime)) { return thriftifyKeySlices(results, new ColumnParent(column_family), limits.perPartitionCount()); } @@@ -1735,24 -1705,20 +1735,20 @@@ ColumnFilter columns = makeColumnFilter(metadata, column_parent, column_predicate); ClusteringIndexFilter filter = toInternalFilter(metadata, column_parent, column_predicate); DataLimits limits = getLimits(index_clause.count, metadata.isSuper() && !column_parent.isSetSuper_column(), column_predicate); - PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false, - 0, - true, - metadata, - nowInSec, - columns, - ThriftConversion.rowFilterFromThrift(metadata, index_clause.expressions), - limits, - new DataRange(bounds, filter), - Optional.empty()); - // If there's a secondary index that the command can use, have it validate - // the request parameters. Note that as a side effect, if a viable Index is - // identified by the CFS's index manager, it will be cached in the command - // and serialized during distribution to replicas in order to avoid performing - // further lookups. + + PartitionRangeReadCommand cmd = + PartitionRangeReadCommand.create(true, + metadata, + nowInSec, + columns, + ThriftConversion.rowFilterFromThrift(metadata, index_clause.expressions), + limits, + new DataRange(bounds, filter)); + + // If there's a secondary index that the command can use, have it validate the request parameters. cmd.maybeValidateIndex(); - try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel)) + try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel, queryStartNanoTime)) { return thriftifyKeySlices(results, column_parent, limits.perPartitionCount()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/Util.java index 3fa24d7,d758efe..a3ad653 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@@ -668,33 -627,9 +668,9 @@@ public class Uti public static UnfilteredPartitionIterator executeLocally(PartitionRangeReadCommand command, ColumnFamilyStore cfs, - ReadOrderGroup orderGroup) + ReadExecutionController controller) { - return new InternalPartitionRangeReadCommand(command).queryStorageInternal(cfs, controller); - } - - private static final class InternalPartitionRangeReadCommand extends PartitionRangeReadCommand - { - - private InternalPartitionRangeReadCommand(PartitionRangeReadCommand original) - { - super(original.isDigestQuery(), - original.digestVersion(), - original.isForThrift(), - original.metadata(), - original.nowInSec(), - original.columnFilter(), - original.rowFilter(), - original.limits(), - original.dataRange(), - Optional.empty()); - } - - private UnfilteredPartitionIterator queryStorageInternal(ColumnFamilyStore cfs, - ReadExecutionController controller) - { - return queryStorage(cfs, controller); - } - return command.queryStorage(cfs, orderGroup); ++ return command.queryStorage(cfs, controller); } public static Closeable markDirectoriesUnwriteable(ColumnFamilyStore cfs) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/SecondaryIndexTest.java index a037d90,2457c4a..f2100db --- a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java +++ b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java @@@ -115,9 -118,8 +115,9 @@@ public class SecondaryIndexTes .filterOn("birthdate", Operator.EQ, 1L) .build(); - Index.Searcher searcher = cfs.indexManager.getBestIndexFor(rc).searcherFor(rc); + Index.Searcher searcher = rc.index().searcherFor(rc); - try (ReadOrderGroup orderGroup = rc.startOrderGroup(); UnfilteredPartitionIterator pi = searcher.search(orderGroup)) + try (ReadExecutionController executionController = rc.executionController(); + UnfilteredPartitionIterator pi = searcher.search(executionController)) { assertTrue(pi.hasNext()); pi.next().close(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java index 3c09c93,02b642e..b056da1 --- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java +++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java @@@ -117,15 -114,16 +117,16 @@@ public class SinglePartitionSliceComman ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(v)); ByteBuffer zero = ByteBufferUtil.bytes(0); - Slices slices = Slices.with(cfm.comparator, Slice.make(Slice.Bound.inclusiveStartOf(zero), Slice.Bound.inclusiveEndOf(zero))); + Slices slices = Slices.with(cfm.comparator, Slice.make(ClusteringBound.inclusiveStartOf(zero), ClusteringBound.inclusiveEndOf(zero))); ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(slices, false); - ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, true, cfm, - FBUtilities.nowInSeconds(), - columnFilter, - RowFilter.NONE, - DataLimits.NONE, - key, - sliceFilter); + ReadCommand cmd = SinglePartitionReadCommand.create(true, + cfm, + FBUtilities.nowInSeconds(), + columnFilter, + RowFilter.NONE, + DataLimits.NONE, + key, + sliceFilter); DataOutputBuffer out = new DataOutputBuffer((int) ReadCommand.legacyReadCommandSerializer.serializedSize(cmd, MessagingService.VERSION_21)); ReadCommand.legacyReadCommandSerializer.serialize(cmd, out, MessagingService.VERSION_21); @@@ -175,16 -167,17 +176,17 @@@ ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(s)); ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.NONE, false); - ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, true, cfm, - FBUtilities.nowInSeconds(), - columnFilter, - RowFilter.NONE, - DataLimits.NONE, - key, - sliceFilter); + ReadCommand cmd = SinglePartitionReadCommand.create(true, + cfm, + FBUtilities.nowInSeconds(), + columnFilter, + RowFilter.NONE, + DataLimits.NONE, + key, + sliceFilter); // check raw iterator for static cell - try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup)) + try (ReadExecutionController executionController = cmd.executionController(); UnfilteredPartitionIterator pi = cmd.executeLocally(executionController)) { checkForS(pi); } @@@ -231,15 -224,16 +233,16 @@@ DecoratedKey key = cfm.decorateKey(ByteBufferUtil.bytes("k1")); ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(s)); - Slice slice = Slice.make(Slice.Bound.BOTTOM, Slice.Bound.inclusiveEndOf(ByteBufferUtil.bytes("i1"))); + Slice slice = Slice.make(ClusteringBound.BOTTOM, ClusteringBound.inclusiveEndOf(ByteBufferUtil.bytes("i1"))); ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.with(cfm.comparator, slice), false); - ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, true, cfm, - FBUtilities.nowInSeconds(), - columnFilter, - RowFilter.NONE, - DataLimits.NONE, - key, - sliceFilter); + ReadCommand cmd = SinglePartitionReadCommand.create(true, + cfm, + FBUtilities.nowInSeconds(), + columnFilter, + RowFilter.NONE, + DataLimits.NONE, + key, + sliceFilter); String ret = cmd.toCQLString(); Assert.assertNotNull(ret); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
