This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 84d1a3a79b76b7f3dc1938b8346c8446ed4922af Author: David Capwell <[email protected]> AuthorDate: Wed Dec 9 10:16:39 2020 -0800 Revert "DigestResolver.getData throws AssertionError since dataResponse is null" to fix commit message This reverts commit 3436c3efc0ff785137ac299e8e09085ffa526f5c. --- CHANGES.txt | 1 - .../service/reads/AbstractReadExecutor.java | 1 - .../cassandra/service/reads/DigestResolver.java | 5 +- .../cassandra/service/reads/ReadCallback.java | 10 +-- .../distributed/test/ReadFailureTest.java | 100 --------------------- .../cassandra/service/reads/ReadExecutorTest.java | 48 ---------- 6 files changed, 4 insertions(+), 161 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 520af90..17f1784 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -19,7 +19,6 @@ * Internode messaging catches OOMs and does not rethrow (CASSANDRA-15214) * When a table attempts to clean up metrics, it was cleaning up all global table metrics (CASSANDRA-16095) * Bring back the accepted encryption protocols list as configurable option (CASSANDRA-13325) - * DigestResolver.getData throws AssertionError since dataResponse is null (CASSANDRA-16097) Merged from 3.11: * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to default of 1GB (CASSANDRA-16071) Merged from 3.0: diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java index ae09417..8907e74 100644 --- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java @@ -371,7 +371,6 @@ public abstract class AbstractReadExecutor try { handler.awaitResults(); - assert digestResolver.isDataPresent() : "awaitResults returned with no data present."; } catch (ReadTimeoutException e) { diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java b/src/java/org/apache/cassandra/service/reads/DigestResolver.java index 475c8c2..dbb761b 100644 --- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java +++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java @@ -74,6 +74,8 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea public PartitionIterator getData() { + assert isDataPresent(); + Collection<Message<ReadResponse>> responses = this.responses.snapshot(); if (!hasTransientResponse(responses)) @@ -107,8 +109,7 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea // validate digests against each other; return false immediately on mismatch. ByteBuffer digest = null; Collection<Message<ReadResponse>> snapshot = responses.snapshot(); - assert snapshot.size() > 0 : "Attempted response match comparison while no responses have been received."; - if (snapshot.size() == 1) + if (snapshot.size() <= 1) return true; // TODO: should also not calculate if only one full node diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java index b7ee18c..2968dbc 100644 --- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java @@ -99,15 +99,7 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead< public void awaitResults() throws ReadFailureException, ReadTimeoutException { boolean signaled = await(command.getTimeout(MILLISECONDS), TimeUnit.MILLISECONDS); - /** - * Here we are checking isDataPresent in addition to the responses size because there is a possibility - * that an asynchronous speculative execution request could be returning after a local failure already - * signaled. Responses may have been set while the data reference is not yet. - * See {@link DigestResolver#preprocess(Message)} - * CASSANDRA-16097 - */ - boolean failed = failures > 0 && - (blockFor > resolver.responses.size() || !resolver.isDataPresent()); + boolean failed = failures > 0 && blockFor + failures > replicaPlan().contacts().size(); if (signaled && !failed) return; diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReadFailureTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReadFailureTest.java deleted file mode 100644 index be8db6c..0000000 --- a/test/distributed/org/apache/cassandra/distributed/test/ReadFailureTest.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.distributed.test; - -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.junit.Test; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.distributed.api.ICluster; -import org.apache.cassandra.distributed.Cluster; -import org.apache.cassandra.distributed.api.ConsistencyLevel; -import org.apache.cassandra.exceptions.RequestFailureReason; - -public class ReadFailureTest extends TestBaseImpl -{ - static final int TOMBSTONE_FAIL_THRESHOLD = 20; - static final int TOMBSTONE_FAIL_KEY = 100001; - static final String TABLE = "t"; - - /** - * This test attempts to create a race condition with speculative executions that would previously cause an AssertionError. - * N=2, RF=2, read ONE - * The read will fail on the local node due to tombstone read threshold. At the same time, a spec exec is triggered - * reading from the other node. - * <p> - * See CASSANDRA-16097 for further details. - */ - @Test - public void testSpecExecRace() throws Throwable - { - try (Cluster cluster = init(Cluster.build().withNodes(2).withConfig(config -> config.set("tombstone_failure_threshold", TOMBSTONE_FAIL_THRESHOLD)).start())) - { - // Create a table with the spec exec policy set to a low percentile so it's more likely to produce a spec exec racing with the local request. - // Not using 'Always' because that actually uses a different class/mechanism and doesn't exercise the bug - // we're trying to produce. - cluster.schemaChange(String.format("CREATE TABLE %s.%s (k int, c int, v int, PRIMARY KEY (k,c)) WITH speculative_retry = '5p';", KEYSPACE, TABLE)); - - // Create a partition with enough tombstones to create a read failure according to the configured threshold - for (int i = 0; i <= TOMBSTONE_FAIL_THRESHOLD; ++i) - cluster.coordinator(1).execute(String.format("DELETE FROM %s.t WHERE k=%d AND c=%d", KEYSPACE, TOMBSTONE_FAIL_KEY, i), - ConsistencyLevel.TWO); - - // Create a bunch of latency samples for this failed operation. - loopFailStatement(cluster, 5000); - // Update the spec exec threshold based on the above samples. - // This would normally be done by the periodic task CassandraDaemon.SPECULATION_THRESHOLD_UPDATER. - cluster.get(1).runOnInstance(() -> - { - ColumnFamilyStore cfs = Keyspace.open(KEYSPACE) - .getColumnFamilyStore(TABLE); - cfs.updateSpeculationThreshold(); - }); - - // Run the request a bunch of times under racy conditions. - loopFailStatement(cluster, 5000); - } - } - - private void loopFailStatement(ICluster cluster, int iterations) - { - final String query = String.format("SELECT k FROM %s.t WHERE k=%d", KEYSPACE, TOMBSTONE_FAIL_KEY); - for (int i = 0; i < iterations; ++i) - { - try - { - cluster.coordinator(1).execute(query, ConsistencyLevel.ONE); - fail("Request did not throw a ReadFailureException as expected."); - } - catch (Throwable t) // Throwable because the raised ReadFailure is loaded from a different classloader and doesn't match "ours" - { - String onFail = String.format("Did not receive expected ReadFailureException. Instead caught %s\n%s", - t, ExceptionUtils.getStackTrace(t)); - assertNotNull(onFail, t.getMessage()); - assertTrue(onFail, t.getMessage().contains(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.name())); - } - } - } -} - diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java index 3eb9b2e..e0a5927 100644 --- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java +++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java @@ -20,8 +20,6 @@ package org.apache.cassandra.service.reads; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.exception.ExceptionUtils; - import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.ReplicaPlan; @@ -48,8 +46,6 @@ import org.apache.cassandra.schema.KeyspaceParams; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.cassandra.locator.ReplicaUtils.full; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class ReadExecutorTest @@ -191,50 +187,6 @@ public class ReadExecutorTest assertEquals(1, ks.metric.speculativeFailedRetries.getCount()); } - /** - * Test that an async speculative execution racing with a local errored request does not violate assertions. - * CASSANDRA-16097 - */ - @Test - public void testRaceWithNonSpeculativeFailure() - { - MockSinglePartitionReadCommand command = new MockSinglePartitionReadCommand(TimeUnit.DAYS.toMillis(365)); - ReplicaPlan.ForTokenRead plan = plan(ConsistencyLevel.LOCAL_ONE, targets, targets.subList(0, 1)); - AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(cfs, command, plan, System.nanoTime()); - - // Issue an initial request against the first endpoint... - executor.executeAsync(); - - // ...and then force a speculative retry against another endpoint. - cfs.sampleReadLatencyNanos = 0L; - executor.maybeTryAdditionalReplicas(); - - new Thread(() -> - { - // Fail the first request. When this fails the number of contacts has already been increased - // to 2, so the failure won't actally signal. However... - executor.handler.onFailure(targets.get(0).endpoint(), RequestFailureReason.READ_TOO_MANY_TOMBSTONES); - - // ...speculative retries are fired after a short wait, and it is possible for the failure to - // reach the handler just before one is fired and the number of contacts incremented... - executor.handler.condition.signalAll(); - }).start(); - - try - { - // ...but by the time we await for results, the number of contacts may already have been incremented. - // If we rely only on the number of failures and the number of nodes blocked for, compared to the number - // of contacts, we may not recognize that the query has failed. - executor.awaitResponses(); - fail("Awaiting responses did not throw a ReadFailureException as expected."); - } - catch (Throwable t) - { - assertSame(ExceptionUtils.getStackTrace(t), ReadFailureException.class, t.getClass()); - assertTrue(t.getMessage().contains(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.name())); - } - } - public static class MockSinglePartitionReadCommand extends SinglePartitionReadCommand { private final long timeout; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
