Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.11 eaced9a54 -> 84b1725fb
  refs/heads/cassandra-3.X 0d4fdadbf -> cc16ff11c
  refs/heads/trunk da4147628 -> 554d6beb0


Fixed query monitoring for range queries

patch by Stefania Alborghetti; reviewed by Alex Petrov for CASSANDRA-13050


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84b1725f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84b1725f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84b1725f

Branch: refs/heads/cassandra-3.11
Commit: 84b1725fb4c4cba4fdb94f2abdb66656a4c66ae1
Parents: eaced9a
Author: Stefania Alborghetti <[email protected]>
Authored: Fri Dec 16 16:23:32 2016 +0800
Committer: Stefania Alborghetti <[email protected]>
Committed: Fri Dec 23 09:39:05 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/ReadCommand.java    | 40 ++++++++++++--------
 .../db/SinglePartitionReadCommand.java          |  4 +-
 .../apache/cassandra/db/filter/RowFilter.java   | 11 +++++-
 .../db/RepairedDataTombstonesTest.java          |  8 ++--
 5 files changed, 42 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b1725f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 74a9808..0c215a2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Fixed query monitoring for range queries (CASSANDRA-13050)
  * Remove outboundBindAny configuration property (CASSANDRA-12673)
  * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
  * Remove timing window in test case (CASSANDRA-12875)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b1725f/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java 
b/src/java/org/apache/cassandra/db/ReadCommand.java
index d8051fe..405b0fc 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.monitoring.ApproximateTime;
 import org.apache.cassandra.db.monitoring.MonitorableImpl;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
@@ -528,9 +529,11 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
         return Transformation.apply(iter, new MetricRecording());
     }
 
-    protected class CheckForAbort extends 
StoppingTransformation<BaseRowIterator<?>>
+    protected class CheckForAbort extends 
StoppingTransformation<UnfilteredRowIterator>
     {
-        protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> 
partition)
+        long lastChecked = 0;
+
+        protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator 
partition)
         {
             if (maybeAbort())
             {
@@ -538,18 +541,28 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
                 return null;
             }
 
-            return partition;
+            return Transformation.apply(partition, this);
         }
 
         protected Row applyToRow(Row row)
         {
+            if (TEST_ITERATION_DELAY_MILLIS > 0)
+                maybeDelayForTesting();
+
             return maybeAbort() ? null : row;
         }
 
         private boolean maybeAbort()
         {
-            if (TEST_ITERATION_DELAY_MILLIS > 0)
-                maybeDelayForTesting();
+            /**
+             * The value returned by ApproximateTime.currentTimeMillis() is 
updated only every
+             * {@link ApproximateTime.CHECK_INTERVAL_MS}, by default 10 
millis. Since MonitorableImpl
+             * relies on ApproximateTime, we don't need to check unless the 
approximate time has elapsed.
+             */
+            if (lastChecked == ApproximateTime.currentTimeMillis())
+                return false;
+
+            lastChecked = ApproximateTime.currentTimeMillis();
 
             if (isAborted())
             {
@@ -559,24 +572,19 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
 
             return false;
         }
-    }
 
-    protected UnfilteredPartitionIterator 
withStateTracking(UnfilteredPartitionIterator iter)
-    {
-        return Transformation.apply(iter, new CheckForAbort());
+        private void maybeDelayForTesting()
+        {
+            if (!metadata.ksName.startsWith("system"))
+                FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS);
+        }
     }
 
-    protected UnfilteredRowIterator withStateTracking(UnfilteredRowIterator 
iter)
+    protected UnfilteredPartitionIterator 
withStateTracking(UnfilteredPartitionIterator iter)
     {
         return Transformation.apply(iter, new CheckForAbort());
     }
 
-    private void maybeDelayForTesting()
-    {
-        if (!metadata.ksName.startsWith("system"))
-            FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS);
-    }
-
     /**
      * Creates a message for this command.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b1725f/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index d87d277..f6d10f5 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -634,7 +634,7 @@ public class SinglePartitionReadCommand extends ReadCommand
                 return EmptyIterators.unfilteredRow(cfs.metadata, 
partitionKey(), filter.isReversed());
 
             StorageHook.instance.reportRead(cfs.metadata.cfId, partitionKey());
-            return withStateTracking(withSSTablesIterated(iterators, 
cfs.metric));
+            return withSSTablesIterated(iterators, cfs.metric);
         }
         catch (RuntimeException | Error e)
         {
@@ -832,7 +832,7 @@ public class SinglePartitionReadCommand extends ReadCommand
             }
         }
 
-        return withStateTracking(result.unfilteredIterator(columnFilter(), 
Slices.ALL, clusteringIndexFilter().isReversed()));
+        return result.unfilteredIterator(columnFilter(), Slices.ALL, 
clusteringIndexFilter().isReversed());
     }
 
     private ImmutableBTreePartition add(UnfilteredRowIterator iter, 
ImmutableBTreePartition result, ClusteringIndexNamesFilter filter, boolean 
isRepaired)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b1725f/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java 
b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index 4c0608f..1c934e7 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -299,10 +299,19 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
                     // Short-circuit all partitions that won't match based on 
static and partition keys
                     for (Expression e : partitionLevelExpressions)
                         if (!e.isSatisfiedBy(metadata, 
partition.partitionKey(), partition.staticRow()))
+                        {
+                            partition.close();
                             return null;
+                        }
 
                     UnfilteredRowIterator iterator = 
Transformation.apply(partition, this);
-                    return (filterNonStaticColumns && !iterator.hasNext()) ? 
null : iterator;
+                    if (filterNonStaticColumns && !iterator.hasNext())
+                    {
+                        iterator.close();
+                        return null;
+                    }
+
+                    return iterator;
                 }
 
                 public Row applyToRow(Row row)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b1725f/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java 
b/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
index ea9f88a..b814ea6 100644
--- a/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
+++ b/test/unit/org/apache/cassandra/db/RepairedDataTombstonesTest.java
@@ -182,9 +182,11 @@ public class RepairedDataTombstonesTest extends CQLTester
             while (iterator.hasNext())
             {
                 partitionsFound++;
-                UnfilteredRowIterator rowIter = iterator.next();
-                int val = 
ByteBufferUtil.toInt(rowIter.partitionKey().getKey());
-                assertTrue("val=" + val, val >= 10 && val < 20);
+                try (UnfilteredRowIterator rowIter = iterator.next())
+                {
+                    int val = 
ByteBufferUtil.toInt(rowIter.partitionKey().getKey());
+                    assertTrue("val=" + val, val >= 10 && val < 20);
+                }
             }
         }
         assertEquals(10, partitionsFound);

Reply via email to