Repository: cassandra Updated Branches: refs/heads/trunk 2fe34badb -> b11fba750
Optimize disk seek using min/max column name meta data when the LIMIT clause is used patch by Stefania Alborghetti; reviewed blambov for CASSANDRA-8180 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b11fba75 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b11fba75 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b11fba75 Branch: refs/heads/trunk Commit: b11fba750c610de5e97acba070cc571cf0a96416 Parents: 2fe34ba Author: Stefania Alborghetti <[email protected]> Authored: Fri Mar 20 08:02:39 2015 +0800 Committer: Aleksey Yeschenko <[email protected]> Committed: Wed Feb 10 13:14:56 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../cassandra/config/DatabaseDescriptor.java | 6 + .../db/SinglePartitionReadCommand.java | 144 +++--- .../columniterator/AbstractSSTableIterator.java | 5 +- .../LazilyInitializedUnfilteredRowIterator.java | 7 +- .../UnfilteredRowIteratorWithLowerBound.java | 212 +++++++++ .../io/sstable/format/SSTableReader.java | 16 + .../apache/cassandra/utils/IMergeIterator.java | 1 + .../cassandra/utils/IteratorWithLowerBound.java | 24 + .../apache/cassandra/utils/MergeIterator.java | 47 +- .../org/apache/cassandra/cql3/CQLTester.java | 23 +- .../validation/entities/StaticColumnsTest.java | 10 + .../miscellaneous/SSTablesIteratedTest.java | 455 +++++++++++++++++++ .../cql3/validation/operations/DeleteTest.java | 60 ++- .../cql3/validation/operations/InsertTest.java | 6 - .../cql3/validation/operations/UpdateTest.java | 6 - 16 files changed, 929 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6cd4cf5..e6067a6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 3.4 + * Optimize disk seek using min/max column name meta data when the LIMIT clause is used + (CASSANDRA-8180) * Add LIKE support to CQL3 (CASSANDRA-11067) * Generic Java UDF types (CASSANDRA-10819) * cqlsh: Include sub-second precision in timestamps by default (CASSANDRA-10428) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index b09605f..5c2a5c9 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -885,6 +885,12 @@ public class DatabaseDescriptor return conf.column_index_size_in_kb * 1024; } + @VisibleForTesting + public static void setColumnIndexSize(int val) + { + conf.column_index_size_in_kb = val; + } + public static int getBatchSizeWarnThreshold() { return conf.batch_size_warn_threshold_in_kb * 1024; http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 680b4b5..1a0b400 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -36,6 +36,7 @@ import org.apache.cassandra.db.lifecycle.*; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataInputPlus; @@ -54,7 +55,6 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.SearchIterator; import org.apache.cassandra.utils.btree.BTreeSet; import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.utils.memory.HeapAllocator; /** @@ -487,9 +487,9 @@ public class SinglePartitionReadCommand extends ReadCommand Tracing.trace("Acquiring sstable references"); ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey())); - List<UnfilteredRowIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size()); ClusteringIndexFilter filter = clusteringIndexFilter(); + long minTimestamp = Long.MAX_VALUE; try { @@ -499,11 +499,14 @@ public class SinglePartitionReadCommand extends ReadCommand if (partition == null) continue; + minTimestamp = Math.min(minTimestamp, memtable.getMinTimestamp()); + @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition); oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime); iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter); } + /* * We can't eliminate full sstables based on the timestamp of what we've already read like * in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone @@ -516,16 +519,13 @@ public class SinglePartitionReadCommand extends ReadCommand * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination * in one pass, and minimize the number of sstables for which we read a partition tombstone. */ - int sstablesIterated = 0; Collections.sort(view.sstables, SSTableReader.maxTimestampComparator); - List<SSTableReader> skippedSSTables = null; long mostRecentPartitionTombstone = Long.MIN_VALUE; - long minTimestamp = Long.MAX_VALUE; int nonIntersectingSSTables = 0; + List<SSTableReader> skippedSSTablesWithTombstones = null; for (SSTableReader sstable : view.sstables) { - minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp()); // if we've already seen a partition tombstone with a timestamp greater // than the most recent update to this sstable, we can skip it if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone) @@ -534,73 +534,55 @@ public class SinglePartitionReadCommand extends ReadCommand if (!shouldInclude(sstable)) { nonIntersectingSSTables++; - // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely - if (sstable.getSSTableMetadata().maxLocalDeletionTime != Integer.MAX_VALUE) - { - if (skippedSSTables == null) - skippedSSTables = new ArrayList<>(); - skippedSSTables.add(sstable); + if (sstable.hasTombstones()) + { // if sstable has tombstones we need to check after one pass if it can be safely skipped + if (skippedSSTablesWithTombstones == null) + skippedSSTablesWithTombstones = new ArrayList<>(); + skippedSSTablesWithTombstones.add(sstable); } continue; } - sstable.incrementReadCount(); - @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator - UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), columnFilter(), filter.isReversed(), isForThrift()); + minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp()); + + @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, + // or through the closing of the final merged iterator + UnfilteredRowIteratorWithLowerBound iter = makeIterator(sstable, true); if (!sstable.isRepaired()) oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); - iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter); - mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, iter.partitionLevelDeletion().markedForDeleteAt()); - sstablesIterated++; + iterators.add(iter); + mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, + iter.partitionLevelDeletion().markedForDeleteAt()); } int includedDueToTombstones = 0; - // Check for partition tombstones in the skipped sstables - if (skippedSSTables != null) + // Check for sstables with tombstones that are not expired + if (skippedSSTablesWithTombstones != null) { - for (SSTableReader sstable : skippedSSTables) + for (SSTableReader sstable : skippedSSTablesWithTombstones) { if (sstable.getMaxTimestamp() <= minTimestamp) continue; - sstable.incrementReadCount(); - @SuppressWarnings("resource") // 'iter' is either closed right away, or added to iterators which is close on exception, or through the closing of the final merged iterator - UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), columnFilter(), filter.isReversed(), isForThrift()); - if (iter.partitionLevelDeletion().markedForDeleteAt() > minTimestamp) - { - iterators.add(iter); - if (!sstable.isRepaired()) - oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); - includedDueToTombstones++; - sstablesIterated++; - } - else - { - iter.close(); - } + @SuppressWarnings("resource") // 'iter' is added to iterators which is close on exception, + // or through the closing of the final merged iterator + UnfilteredRowIteratorWithLowerBound iter = makeIterator(sstable, false); + if (!sstable.isRepaired()) + oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); + + iterators.add(iter); + includedDueToTombstones++; } } if (Tracing.isTracing()) Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones", - nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones); - - cfs.metric.updateSSTableIterated(sstablesIterated); + nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones); if (iterators.isEmpty()) return EmptyIterators.unfilteredRow(cfs.metadata, partitionKey(), filter.isReversed()); - Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated); - - @SuppressWarnings("resource") // Closed through the closing of the result of that method. - UnfilteredRowIterator merged = UnfilteredRowIterators.merge(iterators, nowInSec()); - if (!merged.isEmpty()) - { - DecoratedKey key = merged.partitionKey(); - cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1); - } - - return withStateTracking(merged); + return withStateTracking(withSSTablesIterated(iterators, cfs.metric)); } catch (RuntimeException | Error e) { @@ -627,6 +609,50 @@ public class SinglePartitionReadCommand extends ReadCommand return clusteringIndexFilter().shouldInclude(sstable); } + private UnfilteredRowIteratorWithLowerBound makeIterator(final SSTableReader sstable, boolean applyThriftTransformation) + { + return new UnfilteredRowIteratorWithLowerBound(partitionKey(), + sstable, + clusteringIndexFilter(), + columnFilter(), + isForThrift(), + nowInSec(), + applyThriftTransformation); + } + + /** + * Return a wrapped iterator that when closed will update the sstables iterated and READ sample metrics. + * Note that we cannot use the Transformations framework because they greedily get the static row, which + * would cause all iterators to be initialized and hence all sstables to be accessed. + */ + private UnfilteredRowIterator withSSTablesIterated(List<UnfilteredRowIterator> iterators, + TableMetrics metrics) + { + @SuppressWarnings("resource") // Closed through the closing of the result of the caller method. + UnfilteredRowIterator merged = UnfilteredRowIterators.merge(iterators, nowInSec()); + + if (!merged.isEmpty()) + { + DecoratedKey key = merged.partitionKey(); + metrics.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1); + } + + class UpdateSstablesIterated extends Transformation + { + public void onPartitionClose() + { + int sstablesIterated = (int)iterators.stream() + .filter(it -> it instanceof LazilyInitializedUnfilteredRowIterator) + .filter(it -> ((LazilyInitializedUnfilteredRowIterator)it).initialized()) + .count(); + + metrics.updateSSTableIterated(sstablesIterated); + Tracing.trace("Merged data from memtables and {} sstables", sstablesIterated); + } + }; + return Transformation.apply(merged, new UpdateSstablesIterated()); + } + private boolean queryNeitherCountersNorCollections() { for (ColumnDefinition column : columnFilter().fetchedColumns()) @@ -693,8 +719,8 @@ public class SinglePartitionReadCommand extends ReadCommand // however: if it is set, it impacts everything and must be included. Getting that top-level partition deletion costs us // some seek in general however (unless the partition is indexed and is in the key cache), so we first check if the sstable // has any tombstone at all as a shortcut. - if (sstable.getSSTableMetadata().maxLocalDeletionTime == Integer.MAX_VALUE) - continue; // Means no tombstone at all, we can skip that sstable + if (!sstable.hasTombstones()) + continue; // no tombstone at all, we can skip that sstable // We need to get the partition deletion and include it if it's live. In any case though, we're done with that sstable. sstable.incrementReadCount(); @@ -711,7 +737,7 @@ public class SinglePartitionReadCommand extends ReadCommand Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation); sstable.incrementReadCount(); - try (UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), columnFilter(), filter.isReversed(), isForThrift());) + try (UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), columnFilter(), filter.isReversed(), isForThrift())) { if (iter.isEmpty()) continue; @@ -741,13 +767,9 @@ public class SinglePartitionReadCommand extends ReadCommand try (UnfilteredRowIterator iter = result.unfilteredIterator(columnFilter(), Slices.ALL, false)) { final Mutation mutation = new Mutation(PartitionUpdate.fromIterator(iter, columnFilter())); - StageManager.getStage(Stage.MUTATION).execute(new Runnable() - { - public void run() - { - // skipping commitlog and index updates is fine since we're just de-fragmenting existing data - Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false); - } + StageManager.getStage(Stage.MUTATION).execute(() -> { + // skipping commitlog and index updates is fine since we're just de-fragmenting existing data + Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false); }); } } @@ -909,7 +931,7 @@ public class SinglePartitionReadCommand extends ReadCommand public static Group one(SinglePartitionReadCommand command) { - return new Group(Collections.<SinglePartitionReadCommand>singletonList(command), command.limits()); + return new Group(Collections.singletonList(command), command.limits()); } public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java index 792f5ad..d55161b 100644 --- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java @@ -18,7 +18,6 @@ package org.apache.cassandra.db.columniterator; import java.io.IOException; -import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -217,9 +216,7 @@ abstract class AbstractSSTableIterator implements UnfilteredRowIterator public EncodingStats stats() { - // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see - // SerializationHeader.make() for details) so we use the latter instead. - return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL()); + return sstable.stats(); } public boolean hasNext() http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java index 1bf78dd..fc5bdbe 100644 --- a/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java +++ b/src/java/org/apache/cassandra/db/rows/LazilyInitializedUnfilteredRowIterator.java @@ -42,12 +42,17 @@ public abstract class LazilyInitializedUnfilteredRowIterator extends AbstractIte protected abstract UnfilteredRowIterator initializeIterator(); - private void maybeInit() + protected void maybeInit() { if (iterator == null) iterator = initializeIterator(); } + public boolean initialized() + { + return iterator != null; + } + public CFMetaData metadata() { maybeInit(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java new file mode 100644 index 0000000..4f55677 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorWithLowerBound.java @@ -0,0 +1,212 @@ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; +import java.util.Comparator; +import java.util.List; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.io.sstable.IndexHelper; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.metadata.StatsMetadata; +import org.apache.cassandra.thrift.ThriftResultsMerger; +import org.apache.cassandra.utils.IteratorWithLowerBound; + +/** + * An unfiltered row iterator with a lower bound retrieved from either the global + * sstable statistics or the row index lower bounds (if available in the cache). + * Before initializing the sstable unfiltered row iterator, we return an empty row + * with the clustering set to the lower bound. The empty row will be filtered out and + * the result is that if we don't need to access this sstable, i.e. due to the LIMIT conditon, + * then we will not. See CASSANDRA-8180 for examples of why this is useful. + */ +public class UnfilteredRowIteratorWithLowerBound extends LazilyInitializedUnfilteredRowIterator implements IteratorWithLowerBound<Unfiltered> +{ + private final SSTableReader sstable; + private final ClusteringIndexFilter filter; + private final ColumnFilter selectedColumns; + private final boolean isForThrift; + private final int nowInSec; + private final boolean applyThriftTransformation; + private RangeTombstone.Bound lowerBound; + private boolean firstItemRetrieved; + + public UnfilteredRowIteratorWithLowerBound(DecoratedKey partitionKey, + SSTableReader sstable, + ClusteringIndexFilter filter, + ColumnFilter selectedColumns, + boolean isForThrift, + int nowInSec, + boolean applyThriftTransformation) + { + super(partitionKey); + this.sstable = sstable; + this.filter = filter; + this.selectedColumns = selectedColumns; + this.isForThrift = isForThrift; + this.nowInSec = nowInSec; + this.applyThriftTransformation = applyThriftTransformation; + this.lowerBound = null; + this.firstItemRetrieved = false; + } + + public Unfiltered lowerBound() + { + if (lowerBound != null) + return makeBound(lowerBound); + + // The partition index lower bound is more accurate than the sstable metadata lower bound but it is only + // present if the iterator has already been initialized, which we only do when there are tombstones since in + // this case we cannot use the sstable metadata clustering values + RangeTombstone.Bound ret = getPartitionIndexLowerBound(); + return ret != null ? makeBound(ret) : makeBound(getMetadataLowerBound()); + } + + private Unfiltered makeBound(RangeTombstone.Bound bound) + { + if (bound == null) + return null; + + if (lowerBound != bound) + lowerBound = bound; + + return new RangeTombstoneBoundMarker(lowerBound, DeletionTime.LIVE); + } + + @Override + protected UnfilteredRowIterator initializeIterator() + { + sstable.incrementReadCount(); + + @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator + UnfilteredRowIterator iter = sstable.iterator(partitionKey(), filter.getSlices(metadata()), selectedColumns, filter.isReversed(), isForThrift); + return isForThrift && applyThriftTransformation + ? ThriftResultsMerger.maybeWrap(iter, nowInSec) + : iter; + } + + @Override + protected Unfiltered computeNext() + { + Unfiltered ret = super.computeNext(); + if (firstItemRetrieved) + return ret; + + // Check that the lower bound is not bigger than the first item retrieved + firstItemRetrieved = true; + if (lowerBound != null && ret != null) + assert comparator().compare(lowerBound, ret.clustering()) <= 0 + : String.format("Lower bound [%s ]is bigger than first returned value [%s] for sstable %s", + lowerBound.toString(sstable.metadata), + ret.toString(sstable.metadata), + sstable.getFilename()); + + return ret; + } + + private Comparator<Clusterable> comparator() + { + return filter.isReversed() ? sstable.metadata.comparator.reversed() : sstable.metadata.comparator; + } + + @Override + public CFMetaData metadata() + { + return sstable.metadata; + } + + @Override + public boolean isReverseOrder() + { + return filter.isReversed(); + } + + @Override + public PartitionColumns columns() + { + return selectedColumns.fetchedColumns(); + } + + @Override + public EncodingStats stats() + { + return sstable.stats(); + } + + @Override + public DeletionTime partitionLevelDeletion() + { + if (!sstable.hasTombstones()) + return DeletionTime.LIVE; + + return super.partitionLevelDeletion(); + } + + @Override + public Row staticRow() + { + if (columns().statics.isEmpty()) + return Rows.EMPTY_STATIC_ROW; + + return super.staticRow(); + } + + /** + * @return the lower bound stored on the index entry for this partition, if available. + */ + private RangeTombstone.Bound getPartitionIndexLowerBound() + { + // Creating the iterator ensures that rowIndexEntry is loaded if available (partitions bigger than + // DatabaseDescriptor.column_index_size_in_kb) + if (!canUseMetadataLowerBound()) + maybeInit(); + + RowIndexEntry rowIndexEntry = sstable.getCachedPosition(partitionKey(), false); + if (rowIndexEntry == null) + return null; + + List<IndexHelper.IndexInfo> columns = rowIndexEntry.columnsIndex(); + if (columns.size() == 0) + return null; + + IndexHelper.IndexInfo column = columns.get(filter.isReversed() ? columns.size() - 1 : 0); + ClusteringPrefix lowerBoundPrefix = filter.isReversed() ? column.lastName : column.firstName; + assert lowerBoundPrefix.getRawValues().length <= sstable.metadata.comparator.size() : + String.format("Unexpected number of clustering values %d, expected %d or fewer for %s", + lowerBoundPrefix.getRawValues().length, + sstable.metadata.comparator.size(), + sstable.getFilename()); + return RangeTombstone.Bound.inclusiveOpen(filter.isReversed(), lowerBoundPrefix.getRawValues()); + } + + /** + * @return true if we can use the clustering values in the stats of the sstable: + * - we need the latest stats file format (or else the clustering values create clusterings with the wrong size) + * - we cannot create tombstone bounds from these values only and so we rule out sstables with tombstones + */ + private boolean canUseMetadataLowerBound() + { + return !sstable.hasTombstones() && sstable.descriptor.version.hasNewStatsFile(); + } + + /** + * @return a global lower bound made from the clustering values stored in the sstable metadata, note that + * this currently does not correctly compare tombstone bounds, especially ranges. + */ + private RangeTombstone.Bound getMetadataLowerBound() + { + if (!canUseMetadataLowerBound()) + return null; + + final StatsMetadata m = sstable.getSSTableMetadata(); + List<ByteBuffer> vals = filter.isReversed() ? m.maxClusteringValues : m.minClusteringValues; + assert vals.size() <= sstable.metadata.comparator.size() : + String.format("Unexpected number of clustering values %d, expected %d or fewer for %s", + vals.size(), + sstable.metadata.comparator.size(), + sstable.getFilename()); + return RangeTombstone.Bound.inclusiveOpen(filter.isReversed(), vals.toArray(new ByteBuffer[vals.size()])); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index e152540..495d831 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -47,6 +47,7 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Range; @@ -1922,6 +1923,14 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS return sstableMetadata.maxLocalDeletionTime; } + /** sstable contains no tombstones if maxLocalDeletionTime == Integer.MAX_VALUE */ + public boolean hasTombstones() + { + // sstable contains no tombstone if minLocalDeletionTime is still set to the default value Integer.MAX_VALUE + // which is bigger than any valid deletion times + return getMinLocalDeletionTime() != Integer.MAX_VALUE; + } + public int getMinTTL() { return sstableMetadata.minTTL; @@ -2072,6 +2081,13 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS } } + public EncodingStats stats() + { + // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see + // SerializationHeader.make() for details) so we use the latter instead. + return new EncodingStats(getMinTimestamp(), getMinLocalDeletionTime(), getMinTTL()); + } + public Ref<SSTableReader> tryRef() { return selfRef.tryRef(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/utils/IMergeIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/IMergeIterator.java b/src/java/org/apache/cassandra/utils/IMergeIterator.java index deddc4c..e45b897 100644 --- a/src/java/org/apache/cassandra/utils/IMergeIterator.java +++ b/src/java/org/apache/cassandra/utils/IMergeIterator.java @@ -21,5 +21,6 @@ import java.util.Iterator; public interface IMergeIterator<In, Out> extends CloseableIterator<Out> { + Iterable<? extends Iterator<In>> iterators(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/utils/IteratorWithLowerBound.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/IteratorWithLowerBound.java b/src/java/org/apache/cassandra/utils/IteratorWithLowerBound.java new file mode 100644 index 0000000..85eeede --- /dev/null +++ b/src/java/org/apache/cassandra/utils/IteratorWithLowerBound.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + +public interface IteratorWithLowerBound<In> +{ + In lowerBound(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/src/java/org/apache/cassandra/utils/MergeIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/MergeIterator.java b/src/java/org/apache/cassandra/utils/MergeIterator.java index 70daad9..c9e445b 100644 --- a/src/java/org/apache/cassandra/utils/MergeIterator.java +++ b/src/java/org/apache/cassandra/utils/MergeIterator.java @@ -200,7 +200,7 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem reducer.onKeyChange(); assert !heap[0].equalParent; - reducer.reduce(heap[0].idx, heap[0].consume()); + heap[0].consume(reducer); final int size = this.size; final int sortedSectionSize = Math.min(size, SORTED_SECTION_SIZE); int i; @@ -209,7 +209,7 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem { if (!heap[i].equalParent) break consume; - reducer.reduce(heap[i].idx, heap[i].consume()); + heap[i].consume(reducer); } i = Math.max(i, consumeHeap(i) + 1); } @@ -227,7 +227,7 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem if (idx >= size || !heap[idx].equalParent) return -1; - reducer.reduce(heap[idx].idx, heap[idx].consume()); + heap[idx].consume(reducer); int nextIdx = (idx << 1) - (SORTED_SECTION_SIZE - 1); return Math.max(idx, Math.max(consumeHeap(nextIdx), consumeHeap(nextIdx + 1))); } @@ -351,6 +351,7 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem private final Comparator<? super In> comp; private final int idx; private In item; + private In lowerBound; boolean equalParent; public Candidate(int idx, Iterator<? extends In> iter, Comparator<? super In> comp) @@ -358,29 +359,55 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem this.iter = iter; this.comp = comp; this.idx = idx; + this.lowerBound = iter instanceof IteratorWithLowerBound ? ((IteratorWithLowerBound<In>)iter).lowerBound() : null; } /** @return this if our iterator had an item, and it is now available, otherwise null */ protected Candidate<In> advance() { + if (lowerBound != null) + { + item = lowerBound; + return this; + } + if (!iter.hasNext()) return null; + item = iter.next(); return this; } public int compareTo(Candidate<In> that) { - assert item != null && that.item != null; - return comp.compare(this.item, that.item); + assert this.item != null && that.item != null; + int ret = comp.compare(this.item, that.item); + if (ret == 0 && (this.isLowerBound() ^ that.isLowerBound())) + { // if the items are equal and one of them is a lower bound (but not the other one) + // then ensure the lower bound is less than the real item so we can safely + // skip lower bounds when consuming + return this.isLowerBound() ? -1 : 1; + } + return ret; + } + + private boolean isLowerBound() + { + return item == lowerBound; } - public In consume() + public void consume(Reducer reducer) { - In temp = item; - item = null; - assert temp != null; - return temp; + if (isLowerBound()) + { + item = null; + lowerBound = null; + } + else + { + reducer.reduce(idx, item); + item = null; + } } public boolean needsAdvance() http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 71bc238..3c9cbbd 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -367,6 +367,12 @@ public abstract class CQLTester : Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable); } + public void flush(boolean forceFlush) + { + if (forceFlush) + flush(); + } + public void flush() { ColumnFamilyStore store = getCurrentColumnFamilyStore(); @@ -374,6 +380,12 @@ public abstract class CQLTester store.forceBlockingFlush(); } + public void disableCompaction() + { + ColumnFamilyStore store = getCurrentColumnFamilyStore(); + store.disableAutoCompaction(); + } + public void compact() { try @@ -809,8 +821,17 @@ public abstract class CQLTester { while (iter.hasNext()) { - iter.next(); + UntypedResultSet.Row actual = iter.next(); i++; + + StringBuilder str = new StringBuilder(); + for (int j = 0; j < meta.size(); j++) + { + ColumnSpecification column = meta.get(j); + ByteBuffer actualValue = actual.getBytes(column.name.toString()); + str.append(String.format("%s=%s ", column.name, formatValue(actualValue, column.type))); + } + logger.info("Extra row num {}: {}", i, str.toString()); } Assert.fail(String.format("Got more rows than expected. Expected %d but got %d.", rows.length, i)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java index cef6f1f..db7487e 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/StaticColumnsTest.java @@ -37,9 +37,16 @@ public class StaticColumnsTest extends CQLTester @Test public void testStaticColumns() throws Throwable { + testStaticColumns(false); + testStaticColumns(true); + } + + private void testStaticColumns(boolean forceFlush) throws Throwable + { createTable("CREATE TABLE %s ( k int, p int, s int static, v int, PRIMARY KEY (k, p))"); execute("INSERT INTO %s(k, s) VALUES (0, 42)"); + flush(forceFlush); assertRows(execute("SELECT * FROM %s"), row(0, null, 42, null)); @@ -51,6 +58,7 @@ public class StaticColumnsTest extends CQLTester execute("INSERT INTO %s (k, p, s, v) VALUES (0, 0, 12, 0)"); execute("INSERT INTO %s (k, p, s, v) VALUES (0, 1, 24, 1)"); + flush(forceFlush); // Check the static columns in indeed "static" assertRows(execute("SELECT * FROM %s"), row(0, 0, 24, 0), row(0, 1, 24, 1)); @@ -81,10 +89,12 @@ public class StaticColumnsTest extends CQLTester // Check that deleting a row don't implicitely deletes statics execute("DELETE FROM %s WHERE k=0 AND p=0"); + flush(forceFlush); assertRows(execute("SELECT * FROM %s"),row(0, 1, 24, 1)); // But that explicitely deleting the static column does remove it execute("DELETE s FROM %s WHERE k=0"); + flush(forceFlush); assertRows(execute("SELECT * FROM %s"), row(0, 1, null, 1)); // Check we can add a static column ... http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java new file mode 100644 index 0000000..720108a --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/SSTablesIteratedTest.java @@ -0,0 +1,455 @@ +package org.apache.cassandra.cql3.validation.miscellaneous; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.metrics.ClearableHistogram; + +/** + * Tests for checking how many sstables we access during cql queries with LIMIT specified, + * see CASSANDRA-8180. + */ +public class SSTablesIteratedTest extends CQLTester +{ + private void executeAndCheck(String query, int numSSTables, Object[]... rows) throws Throwable + { + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + + ((ClearableHistogram) cfs.metric.sstablesPerReadHistogram.cf).clear(); // resets counts + + assertRows(execute(query), rows); + + assertEquals(numSSTables, cfs.metric.sstablesPerReadHistogram.cf.getSnapshot().getMax()); // max sstables read + } + + @Override + protected String createTable(String query) + { + String ret = super.createTable(query); + disableCompaction(); + return ret; + } + + @Test + public void testSSTablesOnlyASC() throws Throwable + { + createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col ASC)"); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10"); + flush(); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20"); + flush(); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30"); + flush(); + + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 10, "10")); + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 10, "10"), row(1, 20, "20")); + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 3, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30")); + executeAndCheck("SELECT * FROM %s WHERE id=1", 3, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30")); + + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1, row(1, 30, "30")); + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 1, row(1, 10, "10")); + } + + @Test + public void testMixedMemtableSStablesASC() throws Throwable + { + createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col ASC)"); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30"); + flush(); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20"); + flush(); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10"); + + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 0, row(1, 10, "10")); + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 1, row(1, 10, "10"), row(1, 20, "20")); + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 2, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30")); + executeAndCheck("SELECT * FROM %s WHERE id=1", 2, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30")); + + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1, row(1, 30, "30")); + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 0, row(1, 10, "10")); + } + + @Test + public void testOverlappingSStablesASC() throws Throwable + { + createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col ASC)"); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10"); + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30"); + flush(); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20"); + flush(); + + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 10, "10")); + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 10, "10"), row(1, 20, "20")); + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 2, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30")); + executeAndCheck("SELECT * FROM %s WHERE id=1", 2, row(1, 10, "10"), row(1, 20, "20"), row(1, 30, "30")); + + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1, row(1, 30, "30")); + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 1, row(1, 10, "10")); + } + + @Test + public void testSSTablesOnlyDESC() throws Throwable + { + createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)"); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10"); + flush(); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20"); + flush(); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30"); + flush(); + + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 30, "30")); + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 30, "30"), row(1, 20, "20")); + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 3, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10")); + executeAndCheck("SELECT * FROM %s WHERE id=1", 3, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10")); + + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1, row(1, 30, "30")); + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 1, row(1, 30, "30")); + } + + @Test + public void testMixedMemtableSStablesDESC() throws Throwable + { + createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)"); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10"); + flush(); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20"); + flush(); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30"); + + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 0, row(1, 30, "30")); + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 1, row(1, 30, "30"), row(1, 20, "20")); + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 2, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10")); + executeAndCheck("SELECT * FROM %s WHERE id=1", 2, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10")); + + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 0, row(1, 30, "30")); + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 0, row(1, 30, "30")); + } + + @Test + public void testOverlappingSStablesDESC() throws Throwable + { + createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)"); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10"); + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30"); + flush(); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20"); + flush(); + + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 30, "30")); + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 30, "30"), row(1, 20, "20")); + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 2, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10")); + executeAndCheck("SELECT * FROM %s WHERE id=1", 2, row(1, 30, "30"), row(1, 20, "20"), row(1, 10, "10")); + + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1, row(1, 30, "30")); + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 1, row(1, 30, "30")); + } + + @Test + public void testDeletionOnDifferentSSTables() throws Throwable + { + createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)"); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10"); + flush(); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20"); + flush(); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30"); + flush(); + + execute("DELETE FROM %s WHERE id=1 and col=30"); + flush(); + + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 3, row(1, 20, "20")); + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 4, row(1, 20, "20"), row(1, 10, "10")); + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 4, row(1, 20, "20"), row(1, 10, "10")); + executeAndCheck("SELECT * FROM %s WHERE id=1", 4, row(1, 20, "20"), row(1, 10, "10")); + + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 2); + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 3, row(1, 20, "20")); + } + + @Test + public void testDeletionOnSameSSTable() throws Throwable + { + createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)"); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10"); + flush(); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20"); + flush(); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30"); + execute("DELETE FROM %s WHERE id=1 and col=30"); + flush(); + + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 2, row(1, 20, "20")); + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 3, row(1, 20, "20"), row(1, 10, "10")); + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 3, row(1, 20, "20"), row(1, 10, "10")); + executeAndCheck("SELECT * FROM %s WHERE id=1", 3, row(1, 20, "20"), row(1, 10, "10")); + + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 1); + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 2, row(1, 20, "20")); + } + + @Test + public void testDeletionOnMemTable() throws Throwable + { + createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)"); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10"); + flush(); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20"); + flush(); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30"); + execute("DELETE FROM %s WHERE id=1 and col=30"); + + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 20, "20")); + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 20, "20"), row(1, 10, "10")); + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 3", 2, row(1, 20, "20"), row(1, 10, "10")); + executeAndCheck("SELECT * FROM %s WHERE id=1", 2, row(1, 20, "20"), row(1, 10, "10")); + + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 25 LIMIT 1", 0); + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col < 40 LIMIT 1", 1, row(1, 20, "20")); + } + + @Test + public void testDeletionOnIndexedSSTableDESC() throws Throwable + { + testDeletionOnIndexedSSTableDESC(true); + testDeletionOnIndexedSSTableDESC(false); + } + + private void testDeletionOnIndexedSSTableDESC(boolean deleteWithRange) throws Throwable + { + // reduce the column index size so that columns get indexed during flush + DatabaseDescriptor.setColumnIndexSize(1); + + createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)"); + + for (int i = 1; i <= 1000; i++) + { + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, i, Integer.toString(i)); + } + flush(); + + Object[][] allRows = new Object[1000][]; + for (int i = 1001; i <= 2000; i++) + { + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, i, Integer.toString(i)); + allRows[2000 - i] = row(1, i, Integer.toString(i)); + } + + if (deleteWithRange) + { + execute("DELETE FROM %s WHERE id=1 and col <= ?", 1000); + } + else + { + for (int i = 1; i <= 1000; i++) + execute("DELETE FROM %s WHERE id=1 and col = ?", i); + } + flush(); + + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 1, row(1, 2000, "2000")); + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 1, row(1, 2000, "2000"), row(1, 1999, "1999")); + + executeAndCheck("SELECT * FROM %s WHERE id=1", 2, allRows); + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 1000 LIMIT 1", 1, row(1, 2000, "2000")); + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 2000 LIMIT 1", 1, row(1, 2000, "2000")); + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 1000", 1, allRows); + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 2000", 2, allRows); + } + + @Test + public void testDeletionOnIndexedSSTableASC() throws Throwable + { + testDeletionOnIndexedSSTableASC(true); + testDeletionOnIndexedSSTableASC(false); + } + + private void testDeletionOnIndexedSSTableASC(boolean deleteWithRange) throws Throwable + { + // reduce the column index size so that columns get indexed during flush + DatabaseDescriptor.setColumnIndexSize(1); + + createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col ASC)"); + + for (int i = 1; i <= 1000; i++) + { + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, i, Integer.toString(i)); + } + flush(); + + Object[][] allRows = new Object[1000][]; + for (int i = 1001; i <= 2000; i++) + { + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, i, Integer.toString(i)); + allRows[i - 1001] = row(1, i, Integer.toString(i)); + } + flush(); + + if (deleteWithRange) + { + execute("DELETE FROM %s WHERE id =1 and col <= ?", 1000); + } + else + { + for (int i = 1; i <= 1000; i++) + execute("DELETE FROM %s WHERE id=1 and col = ?", i); + } + flush(); + + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 3, row(1, 1001, "1001")); + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 3, row(1, 1001, "1001"), row(1, 1002, "1002")); + + executeAndCheck("SELECT * FROM %s WHERE id=1", 3, allRows); + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 1000 LIMIT 1", 2, row(1, 1001, "1001")); + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 2000 LIMIT 1", 3, row(1, 1001, "1001")); + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 1000", 2, allRows); + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 2000", 3, allRows); + } + + @Test + public void testDeletionOnOverlappingIndexedSSTable() throws Throwable + { + testDeletionOnOverlappingIndexedSSTable(true); + testDeletionOnOverlappingIndexedSSTable(false); + } + + private void testDeletionOnOverlappingIndexedSSTable(boolean deleteWithRange) throws Throwable + { + // reduce the column index size so that columns get indexed during flush + DatabaseDescriptor.setColumnIndexSize(1); + + createTable("CREATE TABLE %s (id int, col int, val1 text, val2 text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col ASC)"); + + for (int i = 1; i <= 500; i++) + { + if (i % 2 == 0) + execute("INSERT INTO %s (id, col, val1) VALUES (?, ?, ?)", 1, i, Integer.toString(i)); + else + execute("INSERT INTO %s (id, col, val1, val2) VALUES (?, ?, ?, ?)", 1, i, Integer.toString(i), Integer.toString(i)); + } + + for (int i = 1001; i <= 1500; i++) + { + if (i % 2 == 0) + execute("INSERT INTO %s (id, col, val1) VALUES (?, ?, ?)", 1, i, Integer.toString(i)); + else + execute("INSERT INTO %s (id, col, val1, val2) VALUES (?, ?, ?, ?)", 1, i, Integer.toString(i), Integer.toString(i)); + } + + flush(); + + for (int i = 501; i <= 1000; i++) + { + if (i % 2 == 0) + execute("INSERT INTO %s (id, col, val1) VALUES (?, ?, ?)", 1, i, Integer.toString(i)); + else + execute("INSERT INTO %s (id, col, val1, val2) VALUES (?, ?, ?, ?)", 1, i, Integer.toString(i), Integer.toString(i)); + } + + for (int i = 1501; i <= 2000; i++) + { + if (i % 2 == 0) + execute("INSERT INTO %s (id, col, val1) VALUES (?, ?, ?)", 1, i, Integer.toString(i)); + else + execute("INSERT INTO %s (id, col, val1, val2) VALUES (?, ?, ?, ?)", 1, i, Integer.toString(i), Integer.toString(i)); + } + + if (deleteWithRange) + { + execute("DELETE FROM %s WHERE id=1 and col > ? and col <= ?", 250, 750); + } + else + { + for (int i = 251; i <= 750; i++) + execute("DELETE FROM %s WHERE id=1 and col = ?", i); + } + + flush(); + + Object[][] allRows = new Object[1500][]; // non deleted rows + for (int i = 1; i <= 2000; i++) + { + if (i > 250 && i <= 750) + continue; // skip deleted records + + int idx = (i <= 250 ? i - 1 : i - 501); + + if (i % 2 == 0) + allRows[idx] = row(1, i, Integer.toString(i), null); + else + allRows[idx] = row(1, i, Integer.toString(i), Integer.toString(i)); + } + + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 1", 2, row(1, 1, "1", "1")); + executeAndCheck("SELECT * FROM %s WHERE id=1 LIMIT 2", 2, row(1, 1, "1", "1"), row(1, 2, "2", null)); + + executeAndCheck("SELECT * FROM %s WHERE id=1", 2, allRows); + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 1000 LIMIT 1", 2, row(1, 1001, "1001", "1001")); + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 2000 LIMIT 1", 2, row(1, 1, "1", "1")); + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col > 500 LIMIT 1", 2, row(1, 751, "751", "751")); + executeAndCheck("SELECT * FROM %s WHERE id=1 AND col <= 500 LIMIT 1", 2, row(1, 1, "1", "1")); + } + + @Test + public void testMultiplePartitionsDESC() throws Throwable + { + createTable("CREATE TABLE %s (id int, col int, val text, PRIMARY KEY (id, col)) WITH CLUSTERING ORDER BY (col DESC)"); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 10, "10"); + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 2, 10, "10"); + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 3, 10, "10"); + flush(); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 20, "20"); + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 2, 20, "20"); + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 3, 20, "20"); + flush(); + + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 1, 30, "30"); + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 2, 30, "30"); + execute("INSERT INTO %s (id, col, val) VALUES (?, ?, ?)", 3, 30, "30"); + flush(); + + for (int i = 1; i <= 3; i++) + { + String base = "SELECT * FROM %s "; + + executeAndCheck(base + String.format("WHERE id=%d LIMIT 1", i), 1, row(i, 30, "30")); + executeAndCheck(base + String.format("WHERE id=%d LIMIT 2", i), 2, row(i, 30, "30"), row(i, 20, "20")); + executeAndCheck(base + String.format("WHERE id=%d LIMIT 3", i), 3, row(i, 30, "30"), row(i, 20, "20"), row(i, 10, "10")); + executeAndCheck(base + String.format("WHERE id=%d", i), 3, row(i, 30, "30"), row(i, 20, "20"), row(i, 10, "10")); + + executeAndCheck(base + String.format("WHERE id=%d AND col > 25 LIMIT 1", i), 1, row(i, 30, "30")); + executeAndCheck(base + String.format("WHERE id=%d AND col < 40 LIMIT 1", i), 1, row(i, 30, "30")); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java index da0bc33..170f85f 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java @@ -326,6 +326,27 @@ public class DeleteTest extends CQLTester assertEmpty(execute("select * from %s where a=1 and b=1")); } + + /** Test that two deleted rows for the same partition but on different sstables do not resurface */ + @Test + public void testDeletedRowsDoNotResurface() throws Throwable + { + createTable("CREATE TABLE %s (a int, b int, c text, primary key (a, b))"); + execute("INSERT INTO %s (a, b, c) VALUES(1, 1, '1')"); + execute("INSERT INTO %s (a, b, c) VALUES(1, 2, '2')"); + execute("INSERT INTO %s (a, b, c) VALUES(1, 3, '3')"); + flush(); + + execute("DELETE FROM %s where a=1 and b = 1"); + flush(); + + execute("DELETE FROM %s where a=1 and b = 2"); + flush(); + + assertRows(execute("SELECT * FROM %s WHERE a = ?", 1), + row(1, 3, "3")); + } + @Test public void testDeleteWithNoClusteringColumns() throws Throwable { @@ -624,6 +645,39 @@ public class DeleteTest extends CQLTester } @Test + public void testDeleteWithNonoverlappingRange() throws Throwable + { + createTable("CREATE TABLE %s (a int, b int, c text, primary key (a, b))"); + + for (int i = 0; i < 10; i++) + execute("INSERT INTO %s (a, b, c) VALUES(1, ?, 'abc')", i); + flush(); + + execute("DELETE FROM %s WHERE a=1 and b <= 3"); + flush(); + + // this query does not overlap the tombstone range above and caused the rows to be resurrected + assertEmpty(execute("SELECT * FROM %s WHERE a=1 and b <= 2")); + } + + @Test + public void testDeleteWithIntermediateRangeAndOneClusteringColumn() throws Throwable + { + createTable("CREATE TABLE %s (a int, b int, c text, primary key (a, b))"); + execute("INSERT INTO %s (a, b, c) VALUES(1, 1, '1')"); + execute("INSERT INTO %s (a, b, c) VALUES(1, 3, '3')"); + execute("DELETE FROM %s where a=1 and b >= 2 and b <= 3"); + execute("INSERT INTO %s (a, b, c) VALUES(1, 2, '2')"); + flush(); + + execute("DELETE FROM %s where a=1 and b >= 2 and b <= 3"); + flush(); + + assertRows(execute("SELECT * FROM %s WHERE a = ?", 1), + row(1, 1, "1")); + } + + @Test public void testDeleteWithRangeAndOneClusteringColumn() throws Throwable { testDeleteWithRangeAndOneClusteringColumn(false); @@ -1057,10 +1111,4 @@ public class DeleteTest extends CQLTester compact(); assertRows(execute("SELECT * FROM %s"), row(0, null)); } - - private void flush(boolean forceFlush) - { - if (forceFlush) - flush(); - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java index 3c49989..ff98f6b 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java @@ -283,10 +283,4 @@ public class InsertTest extends CQLTester assertInvalidMessage("Some clustering keys are missing: clustering_1", "INSERT INTO %s (partitionKey, clustering_2, staticValue) VALUES (0, 0, 'A')"); } - - private void flush(boolean forceFlush) - { - if (forceFlush) - flush(); - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b11fba75/test/unit/org/apache/cassandra/cql3/validation/operations/UpdateTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/UpdateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/UpdateTest.java index b939b7f..2df6fd6 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/UpdateTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/UpdateTest.java @@ -524,10 +524,4 @@ public class UpdateTest extends CQLTester assertRows(execute("SELECT l FROM %s WHERE k = 0"), row(list("v1", "v4", "v3"))); } - - private void flush(boolean forceFlush) - { - if (forceFlush) - flush(); - } }
