This is an automated email from the ASF dual-hosted git repository. adelapena pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit ae51f0fd12820707927803fdbe63581f33111d4b Merge: 528e3ad 86a9261 Author: Andrés de la Peña <[email protected]> AuthorDate: Thu Aug 6 13:26:39 2020 +0100 Merge branch 'cassandra-3.11' into trunk # Conflicts: # CHANGES.txt # src/java/org/apache/cassandra/db/filter/DataLimits.java # src/java/org/apache/cassandra/service/DataResolver.java CHANGES.txt | 1 + .../org/apache/cassandra/db/filter/DataLimits.java | 66 ++++++------ .../reads/ShortReadPartitionsProtection.java | 10 +- .../service/reads/ShortReadRowsProtection.java | 14 +-- .../distributed/test/ShortReadProtectionTest.java | 116 +++++++++++++++++++++ 5 files changed, 154 insertions(+), 53 deletions(-) diff --cc CHANGES.txt index dcefef0,5168acb..7b4cc98 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,13 -1,5 +1,14 @@@ -3.11.8 +4.0-beta2 + * Improve cassandra-stress logging when using a profile file that doesn't exist (CASSANDRA-14425) + * Improve logging for socket connection/disconnection (CASSANDRA-15980) + * Throw FSWriteError upon write failures in order to apply DiskFailurePolicy (CASSANDRA-15928) + * Forbid altering UDTs used in partition keys (CASSANDRA-15933) + * Fix version parsing logic when upgrading from 3.0 (CASSANDRA-15973) + * Optimize NoSpamLogger use in hot paths (CASSANDRA-15766) + * Verify sstable components on startup (CASSANDRA-15945) +Merged from 3.11: + * Fix short read protection for GROUP BY queries (CASSANDRA-15459) + * stop_paranoid disk failure policy is ignored on CorruptSSTableException after node is up (CASSANDRA-15191) * Frozen RawTuple is not annotated with frozen in the toString method (CASSANDRA-15857) Merged from 3.0: * Check for endpoint collision with hibernating nodes (CASSANDRA-14599) diff --cc src/java/org/apache/cassandra/db/filter/DataLimits.java index 3a766e0,2759932..e6495e8 --- a/src/java/org/apache/cassandra/db/filter/DataLimits.java +++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java @@@ -161,7 -191,7 +161,7 @@@ public abstract class DataLimit * @param countPartitionsWithOnlyStaticData if {@code true} the partitions with only static data should be counted * as 1 valid row. * @param enforceStrictLiveness whether the row should be purged if there is no PK liveness info, -- * normally retrieved from {@link CFMetaData#enforceStrictLiveness()} ++ * normally retrieved from {@link org.apache.cassandra.schema.TableMetadata#enforceStrictLiveness()} * @return a new {@code Counter} for this limits. */ public abstract Counter newCounter(int nowInSec, diff --cc src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java index 42676b6,0000000..6c4dc68 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java +++ b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java @@@ -1,207 -1,0 +1,199 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads; + +import org.apache.cassandra.locator.Endpoints; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.ReplicaPlans; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.PartitionRangeReadCommand; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.transform.MorePartitions; +import org.apache.cassandra.db.transform.MoreRows; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.ExcludingBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.reads.repair.NoopReadRepair; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.tracing.Tracing; + +public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowIterator> implements MorePartitions<UnfilteredPartitionIterator> +{ + private static final Logger logger = LoggerFactory.getLogger(ShortReadPartitionsProtection.class); + private final ReadCommand command; + private final Replica source; + + private final Runnable preFetchCallback; // called immediately before fetching more contents + + private final DataLimits.Counter singleResultCounter; // unmerged per-source counter + private final DataLimits.Counter mergedResultCounter; // merged end-result counter + + private DecoratedKey lastPartitionKey; // key of the last observed partition + + private boolean partitionsFetched; // whether we've seen any new partitions since iteration start or last moreContents() call + + private final long queryStartNanoTime; + + public ShortReadPartitionsProtection(ReadCommand command, + Replica source, + Runnable preFetchCallback, + DataLimits.Counter singleResultCounter, + DataLimits.Counter mergedResultCounter, + long queryStartNanoTime) + { + this.command = command; + this.source = source; + this.preFetchCallback = preFetchCallback; + this.singleResultCounter = singleResultCounter; + this.mergedResultCounter = mergedResultCounter; + this.queryStartNanoTime = queryStartNanoTime; + } + + @Override + public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + partitionsFetched = true; + + lastPartitionKey = partition.partitionKey(); + + /* + * Extend for moreContents() then apply protection to track lastClustering by applyToRow(). + * + * If we don't apply the transformation *after* extending the partition with MoreRows, + * applyToRow() method of protection will not be called on the first row of the new extension iterator. + */ + ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), partition.partitionKey().getToken(), source); + ReplicaPlan.SharedForTokenRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan); + ShortReadRowsProtection protection = new ShortReadRowsProtection(partition.partitionKey(), + command, source, + (cmd) -> executeReadCommand(cmd, sharedReplicaPlan), + singleResultCounter, + mergedResultCounter); + return Transformation.apply(MoreRows.extend(partition, protection), protection); + } + + /* + * We only get here once all the rows and partitions in this iterator have been iterated over, and so + * if the node had returned the requested number of rows but we still get here, then some results were + * skipped during reconciliation. + */ + public UnfilteredPartitionIterator moreContents() + { + // never try to request additional partitions from replicas if our reconciled partitions are already filled to the limit + assert !mergedResultCounter.isDone(); + + // we do not apply short read protection when we have no limits at all + assert !command.limits().isUnlimited(); + + /* + * If this is a single partition read command or an (indexed) partition range read command with + * a partition key specified, then we can't and shouldn't try fetch more partitions. + */ + assert !command.isLimitedToOnePartition(); + + /* + * If the returned result doesn't have enough rows/partitions to satisfy even the original limit, don't ask for more. + * + * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false + * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911). + */ + if (!singleResultCounter.isDone() && command.limits().perPartitionCount() == DataLimits.NO_LIMIT) + return null; + + /* + * Either we had an empty iterator as the initial response, or our moreContents() call got us an empty iterator. + * There is no point to ask the replica for more rows - it has no more in the requested range. + */ + if (!partitionsFetched) + return null; + partitionsFetched = false; + + /* + * We are going to fetch one partition at a time for thrift and potentially more for CQL. + * The row limit will either be set to the per partition limit - if the command has no total row limit set, or + * the total # of rows remaining - if it has some. If we don't grab enough rows in some of the partitions, + * then future ShortReadRowsProtection.moreContents() calls will fetch the missing ones. + */ + int toQuery = command.limits().count() != DataLimits.NO_LIMIT - ? command.limits().count() - counted(mergedResultCounter) ++ ? command.limits().count() - mergedResultCounter.rowsCounted() + : command.limits().perPartitionCount(); + + ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark(); + Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source); + logger.info("Requesting {} extra rows from {} for short read protection", toQuery, source); + + // If we've arrived here, all responses have been consumed, and we're about to request more. + preFetchCallback.run(); + + return makeAndExecuteFetchAdditionalPartitionReadCommand(toQuery); + } + - // Counts the number of rows for regular queries and the number of groups for GROUP BY queries - private int counted(DataLimits.Counter counter) - { - return command.limits().isGroupByLimit() - ? counter.rowCounted() - : counter.counted(); - } - + private UnfilteredPartitionIterator makeAndExecuteFetchAdditionalPartitionReadCommand(int toQuery) + { + PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) command; + + DataLimits newLimits = cmd.limits().forShortReadRetry(toQuery); + + AbstractBounds<PartitionPosition> bounds = cmd.dataRange().keyRange(); + AbstractBounds<PartitionPosition> newBounds = bounds.inclusiveRight() + ? new Range<>(lastPartitionKey, bounds.right) + : new ExcludingBounds<>(lastPartitionKey, bounds.right); + DataRange newDataRange = cmd.dataRange().forSubRange(newBounds); + + ReplicaPlan.ForRangeRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), cmd.dataRange().keyRange(), source, 1); + return executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange), ReplicaPlan.shared(replicaPlan)); + } + + private <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> + UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, ReplicaPlan.Shared<E, P> replicaPlan) + { + DataResolver<E, P> resolver = new DataResolver<>(cmd, replicaPlan, (NoopReadRepair<E, P>)NoopReadRepair.instance, queryStartNanoTime); + ReadCallback<E, P> handler = new ReadCallback<>(resolver, cmd, replicaPlan, queryStartNanoTime); + + if (source.isSelf()) + { + Stage.READ.maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler)); + } + else + { + if (source.isTransient()) + cmd = cmd.copyAsTransientQuery(source); + MessagingService.instance().sendWithCallback(cmd.createMessage(false), source.endpoint(), handler); + } + + // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results. + handler.awaitResults(); + assert resolver.getMessages().size() == 1; + return resolver.getMessages().get(0).payload.makeIterator(command); + } +} diff --cc src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java index 8dc7fc7,0000000..f32502d mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java +++ b/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java @@@ -1,197 -1,0 +1,189 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads; + +import java.util.function.Function; + +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.transform.MoreRows; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.tracing.Tracing; + +class ShortReadRowsProtection extends Transformation implements MoreRows<UnfilteredRowIterator> +{ + private final ReadCommand command; + private final Replica source; + private final DataLimits.Counter singleResultCounter; // unmerged per-source counter + private final DataLimits.Counter mergedResultCounter; // merged end-result counter + private final Function<ReadCommand, UnfilteredPartitionIterator> commandExecutor; + private final TableMetadata metadata; + private final DecoratedKey partitionKey; + + private Clustering lastClustering; // clustering of the last observed row + + private int lastCounted = 0; // last seen recorded # before attempting to fetch more rows + private int lastFetched = 0; // # rows returned by last attempt to get more (or by the original read command) + private int lastQueried = 0; // # extra rows requested from the replica last time + + ShortReadRowsProtection(DecoratedKey partitionKey, ReadCommand command, Replica source, + Function<ReadCommand, UnfilteredPartitionIterator> commandExecutor, + DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter) + { + this.command = command; + this.source = source; + this.commandExecutor = commandExecutor; + this.singleResultCounter = singleResultCounter; + this.mergedResultCounter = mergedResultCounter; + this.metadata = command.metadata(); + this.partitionKey = partitionKey; + } + + @Override + public Row applyToRow(Row row) + { + lastClustering = row.clustering(); + return row; + } + + /* + * We only get here once all the rows in this iterator have been iterated over, and so if the node + * had returned the requested number of rows but we still get here, then some results were skipped + * during reconciliation. + */ + public UnfilteredRowIterator moreContents() + { + // never try to request additional rows from replicas if our reconciled partition is already filled to the limit + assert !mergedResultCounter.isDoneForPartition(); + + // we do not apply short read protection when we have no limits at all + assert !command.limits().isUnlimited(); + + /* + * If the returned partition doesn't have enough rows to satisfy even the original limit, don't ask for more. + * + * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false + * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911). + */ + if (!singleResultCounter.isDoneForPartition() && command.limits().perPartitionCount() == DataLimits.NO_LIMIT) + return null; + + /* + * If the replica has no live rows in the partition, don't try to fetch more. + * + * Note that the previous branch [if (!singleResultCounter.isDoneForPartition()) return null] doesn't + * always cover this scenario: + * isDoneForPartition() is defined as [isDone() || rowInCurrentPartition >= perPartitionLimit], + * and will return true if isDone() returns true, even if there are 0 rows counted in the current partition. + * + * This can happen with a range read if after 1+ rounds of short read protection requests we managed to fetch + * enough extra rows for other partitions to satisfy the singleResultCounter's total row limit, but only + * have tombstones in the current partition. + * + * One other way we can hit this condition is when the partition only has a live static row and no regular + * rows. In that scenario the counter will remain at 0 until the partition is closed - which happens after + * the moreContents() call. + */ - if (countedInCurrentPartition(singleResultCounter) == 0) ++ if (singleResultCounter.rowsCountedInCurrentPartition() == 0) + return null; + + /* + * This is a table with no clustering columns, and has at most one row per partition - with EMPTY clustering. + * We already have the row, so there is no point in asking for more from the partition. + */ + if (Clustering.EMPTY == lastClustering) + return null; + - lastFetched = countedInCurrentPartition(singleResultCounter) - lastCounted; - lastCounted = countedInCurrentPartition(singleResultCounter); ++ lastFetched = singleResultCounter.rowsCountedInCurrentPartition() - lastCounted; ++ lastCounted = singleResultCounter.rowsCountedInCurrentPartition(); + + // getting back fewer rows than we asked for means the partition on the replica has been fully consumed + if (lastQueried > 0 && lastFetched < lastQueried) + return null; + + /* + * At this point we know that: + * 1. the replica returned [repeatedly?] as many rows as we asked for and potentially has more + * rows in the partition + * 2. at least one of those returned rows was shadowed by a tombstone returned from another + * replica + * 3. we haven't satisfied the client's limits yet, and should attempt to query for more rows to + * avoid a short read + * + * In the ideal scenario, we would get exactly min(a, b) or fewer rows from the next request, where a and b + * are defined as follows: + * [a] limits.count() - mergedResultCounter.counted() + * [b] limits.perPartitionCount() - mergedResultCounter.countedInCurrentPartition() + * + * It would be naive to query for exactly that many rows, as it's possible and not unlikely + * that some of the returned rows would also be shadowed by tombstones from other hosts. + * + * Note: we don't know, nor do we care, how many rows from the replica made it into the reconciled result; + * we can only tell how many in total we queried for, and that [0, mrc.countedInCurrentPartition()) made it. + * + * In general, our goal should be to minimise the number of extra requests - *not* to minimise the number + * of rows fetched: there is a high transactional cost for every individual request, but a relatively low + * marginal cost for each extra row requested. + * + * As such it's better to overfetch than to underfetch extra rows from a host; but at the same + * time we want to respect paging limits and not blow up spectacularly. + * + * Note: it's ok to retrieve more rows that necessary since singleResultCounter is not stopping and only + * counts. + * + * With that in mind, we'll just request the minimum of (count(), perPartitionCount()) limits. + * + * See CASSANDRA-13794 for more details. + */ + lastQueried = Math.min(command.limits().count(), command.limits().perPartitionCount()); + + ColumnFamilyStore.metricsFor(metadata.id).shortReadProtectionRequests.mark(); + Tracing.trace("Requesting {} extra rows from {} for short read protection", lastQueried, source); + + SinglePartitionReadCommand cmd = makeFetchAdditionalRowsReadCommand(lastQueried); + return UnfilteredPartitionIterators.getOnlyElement(commandExecutor.apply(cmd), cmd); + } + - // Counts the number of rows for regular queries and the number of groups for GROUP BY queries - private int countedInCurrentPartition(DataLimits.Counter counter) - { - return command.limits().isGroupByLimit() - ? counter.rowCountedInCurrentPartition() - : counter.countedInCurrentPartition(); - } - + private SinglePartitionReadCommand makeFetchAdditionalRowsReadCommand(int toQuery) + { + ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey); + if (null != lastClustering) + filter = filter.forPaging(metadata.comparator, lastClustering, false); + + return SinglePartitionReadCommand.create(command.metadata(), + command.nowInSec(), + command.columnFilter(), + command.rowFilter(), + command.limits().forShortReadRetry(toQuery), + partitionKey, + filter, + command.indexMetadata()); + } +} diff --cc test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java index 0000000,27390d6..23b5c14 mode 000000,100644..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java @@@ -1,0 -1,158 +1,116 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.distributed.test; + + import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicBoolean; + + import org.junit.Test; + -import net.bytebuddy.ByteBuddy; -import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; -import net.bytebuddy.implementation.MethodDelegation; -import net.bytebuddy.implementation.bind.annotation.SuperCall; -import org.apache.cassandra.db.Mutation; + import org.apache.cassandra.distributed.Cluster; + import org.apache.cassandra.distributed.api.ICoordinator; + import org.apache.cassandra.distributed.api.IInvokableInstance; -import org.apache.cassandra.transport.messages.ResultMessage; + + import static java.lang.String.format; + import static java.util.Arrays.asList; -import static net.bytebuddy.matcher.ElementMatchers.named; + import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL; + import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; + + /** + * Tests short read protection, the mechanism that ensures distributed queries at read consistency levels > ONE/LOCAL_ONE + * avoid short reads that might happen when a limit is used and reconciliation accepts less rows than such limit. + */ + public class ShortReadProtectionTest extends TestBaseImpl + { + /** + * Test GROUP BY with short read protection, particularly when there is a limit and regular row deletions. + * <p> + * See CASSANDRA-15459 + */ + @Test + public void testGroupBySRPRegularRow() throws Throwable + { + testGroupBySRP("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, ck))", + asList("INSERT INTO %s (pk, ck) VALUES (1, 1) USING TIMESTAMP 0", + "DELETE FROM %s WHERE pk=0 AND ck=0", + "INSERT INTO %s (pk, ck) VALUES (2, 2) USING TIMESTAMP 0"), + asList("DELETE FROM %s WHERE pk=1 AND ck=1", + "INSERT INTO %s (pk, ck) VALUES (0, 0) USING TIMESTAMP 0", + "DELETE FROM %s WHERE pk=2 AND ck=2"), + asList("SELECT * FROM %s LIMIT 1", + "SELECT * FROM %s LIMIT 10", + "SELECT * FROM %s GROUP BY pk LIMIT 1", + "SELECT * FROM %s GROUP BY pk LIMIT 10", + "SELECT * FROM %s GROUP BY pk, ck LIMIT 1", + "SELECT * FROM %s GROUP BY pk, ck LIMIT 10")); + } + + /** + * Test GROUP BY with short read protection, particularly when there is a limit and static row deletions. + * <p> + * See CASSANDRA-15459 + */ + @Test + public void testGroupBySRPStaticRow() throws Throwable + { + testGroupBySRP("CREATE TABLE %s (pk int, ck int, s int static, PRIMARY KEY (pk, ck))", + asList("INSERT INTO %s (pk, s) VALUES (1, 1) USING TIMESTAMP 0", + "INSERT INTO %s (pk, s) VALUES (0, null)", + "INSERT INTO %s (pk, s) VALUES (2, 2) USING TIMESTAMP 0"), + asList("INSERT INTO %s (pk, s) VALUES (1, null)", + "INSERT INTO %s (pk, s) VALUES (0, 0) USING TIMESTAMP 0", + "INSERT INTO %s (pk, s) VALUES (2, null)"), + asList("SELECT * FROM %s LIMIT 1", + "SELECT * FROM %s LIMIT 10", + "SELECT * FROM %s GROUP BY pk LIMIT 1", + "SELECT * FROM %s GROUP BY pk LIMIT 10", + "SELECT * FROM %s GROUP BY pk, ck LIMIT 1", + "SELECT * FROM %s GROUP BY pk, ck LIMIT 10")); + } + + private void testGroupBySRP(String createTable, + List<String> node1Queries, + List<String> node2Queries, + List<String> coordinatorQueries) throws Throwable + { + try (Cluster cluster = init(Cluster.build() + .withNodes(2) + .withConfig(config -> config.set("hinted_handoff_enabled", false)) - .withInstanceInitializer(BBDropMutationsHelper::install) + .start())) + { ++ // create the table with read repair disabled + String table = withKeyspace("%s.t"); - cluster.schemaChange(format(createTable, table)); ++ cluster.schemaChange(format(createTable + " WITH read_repair='NONE'", table)); + + // populate data on node1 + IInvokableInstance node1 = cluster.get(1); + for (String query : node1Queries) + node1.executeInternal(format(query, table)); + + // populate data on node2 + IInvokableInstance node2 = cluster.get(2); + for (String query : node2Queries) + node2.executeInternal(format(query, table)); + - // ignore read repair writes - node1.runOnInstance(BBDropMutationsHelper::enable); - node2.runOnInstance(BBDropMutationsHelper::enable); - + // verify the behaviour of SRP with GROUP BY queries + ICoordinator coordinator = cluster.coordinator(1); + for (String query : coordinatorQueries) + assertRows(coordinator.execute(format(query, table), ALL)); + } + } - - /** - * Byte Buddy helper to silently drop mutations. - */ - public static class BBDropMutationsHelper - { - private static final AtomicBoolean enabled = new AtomicBoolean(false); - - static void enable() - { - enabled.set(true); - } - - static void install(ClassLoader cl, int nodeNumber) - { - new ByteBuddy().rebase(Mutation.class) - .method(named("apply")) - .intercept(MethodDelegation.to(BBDropMutationsHelper.class)) - .make() - .load(cl, ClassLoadingStrategy.Default.INJECTION); - } - - public static void execute(@SuperCall Callable<ResultMessage.Rows> r) throws Exception - { - if (enabled.get()) - return; - r.call(); - } - } -} ++} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
