This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 4c103447af3c4829e3a1c733bed3952fd059af08
Author: Adam Holmberg <[email protected]>
AuthorDate: Wed Dec 9 10:17:07 2020 -0800

    DigestResolver.getData throws AssertionError since dataResponse is null
    
    patch by Adam Holmberg, Caleb Rackliffe; reviewed by Berenguer Blasi, 
Brandon Williams, Caleb Rackliffe, David Capwell for CASSANDRA-16097
---
 CHANGES.txt                                        |   1 +
 .../service/reads/AbstractReadExecutor.java        |   1 +
 .../cassandra/service/reads/DigestResolver.java    |   5 +-
 .../cassandra/service/reads/ReadCallback.java      |  10 ++-
 .../distributed/test/ReadFailureTest.java          | 100 +++++++++++++++++++++
 .../cassandra/service/reads/ReadExecutorTest.java  |  48 ++++++++++
 6 files changed, 161 insertions(+), 4 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 17f1784..520af90 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,7 @@
  * Internode messaging catches OOMs and does not rethrow (CASSANDRA-15214)
  * When a table attempts to clean up metrics, it was cleaning up all global 
table metrics (CASSANDRA-16095)
  * Bring back the accepted encryption protocols list as configurable option 
(CASSANDRA-13325)
+ * DigestResolver.getData throws AssertionError since dataResponse is null 
(CASSANDRA-16097)
 Merged from 3.11:
  * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to 
default of 1GB (CASSANDRA-16071)
 Merged from 3.0:
