This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit ae51f0fd12820707927803fdbe63581f33111d4b
Merge: 528e3ad 86a9261
Author: Andrés de la Peña <[email protected]>
AuthorDate: Thu Aug 6 13:26:39 2020 +0100

    Merge branch 'cassandra-3.11' into trunk
    
    # Conflicts:
    #   CHANGES.txt
    #   src/java/org/apache/cassandra/db/filter/DataLimits.java
    #   src/java/org/apache/cassandra/service/DataResolver.java

 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/filter/DataLimits.java |  66 ++++++------
 .../reads/ShortReadPartitionsProtection.java       |  10 +-
 .../service/reads/ShortReadRowsProtection.java     |  14 +--
 .../distributed/test/ShortReadProtectionTest.java  | 116 +++++++++++++++++++++
 5 files changed, 154 insertions(+), 53 deletions(-)

diff --cc CHANGES.txt
index dcefef0,5168acb..7b4cc98
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,13 -1,5 +1,14 @@@
 -3.11.8
 +4.0-beta2
 + * Improve cassandra-stress logging when using a profile file that doesn't 
exist (CASSANDRA-14425)
 + * Improve logging for socket connection/disconnection (CASSANDRA-15980)
 + * Throw FSWriteError upon write failures in order to apply DiskFailurePolicy 
(CASSANDRA-15928)
 + * Forbid altering UDTs used in partition keys (CASSANDRA-15933)
 + * Fix version parsing logic when upgrading from 3.0 (CASSANDRA-15973)
 + * Optimize NoSpamLogger use in hot paths (CASSANDRA-15766)
 + * Verify sstable components on startup (CASSANDRA-15945)
 +Merged from 3.11:
+  * Fix short read protection for GROUP BY queries (CASSANDRA-15459)
 + * stop_paranoid disk failure policy is ignored on CorruptSSTableException 
after node is up (CASSANDRA-15191)
   * Frozen RawTuple is not annotated with frozen in the toString method 
(CASSANDRA-15857)
  Merged from 3.0:
   * Check for endpoint collision with hibernating nodes (CASSANDRA-14599)
diff --cc src/java/org/apache/cassandra/db/filter/DataLimits.java
index 3a766e0,2759932..e6495e8
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@@ -161,7 -191,7 +161,7 @@@ public abstract class DataLimit
       * @param countPartitionsWithOnlyStaticData if {@code true} the 
partitions with only static data should be counted
       * as 1 valid row.
       * @param enforceStrictLiveness whether the row should be purged if there 
