This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
     new 86a9261  Fix short read protection for GROUP BY queries
86a9261 is described below

commit 86a9261fd94707283e4ce149f88756099e22fcb6
Author: Andrés de la Peña <[email protected]>
AuthorDate: Tue Jun 16 16:32:22 2020 +0100

    Fix short read protection for GROUP BY queries
    
    patch by Andrés de la Peña; reviewed by Caleb Rackliffe for CASSANDRA-15459
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/filter/DataLimits.java |  68 ++++-----
 .../org/apache/cassandra/service/DataResolver.java |  11 +-
 .../distributed/test/ShortReadProtectionTest.java  | 158 +++++++++++++++++++++
 4 files changed, 200 insertions(+), 38 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 749aba2..5168acb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.8
+ * Fix short read protection for GROUP BY queries (CASSANDRA-15459)
  * Frozen RawTuple is not annotated with frozen in the toString method 
(CASSANDRA-15857)
 Merged from 3.0:
  * Check for endpoint collision with hibernating nodes (CASSANDRA-14599)
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java 
b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index cf2bc13..2759932 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -257,7 +257,7 @@ public abstract class DataLimits
         private final boolean enforceStrictLiveness;
 
         // false means we do not propagate our stop signals onto the iterator, 
we only count
-        private boolean enforceLimits = true;
+        protected boolean enforceLimits = true;
 
         protected Counter(int nowInSec, boolean assumeLiveData, boolean 
enforceStrictLiveness)
         {
@@ -308,14 +308,14 @@ public abstract class DataLimits
          *
          * @return the number of rows counted.
          */
-        public abstract int rowCounted();
+        public abstract int rowsCounted();
 
         /**
          * The number of rows counted in the current partition.
          *
          * @return the number of rows counted in the current partition.
          */
-        public abstract int rowCountedInCurrentPartition();
+        public abstract int rowsCountedInCurrentPartition();
 
         public abstract boolean isDone();
         public abstract boolean isDoneForPartition();
@@ -483,8 +483,8 @@ public abstract class DataLimits
 
         protected class CQLCounter extends Counter
         {
-            protected int rowCounted;
-            protected int rowInCurrentPartition;
+            protected int rowsCounted;
+            protected int rowsInCurrentPartition;
             protected final boolean countPartitionsWithOnlyStaticData;
 
             protected boolean hasLiveStaticRow;
@@ -501,7 +501,7 @@ public abstract class DataLimits
             @Override
             public void applyToPartition(DecoratedKey partitionKey, Row 
staticRow)
             {
-                rowInCurrentPartition = 0;
+                rowsInCurrentPartition = 0;
                 hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow);
             }
 
@@ -519,47 +519,47 @@ public abstract class DataLimits
                 // Normally, we don't count static rows as from a CQL point of 
view, it will be merge with other
                 // rows in the partition. However, if we only have the static 
row, it will be returned as one row
                 // so count it.
-                if (countPartitionsWithOnlyStaticData && hasLiveStaticRow && 
rowInCurrentPartition == 0)
+                if (countPartitionsWithOnlyStaticData && hasLiveStaticRow && 
rowsInCurrentPartition == 0)
                     incrementRowCount();
                 super.onPartitionClose();
             }
 
             protected void incrementRowCount()
             {
-                if (++rowCounted >= rowLimit)
+                if (++rowsCounted >= rowLimit)
                     stop();
-                if (++rowInCurrentPartition >= perPartitionLimit)
+                if (++rowsInCurrentPartition >= perPartitionLimit)
                     stopInPartition();
             }
 
             public int counted()
             {
-                return rowCounted;
+                return rowsCounted;
             }
 
             public int countedInCurrentPartition()
             {
-                return rowInCurrentPartition;
+                return rowsInCurrentPartition;
             }
 
-            public int rowCounted()
+            public int rowsCounted()
             {
-                return rowCounted;
+                return rowsCounted;
             }
 
