This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cep-21-tcm
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 5a5568eb60a7cc6e4341bc6c91235b463fa41685
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Fri Mar 24 09:46:29 2023 +0000

    [CEP-21] Add invalid routing exception
    
    patch by Sam Tunnicliffe; reviewed by Alex Petrov and Marcus Eriksson
    for CASSANDRA-18413
---
 .../cassandra/db/ReadCommandVerbHandler.java       |  9 ++--
 .../exceptions/InvalidRoutingException.java        | 51 ++++++++++++++++++++++
 .../cassandra/exceptions/RequestFailureReason.java |  4 ++
 .../cassandra/distributed/test/ReadRepairTest.java | 45 +++++++++++++++----
 4 files changed, 95 insertions(+), 14 deletions(-)

diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java 
b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index 0aa295422f..119cfa4332 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -22,10 +22,11 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.exceptions.InvalidRoutingException;
+import org.apache.cassandra.exceptions.QueryCancelledException;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.exceptions.QueryCancelledException;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.net.IVerbHandler;
@@ -121,8 +122,7 @@ public class ReadCommandVerbHandler implements 
IVerbHandler<ReadCommand>
             Replica localReplica = getLocalReplica(metadata, token, 
command.metadata().keyspace);
             if (localReplica == null)
             {
-                throw new InvalidRequestException(String.format("Received a 
read request from %s for a token %s that is not owned by the current replica as 
of %s: %s.",
-                                                                
message.from(), token, metadata.epoch, message.payload));
+                throw InvalidRoutingException.forTokenRead(message.from(), 
token, metadata.epoch, message.payload);
             }
 
             if (!command.acceptsTransient() && localReplica.isTransient())
@@ -142,8 +142,7 @@ public class ReadCommandVerbHandler implements 
IVerbHandler<ReadCommand>
             Replica maxTokenLocalReplica = getLocalReplica(metadata, 
range.right.getToken(), command.metadata().keyspace);
             if (maxTokenLocalReplica == null)
             {
-                throw new InvalidRequestException(String.format("Received a 
read request from %s for a range [%s,%s] that is not owned by the current 
replica as of %s: %s.",
-                                                                
message.from(), range.left, range.right, metadata.epoch, message.payload));
+                throw InvalidRoutingException.forRangeRead(message.from(), 
range, metadata.epoch, message.payload);
             }
 
             // TODO: preexisting issue: we should change the whole range for 
transient-ness, not just the right token
diff --git 
a/src/java/org/apache/cassandra/exceptions/InvalidRoutingException.java 
b/src/java/org/apache/cassandra/exceptions/InvalidRoutingException.java
new file mode 100644
index 0000000000..fa3f0f8c9f
--- /dev/null
+++ b/src/java/org/apache/cassandra/exceptions/InvalidRoutingException.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.exceptions;
+
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.Epoch;
+
+public class InvalidRoutingException extends InvalidRequestException
+{
+    public static final String TOKEN_TEMPLATE = "Received a read request from 
%s for a token %s that is not owned by the current replica as of %s: %s.";
+    public static final String RANGE_TEMPLATE = "Received a read request from 
%s for a range [%s,%s] that is not owned by the current replica as of %s: %s.";
+    private InvalidRoutingException(String msg)
+    {
+        super(msg);
+    }
+
+    public static InvalidRoutingException forTokenRead(InetAddressAndPort from,
+                                                       Token token,
+                                                       Epoch epoch,
+                                                       ReadCommand command)
+    {
+        return new InvalidRoutingException(String.format(TOKEN_TEMPLATE, from, 
token, epoch, command));
+    }
+
+    public static InvalidRoutingException forRangeRead(InetAddressAndPort from,
+                                                       AbstractBounds<?> range,
+                                                       Epoch epoch,
+                                                       ReadCommand command)
+    {
+        return new InvalidRoutingException(String.format(RANGE_TEMPLATE, from, 
range.left, range.right, epoch, command));
+    }
+}
diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java 
b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
index b0f35c97aa..bc8800d4c2 100644
--- a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
+++ b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
@@ -38,6 +38,7 @@ public enum RequestFailureReason
     READ_SIZE                (4),
     NODE_DOWN                (5),
     NOT_CMS                  (6),
