This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 5a5568eb60a7cc6e4341bc6c91235b463fa41685 Author: Sam Tunnicliffe <[email protected]> AuthorDate: Fri Mar 24 09:46:29 2023 +0000 [CEP-21] Add invalid routing exception patch by Sam Tunnicliffe; reviewed by Alex Petrov and Marcus Eriksson for CASSANDRA-18413 --- .../cassandra/db/ReadCommandVerbHandler.java | 9 ++-- .../exceptions/InvalidRoutingException.java | 51 ++++++++++++++++++++++ .../cassandra/exceptions/RequestFailureReason.java | 4 ++ .../cassandra/distributed/test/ReadRepairTest.java | 45 +++++++++++++++---- 4 files changed, 95 insertions(+), 14 deletions(-) diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java index 0aa295422f..119cfa4332 100644 --- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java +++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java @@ -22,10 +22,11 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.exceptions.InvalidRoutingException; +import org.apache.cassandra.exceptions.QueryCancelledException; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.exceptions.QueryCancelledException; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.net.IVerbHandler; @@ -121,8 +122,7 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand> Replica localReplica = getLocalReplica(metadata, token, command.metadata().keyspace); if (localReplica == null) { - throw new InvalidRequestException(String.format("Received a read request from %s for a token %s that is not owned by the current replica as of %s: %s.", - message.from(), token, metadata.epoch, message.payload)); + throw InvalidRoutingException.forTokenRead(message.from(), token, metadata.epoch, message.payload); } if (!command.acceptsTransient() && localReplica.isTransient()) @@ -142,8 +142,7 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand> Replica maxTokenLocalReplica = getLocalReplica(metadata, range.right.getToken(), command.metadata().keyspace); if (maxTokenLocalReplica == null) { - throw new InvalidRequestException(String.format("Received a read request from %s for a range [%s,%s] that is not owned by the current replica as of %s: %s.", - message.from(), range.left, range.right, metadata.epoch, message.payload)); + throw InvalidRoutingException.forRangeRead(message.from(), range, metadata.epoch, message.payload); } // TODO: preexisting issue: we should change the whole range for transient-ness, not just the right token diff --git a/src/java/org/apache/cassandra/exceptions/InvalidRoutingException.java b/src/java/org/apache/cassandra/exceptions/InvalidRoutingException.java new file mode 100644 index 0000000000..fa3f0f8c9f --- /dev/null +++ b/src/java/org/apache/cassandra/exceptions/InvalidRoutingException.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.exceptions; + +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.Epoch; + +public class InvalidRoutingException extends InvalidRequestException +{ + public static final String TOKEN_TEMPLATE = "Received a read request from %s for a token %s that is not owned by the current replica as of %s: %s."; + public static final String RANGE_TEMPLATE = "Received a read request from %s for a range [%s,%s] that is not owned by the current replica as of %s: %s."; + private InvalidRoutingException(String msg) + { + super(msg); + } + + public static InvalidRoutingException forTokenRead(InetAddressAndPort from, + Token token, + Epoch epoch, + ReadCommand command) + { + return new InvalidRoutingException(String.format(TOKEN_TEMPLATE, from, token, epoch, command)); + } + + public static InvalidRoutingException forRangeRead(InetAddressAndPort from, + AbstractBounds<?> range, + Epoch epoch, + ReadCommand command) + { + return new InvalidRoutingException(String.format(RANGE_TEMPLATE, from, range.left, range.right, epoch, command)); + } +} diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java index b0f35c97aa..bc8800d4c2 100644 --- a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java +++ b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java @@ -38,6 +38,7 @@ public enum RequestFailureReason READ_SIZE (4), NODE_DOWN (5), NOT_CMS (6), + INVALID_ROUTING (7) ; public static final Serializer serializer = new Serializer(); @@ -91,6 +92,9 @@ public enum RequestFailureReason if (t instanceof NotCMSException) return NOT_CMS; + if (t instanceof InvalidRoutingException) + return INVALID_ROUTING; + return UNKNOWN; } diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java index 46fc9b376a..24e47d97a8 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java @@ -18,20 +18,23 @@ package org.apache.cassandra.distributed.test; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.ExecutionException; +import com.google.common.util.concurrent.FutureCallback; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import net.bytebuddy.ByteBuddy; import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; import net.bytebuddy.implementation.MethodDelegation; import net.bytebuddy.implementation.bind.annotation.SuperCall; +import org.apache.cassandra.concurrent.ExecutorFactory; +import org.apache.cassandra.concurrent.ExecutorPlus; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.DecoratedKey; @@ -52,6 +55,8 @@ import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.service.reads.repair.BlockingReadRepair; import org.apache.cassandra.service.reads.repair.ReadRepairStrategy; import org.apache.cassandra.utils.concurrent.Condition; +import org.apache.cassandra.utils.concurrent.Future; +import org.checkerframework.checker.nullness.qual.Nullable; import static net.bytebuddy.matcher.ElementMatchers.named; import static org.apache.cassandra.db.Keyspace.open; @@ -172,9 +177,10 @@ public class ReadRepairTest extends TestBaseImpl } } - @Test + @Test @Ignore public void movingTokenReadRepairTest() throws Throwable { + // TODO: rewrite using FuzzTestBase to control progress through decommission // TODO: fails with vnode enabled try (Cluster cluster = init(Cluster.build(4).withoutVNodes().start(), 3)) { @@ -345,9 +351,9 @@ public class ReadRepairTest extends TestBaseImpl } @Test - public void readRepairRTRangeMovementTest() throws Throwable + public void readRepairRTRangeMovementTest() throws IOException { - ExecutorService es = Executors.newFixedThreadPool(1); + ExecutorPlus es = ExecutorFactory.Global.executorFactory().sequential("query-executor"); String key = "test1"; try (Cluster cluster = init(Cluster.build() .withConfig(config -> config.with(Feature.GOSSIP, Feature.NETWORK) @@ -390,9 +396,20 @@ public class ReadRepairTest extends TestBaseImpl } return false; }).drop(); - Future<Object[][]> read = es.submit(() -> cluster.coordinator(3) - .execute("SELECT * FROM distributed_test_keyspace.tbl WHERE key=? and column1 >= ? and column1 <= ?", - ALL, key, 20, 40)); + + String query = "SELECT * FROM distributed_test_keyspace.tbl WHERE key=? and column1 >= ? and column1 <= ?"; + Future<Object[][]> read = es.submit(() -> cluster.coordinator(3).execute(query, ALL, key, 20, 40)); + read.addCallback(new FutureCallback<Object[][]>() + { + @Override + public void onSuccess(Object @Nullable [][] objects) + { + Assert.fail("Expected read failure because replica placements have become incompatible during execution"); + } + + @Override + public void onFailure(Throwable t) {} + }); readStarted.await(); IInstanceConfig config = cluster.newInstanceConfig(); config.set("auto_bootstrap", true); @@ -400,6 +417,16 @@ public class ReadRepairTest extends TestBaseImpl continueRead.signalAll(); read.get(); } + catch (ExecutionException e) + { + Throwable cause = e.getCause(); + Assert.assertTrue("Expected a different error message, but got " + cause.getMessage(), + cause.getMessage().contains("Operation failed - received 1 responses and 1 failures: INVALID_ROUTING from /127.0.0.2:7012")); + } + catch (InterruptedException e) + { + Assert.fail("Unexpected exception"); + } finally { es.shutdown(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