-            public int rowCountedInCurrentPartition()
+            public int rowsCountedInCurrentPartition()
             {
-                return rowInCurrentPartition;
+                return rowsInCurrentPartition;
             }
 
             public boolean isDone()
             {
-                return rowCounted >= rowLimit;
+                return rowsCounted >= rowLimit;
             }
 
             public boolean isDoneForPartition()
             {
-                return isDone() || rowInCurrentPartition >= perPartitionLimit;
+                return isDone() || rowsInCurrentPartition >= perPartitionLimit;
             }
         }
 
@@ -639,7 +639,7 @@ public abstract class DataLimits
             {
                 if (partitionKey.getKey().equals(lastReturnedKey))
                 {
-                    rowInCurrentPartition = perPartitionLimit - 
lastReturnedKeyRemaining;
+                    rowsInCurrentPartition = perPartitionLimit - 
lastReturnedKeyRemaining;
                     // lastReturnedKey is the last key for which we're 
returned rows in the first page.
                     // So, since we know we have returned rows, we know we 
have accounted for the static row
                     // if any already, so force hasLiveStaticRow to false so 
we make sure to not count it
@@ -825,7 +825,7 @@ public abstract class DataLimits
         @Override
         public boolean isExhausted(Counter counter)
         {
-            return ((GroupByAwareCounter) counter).rowCounted < rowLimit
+            return ((GroupByAwareCounter) counter).rowsCounted < rowLimit
                     && counter.counted() < groupLimit;
         }
 
@@ -843,12 +843,12 @@ public abstract class DataLimits
             /**
              * The number of rows counted so far.
              */
-            protected int rowCounted;
+            protected int rowsCounted;
 
             /**
              * The number of rows counted so far in the current partition.
              */
-            protected int rowCountedInCurrentPartition;
+            protected int rowsCountedInCurrentPartition;
 
             /**
              * The number of groups counted so far. A group is counted only 
once it is complete
@@ -919,12 +919,12 @@ public abstract class DataLimits
                     hasLiveStaticRow = !staticRow.isEmpty() && 
isLive(staticRow);
                 }
                 currentPartitionKey = partitionKey;
-                // If we are done we need to preserve the 
groupInCurrentPartition and rowCountedInCurrentPartition
+                // If we are done we need to preserve the 
groupInCurrentPartition and rowsCountedInCurrentPartition
                 // because the pager need to retrieve the count associated to 
the last value it has returned.
                 if (!isDone())
                 {
                     groupInCurrentPartition = 0;
-                    rowCountedInCurrentPartition = 0;
+                    rowsCountedInCurrentPartition = 0;
                 }
             }
 
@@ -934,7 +934,7 @@ public abstract class DataLimits
                 // It's possible that we're "done" if the partition we just 
started bumped the number of groups (in
                 // applyToPartition() above), in which case Transformation 
will still call this method. In that case, we
                 // want to ignore the static row, it should (and will) be 
returned with the next page/group if needs be.
-                if (isDone())
+                if (enforceLimits && isDone())
                 {
                     hasLiveStaticRow = false; // The row has not been returned
                     return Rows.EMPTY_STATIC_ROW;
@@ -960,7 +960,7 @@ public abstract class DataLimits
 
                 // That row may have made us increment the group count, which 
may mean we're done for this partition, in
                 // which case we shouldn't count this row (it won't be 
returned).
-                if (isDoneForPartition())
+                if (enforceLimits && isDoneForPartition())
                 {
                     hasGroupStarted = false;
                     return null;
@@ -989,21 +989,21 @@ public abstract class DataLimits
             }
 
             @Override
-            public int rowCounted()
+            public int rowsCounted()
             {
-                return rowCounted;
+                return rowsCounted;
             }
 
             @Override
-            public int rowCountedInCurrentPartition()
+            public int rowsCountedInCurrentPartition()
             {
-                return rowCountedInCurrentPartition;
+                return rowsCountedInCurrentPartition;
             }
 
             protected void incrementRowCount()
             {
-                rowCountedInCurrentPartition++;
-                if (++rowCounted >= rowLimit)
+                rowsCountedInCurrentPartition++;
+                if (++rowsCounted >= rowLimit)
                     stop();
             }
 
@@ -1058,7 +1058,7 @@ public abstract class DataLimits
                 // 2) the end of the data is reached
                 // We know that the end of the data is reached if the group 
limit has not been reached
                 // and the number of rows counted is smaller than the internal 
page size.
-                if (hasGroupStarted && groupCounted < groupLimit && rowCounted 
< rowLimit)
+                if (hasGroupStarted && groupCounted < groupLimit && 
rowsCounted < rowLimit)
                 {
                     incrementGroupCount();
                     incrementGroupInCurrentPartitionCount();
@@ -1311,12 +1311,12 @@ public abstract class DataLimits
                 return cellsInCurrentPartition;
             }
 
-            public int rowCounted()
+            public int rowsCounted()
             {
                 throw new UnsupportedOperationException();
             }
 
-            public int rowCountedInCurrentPartition()
+            public int rowsCountedInCurrentPartition()
             {
                 throw new UnsupportedOperationException();
             }
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java 
b/src/java/org/apache/cassandra/service/DataResolver.java
index 29fa003..eb2f7b0 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -770,11 +770,12 @@ public class DataResolver extends ResponseResolver
             return executeReadCommand(cmd);
         }
 
-        // Counts the number of rows for regular queries and the number of 
groups for GROUP BY queries
+        /** Returns the number of results counted by the counter */
         private int counted(Counter counter)
         {
+            // We are interested by the number of rows but for GROUP BY 
queries 'counted' returns the number of groups.
             return command.limits().isGroupByLimit()
-                 ? counter.rowCounted()
+                 ? counter.rowsCounted()
                  : counter.counted();
         }
 
@@ -915,11 +916,13 @@ public class DataResolver extends ResponseResolver
                 return 
UnfilteredPartitionIterators.getOnlyElement(executeReadCommand(cmd), cmd);
             }
 
-            // Counts the number of rows for regular queries and the number of 
groups for GROUP BY queries
+            /** Returns the number of results counted in the partition by the 
counter */
             private int countedInCurrentPartition(Counter counter)
             {
+                // We are interested by the number of rows but for GROUP BY 
queries 'countedInCurrentPartition' returns
+                // the number of groups in the current partition.
                 return command.limits().isGroupByLimit()
-                     ? counter.rowCountedInCurrentPartition()
+                     ? counter.rowsCountedInCurrentPartition()
                      : counter.countedInCurrentPartition();
             }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
new file mode 100644
index 0000000..27390d6
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+import static java.lang.String.format;
+import static java.util.Arrays.asList;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+
+/**
+ * Tests short read protection, the mechanism that ensures distributed queries 
at read consistency levels > ONE/LOCAL_ONE
+ * avoid short reads that might happen when a limit is used and reconciliation 
accepts less rows than such limit.
+ */
+public class ShortReadProtectionTest extends TestBaseImpl
+{
+    /**
+     * Test GROUP BY with short read protection, particularly when there is a 
limit and regular row deletions.
+     * <p>
+     * See CASSANDRA-15459
+     */
+    @Test
+    public void testGroupBySRPRegularRow() throws Throwable
+    {
+        testGroupBySRP("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, 
ck))",
+                       asList("INSERT INTO %s (pk, ck) VALUES (1, 1) USING 
TIMESTAMP 0",
+                              "DELETE FROM %s WHERE pk=0 AND ck=0",
+                              "INSERT INTO %s (pk, ck) VALUES (2, 2) USING 
TIMESTAMP 0"),
+                       asList("DELETE FROM %s WHERE pk=1 AND ck=1",
+                              "INSERT INTO %s (pk, ck) VALUES (0, 0) USING 
TIMESTAMP 0",
+                              "DELETE FROM %s WHERE pk=2 AND ck=2"),
+                       asList("SELECT * FROM %s LIMIT 1",
+                              "SELECT * FROM %s LIMIT 10",
+                              "SELECT * FROM %s GROUP BY pk LIMIT 1",
+                              "SELECT * FROM %s GROUP BY pk LIMIT 10",
+                              "SELECT * FROM %s GROUP BY pk, ck LIMIT 1",
+                              "SELECT * FROM %s GROUP BY pk, ck LIMIT 10"));
+    }
+
+    /**
+     * Test GROUP BY with short read protection, particularly when there is a 
limit and static row deletions.
+     * <p>
+     * See CASSANDRA-15459
+     */
+    @Test
+    public void testGroupBySRPStaticRow() throws Throwable
+    {
+        testGroupBySRP("CREATE TABLE %s (pk int, ck int, s int static, PRIMARY 
KEY (pk, ck))",
+                       asList("INSERT INTO %s (pk, s) VALUES (1, 1) USING 
TIMESTAMP 0",
+                              "INSERT INTO %s (pk, s) VALUES (0, null)",
+                              "INSERT INTO %s (pk, s) VALUES (2, 2) USING 
TIMESTAMP 0"),
+                       asList("INSERT INTO %s (pk, s) VALUES (1, null)",
+                              "INSERT INTO %s (pk, s) VALUES (0, 0) USING 
TIMESTAMP 0",
+                              "INSERT INTO %s (pk, s) VALUES (2, null)"),
+                       asList("SELECT * FROM %s LIMIT 1",
+                              "SELECT * FROM %s LIMIT 10",
+                              "SELECT * FROM %s GROUP BY pk LIMIT 1",
+                              "SELECT * FROM %s GROUP BY pk LIMIT 10",
+                              "SELECT * FROM %s GROUP BY pk, ck LIMIT 1",
+                              "SELECT * FROM %s GROUP BY pk, ck LIMIT 10"));
+    }
+
+    private void testGroupBySRP(String createTable,
+                                List<String> node1Queries,
+                                List<String> node2Queries,
+                                List<String> coordinatorQueries) throws 
Throwable
+    {
+        try (Cluster cluster = init(Cluster.build()
+                                           .withNodes(2)
+                                           .withConfig(config -> 
config.set("hinted_handoff_enabled", false))
+                                           
.withInstanceInitializer(BBDropMutationsHelper::install)
+                                           .start()))
+        {
+            String table = withKeyspace("%s.t");
+            cluster.schemaChange(format(createTable, table));
+
+            // populate data on node1
+            IInvokableInstance node1 = cluster.get(1);
+            for (String query : node1Queries)
+                node1.executeInternal(format(query, table));
+
+            // populate data on node2
+            IInvokableInstance node2 = cluster.get(2);
+            for (String query : node2Queries)
+                node2.executeInternal(format(query, table));
+
+            // ignore read repair writes
+            node1.runOnInstance(BBDropMutationsHelper::enable);
+            node2.runOnInstance(BBDropMutationsHelper::enable);
+
+            // verify the behaviour of SRP with GROUP BY queries
+            ICoordinator coordinator = cluster.coordinator(1);
+            for (String query : coordinatorQueries)
+                assertRows(coordinator.execute(format(query, table), ALL));
+        }
+    }
+
+    /**
+     * Byte Buddy helper to silently drop mutations.
+     */
+    public static class BBDropMutationsHelper
+    {
+        private static final AtomicBoolean enabled = new AtomicBoolean(false);
+
+        static void enable()
+        {
+            enabled.set(true);
+        }
+
+        static void install(ClassLoader cl, int nodeNumber)
+        {
+            new ByteBuddy().rebase(Mutation.class)
+                           .method(named("apply"))
+                           
.intercept(MethodDelegation.to(BBDropMutationsHelper.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        public static void execute(@SuperCall Callable<ResultMessage.Rows> r) 
throws Exception
+        {
+            if (enabled.get())
+                return;
+            r.call();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to