+    INVALID_ROUTING          (7)
     ;
 
     public static final Serializer serializer = new Serializer();
@@ -91,6 +92,9 @@ public enum RequestFailureReason
         if (t instanceof NotCMSException)
             return NOT_CMS;
 
+        if (t instanceof InvalidRoutingException)
+            return INVALID_ROUTING;
+
         return UNKNOWN;
     }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
index 46fc9b376a..24e47d97a8 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
@@ -18,20 +18,23 @@
 
 package org.apache.cassandra.distributed.test;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
 
+import com.google.common.util.concurrent.FutureCallback;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import net.bytebuddy.ByteBuddy;
 import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.concurrent.ExecutorFactory;
+import org.apache.cassandra.concurrent.ExecutorPlus;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.DecoratedKey;
@@ -52,6 +55,8 @@ import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.service.reads.repair.BlockingReadRepair;
 import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
 import org.apache.cassandra.utils.concurrent.Condition;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 import static net.bytebuddy.matcher.ElementMatchers.named;
 import static org.apache.cassandra.db.Keyspace.open;
@@ -172,9 +177,10 @@ public class ReadRepairTest extends TestBaseImpl
         }
     }
 
-    @Test
+    @Test @Ignore
     public void movingTokenReadRepairTest() throws Throwable
     {
+        // TODO: rewrite using FuzzTestBase to control progress through 
decommission
         // TODO: fails with vnode enabled
         try (Cluster cluster = init(Cluster.build(4).withoutVNodes().start(), 
3))
         {
@@ -345,9 +351,9 @@ public class ReadRepairTest extends TestBaseImpl
     }
 
     @Test
-    public void readRepairRTRangeMovementTest() throws Throwable
+    public void readRepairRTRangeMovementTest() throws IOException
     {
-        ExecutorService es = Executors.newFixedThreadPool(1);
+        ExecutorPlus es = 
ExecutorFactory.Global.executorFactory().sequential("query-executor");
         String key = "test1";
         try (Cluster cluster = init(Cluster.build()
                                            .withConfig(config -> 
config.with(Feature.GOSSIP, Feature.NETWORK)
@@ -390,9 +396,20 @@ public class ReadRepairTest extends TestBaseImpl
                 }
                 return false;
             }).drop();
-            Future<Object[][]> read = es.submit(() -> cluster.coordinator(3)
-                                                          .execute("SELECT * 
FROM distributed_test_keyspace.tbl WHERE key=? and column1 >= ? and column1 <= 
?",
-                                                                   ALL, key, 
20, 40));
+
+            String query = "SELECT * FROM distributed_test_keyspace.tbl WHERE 
key=? and column1 >= ? and column1 <= ?";
+            Future<Object[][]> read = es.submit(() -> 
cluster.coordinator(3).execute(query, ALL, key, 20, 40));
+            read.addCallback(new FutureCallback<Object[][]>()
+            {
+                @Override
+                public void onSuccess(Object @Nullable [][] objects)
+                {
+                    Assert.fail("Expected read failure because replica 
placements have become incompatible during execution");
+                }
+
+                @Override
+                public void onFailure(Throwable t) {}
+            });
             readStarted.await();
             IInstanceConfig config = cluster.newInstanceConfig();
             config.set("auto_bootstrap", true);
@@ -400,6 +417,16 @@ public class ReadRepairTest extends TestBaseImpl
             continueRead.signalAll();
             read.get();
         }
+        catch (ExecutionException e)
+        {
+            Throwable cause = e.getCause();
+            Assert.assertTrue("Expected a different error message, but got " + 
cause.getMessage(),
+                              cause.getMessage().contains("Operation failed - 
received 1 responses and 1 failures: INVALID_ROUTING from /127.0.0.2:7012"));
+        }
+        catch (InterruptedException e)
+        {
+            Assert.fail("Unexpected exception");
+        }
         finally
         {
             es.shutdown();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to