This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
new 86a9261 Fix short read protection for GROUP BY queries
86a9261 is described below
commit 86a9261fd94707283e4ce149f88756099e22fcb6
Author: Andrés de la Peña <[email protected]>
AuthorDate: Tue Jun 16 16:32:22 2020 +0100
Fix short read protection for GROUP BY queries
patch by Andrés de la Peña; reviewed by Caleb Rackliffe for CASSANDRA-15459
---
CHANGES.txt | 1 +
.../org/apache/cassandra/db/filter/DataLimits.java | 68 ++++-----
.../org/apache/cassandra/service/DataResolver.java | 11 +-
.../distributed/test/ShortReadProtectionTest.java | 158 +++++++++++++++++++++
4 files changed, 200 insertions(+), 38 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 749aba2..5168acb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.8
+ * Fix short read protection for GROUP BY queries (CASSANDRA-15459)
* Frozen RawTuple is not annotated with frozen in the toString method
(CASSANDRA-15857)
Merged from 3.0:
* Check for endpoint collision with hibernating nodes (CASSANDRA-14599)
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java
b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index cf2bc13..2759932 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -257,7 +257,7 @@ public abstract class DataLimits
private final boolean enforceStrictLiveness;
// false means we do not propagate our stop signals onto the iterator,
we only count
- private boolean enforceLimits = true;
+ protected boolean enforceLimits = true;
protected Counter(int nowInSec, boolean assumeLiveData, boolean
enforceStrictLiveness)
{
@@ -308,14 +308,14 @@ public abstract class DataLimits
*
* @return the number of rows counted.
*/
- public abstract int rowCounted();
+ public abstract int rowsCounted();
/**
* The number of rows counted in the current partition.
*
* @return the number of rows counted in the current partition.
*/
- public abstract int rowCountedInCurrentPartition();
+ public abstract int rowsCountedInCurrentPartition();
public abstract boolean isDone();
public abstract boolean isDoneForPartition();
@@ -483,8 +483,8 @@ public abstract class DataLimits
protected class CQLCounter extends Counter
{
- protected int rowCounted;
- protected int rowInCurrentPartition;
+ protected int rowsCounted;
+ protected int rowsInCurrentPartition;
protected final boolean countPartitionsWithOnlyStaticData;
protected boolean hasLiveStaticRow;
@@ -501,7 +501,7 @@ public abstract class DataLimits
@Override
public void applyToPartition(DecoratedKey partitionKey, Row
staticRow)
{
- rowInCurrentPartition = 0;
+ rowsInCurrentPartition = 0;
hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow);
}
@@ -519,47 +519,47 @@ public abstract class DataLimits
// Normally, we don't count static rows as from a CQL point of
view, it will be merge with other
// rows in the partition. However, if we only have the static
row, it will be returned as one row
// so count it.
- if (countPartitionsWithOnlyStaticData && hasLiveStaticRow &&
rowInCurrentPartition == 0)
+ if (countPartitionsWithOnlyStaticData && hasLiveStaticRow &&
rowsInCurrentPartition == 0)
incrementRowCount();
super.onPartitionClose();
}
protected void incrementRowCount()
{
- if (++rowCounted >= rowLimit)
+ if (++rowsCounted >= rowLimit)
stop();
- if (++rowInCurrentPartition >= perPartitionLimit)
+ if (++rowsInCurrentPartition >= perPartitionLimit)
stopInPartition();
}
public int counted()
{
- return rowCounted;
+ return rowsCounted;
}
public int countedInCurrentPartition()
{
- return rowInCurrentPartition;
+ return rowsInCurrentPartition;
}
- public int rowCounted()
+ public int rowsCounted()
{
- return rowCounted;
+ return rowsCounted;
}
- public int rowCountedInCurrentPartition()
+ public int rowsCountedInCurrentPartition()
{
- return rowInCurrentPartition;
+ return rowsInCurrentPartition;
}
public boolean isDone()
{
- return rowCounted >= rowLimit;
+ return rowsCounted >= rowLimit;
}
public boolean isDoneForPartition()
{
- return isDone() || rowInCurrentPartition >= perPartitionLimit;
+ return isDone() || rowsInCurrentPartition >= perPartitionLimit;
}
}
@@ -639,7 +639,7 @@ public abstract class DataLimits
{
if (partitionKey.getKey().equals(lastReturnedKey))
{
- rowInCurrentPartition = perPartitionLimit -
lastReturnedKeyRemaining;
+ rowsInCurrentPartition = perPartitionLimit -
lastReturnedKeyRemaining;
// lastReturnedKey is the last key for which we're
returned rows in the first page.
// So, since we know we have returned rows, we know we
have accounted for the static row
// if any already, so force hasLiveStaticRow to false so
we make sure to not count it
@@ -825,7 +825,7 @@ public abstract class DataLimits
@Override
public boolean isExhausted(Counter counter)
{
- return ((GroupByAwareCounter) counter).rowCounted < rowLimit
+ return ((GroupByAwareCounter) counter).rowsCounted < rowLimit
&& counter.counted() < groupLimit;
}
@@ -843,12 +843,12 @@ public abstract class DataLimits
/**
* The number of rows counted so far.
*/
- protected int rowCounted;
+ protected int rowsCounted;
/**
* The number of rows counted so far in the current partition.
*/
- protected int rowCountedInCurrentPartition;
+ protected int rowsCountedInCurrentPartition;
/**
* The number of groups counted so far. A group is counted only
once it is complete
@@ -919,12 +919,12 @@ public abstract class DataLimits
hasLiveStaticRow = !staticRow.isEmpty() &&
isLive(staticRow);
}
currentPartitionKey = partitionKey;
- // If we are done we need to preserve the
groupInCurrentPartition and rowCountedInCurrentPartition
+ // If we are done we need to preserve the
groupInCurrentPartition and rowsCountedInCurrentPartition
// because the pager need to retrieve the count associated to
the last value it has returned.
if (!isDone())
{
groupInCurrentPartition = 0;
- rowCountedInCurrentPartition = 0;
+ rowsCountedInCurrentPartition = 0;
}
}
@@ -934,7 +934,7 @@ public abstract class DataLimits
// It's possible that we're "done" if the partition we just
started bumped the number of groups (in
// applyToPartition() above), in which case Transformation
will still call this method. In that case, we
// want to ignore the static row, it should (and will) be
returned with the next page/group if needs be.
- if (isDone())
+ if (enforceLimits && isDone())
{
hasLiveStaticRow = false; // The row has not been returned
return Rows.EMPTY_STATIC_ROW;
@@ -960,7 +960,7 @@ public abstract class DataLimits
// That row may have made us increment the group count, which
may mean we're done for this partition, in
// which case we shouldn't count this row (it won't be
returned).
- if (isDoneForPartition())
+ if (enforceLimits && isDoneForPartition())
{
hasGroupStarted = false;
return null;
@@ -989,21 +989,21 @@ public abstract class DataLimits
}
@Override
- public int rowCounted()
+ public int rowsCounted()
{
- return rowCounted;
+ return rowsCounted;
}
@Override
- public int rowCountedInCurrentPartition()
+ public int rowsCountedInCurrentPartition()
{
- return rowCountedInCurrentPartition;
+ return rowsCountedInCurrentPartition;
}
protected void incrementRowCount()
{
- rowCountedInCurrentPartition++;
- if (++rowCounted >= rowLimit)
+ rowsCountedInCurrentPartition++;
+ if (++rowsCounted >= rowLimit)
stop();
}
@@ -1058,7 +1058,7 @@ public abstract class DataLimits
// 2) the end of the data is reached
// We know that the end of the data is reached if the group
limit has not been reached
// and the number of rows counted is smaller than the internal
page size.
- if (hasGroupStarted && groupCounted < groupLimit && rowCounted
< rowLimit)
+ if (hasGroupStarted && groupCounted < groupLimit &&
rowsCounted < rowLimit)
{
incrementGroupCount();
incrementGroupInCurrentPartitionCount();
@@ -1311,12 +1311,12 @@ public abstract class DataLimits
return cellsInCurrentPartition;
}
- public int rowCounted()
+ public int rowsCounted()
{
throw new UnsupportedOperationException();
}
- public int rowCountedInCurrentPartition()
+ public int rowsCountedInCurrentPartition()
{
throw new UnsupportedOperationException();
}
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java
b/src/java/org/apache/cassandra/service/DataResolver.java
index 29fa003..eb2f7b0 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -770,11 +770,12 @@ public class DataResolver extends ResponseResolver
return executeReadCommand(cmd);
}
- // Counts the number of rows for regular queries and the number of
groups for GROUP BY queries
+ /** Returns the number of results counted by the counter */
private int counted(Counter counter)
{
+ // We are interested by the number of rows but for GROUP BY
queries 'counted' returns the number of groups.
return command.limits().isGroupByLimit()
- ? counter.rowCounted()
+ ? counter.rowsCounted()
: counter.counted();
}
@@ -915,11 +916,13 @@ public class DataResolver extends ResponseResolver
return
UnfilteredPartitionIterators.getOnlyElement(executeReadCommand(cmd), cmd);
}
- // Counts the number of rows for regular queries and the number of
groups for GROUP BY queries
+ /** Returns the number of results counted in the partition by the
counter */
private int countedInCurrentPartition(Counter counter)
{
+ // We are interested by the number of rows but for GROUP BY
queries 'countedInCurrentPartition' returns
+ // the number of groups in the current partition.
return command.limits().isGroupByLimit()
- ? counter.rowCountedInCurrentPartition()
+ ? counter.rowsCountedInCurrentPartition()
: counter.countedInCurrentPartition();
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
b/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
new file mode 100644
index 0000000..27390d6
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+import static java.lang.String.format;
+import static java.util.Arrays.asList;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+
+/**
+ * Tests short read protection, the mechanism that ensures distributed queries
at read consistency levels > ONE/LOCAL_ONE
+ * avoid short reads that might happen when a limit is used and reconciliation
accepts less rows than such limit.
+ */
+public class ShortReadProtectionTest extends TestBaseImpl
+{
+ /**
+ * Test GROUP BY with short read protection, particularly when there is a
limit and regular row deletions.
+ * <p>
+ * See CASSANDRA-15459
+ */
+ @Test
+ public void testGroupBySRPRegularRow() throws Throwable
+ {
+ testGroupBySRP("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk,
ck))",
+ asList("INSERT INTO %s (pk, ck) VALUES (1, 1) USING
TIMESTAMP 0",
+ "DELETE FROM %s WHERE pk=0 AND ck=0",
+ "INSERT INTO %s (pk, ck) VALUES (2, 2) USING
TIMESTAMP 0"),
+ asList("DELETE FROM %s WHERE pk=1 AND ck=1",
+ "INSERT INTO %s (pk, ck) VALUES (0, 0) USING
TIMESTAMP 0",
+ "DELETE FROM %s WHERE pk=2 AND ck=2"),
+ asList("SELECT * FROM %s LIMIT 1",
+ "SELECT * FROM %s LIMIT 10",
+ "SELECT * FROM %s GROUP BY pk LIMIT 1",
+ "SELECT * FROM %s GROUP BY pk LIMIT 10",
+ "SELECT * FROM %s GROUP BY pk, ck LIMIT 1",
+ "SELECT * FROM %s GROUP BY pk, ck LIMIT 10"));
+ }
+
+ /**
+ * Test GROUP BY with short read protection, particularly when there is a
limit and static row deletions.
+ * <p>
+ * See CASSANDRA-15459
+ */
+ @Test
+ public void testGroupBySRPStaticRow() throws Throwable
+ {
+ testGroupBySRP("CREATE TABLE %s (pk int, ck int, s int static, PRIMARY
KEY (pk, ck))",
+ asList("INSERT INTO %s (pk, s) VALUES (1, 1) USING
TIMESTAMP 0",
+ "INSERT INTO %s (pk, s) VALUES (0, null)",
+ "INSERT INTO %s (pk, s) VALUES (2, 2) USING
TIMESTAMP 0"),
+ asList("INSERT INTO %s (pk, s) VALUES (1, null)",
+ "INSERT INTO %s (pk, s) VALUES (0, 0) USING
TIMESTAMP 0",
+ "INSERT INTO %s (pk, s) VALUES (2, null)"),
+ asList("SELECT * FROM %s LIMIT 1",
+ "SELECT * FROM %s LIMIT 10",
+ "SELECT * FROM %s GROUP BY pk LIMIT 1",
+ "SELECT * FROM %s GROUP BY pk LIMIT 10",
+ "SELECT * FROM %s GROUP BY pk, ck LIMIT 1",
+ "SELECT * FROM %s GROUP BY pk, ck LIMIT 10"));
+ }
+
+ private void testGroupBySRP(String createTable,
+ List<String> node1Queries,
+ List<String> node2Queries,
+ List<String> coordinatorQueries) throws
Throwable
+ {
+ try (Cluster cluster = init(Cluster.build()
+ .withNodes(2)
+ .withConfig(config ->
config.set("hinted_handoff_enabled", false))
+
.withInstanceInitializer(BBDropMutationsHelper::install)
+ .start()))
+ {
+ String table = withKeyspace("%s.t");
+ cluster.schemaChange(format(createTable, table));
+
+ // populate data on node1
+ IInvokableInstance node1 = cluster.get(1);
+ for (String query : node1Queries)
+ node1.executeInternal(format(query, table));
+
+ // populate data on node2
+ IInvokableInstance node2 = cluster.get(2);
+ for (String query : node2Queries)
+ node2.executeInternal(format(query, table));
+
+ // ignore read repair writes
+ node1.runOnInstance(BBDropMutationsHelper::enable);
+ node2.runOnInstance(BBDropMutationsHelper::enable);
+
+ // verify the behaviour of SRP with GROUP BY queries
+ ICoordinator coordinator = cluster.coordinator(1);
+ for (String query : coordinatorQueries)
+ assertRows(coordinator.execute(format(query, table), ALL));
+ }
+ }
+
+ /**
+ * Byte Buddy helper to silently drop mutations.
+ */
+ public static class BBDropMutationsHelper
+ {
+ private static final AtomicBoolean enabled = new AtomicBoolean(false);
+
+ static void enable()
+ {
+ enabled.set(true);
+ }
+
+ static void install(ClassLoader cl, int nodeNumber)
+ {
+ new ByteBuddy().rebase(Mutation.class)
+ .method(named("apply"))
+
.intercept(MethodDelegation.to(BBDropMutationsHelper.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ public static void execute(@SuperCall Callable<ResultMessage.Rows> r)
throws Exception
+ {
+ if (enabled.get())
+ return;
+ r.call();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]