is no PK liveness info,
--     *                              normally retrieved from {@link 
CFMetaData#enforceStrictLiveness()}
++     * normally retrieved from {@link 
org.apache.cassandra.schema.TableMetadata#enforceStrictLiveness()}
       * @return a new {@code Counter} for this limits.
       */
      public abstract Counter newCounter(int nowInSec,
diff --cc 
src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
index 42676b6,0000000..6c4dc68
mode 100644,000000..100644
--- 
a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
+++ 
b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
@@@ -1,207 -1,0 +1,199 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service.reads;
 +
 +import org.apache.cassandra.locator.Endpoints;
 +import org.apache.cassandra.locator.ReplicaPlan;
 +import org.apache.cassandra.locator.ReplicaPlans;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.Stage;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DataRange;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.PartitionPosition;
 +import org.apache.cassandra.db.PartitionRangeReadCommand;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.transform.MorePartitions;
 +import org.apache.cassandra.db.transform.MoreRows;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.dht.ExcludingBounds;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.service.reads.repair.NoopReadRepair;
 +import org.apache.cassandra.service.StorageProxy;
 +import org.apache.cassandra.tracing.Tracing;
 +
 +public class ShortReadPartitionsProtection extends 
Transformation<UnfilteredRowIterator> implements 
MorePartitions<UnfilteredPartitionIterator>
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(ShortReadPartitionsProtection.class);
 +    private final ReadCommand command;
 +    private final Replica source;
 +
 +    private final Runnable preFetchCallback; // called immediately before 
fetching more contents
 +
 +    private final DataLimits.Counter singleResultCounter; // unmerged 
per-source counter
 +    private final DataLimits.Counter mergedResultCounter; // merged 
end-result counter
 +
 +    private DecoratedKey lastPartitionKey; // key of the last observed 
partition
 +
 +    private boolean partitionsFetched; // whether we've seen any new 
partitions since iteration start or last moreContents() call
 +
 +    private final long queryStartNanoTime;
 +
 +    public ShortReadPartitionsProtection(ReadCommand command,
 +                                         Replica source,
 +                                         Runnable preFetchCallback,
 +                                         DataLimits.Counter 
singleResultCounter,
 +                                         DataLimits.Counter 
mergedResultCounter,
 +                                         long queryStartNanoTime)
 +    {
 +        this.command = command;
 +        this.source = source;
 +        this.preFetchCallback = preFetchCallback;
 +        this.singleResultCounter = singleResultCounter;
 +        this.mergedResultCounter = mergedResultCounter;
 +        this.queryStartNanoTime = queryStartNanoTime;
 +    }
 +
 +    @Override
 +    public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator 
partition)
 +    {
 +        partitionsFetched = true;
 +
 +        lastPartitionKey = partition.partitionKey();
 +
 +        /*
 +         * Extend for moreContents() then apply protection to track 
lastClustering by applyToRow().
 +         *
 +         * If we don't apply the transformation *after* extending the 
partition with MoreRows,
 +         * applyToRow() method of protection will not be called on the first 
row of the new extension iterator.
 +         */
 +        ReplicaPlan.ForTokenRead replicaPlan = 
ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), 
partition.partitionKey().getToken(), source);
 +        ReplicaPlan.SharedForTokenRead sharedReplicaPlan = 
ReplicaPlan.shared(replicaPlan);
 +        ShortReadRowsProtection protection = new 
ShortReadRowsProtection(partition.partitionKey(),
 +                                                                         
command, source,
 +                                                                         
(cmd) -> executeReadCommand(cmd, sharedReplicaPlan),
 +                                                                         
singleResultCounter,
 +                                                                         
mergedResultCounter);
 +        return Transformation.apply(MoreRows.extend(partition, protection), 
protection);
 +    }
 +
 +    /*
 +     * We only get here once all the rows and partitions in this iterator 
have been iterated over, and so
 +     * if the node had returned the requested number of rows but we still get 
here, then some results were
 +     * skipped during reconciliation.
 +     */
 +    public UnfilteredPartitionIterator moreContents()
 +    {
 +        // never try to request additional partitions from replicas if our 
reconciled partitions are already filled to the limit
 +        assert !mergedResultCounter.isDone();
 +
 +        // we do not apply short read protection when we have no limits at all
 +        assert !command.limits().isUnlimited();
 +
 +        /*
 +         * If this is a single partition read command or an (indexed) 
partition range read command with
 +         * a partition key specified, then we can't and shouldn't try fetch 
more partitions.
 +         */
 +        assert !command.isLimitedToOnePartition();
 +
 +        /*
 +         * If the returned result doesn't have enough rows/partitions to 
satisfy even the original limit, don't ask for more.
 +         *
 +         * Can only take the short cut if there is no per partition limit 
set. Otherwise it's possible to hit false
 +         * positives due to some rows being uncounted for in certain 
scenarios (see CASSANDRA-13911).
 +         */
 +        if (!singleResultCounter.isDone() && 
command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
 +            return null;
 +
 +        /*
 +         * Either we had an empty iterator as the initial response, or our 
moreContents() call got us an empty iterator.
 +         * There is no point to ask the replica for more rows - it has no 
more in the requested range.
 +         */
 +        if (!partitionsFetched)
 +            return null;
 +        partitionsFetched = false;
 +
 +        /*
 +         * We are going to fetch one partition at a time for thrift and 
potentially more for CQL.
 +         * The row limit will either be set to the per partition limit - if 
the command has no total row limit set, or
 +         * the total # of rows remaining - if it has some. If we don't grab 
enough rows in some of the partitions,
 +         * then future ShortReadRowsProtection.moreContents() calls will 
fetch the missing ones.
 +         */
 +        int toQuery = command.limits().count() != DataLimits.NO_LIMIT
-                       ? command.limits().count() - 
counted(mergedResultCounter)
++                      ? command.limits().count() - 
mergedResultCounter.rowsCounted()
 +                      : command.limits().perPartitionCount();
 +
 +        
ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark();
 +        Tracing.trace("Requesting {} extra rows from {} for short read 
protection", toQuery, source);
 +        logger.info("Requesting {} extra rows from {} for short read 
protection", toQuery, source);
 +
 +        // If we've arrived here, all responses have been consumed, and we're 
about to request more.
 +        preFetchCallback.run();
 +
 +        return makeAndExecuteFetchAdditionalPartitionReadCommand(toQuery);
 +    }
 +
-     // Counts the number of rows for regular queries and the number of groups 
for GROUP BY queries
-     private int counted(DataLimits.Counter counter)
-     {
-         return command.limits().isGroupByLimit()
-                ? counter.rowCounted()
-                : counter.counted();
-     }
- 
 +    private UnfilteredPartitionIterator 
makeAndExecuteFetchAdditionalPartitionReadCommand(int toQuery)
 +    {
 +        PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) command;
 +
 +        DataLimits newLimits = cmd.limits().forShortReadRetry(toQuery);
 +
 +        AbstractBounds<PartitionPosition> bounds = cmd.dataRange().keyRange();
 +        AbstractBounds<PartitionPosition> newBounds = bounds.inclusiveRight()
 +                                                      ? new 
Range<>(lastPartitionKey, bounds.right)
 +                                                      : new 
ExcludingBounds<>(lastPartitionKey, bounds.right);
 +        DataRange newDataRange = cmd.dataRange().forSubRange(newBounds);
 +
 +        ReplicaPlan.ForRangeRead replicaPlan = 
ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), 
cmd.dataRange().keyRange(), source, 1);
 +        return 
executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange), 
ReplicaPlan.shared(replicaPlan));
 +    }
 +
 +    private <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
 +    UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, 