diff --git 
a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java 
b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index 8907e74..ae09417 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -371,6 +371,7 @@ public abstract class AbstractReadExecutor
         try
         {
             handler.awaitResults();
+            assert digestResolver.isDataPresent() : "awaitResults returned 
with no data present.";
         }
         catch (ReadTimeoutException e)
         {
diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java 
b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
index dbb761b..475c8c2 100644
--- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
@@ -74,8 +74,6 @@ public class DigestResolver<E extends Endpoints<E>, P extends 
ReplicaPlan.ForRea
 
     public PartitionIterator getData()
     {
-        assert isDataPresent();
-
         Collection<Message<ReadResponse>> responses = 
this.responses.snapshot();
 
         if (!hasTransientResponse(responses))
@@ -109,7 +107,8 @@ public class DigestResolver<E extends Endpoints<E>, P 
extends ReplicaPlan.ForRea
         // validate digests against each other; return false immediately on 
mismatch.
         ByteBuffer digest = null;
         Collection<Message<ReadResponse>> snapshot = responses.snapshot();
-        if (snapshot.size() <= 1)
+        assert snapshot.size() > 0 : "Attempted response match comparison 
while no responses have been received.";
+        if (snapshot.size() == 1)
             return true;
 
         // TODO: should also not calculate if only one full node
diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java 
b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
index 2968dbc..b7ee18c 100644
--- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
@@ -99,7 +99,15 @@ public class ReadCallback<E extends Endpoints<E>, P extends 
ReplicaPlan.ForRead<
     public void awaitResults() throws ReadFailureException, 
ReadTimeoutException
     {
         boolean signaled = await(command.getTimeout(MILLISECONDS), 
TimeUnit.MILLISECONDS);
-        boolean failed = failures > 0 && blockFor + failures > 
replicaPlan().contacts().size();
+        /**
+         * Here we are checking isDataPresent in addition to the responses 
size because there is a possibility
+         * that an asynchronous speculative execution request could be 
returning after a local failure already
+         * signaled. Responses may have been set while the data reference is 
not yet.
+         * See {@link DigestResolver#preprocess(Message)}
+         * CASSANDRA-16097
+         */
+        boolean failed = failures > 0 &&
+                         (blockFor > resolver.responses.size() || 
!resolver.isDataPresent());
         if (signaled && !failed)
             return;
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ReadFailureTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/ReadFailureTest.java
new file mode 100644
index 0000000..be8db6c
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ReadFailureTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+
+public class ReadFailureTest extends TestBaseImpl
+{
+    static final int TOMBSTONE_FAIL_THRESHOLD = 20;
+    static final int TOMBSTONE_FAIL_KEY = 100001;
+    static final String TABLE = "t";
+
+    /**
+     * This test attempts to create a race condition with speculative 
executions that would previously cause an AssertionError.
+     * N=2, RF=2, read ONE
+     * The read will fail on the local node due to tombstone read threshold. 
At the same time, a spec exec is triggered
+     * reading from the other node.
+     * <p>
+     * See CASSANDRA-16097 for further details.
+     */
+    @Test
+    public void testSpecExecRace() throws Throwable
+    {
+        try (Cluster cluster = 
init(Cluster.build().withNodes(2).withConfig(config -> 
config.set("tombstone_failure_threshold", TOMBSTONE_FAIL_THRESHOLD)).start()))
+        {
+            // Create a table with the spec exec policy set to a low 
percentile so it's more likely to produce a spec exec racing with the local 
request.
+            // Not using 'Always' because that actually uses a different 
class/mechanism and doesn't exercise the bug
+            // we're trying to produce.
+            cluster.schemaChange(String.format("CREATE TABLE %s.%s (k int, c 
int, v int, PRIMARY KEY (k,c)) WITH speculative_retry = '5p';", KEYSPACE, 
TABLE));
+
+            // Create a partition with enough tombstones to create a read 
failure according to the configured threshold
+            for (int i = 0; i <= TOMBSTONE_FAIL_THRESHOLD; ++i)
+                cluster.coordinator(1).execute(String.format("DELETE FROM %s.t 
WHERE k=%d AND c=%d", KEYSPACE, TOMBSTONE_FAIL_KEY, i),
+                                               ConsistencyLevel.TWO);
+
+            // Create a bunch of latency samples for this failed operation.
+            loopFailStatement(cluster, 5000);
+            // Update the spec exec threshold based on the above samples.
+            // This would normally be done by the periodic task 
CassandraDaemon.SPECULATION_THRESHOLD_UPDATER.
+            cluster.get(1).runOnInstance(() ->
+                                         {
+                                             ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE)
+                                                                             
.getColumnFamilyStore(TABLE);
+                                             cfs.updateSpeculationThreshold();
+                                         });
+
+            // Run the request a bunch of times under racy conditions.
+            loopFailStatement(cluster, 5000);
+        }
+    }
+
+    private void loopFailStatement(ICluster cluster, int iterations)
+    {
+        final String query = String.format("SELECT k FROM %s.t WHERE k=%d", 
KEYSPACE, TOMBSTONE_FAIL_KEY);
+        for (int i = 0; i < iterations; ++i)
+        {
+            try
+            {
+                cluster.coordinator(1).execute(query, ConsistencyLevel.ONE);
+                fail("Request did not throw a ReadFailureException as 
expected.");
+            }
+            catch (Throwable t) // Throwable because the raised ReadFailure is 
loaded from a different classloader and doesn't match "ours"
+            {
+                String onFail = String.format("Did not receive expected 
ReadFailureException. Instead caught %s\n%s",
+                                              t, 
ExceptionUtils.getStackTrace(t));
+                assertNotNull(onFail, t.getMessage());
+                assertTrue(onFail, 
t.getMessage().contains(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.name()));
+            }
+        }
+    }
+}
+
diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java 
b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
index e0a5927..3eb9b2e 100644
--- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.service.reads;
 
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.ReplicaPlan;
@@ -46,6 +48,8 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.cassandra.locator.ReplicaUtils.full;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class ReadExecutorTest
@@ -187,6 +191,50 @@ public class ReadExecutorTest
         assertEquals(1, ks.metric.speculativeFailedRetries.getCount());
     }
 
+    /**
+     * Test that an async speculative execution racing with a local errored 
request does not violate assertions.
+     * CASSANDRA-16097
+     */
+    @Test
+    public void testRaceWithNonSpeculativeFailure()
+    {
+        MockSinglePartitionReadCommand command = new 
MockSinglePartitionReadCommand(TimeUnit.DAYS.toMillis(365));
+        ReplicaPlan.ForTokenRead plan = plan(ConsistencyLevel.LOCAL_ONE, 
targets, targets.subList(0, 1));
+        AbstractReadExecutor executor = new 
AbstractReadExecutor.SpeculatingReadExecutor(cfs, command, plan, 
System.nanoTime());
+
+        // Issue an initial request against the first endpoint...
+        executor.executeAsync();
+
+        // ...and then force a speculative retry against another endpoint.
+        cfs.sampleReadLatencyNanos = 0L;
+        executor.maybeTryAdditionalReplicas();
+
+        new Thread(() ->
+                   {
+                       // Fail the first request. When this fails the number 
of contacts has already been increased
+                       // to 2, so the failure won't actally signal. However...
+                       executor.handler.onFailure(targets.get(0).endpoint(), 
RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
+
+                       // ...speculative retries are fired after a short wait, 
and it is possible for the failure to
+                       // reach the handler just before one is fired and the 
number of contacts incremented...
+                       executor.handler.condition.signalAll();
+                   }).start();
+
+        try
+        {
+            // ...but by the time we await for results, the number of contacts 
may already have been incremented.
+            // If we rely only on the number of failures and the number of 
nodes blocked for, compared to the number
+            // of contacts, we may not recognize that the query has failed.
+            executor.awaitResponses();
+            fail("Awaiting responses did not throw a ReadFailureException as 
expected.");
+        }
+        catch (Throwable t)
+        {
+            assertSame(ExceptionUtils.getStackTrace(t), 
ReadFailureException.class, t.getClass());
+            
assertTrue(t.getMessage().contains(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.name()));
+        }
+    }
+
     public static class MockSinglePartitionReadCommand extends 
SinglePartitionReadCommand
     {
         private final long timeout;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to