This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 4c103447af3c4829e3a1c733bed3952fd059af08 Author: Adam Holmberg <[email protected]> AuthorDate: Wed Dec 9 10:17:07 2020 -0800 DigestResolver.getData throws AssertionError since dataResponse is null patch by Adam Holmberg, Caleb Rackliffe; reviewed by Berenguer Blasi, Brandon Williams, Caleb Rackliffe, David Capwell for CASSANDRA-16097 --- CHANGES.txt | 1 + .../service/reads/AbstractReadExecutor.java | 1 + .../cassandra/service/reads/DigestResolver.java | 5 +- .../cassandra/service/reads/ReadCallback.java | 10 ++- .../distributed/test/ReadFailureTest.java | 100 +++++++++++++++++++++ .../cassandra/service/reads/ReadExecutorTest.java | 48 ++++++++++ 6 files changed, 161 insertions(+), 4 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 17f1784..520af90 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -19,6 +19,7 @@ * Internode messaging catches OOMs and does not rethrow (CASSANDRA-15214) * When a table attempts to clean up metrics, it was cleaning up all global table metrics (CASSANDRA-16095) * Bring back the accepted encryption protocols list as configurable option (CASSANDRA-13325) + * DigestResolver.getData throws AssertionError since dataResponse is null (CASSANDRA-16097) Merged from 3.11: * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to default of 1GB (CASSANDRA-16071) Merged from 3.0: diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java index 8907e74..ae09417 100644 --- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java @@ -371,6 +371,7 @@ public abstract class AbstractReadExecutor try { handler.awaitResults(); + assert digestResolver.isDataPresent() : "awaitResults returned with no data present."; } catch (ReadTimeoutException e) { diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java b/src/java/org/apache/cassandra/service/reads/DigestResolver.java index dbb761b..475c8c2 100644 --- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java +++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java @@ -74,8 +74,6 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea public PartitionIterator getData() { - assert isDataPresent(); - Collection<Message<ReadResponse>> responses = this.responses.snapshot(); if (!hasTransientResponse(responses)) @@ -109,7 +107,8 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea // validate digests against each other; return false immediately on mismatch. ByteBuffer digest = null; Collection<Message<ReadResponse>> snapshot = responses.snapshot(); - if (snapshot.size() <= 1) + assert snapshot.size() > 0 : "Attempted response match comparison while no responses have been received."; + if (snapshot.size() == 1) return true; // TODO: should also not calculate if only one full node diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java index 2968dbc..b7ee18c 100644 --- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java @@ -99,7 +99,15 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead< public void awaitResults() throws ReadFailureException, ReadTimeoutException { boolean signaled = await(command.getTimeout(MILLISECONDS), TimeUnit.MILLISECONDS); - boolean failed = failures > 0 && blockFor + failures > replicaPlan().contacts().size(); + /** + * Here we are checking isDataPresent in addition to the responses size because there is a possibility + * that an asynchronous speculative execution request could be returning after a local failure already + * signaled. Responses may have been set while the data reference is not yet. + * See {@link DigestResolver#preprocess(Message)} + * CASSANDRA-16097 + */ + boolean failed = failures > 0 && + (blockFor > resolver.responses.size() || !resolver.isDataPresent()); if (signaled && !failed) return; diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReadFailureTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReadFailureTest.java new file mode 100644 index 0000000..be8db6c --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/ReadFailureTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.junit.Test; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.exceptions.RequestFailureReason; + +public class ReadFailureTest extends TestBaseImpl +{ + static final int TOMBSTONE_FAIL_THRESHOLD = 20; + static final int TOMBSTONE_FAIL_KEY = 100001; + static final String TABLE = "t"; + + /** + * This test attempts to create a race condition with speculative executions that would previously cause an AssertionError. + * N=2, RF=2, read ONE + * The read will fail on the local node due to tombstone read threshold. At the same time, a spec exec is triggered + * reading from the other node. + * <p> + * See CASSANDRA-16097 for further details. + */ + @Test + public void testSpecExecRace() throws Throwable + { + try (Cluster cluster = init(Cluster.build().withNodes(2).withConfig(config -> config.set("tombstone_failure_threshold", TOMBSTONE_FAIL_THRESHOLD)).start())) + { + // Create a table with the spec exec policy set to a low percentile so it's more likely to produce a spec exec racing with the local request. + // Not using 'Always' because that actually uses a different class/mechanism and doesn't exercise the bug + // we're trying to produce. + cluster.schemaChange(String.format("CREATE TABLE %s.%s (k int, c int, v int, PRIMARY KEY (k,c)) WITH speculative_retry = '5p';", KEYSPACE, TABLE)); + + // Create a partition with enough tombstones to create a read failure according to the configured threshold + for (int i = 0; i <= TOMBSTONE_FAIL_THRESHOLD; ++i) + cluster.coordinator(1).execute(String.format("DELETE FROM %s.t WHERE k=%d AND c=%d", KEYSPACE, TOMBSTONE_FAIL_KEY, i), + ConsistencyLevel.TWO); + + // Create a bunch of latency samples for this failed operation. + loopFailStatement(cluster, 5000); + // Update the spec exec threshold based on the above samples. + // This would normally be done by the periodic task CassandraDaemon.SPECULATION_THRESHOLD_UPDATER. + cluster.get(1).runOnInstance(() -> + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE) + .getColumnFamilyStore(TABLE); + cfs.updateSpeculationThreshold(); + }); + + // Run the request a bunch of times under racy conditions. + loopFailStatement(cluster, 5000); + } + } + + private void loopFailStatement(ICluster cluster, int iterations) + { + final String query = String.format("SELECT k FROM %s.t WHERE k=%d", KEYSPACE, TOMBSTONE_FAIL_KEY); + for (int i = 0; i < iterations; ++i) + { + try + { + cluster.coordinator(1).execute(query, ConsistencyLevel.ONE); + fail("Request did not throw a ReadFailureException as expected."); + } + catch (Throwable t) // Throwable because the raised ReadFailure is loaded from a different classloader and doesn't match "ours" + { + String onFail = String.format("Did not receive expected ReadFailureException. Instead caught %s\n%s", + t, ExceptionUtils.getStackTrace(t)); + assertNotNull(onFail, t.getMessage()); + assertTrue(onFail, t.getMessage().contains(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.name())); + } + } + } +} + diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java index e0a5927..3eb9b2e 100644 --- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java +++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java @@ -20,6 +20,8 @@ package org.apache.cassandra.service.reads; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.exception.ExceptionUtils; + import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.ReplicaPlan; @@ -46,6 +48,8 @@ import org.apache.cassandra.schema.KeyspaceParams; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.cassandra.locator.ReplicaUtils.full; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class ReadExecutorTest @@ -187,6 +191,50 @@ public class ReadExecutorTest assertEquals(1, ks.metric.speculativeFailedRetries.getCount()); } + /** + * Test that an async speculative execution racing with a local errored request does not violate assertions. + * CASSANDRA-16097 + */ + @Test + public void testRaceWithNonSpeculativeFailure() + { + MockSinglePartitionReadCommand command = new MockSinglePartitionReadCommand(TimeUnit.DAYS.toMillis(365)); + ReplicaPlan.ForTokenRead plan = plan(ConsistencyLevel.LOCAL_ONE, targets, targets.subList(0, 1)); + AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(cfs, command, plan, System.nanoTime()); + + // Issue an initial request against the first endpoint... + executor.executeAsync(); + + // ...and then force a speculative retry against another endpoint. + cfs.sampleReadLatencyNanos = 0L; + executor.maybeTryAdditionalReplicas(); + + new Thread(() -> + { + // Fail the first request. When this fails the number of contacts has already been increased + // to 2, so the failure won't actally signal. However... + executor.handler.onFailure(targets.get(0).endpoint(), RequestFailureReason.READ_TOO_MANY_TOMBSTONES); + + // ...speculative retries are fired after a short wait, and it is possible for the failure to + // reach the handler just before one is fired and the number of contacts incremented... + executor.handler.condition.signalAll(); + }).start(); + + try + { + // ...but by the time we await for results, the number of contacts may already have been incremented. + // If we rely only on the number of failures and the number of nodes blocked for, compared to the number + // of contacts, we may not recognize that the query has failed. + executor.awaitResponses(); + fail("Awaiting responses did not throw a ReadFailureException as expected."); + } + catch (Throwable t) + { + assertSame(ExceptionUtils.getStackTrace(t), ReadFailureException.class, t.getClass()); + assertTrue(t.getMessage().contains(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.name())); + } + } + public static class MockSinglePartitionReadCommand extends SinglePartitionReadCommand { private final long timeout; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