ReplicaPlan.Shared<E, P> replicaPlan)
 +    {
 +        DataResolver<E, P> resolver = new DataResolver<>(cmd, replicaPlan, 
(NoopReadRepair<E, P>)NoopReadRepair.instance, queryStartNanoTime);
 +        ReadCallback<E, P> handler = new ReadCallback<>(resolver, cmd, 
replicaPlan, queryStartNanoTime);
 +
 +        if (source.isSelf())
 +        {
 +            Stage.READ.maybeExecuteImmediately(new 
StorageProxy.LocalReadRunnable(cmd, handler));
 +        }
 +        else
 +        {
 +            if (source.isTransient())
 +                cmd = cmd.copyAsTransientQuery(source);
 +            
MessagingService.instance().sendWithCallback(cmd.createMessage(false), 
source.endpoint(), handler);
 +        }
 +
 +        // We don't call handler.get() because we want to preserve tombstones 
since we're still in the middle of merging node results.
 +        handler.awaitResults();
 +        assert resolver.getMessages().size() == 1;
 +        return resolver.getMessages().get(0).payload.makeIterator(command);
 +    }
 +}
diff --cc 
src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
index 8dc7fc7,0000000..f32502d
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadRowsProtection.java
@@@ -1,197 -1,0 +1,189 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service.reads;
 +
 +import java.util.function.Function;
 +
 +import org.apache.cassandra.db.Clustering;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.SinglePartitionReadCommand;
 +import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.transform.MoreRows;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.tracing.Tracing;
 +
 +class ShortReadRowsProtection extends Transformation implements 
