Fixed query monitoring for range queries patch by Stefania Alborghetti; reviewed by Alex Petrov for CASSANDRA-13050
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84b1725f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84b1725f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84b1725f Branch: refs/heads/trunk Commit: 84b1725fb4c4cba4fdb94f2abdb66656a4c66ae1 Parents: eaced9a Author: Stefania Alborghetti <[email protected]> Authored: Fri Dec 16 16:23:32 2016 +0800 Committer: Stefania Alborghetti <[email protected]> Committed: Fri Dec 23 09:39:05 2016 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ReadCommand.java | 40 ++++++++++++-------- .../db/SinglePartitionReadCommand.java | 4 +- .../apache/cassandra/db/filter/RowFilter.java | 11 +++++- .../db/RepairedDataTombstonesTest.java | 8 ++-- 5 files changed, 42 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b1725f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 74a9808..0c215a2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Fixed query monitoring for range queries (CASSANDRA-13050) * Remove outboundBindAny configuration property (CASSANDRA-12673) * Use correct bounds for all-data range when filtering (CASSANDRA-12666) * Remove timing window in test case (CASSANDRA-12875) http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b1725f/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index d8051fe..405b0fc 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.*; import org.apache.cassandra.cql3.Operator; import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.monitoring.ApproximateTime; import org.apache.cassandra.db.monitoring.MonitorableImpl; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; @@ -528,9 +529,11 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery return Transformation.apply(iter, new MetricRecording()); } - protected class CheckForAbort extends StoppingTransformation<BaseRowIterator<?>> + protected class CheckForAbort extends StoppingTransformation<UnfilteredRowIterator> { - protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition) + long lastChecked = 0; + + protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) { if (maybeAbort()) { @@ -538,18 +541,28 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery return null; } - return partition; + return Transformation.apply(partition, this); } protected Row applyToRow(Row row) { + if (TEST_ITERATION_DELAY_MILLIS > 0) + maybeDelayForTesting(); + return maybeAbort() ? null : row; } private boolean maybeAbort() { - if (TEST_ITERATION_DELAY_MILLIS > 0) - maybeDelayForTesting(); + /** + * The value returned by ApproximateTime.currentTimeMillis() is updated only every + * {@link ApproximateTime.CHECK_INTERVAL_MS}, by default 10 millis. Since MonitorableImpl + * relies on ApproximateTime, we don't need to check unless the approximate time has elapsed. + */ + if (lastChecked == ApproximateTime.currentTimeMillis()) + return false; + + lastChecked = ApproximateTime.currentTimeMillis(); if (isAborted()) { @@ -559,24 +572,19 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery return false; } - } - protected UnfilteredPartitionIterator withStateTracking(UnfilteredPartitionIterator iter) - { - return Transformation.apply(iter, new CheckForAbort()); + private void maybeDelayForTesting() + { + if (!metadata.ksName.startsWith("system")) + FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS); + } } - protected UnfilteredRowIterator withStateTracking(UnfilteredRowIterator iter) + protected UnfilteredPartitionIterator withStateTracking(UnfilteredPartitionIterator iter) { return Transformation.apply(iter, new CheckForAbort()); } - private void maybeDelayForTesting() - { - if (!metadata.ksName.startsWith("system")) - FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS); - } - /** * Creates a message for this command. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b1725f/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index d87d277..f6d10f5 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -634,7 +634,7 @@ public class SinglePartitionReadCommand extends ReadCommand return EmptyIterators.unfilteredRow(cfs.metadata, partitionKey(), filter.isReversed()); StorageHook.instance.reportRead(cfs.metadata.cfId, partitionKey()); - return withStateTracking(withSSTablesIterated(iterators, cfs.metric)); + return withSSTablesIterated(iterators, cfs.metric); } catch (RuntimeException | Error e) { @@ -832,7 +832,7 @@ public class SinglePartitionReadCommand extends ReadCommand } } - return withStateTracking(result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed())); + return result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed()); } private ImmutableBTreePartition add(UnfilteredRowIterator iter, ImmutableBTreePartition result, ClusteringIndexNamesFilter filter, boolean isRepaired) http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b1725f/src/java/org/apache/cassandra/db/filter/RowFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java index 4c0608f..1c934e7 100644 --- a/src/java/org/apache/cassandra/db/filter/RowFilter.java +++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java @@ -299,10 +299,19 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> // Short-circuit all partitions that won't match based on static and partition keys for (Expression e : partitionLevelExpressions) if (!e.isSatisfiedBy(metadata, partition.partitionKey(), partition.staticRow())) + { + partition.close(); return null; + } UnfilteredRowIterator iterator = Transformation.apply(partition, this); - return (filterNonStaticColumns && !iterator.hasNext()) ? null : iterator; + if (filterNonStaticColumns && !iterator.hasNext()) + { + iterator.close(); + return null; + } + + return iterator; } public Row applyToRow(Row row) http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b1725f/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java b/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java index ea9f88a..b814ea6 100644 --- a/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java +++ b/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java @@ -182,9 +182,11 @@ public class RepairedDataTombstonesTest extends CQLTester while (iterator.hasNext()) { partitionsFound++; - UnfilteredRowIterator rowIter = iterator.next(); - int val = ByteBufferUtil.toInt(rowIter.partitionKey().getKey()); - assertTrue("val=" + val, val >= 10 && val < 20); + try (UnfilteredRowIterator rowIter = iterator.next()) + { + int val = ByteBufferUtil.toInt(rowIter.partitionKey().getKey()); + assertTrue("val=" + val, val >= 10 && val < 20); + } } } assertEquals(10, partitionsFound);
