Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dcc57d0b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dcc57d0b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dcc57d0b Branch: refs/heads/cassandra-3.5 Commit: dcc57d0bb2761f0b71f6064f4830af9fa140d0cf Parents: ec0092b 5182376 Author: Marcus Eriksson <marc...@apache.org> Authored: Wed Mar 16 10:48:30 2016 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Wed Mar 16 10:48:30 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/SinglePartitionReadCommand.java | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc57d0b/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 126039a,dca4f8a..87691f9 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -58,12 -29,15 +58,13 @@@ Merged from 2.2 * (cqlsh) Support utf-8/cp65001 encoding on Windows (CASSANDRA-11030) * Fix paging on DISTINCT queries repeats result when first row in partition changes (CASSANDRA-10010) + * cqlsh: change default encoding to UTF-8 (CASSANDRA-11124) Merged from 2.1: + * Don't do defragmentation if reading from repaired sstables (CASSANDRA-10342) * Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286) * Avoid dropping message too quickly due to missing unit conversion (CASSANDRA-11302) - * COPY FROM on large datasets: fix progress report and debug performance (CASSANDRA-11053) - * InvalidateKeys should have a weak ref to key cache (CASSANDRA-11176) * Don't remove FailureDetector history on removeEndpoint (CASSANDRA-10371) * Only notify if repair status changed (CASSANDRA-11172) - * Add partition key to TombstoneOverwhelmingException error message (CASSANDRA-10888) * Use logback setting for 'cassandra -v' command (CASSANDRA-10767) * Fix sstableloader to unthrottle streaming by default (CASSANDRA-9714) * Fix incorrect warning in 'nodetool status' (CASSANDRA-10176) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dcc57d0b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index de4c9c7,0000000..14923b9 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@@ -1,988 -1,0 +1,991 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +import org.apache.cassandra.cache.IRowCacheEntry; +import org.apache.cassandra.cache.RowCacheKey; +import org.apache.cassandra.cache.RowCacheSentinel; +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.*; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.service.CacheService; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.pager.*; +import org.apache.cassandra.thrift.ThriftResultsMerger; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.SearchIterator; +import org.apache.cassandra.utils.btree.BTreeSet; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.memory.HeapAllocator; + + +/** + * A read command that selects a (part of a) single partition. + */ +public class SinglePartitionReadCommand extends ReadCommand +{ + protected static final SelectionDeserializer selectionDeserializer = new Deserializer(); + + private final DecoratedKey partitionKey; + private final ClusteringIndexFilter clusteringIndexFilter; + + private int oldestUnrepairedTombstone = Integer.MAX_VALUE; + + public SinglePartitionReadCommand(boolean isDigest, + int digestVersion, + boolean isForThrift, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DecoratedKey partitionKey, + ClusteringIndexFilter clusteringIndexFilter) + { + super(Kind.SINGLE_PARTITION, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits); + assert partitionKey.getPartitioner() == metadata.partitioner; + this.partitionKey = partitionKey; + this.clusteringIndexFilter = clusteringIndexFilter; + } + + /** + * Creates a new read command on a single partition. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param columnFilter the column filter to use for the query. + * @param rowFilter the row filter to use for the query. + * @param limits the limits to use for the query. + * @param partitionKey the partition key for the partition to query. + * @param clusteringIndexFilter the clustering index filter to use for the query. + * + * @return a newly created read command. + */ + public static SinglePartitionReadCommand create(CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DecoratedKey partitionKey, + ClusteringIndexFilter clusteringIndexFilter) + { + return create(false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); + } + + /** + * Creates a new read command on a single partition for thrift. + * + * @param isForThrift whether the query is for thrift or not. + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param columnFilter the column filter to use for the query. + * @param rowFilter the row filter to use for the query. + * @param limits the limits to use for the query. + * @param partitionKey the partition key for the partition to query. + * @param clusteringIndexFilter the clustering index filter to use for the query. + * + * @return a newly created read command. + */ + public static SinglePartitionReadCommand create(boolean isForThrift, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DecoratedKey partitionKey, + ClusteringIndexFilter clusteringIndexFilter) + { + return new SinglePartitionReadCommand(false, 0, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); + } + + /** + * Creates a new read command on a single partition. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param key the partition key for the partition to query. + * @param columnFilter the column filter to use for the query. + * @param filter the clustering index filter to use for the query. + * + * @return a newly created read command. The returned command will use no row filter and have no limits. + */ + public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, ColumnFilter columnFilter, ClusteringIndexFilter filter) + { + return create(metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, key, filter); + } + + /** + * Creates a new read command that queries a single partition in its entirety. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param key the partition key for the partition to query. + * + * @return a newly created read command that queries all the rows of {@code key}. + */ + public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, DecoratedKey key) + { + return SinglePartitionReadCommand.create(metadata, nowInSec, key, Slices.ALL); + } + + /** + * Creates a new read command that queries a single partition in its entirety. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param key the partition key for the partition to query. + * + * @return a newly created read command that queries all the rows of {@code key}. + */ + public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, ByteBuffer key) + { + return SinglePartitionReadCommand.create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL); + } + + /** + * Creates a new single partition slice command for the provided single slice. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param key the partition key for the partition to query. + * @param slice the slice of rows to query. + * + * @return a newly created read command that queries {@code slice} in {@code key}. The returned query will + * query every columns for the table (without limit or row filtering) and be in forward order. + */ + public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slice slice) + { + return create(metadata, nowInSec, key, Slices.with(metadata.comparator, slice)); + } + + /** + * Creates a new single partition slice command for the provided slices. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param key the partition key for the partition to query. + * @param slices the slices of rows to query. + * + * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will + * query every columns for the table (without limit or row filtering) and be in forward order. + */ + public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slices slices) + { + ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(slices, false); + return SinglePartitionReadCommand.create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter); + } + + /** + * Creates a new single partition slice command for the provided slices. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param key the partition key for the partition to query. + * @param slices the slices of rows to query. + * + * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will + * query every columns for the table (without limit or row filtering) and be in forward order. + */ + public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, ByteBuffer key, Slices slices) + { + return create(metadata, nowInSec, metadata.decorateKey(key), slices); + } + + public SinglePartitionReadCommand copy() + { + return new SinglePartitionReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter()); + } + + public DecoratedKey partitionKey() + { + return partitionKey; + } + + public ClusteringIndexFilter clusteringIndexFilter() + { + return clusteringIndexFilter; + } + + public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key) + { + return clusteringIndexFilter; + } + + public long getTimeout() + { + return DatabaseDescriptor.getReadRpcTimeout(); + } + + public boolean selectsKey(DecoratedKey key) + { + if (!this.partitionKey().equals(key)) + return false; + + return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().getKeyValidator()); + } + + public boolean selectsClustering(DecoratedKey key, Clustering clustering) + { + if (clustering == Clustering.STATIC_CLUSTERING) + return !columnFilter().fetchedColumns().statics.isEmpty(); + + if (!clusteringIndexFilter().selects(clustering)) + return false; + + return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering); + } + + /** + * Returns a new command suitable to paging from the last returned row. + * + * @param lastReturned the last row returned by the previous page. The newly created command + * will only query row that comes after this (in query order). This can be {@code null} if this + * is the first page. + * @param pageSize the size to use for the page to query. + * + * @return the newly create command. + */ + public SinglePartitionReadCommand forPaging(Clustering lastReturned, int pageSize) + { + // We shouldn't have set digest yet when reaching that point + assert !isDigestQuery(); + return create(isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits().forPaging(pageSize), + partitionKey(), + lastReturned == null ? clusteringIndexFilter() : clusteringIndexFilter.forPaging(metadata().comparator, lastReturned, false)); + } + + public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException + { + return StorageProxy.read(Group.one(this), consistency, clientState); + } + + public SinglePartitionPager getPager(PagingState pagingState, int protocolVersion) + { + return getPager(this, pagingState, protocolVersion); + } + + private static SinglePartitionPager getPager(SinglePartitionReadCommand command, PagingState pagingState, int protocolVersion) + { + return new SinglePartitionPager(command, pagingState, protocolVersion); + } + + protected void recordLatency(TableMetrics metric, long latencyNanos) + { + metric.readLatency.addNano(latencyNanos); + } + + @SuppressWarnings("resource") // we close the created iterator through closing the result of this method (and SingletonUnfilteredPartitionIterator ctor cannot fail) + protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup) + { + UnfilteredRowIterator partition = cfs.isRowCacheEnabled() + ? getThroughCache(cfs, orderGroup.baseReadOpOrderGroup()) + : queryMemtableAndDisk(cfs, orderGroup.baseReadOpOrderGroup()); + return new SingletonUnfilteredPartitionIterator(partition, isForThrift()); + } + + /** + * Fetch the rows requested if in cache; if not, read it from disk and cache it. + * <p> + * If the partition is cached, and the filter given is within its bounds, we return + * from cache, otherwise from disk. + * <p> + * If the partition is is not cached, we figure out what filter is "biggest", read + * that from disk, then filter the result and either cache that or return it. + */ + private UnfilteredRowIterator getThroughCache(ColumnFamilyStore cfs, OpOrder.Group readOp) + { + assert !cfs.isIndex(); // CASSANDRA-5732 + assert cfs.isRowCacheEnabled() : String.format("Row cache is not enabled on table [%s]", cfs.name); + + RowCacheKey key = new RowCacheKey(metadata().ksAndCFName, partitionKey()); + + // Attempt a sentinel-read-cache sequence. if a write invalidates our sentinel, we'll return our + // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862 + // TODO: don't evict entire partitions on writes (#2864) + IRowCacheEntry cached = CacheService.instance.rowCache.get(key); + if (cached != null) + { + if (cached instanceof RowCacheSentinel) + { + // Some other read is trying to cache the value, just do a normal non-caching read + Tracing.trace("Row cache miss (race)"); + cfs.metric.rowCacheMiss.inc(); + return queryMemtableAndDisk(cfs, readOp); + } + + CachedPartition cachedPartition = (CachedPartition)cached; + if (cfs.isFilterFullyCoveredBy(clusteringIndexFilter(), limits(), cachedPartition, nowInSec())) + { + cfs.metric.rowCacheHit.inc(); + Tracing.trace("Row cache hit"); + UnfilteredRowIterator unfilteredRowIterator = clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), cachedPartition); + cfs.metric.updateSSTableIterated(0); + return unfilteredRowIterator; + } + + cfs.metric.rowCacheHitOutOfRange.inc(); + Tracing.trace("Ignoring row cache as cached value could not satisfy query"); + return queryMemtableAndDisk(cfs, readOp); + } + + cfs.metric.rowCacheMiss.inc(); + Tracing.trace("Row cache miss"); + + boolean cacheFullPartitions = metadata().params.caching.cacheAllRows(); + + // To be able to cache what we read, what we read must at least covers what the cache holds, that + // is the 'rowsToCache' first rows of the partition. We could read those 'rowsToCache' first rows + // systematically, but we'd have to "extend" that to whatever is needed for the user query that the + // 'rowsToCache' first rows don't cover and it's not trivial with our existing filters. So currently + // we settle for caching what we read only if the user query does query the head of the partition since + // that's the common case of when we'll be able to use the cache anyway. One exception is if we cache + // full partitions, in which case we just always read it all and cache. + if (cacheFullPartitions || clusteringIndexFilter().isHeadFilter()) + { + RowCacheSentinel sentinel = new RowCacheSentinel(); + boolean sentinelSuccess = CacheService.instance.rowCache.putIfAbsent(key, sentinel); + boolean sentinelReplaced = false; + + try + { + int rowsToCache = metadata().params.caching.rowsPerPartitionToCache(); + @SuppressWarnings("resource") // we close on exception or upon closing the result of this method + UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp); + try + { + // We want to cache only rowsToCache rows + CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec()), nowInSec()); + if (sentinelSuccess && !toCache.isEmpty()) + { + Tracing.trace("Caching {} rows", toCache.rowCount()); + CacheService.instance.rowCache.replace(key, sentinel, toCache); + // Whether or not the previous replace has worked, our sentinel is not in the cache anymore + sentinelReplaced = true; + } + + // We then re-filter out what this query wants. + // Note that in the case where we don't cache full partitions, it's possible that the current query is interested in more + // than what we've cached, so we can't just use toCache. + UnfilteredRowIterator cacheIterator = clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), toCache); + if (cacheFullPartitions) + { + // Everything is guaranteed to be in 'toCache', we're done with 'iter' + assert !iter.hasNext(); + iter.close(); + return cacheIterator; + } + return UnfilteredRowIterators.concat(cacheIterator, clusteringIndexFilter().filterNotIndexed(columnFilter(), iter)); + } + catch (RuntimeException | Error e) + { + iter.close(); + throw e; + } + } + finally + { + if (sentinelSuccess && !sentinelReplaced) + cfs.invalidateCachedPartition(key); + } + } + + Tracing.trace("Fetching data but not populating cache as query does not query from the start of the partition"); + return queryMemtableAndDisk(cfs, readOp); + } + + /** + * Queries both memtable and sstables to fetch the result of this query. + * <p> + * Please note that this method: + * 1) does not check the row cache. + * 2) does not apply the query limit, nor the row filter (and so ignore 2ndary indexes). + * Those are applied in {@link ReadCommand#executeLocally}. + * 3) does not record some of the read metrics (latency, scanned cells histograms) nor + * throws TombstoneOverwhelmingException. + * It is publicly exposed because there is a few places where that is exactly what we want, + * but it should be used only where you know you don't need thoses things. + * <p> + * Also note that one must have "started" a {@code OpOrder.Group} on the queried table, and that is + * to enforce that that it is required as parameter, even though it's not explicitlly used by the method. + */ + public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs, OpOrder.Group readOp) + { + Tracing.trace("Executing single-partition query on {}", cfs.name); + + boolean copyOnHeap = Memtable.MEMORY_POOL.needToCopyOnHeap(); + return queryMemtableAndDiskInternal(cfs, copyOnHeap); + } + + @Override + protected int oldestUnrepairedTombstone() + { + return oldestUnrepairedTombstone; + } + + private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap) + { + /* + * We have 2 main strategies: + * 1) We query memtables and sstables simulateneously. This is our most generic strategy and the one we use + * unless we have a names filter that we know we can optimize futher. + * 2) If we have a name filter (so we query specific rows), we can make a bet: that all column for all queried row + * will have data in the most recent sstable(s), thus saving us from reading older ones. This does imply we + * have a way to guarantee we have all the data for what is queried, which is only possible for name queries + * and if we have neither collections nor counters (indeed, for a collection, we can't guarantee an older sstable + * won't have some elements that weren't in the most recent sstables, and counters are intrinsically a collection + * of shards so have the same problem). + */ + if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter && queryNeitherCountersNorCollections()) + return queryMemtableAndSSTablesInTimestampOrder(cfs, copyOnHeap, (ClusteringIndexNamesFilter)clusteringIndexFilter()); + + Tracing.trace("Acquiring sstable references"); + ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey())); + + List<UnfilteredRowIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size()); + ClusteringIndexFilter filter = clusteringIndexFilter(); + + try + { + for (Memtable memtable : view.memtables) + { + Partition partition = memtable.getPartition(partitionKey()); + if (partition == null) + continue; + + @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator + UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition); + @SuppressWarnings("resource") // same as above + UnfilteredRowIterator maybeCopied = copyOnHeap ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) : iter; + oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime); + iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(maybeCopied, nowInSec()) : maybeCopied); + } + /* + * We can't eliminate full sstables based on the timestamp of what we've already read like + * in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone + * we've read. We still rely on the sstable ordering by maxTimestamp since if + * maxTimestamp_s1 > maxTimestamp_s0, + * we're guaranteed that s1 cannot have a row tombstone such that + * timestamp(tombstone) > maxTimestamp_s0 + * since we necessarily have + * timestamp(tombstone) <= maxTimestamp_s1 + * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination + * in one pass, and minimize the number of sstables for which we read a partition tombstone. + */ + int sstablesIterated = 0; + Collections.sort(view.sstables, SSTableReader.maxTimestampComparator); + List<SSTableReader> skippedSSTables = null; + long mostRecentPartitionTombstone = Long.MIN_VALUE; + long minTimestamp = Long.MAX_VALUE; + int nonIntersectingSSTables = 0; + + for (SSTableReader sstable : view.sstables) + { + minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp()); + // if we've already seen a partition tombstone with a timestamp greater + // than the most recent update to this sstable, we can skip it + if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone) + break; + + if (!shouldInclude(sstable)) + { + nonIntersectingSSTables++; + // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely + if (sstable.getSSTableMetadata().maxLocalDeletionTime != Integer.MAX_VALUE) + { + if (skippedSSTables == null) + skippedSSTables = new ArrayList<>(); + skippedSSTables.add(sstable); + } + continue; + } + + sstable.incrementReadCount(); + @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator + UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift())); + if (!sstable.isRepaired()) + oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); + + iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter); + mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, iter.partitionLevelDeletion().markedForDeleteAt()); + sstablesIterated++; + } + + int includedDueToTombstones = 0; + // Check for partition tombstones in the skipped sstables + if (skippedSSTables != null) + { + for (SSTableReader sstable : skippedSSTables) + { + if (sstable.getMaxTimestamp() <= minTimestamp) + continue; + + sstable.incrementReadCount(); + @SuppressWarnings("resource") // 'iter' is either closed right away, or added to iterators which is close on exception, or through the closing of the final merged iterator + UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift())); + if (iter.partitionLevelDeletion().markedForDeleteAt() > minTimestamp) + { + iterators.add(iter); + if (!sstable.isRepaired()) + oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); + includedDueToTombstones++; + sstablesIterated++; + } + else + { + iter.close(); + } + } + } + if (Tracing.isTracing()) + Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones", + nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones); + + cfs.metric.updateSSTableIterated(sstablesIterated); + + if (iterators.isEmpty()) + return EmptyIterators.unfilteredRow(cfs.metadata, partitionKey(), filter.isReversed()); + + Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated); + + @SuppressWarnings("resource") // Closed through the closing of the result of that method. + UnfilteredRowIterator merged = UnfilteredRowIterators.merge(iterators, nowInSec()); + if (!merged.isEmpty()) + { + DecoratedKey key = merged.partitionKey(); + cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1); + } + + return merged; + } + catch (RuntimeException | Error e) + { + try + { + FBUtilities.closeAll(iterators); + } + catch (Exception suppressed) + { + e.addSuppressed(suppressed); + } + throw e; + } + } + + private boolean shouldInclude(SSTableReader sstable) + { + // If some static columns are queried, we should always include the sstable: the clustering values stats of the sstable + // don't tell us if the sstable contains static values in particular. + // TODO: we could record if a sstable contains any static value at all. + if (!columnFilter().fetchedColumns().statics.isEmpty()) + return true; + + return clusteringIndexFilter().shouldInclude(sstable); + } + + private boolean queryNeitherCountersNorCollections() + { + for (ColumnDefinition column : columnFilter().fetchedColumns()) + { + if (column.type.isCollection() || column.type.isCounter()) + return false; + } + return true; + } + + /** + * Do a read by querying the memtable(s) first, and then each relevant sstables sequentially by order of the sstable + * max timestamp. + * + * This is used for names query in the hope of only having to query the 1 or 2 most recent query and then knowing nothing + * more recent could be in the older sstables (which we can only guarantee if we know exactly which row we queries, and if + * no collection or counters are included). + * This method assumes the filter is a {@code ClusteringIndexNamesFilter}. + */ + private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, boolean copyOnHeap, ClusteringIndexNamesFilter filter) + { + Tracing.trace("Acquiring sstable references"); + ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey())); + + ImmutableBTreePartition result = null; + + Tracing.trace("Merging memtable contents"); + for (Memtable memtable : view.memtables) + { + Partition partition = memtable.getPartition(partitionKey()); + if (partition == null) + continue; + + try (UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition)) + { + if (iter.isEmpty()) + continue; + + UnfilteredRowIterator clonedFilter = copyOnHeap + ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) + : iter; + result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(clonedFilter, nowInSec()) : clonedFilter, result, filter, false); + } + } + + /* add the SSTables on disk */ + Collections.sort(view.sstables, SSTableReader.maxTimestampComparator); + int sstablesIterated = 0; - ++ boolean onlyUnrepaired = true; + // read sorted sstables + for (SSTableReader sstable : view.sstables) + { + // if we've already seen a partition tombstone with a timestamp greater + // than the most recent update to this sstable, we're done, since the rest of the sstables + // will also be older + if (result != null && sstable.getMaxTimestamp() < result.partitionLevelDeletion().markedForDeleteAt()) + break; + + long currentMaxTs = sstable.getMaxTimestamp(); + filter = reduceFilter(filter, result, currentMaxTs); + if (filter == null) + break; + + if (!shouldInclude(sstable)) + { + // This mean that nothing queried by the filter can be in the sstable. One exception is the top-level partition deletion + // however: if it is set, it impacts everything and must be included. Getting that top-level partition deletion costs us + // some seek in general however (unless the partition is indexed and is in the key cache), so we first check if the sstable + // has any tombstone at all as a shortcut. + if (sstable.getSSTableMetadata().maxLocalDeletionTime == Integer.MAX_VALUE) + continue; // Means no tombstone at all, we can skip that sstable + + // We need to get the partition deletion and include it if it's live. In any case though, we're done with that sstable. + sstable.incrementReadCount(); + try (UnfilteredRowIterator iter = sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift())) + { + if (iter.partitionLevelDeletion().isLive()) + { + sstablesIterated++; + result = add(UnfilteredRowIterators.noRowsIterator(iter.metadata(), iter.partitionKey(), Rows.EMPTY_STATIC_ROW, iter.partitionLevelDeletion(), filter.isReversed()), result, filter, sstable.isRepaired()); + } + } + continue; + } + + Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation); + sstable.incrementReadCount(); + try (UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()))) + { + if (iter.isEmpty()) + continue; + ++ if (sstable.isRepaired()) ++ onlyUnrepaired = false; + sstablesIterated++; + result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, result, filter, sstable.isRepaired()); + } + } + + cfs.metric.updateSSTableIterated(sstablesIterated); + + if (result == null || result.isEmpty()) + return EmptyIterators.unfilteredRow(metadata(), partitionKey(), false); + + DecoratedKey key = result.partitionKey(); + cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1); + + // "hoist up" the requested data into a more recent sstable + if (sstablesIterated > cfs.getMinimumCompactionThreshold() ++ && onlyUnrepaired + && !cfs.isAutoCompactionDisabled() + && cfs.getCompactionStrategyManager().shouldDefragment()) + { + // !!WARNING!! if we stop copying our data to a heap-managed object, + // we will need to track the lifetime of this mutation as well + Tracing.trace("Defragmenting requested data"); + + try (UnfilteredRowIterator iter = result.unfilteredIterator(columnFilter(), Slices.ALL, false)) + { + final Mutation mutation = new Mutation(PartitionUpdate.fromIterator(iter)); + StageManager.getStage(Stage.MUTATION).execute(new Runnable() + { + public void run() + { + // skipping commitlog and index updates is fine since we're just de-fragmenting existing data + Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false); + } + }); + } + } + + return result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed()); + } + + private ImmutableBTreePartition add(UnfilteredRowIterator iter, ImmutableBTreePartition result, ClusteringIndexNamesFilter filter, boolean isRepaired) + { + if (!isRepaired) + oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.stats().minLocalDeletionTime); + + int maxRows = Math.max(filter.requestedRows().size(), 1); + if (result == null) + return ImmutableBTreePartition.create(iter, maxRows); + + try (UnfilteredRowIterator merged = UnfilteredRowIterators.merge(Arrays.asList(iter, result.unfilteredIterator(columnFilter(), Slices.ALL, filter.isReversed())), nowInSec())) + { + return ImmutableBTreePartition.create(merged, maxRows); + } + } + + private ClusteringIndexNamesFilter reduceFilter(ClusteringIndexNamesFilter filter, Partition result, long sstableTimestamp) + { + if (result == null) + return filter; + + SearchIterator<Clustering, Row> searchIter = result.searchIterator(columnFilter(), false); + + PartitionColumns columns = columnFilter().fetchedColumns(); + NavigableSet<Clustering> clusterings = filter.requestedRows(); + + // We want to remove rows for which we have values for all requested columns. We have to deal with both static and regular rows. + // TODO: we could also remove a selected column if we've found values for every requested row but we'll leave + // that for later. + + boolean removeStatic = false; + if (!columns.statics.isEmpty()) + { + Row staticRow = searchIter.next(Clustering.STATIC_CLUSTERING); + removeStatic = staticRow != null && canRemoveRow(staticRow, columns.statics, sstableTimestamp); + } + + NavigableSet<Clustering> toRemove = null; + for (Clustering clustering : clusterings) + { + if (!searchIter.hasNext()) + break; + + Row row = searchIter.next(clustering); + if (row == null || !canRemoveRow(row, columns.regulars, sstableTimestamp)) + continue; + + if (toRemove == null) + toRemove = new TreeSet<>(result.metadata().comparator); + toRemove.add(clustering); + } + + if (!removeStatic && toRemove == null) + return filter; + + // Check if we have everything we need + boolean hasNoMoreStatic = columns.statics.isEmpty() || removeStatic; + boolean hasNoMoreClusterings = clusterings.isEmpty() || (toRemove != null && toRemove.size() == clusterings.size()); + if (hasNoMoreStatic && hasNoMoreClusterings) + return null; + + if (toRemove != null) + { + BTreeSet.Builder<Clustering> newClusterings = BTreeSet.builder(result.metadata().comparator); + newClusterings.addAll(Sets.difference(clusterings, toRemove)); + clusterings = newClusterings.build(); + } + return new ClusteringIndexNamesFilter(clusterings, filter.isReversed()); + } + + private boolean canRemoveRow(Row row, Columns requestedColumns, long sstableTimestamp) + { + // We can remove a row if it has data that is more recent that the next sstable to consider for the data that the query + // cares about. And the data we care about is 1) the row timestamp (since every query cares if the row exists or not) + // and 2) the requested columns. + if (row.primaryKeyLivenessInfo().isEmpty() || row.primaryKeyLivenessInfo().timestamp() <= sstableTimestamp) + return false; + + for (ColumnDefinition column : requestedColumns) + { + Cell cell = row.getCell(column); + if (cell == null || cell.timestamp() <= sstableTimestamp) + return false; + } + return true; + } + + @Override + public String toString() + { + return String.format("Read(%s.%s columns=%s rowFilter=%s limits=%s key=%s filter=%s, nowInSec=%d)", + metadata().ksName, + metadata().cfName, + columnFilter(), + rowFilter(), + limits(), + metadata().getKeyValidator().getString(partitionKey().getKey()), + clusteringIndexFilter.toString(metadata()), + nowInSec()); + } + + public MessageOut<ReadCommand> createMessage(int version) + { + return new MessageOut<>(MessagingService.Verb.READ, this, version < MessagingService.VERSION_30 ? legacyReadCommandSerializer : serializer); + } + + protected void appendCQLWhereClause(StringBuilder sb) + { + sb.append(" WHERE "); + + sb.append(ColumnDefinition.toCQLString(metadata().partitionKeyColumns())).append(" = "); + DataRange.appendKeyString(sb, metadata().getKeyValidator(), partitionKey().getKey()); + + // We put the row filter first because the clustering index filter can end by "ORDER BY" + if (!rowFilter().isEmpty()) + sb.append(" AND ").append(rowFilter()); + + String filterString = clusteringIndexFilter().toCQLString(metadata()); + if (!filterString.isEmpty()) + sb.append(" AND ").append(filterString); + } + + protected void serializeSelection(DataOutputPlus out, int version) throws IOException + { + metadata().getKeyValidator().writeValue(partitionKey().getKey(), out); + ClusteringIndexFilter.serializer.serialize(clusteringIndexFilter(), out, version); + } + + protected long selectionSerializedSize(int version) + { + return metadata().getKeyValidator().writtenLength(partitionKey().getKey()) + + ClusteringIndexFilter.serializer.serializedSize(clusteringIndexFilter(), version); + } + + /** + * Groups multiple single partition read commands. + */ + public static class Group implements ReadQuery + { + public final List<SinglePartitionReadCommand> commands; + private final DataLimits limits; + private final int nowInSec; + + public Group(List<SinglePartitionReadCommand> commands, DataLimits limits) + { + assert !commands.isEmpty(); + this.commands = commands; + this.limits = limits; + this.nowInSec = commands.get(0).nowInSec(); + for (int i = 1; i < commands.size(); i++) + assert commands.get(i).nowInSec() == nowInSec; + } + + public static Group one(SinglePartitionReadCommand command) + { + return new Group(Collections.<SinglePartitionReadCommand>singletonList(command), command.limits()); + } + + public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException + { + return StorageProxy.read(this, consistency, clientState); + } + + public int nowInSec() + { + return nowInSec; + } + + public DataLimits limits() + { + return limits; + } + + public CFMetaData metadata() + { + return commands.get(0).metadata(); + } + + public ReadOrderGroup startOrderGroup() + { + // Note that the only difference between the command in a group must be the partition key on which + // they applied. So as far as ReadOrderGroup is concerned, we can use any of the commands to start one. + return commands.get(0).startOrderGroup(); + } + + public PartitionIterator executeInternal(ReadOrderGroup orderGroup) + { + List<PartitionIterator> partitions = new ArrayList<>(commands.size()); + for (SinglePartitionReadCommand cmd : commands) + partitions.add(cmd.executeInternal(orderGroup)); + + // Because we only have enforce the limit per command, we need to enforce it globally. + return limits.filter(PartitionIterators.concat(partitions), nowInSec); + } + + public QueryPager getPager(PagingState pagingState, int protocolVersion) + { + if (commands.size() == 1) + return SinglePartitionReadCommand.getPager(commands.get(0), pagingState, protocolVersion); + + return new MultiPartitionPager(this, pagingState, protocolVersion); + } + + public boolean selectsKey(DecoratedKey key) + { + return Iterables.any(commands, c -> c.selectsKey(key)); + } + + public boolean selectsClustering(DecoratedKey key, Clustering clustering) + { + return Iterables.any(commands, c -> c.selectsClustering(key, clustering)); + } + + @Override + public String toString() + { + return commands.toString(); + } + } + + private static class Deserializer extends SelectionDeserializer + { + public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) + throws IOException + { + DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in)); + ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata); + return new SinglePartitionReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter); + } + } +}