Repository: cassandra Updated Branches: refs/heads/trunk d1a552dd7 -> 1657b4fbf
Fix DISTINCT queries w/ limits/paging and tombstoned partitions Patch by Tyler Hobbs; reviewed by Sylvain Lebresne for CASSANDRA-8490 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dd62f7bf Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dd62f7bf Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dd62f7bf Branch: refs/heads/trunk Commit: dd62f7bf7977dd40eedb1c81ab7900b778f84540 Parents: ed54e80 Author: Tyler Hobbs <[email protected]> Authored: Fri Jan 9 11:14:54 2015 -0600 Committer: Tyler Hobbs <[email protected]> Committed: Fri Jan 9 11:14:54 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../cassandra/cql3/statements/SelectStatement.java | 6 +++++- .../org/apache/cassandra/db/AbstractRangeCommand.java | 13 +++++++++++++ .../org/apache/cassandra/db/ColumnFamilyStore.java | 4 +++- src/java/org/apache/cassandra/db/DataRange.java | 12 ++++++++++++ .../org/apache/cassandra/db/filter/ExtendedFilter.java | 6 ++++++ .../apache/cassandra/db/filter/SliceQueryFilter.java | 6 ++++++ .../org/apache/cassandra/service/StorageProxy.java | 13 +++++++------ 8 files changed, 54 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index adb374a..0c7e9a2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 2.0.12: + * Fix DISTINCT queries with LIMITs or paging when some partitions + contain only tombstones (CASSANDRA-8490) * Introduce background cache refreshing to permissions cache (CASSANDRA-8194) * Fix race condition in StreamTransferTask that could lead to http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index f08f6b8..19615b6 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -450,7 +450,11 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache // For distinct, we only care about fetching the beginning of each partition. If we don't have // static columns, we in fact only care about the first cell, so we query only that (we don't "group"). // If we do have static columns, we do need to fetch the first full group (to have the static columns values). - return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, 1, selectsStaticColumns ? toGroup : -1); + + // See the comments on IGNORE_TOMBSTONED_PARTITIONS and CASSANDRA-8490 for why we use a special value for + // DISTINCT queries on the partition key only. + toGroup = selectsStaticColumns ? toGroup : SliceQueryFilter.IGNORE_TOMBSTONED_PARTITIONS; + return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, 1, toGroup); } else if (isColumnRange()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/src/java/org/apache/cassandra/db/AbstractRangeCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AbstractRangeCommand.java b/src/java/org/apache/cassandra/db/AbstractRangeCommand.java index 45302e2..4ddcb8d 100644 --- a/src/java/org/apache/cassandra/db/AbstractRangeCommand.java +++ b/src/java/org/apache/cassandra/db/AbstractRangeCommand.java @@ -57,6 +57,19 @@ public abstract class AbstractRangeCommand implements IReadCommand public abstract int limit(); public abstract boolean countCQL3Rows(); + + /** + * Returns true if tombstoned partitions should not be included in results or count towards the limit. + * See CASSANDRA-8490 for more details on why this is needed (and done this way). + * */ + public boolean ignoredTombstonedPartitions() + { + if (!(predicate instanceof SliceQueryFilter)) + return false; + + return ((SliceQueryFilter) predicate).compositesToGroup == SliceQueryFilter.IGNORE_TOMBSTONED_PARTITIONS; + } + public abstract List<Row> executeLocally(); public long getTimeout() http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 7bd2a59..e936473 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1749,6 +1749,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean List<Row> rows = new ArrayList<Row>(); int columnsCount = 0; int total = 0, matched = 0; + boolean ignoreTombstonedPartitions = filter.ignoreTombstonedPartitions(); try { @@ -1784,7 +1785,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } rows.add(new Row(rawRow.key, data)); - matched++; + if (!ignoreTombstonedPartitions || !data.hasOnlyTombstones(filter.timestamp)) + matched++; if (data != null) columnsCount += filter.lastCounted(data); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/src/java/org/apache/cassandra/db/DataRange.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java index b8b8daf..774a3aa 100644 --- a/src/java/org/apache/cassandra/db/DataRange.java +++ b/src/java/org/apache/cassandra/db/DataRange.java @@ -87,6 +87,18 @@ public class DataRange return keyRange.right; } + /** + * Returns true if tombstoned partitions should not be included in results or count towards the limit. + * See CASSANDRA-8490 for more details on why this is needed (and done this way). + * */ + public boolean ignoredTombstonedPartitions() + { + if (!(columnFilter instanceof SliceQueryFilter)) + return false; + + return ((SliceQueryFilter) columnFilter).compositesToGroup == SliceQueryFilter.IGNORE_TOMBSTONED_PARTITIONS; + } + // Whether the bounds of this DataRange actually wraps around. public boolean isWrapAround() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java index 5c3662b..82e889d 100644 --- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java @@ -127,6 +127,12 @@ public abstract class ExtendedFilter */ public abstract ColumnFamily prune(DecoratedKey key, ColumnFamily data); + /** Returns true if tombstoned partitions should not be included in results or count towards the limit, false otherwise. */ + public boolean ignoreTombstonedPartitions() + { + return dataRange.ignoredTombstonedPartitions(); + } + /** * @return true if the provided data satisfies all the expressions from * the clause of this filter. http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java index 58a0303..858578f 100644 --- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java +++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java @@ -42,6 +42,12 @@ public class SliceQueryFilter implements IDiskAtomFilter private static final Logger logger = LoggerFactory.getLogger(SliceQueryFilter.class); public static final Serializer serializer = new Serializer(); + /** + * A special value for compositesToGroup that indicates that partitioned tombstones should not be included in results + * or count towards the limit. See CASSANDRA-8490 for more details on why this is needed (and done this way). + **/ + public static final int IGNORE_TOMBSTONED_PARTITIONS = -2; + public final ColumnSlice[] slices; public final boolean reversed; public volatile int count; http://git-wip-us.apache.org/repos/asf/cassandra/blob/dd62f7bf/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 1e1a2a3..45af1c8 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1499,7 +1499,8 @@ public class StorageProxy implements StorageProxyMBean // now scan until we have enough results try { - int cql3RowCount = 0; + int liveRowCount = 0; + boolean countLiveRows = command.countCQL3Rows() || command.ignoredTombstonedPartitions(); rows = new ArrayList<>(); // when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be @@ -1594,8 +1595,8 @@ public class StorageProxy implements StorageProxyMBean for (Row row : handler.get()) { rows.add(row); - if (nodeCmd.countCQL3Rows()) - cql3RowCount += row.getLiveCount(command.predicate, command.timestamp); + if (countLiveRows) + liveRowCount += row.getLiveCount(command.predicate, command.timestamp); } FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout()); } @@ -1636,7 +1637,7 @@ public class StorageProxy implements StorageProxyMBean } // if we're done, great, otherwise, move to the next range - int count = nodeCmd.countCQL3Rows() ? cql3RowCount : rows.size(); + int count = countLiveRows ? liveRowCount : rows.size(); if (count >= nodeCmd.limit()) break; } @@ -1652,8 +1653,8 @@ public class StorageProxy implements StorageProxyMBean private static List<Row> trim(AbstractRangeCommand command, List<Row> rows) { - // When maxIsColumns, we let the caller trim the result. - if (command.countCQL3Rows()) + // for CQL3 queries, let the caller trim the results + if (command.countCQL3Rows() || command.ignoredTombstonedPartitions()) return rows; else return rows.size() > command.limit() ? rows.subList(0, command.limit()) : rows;