MoreRows<UnfilteredRowIterator>
 +{
 +    private final ReadCommand command;
 +    private final Replica source;
 +    private final DataLimits.Counter singleResultCounter; // unmerged 
per-source counter
 +    private final DataLimits.Counter mergedResultCounter; // merged 
end-result counter
 +    private final Function<ReadCommand, UnfilteredPartitionIterator> 
commandExecutor;
 +    private final TableMetadata metadata;
 +    private final DecoratedKey partitionKey;
 +
 +    private Clustering lastClustering; // clustering of the last observed row
 +
 +    private int lastCounted = 0; // last seen recorded # before attempting to 
fetch more rows
 +    private int lastFetched = 0; // # rows returned by last attempt to get 
more (or by the original read command)
 +    private int lastQueried = 0; // # extra rows requested from the replica 
last time
 +
 +    ShortReadRowsProtection(DecoratedKey partitionKey, ReadCommand command, 
Replica source,
 +                            Function<ReadCommand, 
UnfilteredPartitionIterator> commandExecutor,
 +                            DataLimits.Counter singleResultCounter, 
DataLimits.Counter mergedResultCounter)
 +    {
 +        this.command = command;
 +        this.source = source;
 +        this.commandExecutor = commandExecutor;
 +        this.singleResultCounter = singleResultCounter;
 +        this.mergedResultCounter = mergedResultCounter;
 +        this.metadata = command.metadata();
 +        this.partitionKey = partitionKey;
 +    }
 +
 +    @Override
 +    public Row applyToRow(Row row)
 +    {
 +        lastClustering = row.clustering();
 +        return row;
 +    }
 +
 +    /*
 +     * We only get here once all the rows in this iterator have been iterated 
over, and so if the node
 +     * had returned the requested number of rows but we still get here, then 
some results were skipped
 +     * during reconciliation.
 +     */
 +    public UnfilteredRowIterator moreContents()
 +    {
 +        // never try to request additional rows from replicas if our 
reconciled partition is already filled to the limit
 +        assert !mergedResultCounter.isDoneForPartition();
 +
 +        // we do not apply short read protection when we have no limits at all
 +        assert !command.limits().isUnlimited();
 +
 +        /*
 +         * If the returned partition doesn't have enough rows to satisfy even 
the original limit, don't ask for more.
 +         *
 +         * Can only take the short cut if there is no per partition limit 
set. Otherwise it's possible to hit false
 +         * positives due to some rows being uncounted for in certain 
scenarios (see CASSANDRA-13911).
 +         */
 +        if (!singleResultCounter.isDoneForPartition() && 
command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
 +            return null;
 +
 +        /*
 +         * If the replica has no live rows in the partition, don't try to 
fetch more.
 +         *
 +         * Note that the previous branch [if 
(!singleResultCounter.isDoneForPartition()) return null] doesn't
 +         * always cover this scenario:
 +         * isDoneForPartition() is defined as [isDone() || 
rowInCurrentPartition >= perPartitionLimit],
 +         * and will return true if isDone() returns true, even if there are 0 
rows counted in the current partition.
 +         *
 +         * This can happen with a range read if after 1+ rounds of short read 
protection requests we managed to fetch
 +         * enough extra rows for other partitions to satisfy the 
singleResultCounter's total row limit, but only
 +         * have tombstones in the current partition.
 +         *
 +         * One other way we can hit this condition is when the partition only 
has a live static row and no regular
 +         * rows. In that scenario the counter will remain at 0 until the 
partition is closed - which happens after
 +         * the moreContents() call.
 +         */
-         if (countedInCurrentPartition(singleResultCounter) == 0)
++        if (singleResultCounter.rowsCountedInCurrentPartition() == 0)
 +            return null;
 +
 +        /*
 +         * This is a table with no clustering columns, and has at most one 
row per partition - with EMPTY clustering.
 +         * We already have the row, so there is no point in asking for more 
from the partition.
 +         */
 +        if (Clustering.EMPTY == lastClustering)
 +            return null;
 +
-         lastFetched = countedInCurrentPartition(singleResultCounter) - 
lastCounted;
-         lastCounted = countedInCurrentPartition(singleResultCounter);
++        lastFetched = singleResultCounter.rowsCountedInCurrentPartition() - 
lastCounted;
++        lastCounted = singleResultCounter.rowsCountedInCurrentPartition();
 +
 +        // getting back fewer rows than we asked for means the partition on 
the replica has been fully consumed
 +        if (lastQueried > 0 && lastFetched < lastQueried)
 +            return null;
 +
 +        /*
 +         * At this point we know that:
 +         *     1. the replica returned [repeatedly?] as many rows as we asked 
for and potentially has more
 +         *        rows in the partition
 +         *     2. at least one of those returned rows was shadowed by a 
tombstone returned from another
 +         *        replica
 +         *     3. we haven't satisfied the client's limits yet, and should 
attempt to query for more rows to
 +         *        avoid a short read
 +         *
 +         * In the ideal scenario, we would get exactly min(a, b) or fewer 
rows from the next request, where a and b
 +         * are defined as follows:
 +         *     [a] limits.count() - mergedResultCounter.counted()
 +         *     [b] limits.perPartitionCount() - 
mergedResultCounter.countedInCurrentPartition()
 +         *
 +         * It would be naive to query for exactly that many rows, as it's 
possible and not unlikely
 +         * that some of the returned rows would also be shadowed by 
tombstones from other hosts.
 +         *
 +         * Note: we don't know, nor do we care, how many rows from the 
replica made it into the reconciled result;
 +         * we can only tell how many in total we queried for, and that [0, 
mrc.countedInCurrentPartition()) made it.
 +         *
 +         * In general, our goal should be to minimise the number of extra 
requests - *not* to minimise the number
 +         * of rows fetched: there is a high transactional cost for every 
individual request, but a relatively low
 +         * marginal cost for each extra row requested.
 +         *
 +         * As such it's better to overfetch than to underfetch extra rows 
from a host; but at the same
 +         * time we want to respect paging limits and not blow up 
spectacularly.
 +         *
 +         * Note: it's ok to retrieve more rows that necessary since 
singleResultCounter is not stopping and only
 +         * counts.
 +         *
 +         * With that in mind, we'll just request the minimum of (count(), 
perPartitionCount()) limits.
 +         *
 +         * See CASSANDRA-13794 for more details.
 +         */
 +        lastQueried = Math.min(command.limits().count(), 
command.limits().perPartitionCount());
 +
 +        
ColumnFamilyStore.metricsFor(metadata.id).shortReadProtectionRequests.mark();
 +        Tracing.trace("Requesting {} extra rows from {} for short read 
protection", lastQueried, source);
 +
 +        SinglePartitionReadCommand cmd = 
makeFetchAdditionalRowsReadCommand(lastQueried);
 +        return 
UnfilteredPartitionIterators.getOnlyElement(commandExecutor.apply(cmd), cmd);
 +    }
 +
-     // Counts the number of rows for regular queries and the number of groups 
for GROUP BY queries
-     private int countedInCurrentPartition(DataLimits.Counter counter)
-     {
-         return command.limits().isGroupByLimit()
-                ? counter.rowCountedInCurrentPartition()
-                : counter.countedInCurrentPartition();
-     }
- 
 +    private SinglePartitionReadCommand makeFetchAdditionalRowsReadCommand(int 
toQuery)
 +    {
 +        ClusteringIndexFilter filter = 
command.clusteringIndexFilter(partitionKey);
 +        if (null != lastClustering)
 +            filter = filter.forPaging(metadata.comparator, lastClustering, 
false);
 +
 +        return SinglePartitionReadCommand.create(command.metadata(),
 +                                                 command.nowInSec(),
 +                                                 command.columnFilter(),
 +                                                 command.rowFilter(),
 +                                                 
command.limits().forShortReadRetry(toQuery),
 +                                                 partitionKey,
 +                                                 filter,
 +                                                 command.indexMetadata());
 +    }
 +}
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
index 0000000,27390d6..23b5c14
mode 000000,100644..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ShortReadProtectionTest.java
@@@ -1,0 -1,158 +1,116 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
+ import java.util.List;
 -import java.util.concurrent.Callable;
 -import java.util.concurrent.atomic.AtomicBoolean;
+ 
+ import org.junit.Test;
+ 
 -import net.bytebuddy.ByteBuddy;
 -import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 -import net.bytebuddy.implementation.MethodDelegation;
 -import net.bytebuddy.implementation.bind.annotation.SuperCall;
 -import org.apache.cassandra.db.Mutation;
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.ICoordinator;
+ import org.apache.cassandra.distributed.api.IInvokableInstance;
 -import org.apache.cassandra.transport.messages.ResultMessage;
+ 
+ import static java.lang.String.format;
+ import static java.util.Arrays.asList;
 -import static net.bytebuddy.matcher.ElementMatchers.named;
+ import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+ import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+ 
+ /**
+  * Tests short read protection, the mechanism that ensures distributed 
queries at read consistency levels > ONE/LOCAL_ONE
+  * avoid short reads that might happen when a limit is used and 
reconciliation accepts less rows than such limit.
+  */
+ public class ShortReadProtectionTest extends TestBaseImpl
+ {
+     /**
+      * Test GROUP BY with short read protection, particularly when there is a 
limit and regular row deletions.
+      * <p>
+      * See CASSANDRA-15459
+      */
+     @Test
+     public void testGroupBySRPRegularRow() throws Throwable
+     {
+         testGroupBySRP("CREATE TABLE %s (pk int, ck int, PRIMARY KEY (pk, 
ck))",
+                        asList("INSERT INTO %s (pk, ck) VALUES (1, 1) USING 
TIMESTAMP 0",
+                               "DELETE FROM %s WHERE pk=0 AND ck=0",
+                               "INSERT INTO %s (pk, ck) VALUES (2, 2) USING 
TIMESTAMP 0"),
+                        asList("DELETE FROM %s WHERE pk=1 AND ck=1",
+                               "INSERT INTO %s (pk, ck) VALUES (0, 0) USING 
TIMESTAMP 0",
+                               "DELETE FROM %s WHERE pk=2 AND ck=2"),
+                        asList("SELECT * FROM %s LIMIT 1",
+                               "SELECT * FROM %s LIMIT 10",
+                               "SELECT * FROM %s GROUP BY pk LIMIT 1",
+                               "SELECT * FROM %s GROUP BY pk LIMIT 10",
+                               "SELECT * FROM %s GROUP BY pk, ck LIMIT 1",
+                               "SELECT * FROM %s GROUP BY pk, ck LIMIT 10"));
+     }
+ 
+     /**
+      * Test GROUP BY with short read protection, particularly when there is a 
limit and static row deletions.
+      * <p>
+      * See CASSANDRA-15459
+      */
+     @Test
+     public void testGroupBySRPStaticRow() throws Throwable
+     {
+         testGroupBySRP("CREATE TABLE %s (pk int, ck int, s int static, 
PRIMARY KEY (pk, ck))",
+                        asList("INSERT INTO %s (pk, s) VALUES (1, 1) USING 
TIMESTAMP 0",
+                               "INSERT INTO %s (pk, s) VALUES (0, null)",
+                               "INSERT INTO %s (pk, s) VALUES (2, 2) USING 
TIMESTAMP 0"),
+                        asList("INSERT INTO %s (pk, s) VALUES (1, null)",
+                               "INSERT INTO %s (pk, s) VALUES (0, 0) USING 
TIMESTAMP 0",
+                               "INSERT INTO %s (pk, s) VALUES (2, null)"),
+                        asList("SELECT * FROM %s LIMIT 1",
+                               "SELECT * FROM %s LIMIT 10",
+                               "SELECT * FROM %s GROUP BY pk LIMIT 1",
+                               "SELECT * FROM %s GROUP BY pk LIMIT 10",
+                               "SELECT * FROM %s GROUP BY pk, ck LIMIT 1",
+                               "SELECT * FROM %s GROUP BY pk, ck LIMIT 10"));
+     }
+ 
+     private void testGroupBySRP(String createTable,
+                                 List<String> node1Queries,
+                                 List<String> node2Queries,
+                                 List<String> coordinatorQueries) throws 
Throwable
+     {
+         try (Cluster cluster = init(Cluster.build()
+                                            .withNodes(2)
+                                            .withConfig(config -> 
config.set("hinted_handoff_enabled", false))
 -                                           
.withInstanceInitializer(BBDropMutationsHelper::install)
+                                            .start()))
+         {
++            // create the table with read repair disabled
+             String table = withKeyspace("%s.t");
 -            cluster.schemaChange(format(createTable, table));
++            cluster.schemaChange(format(createTable + " WITH 
read_repair='NONE'", table));
+ 
+             // populate data on node1
+             IInvokableInstance node1 = cluster.get(1);
+             for (String query : node1Queries)
+                 node1.executeInternal(format(query, table));
+ 
+             // populate data on node2
+             IInvokableInstance node2 = cluster.get(2);
+             for (String query : node2Queries)
+                 node2.executeInternal(format(query, table));
+ 
 -            // ignore read repair writes
 -            node1.runOnInstance(BBDropMutationsHelper::enable);
 -            node2.runOnInstance(BBDropMutationsHelper::enable);
 -
+             // verify the behaviour of SRP with GROUP BY queries
+             ICoordinator coordinator = cluster.coordinator(1);
+             for (String query : coordinatorQueries)
+                 assertRows(coordinator.execute(format(query, table), ALL));
+         }
+     }
 -
 -    /**
 -     * Byte Buddy helper to silently drop mutations.
 -     */
 -    public static class BBDropMutationsHelper
 -    {
 -        private static final AtomicBoolean enabled = new AtomicBoolean(false);
 -
 -        static void enable()
 -        {
 -            enabled.set(true);
 -        }
 -
 -        static void install(ClassLoader cl, int nodeNumber)
 -        {
 -            new ByteBuddy().rebase(Mutation.class)
 -                           .method(named("apply"))
 -                           
.intercept(MethodDelegation.to(BBDropMutationsHelper.class))
 -                           .make()
 -                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
 -        }
 -
 -        public static void execute(@SuperCall Callable<ResultMessage.Rows> r) 
throws Exception
 -        {
 -            if (enabled.get())
 -                return;
 -            r.call();
 -        }
 -    }
 -}
++}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to