This is an automated email from the ASF dual-hosted git repository.

paulo pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
     new 7c2f97cd29 Do not submit hints when hinted_handoff_enabled=false
7c2f97cd29 is described below

commit 7c2f97cd29486196b50c65a093e92b0fcd9789d9
Author: Paulo Motta <pa...@apache.org>
AuthorDate: Mon Mar 6 18:17:32 2023 -0500

    Do not submit hints when hinted_handoff_enabled=false
    
    - Remove dead WriteCallbackInfo code
    
    Patch by Paulo Motta, Aleksey Yeschenko; Reviewed by Stefan Miklosovic, 
Claude Warren for CASSANDRA-18304
    
    Co-authored-by: Aleksey Yeschenko <alek...@apache.org>
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/batchlog/BatchlogManager.java |   2 +-
 .../org/apache/cassandra/net/MessagingService.java |   4 +-
 .../org/apache/cassandra/net/RequestCallbacks.java |  69 +------------
 .../service/AbstractWriteResponseHandler.java      |   2 +-
 .../org/apache/cassandra/service/StorageProxy.java |  10 +-
 .../distributed/test/HintsDisabledTest.java        |  74 ++++++++++++++
 .../cassandra/net/WriteCallbackInfoTest.java       | 107 ---------------------
 8 files changed, 88 insertions(+), 181 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 7c7290f133..1cb56d160f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1.2
+ * Do not submit hints when hinted_handoff_enabled=false (CASSANDRA-18304)
  * Fix COPY ... TO STDOUT behavior in cqlsh (CASSANDRA-18353)
  * Remove six and Py2SaferScanner merge cruft (CASSANDRA-18354)
  
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java 
b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 6ff5aee0cf..6d102b01c0 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -505,7 +505,7 @@ public class BatchlogManager implements BatchlogManagerMBean
             ReplayWriteResponseHandler<Mutation> handler = new 
ReplayWriteResponseHandler<>(replicaPlan, mutation, nanoTime());
             Message<Mutation> message = Message.outWithFlag(MUTATION_REQ, 
mutation, MessageFlag.CALL_BACK_ON_FAILURE);
             for (Replica replica : liveRemoteOnly.all())
-                MessagingService.instance().sendWriteWithCallback(message, 
replica, handler, false);
+                MessagingService.instance().sendWriteWithCallback(message, 
replica, handler);
             return handler;
         }
 
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index dab6962f5e..f14835661e 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -357,10 +357,10 @@ public class MessagingService extends 
MessagingServiceMBeanImpl
      * @param handler callback interface which is used to pass the responses or
      *                suggest that a timeout occurred to the invoker of the 
send().
      */
-    public void sendWriteWithCallback(Message message, Replica to, 
AbstractWriteResponseHandler<?> handler, boolean allowHints)
+    public void sendWriteWithCallback(Message message, Replica to, 
AbstractWriteResponseHandler<?> handler)
     {
         assert message.callBackOnFailure();
-        callbacks.addWithExpiration(handler, message, to, 
handler.consistencyLevel(), allowHints);
+        callbacks.addWithExpiration(handler, message, to);
         send(message, to.endpoint(), null);
     }
 
diff --git a/src/java/org/apache/cassandra/net/RequestCallbacks.java 
b/src/java/org/apache/cassandra/net/RequestCallbacks.java
index ae57078964..663126ff02 100644
--- a/src/java/org/apache/cassandra/net/RequestCallbacks.java
+++ b/src/java/org/apache/cassandra/net/RequestCallbacks.java
@@ -31,17 +31,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.io.IVersionedAsymmetricSerializer;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.metrics.InternodeOutboundMetrics;
 import org.apache.cassandra.service.AbstractWriteResponseHandler;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.paxos.Commit;
-import org.apache.cassandra.utils.FBUtilities;
 
 import static java.lang.String.format;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -99,22 +94,18 @@ public class RequestCallbacks implements 
OutboundMessageCallbacks
     /**
      * Register the provided {@link RequestCallback}, inferring expiry and id 
from the provided {@link Message}.
      */
-    void addWithExpiration(RequestCallback cb, Message message, 
InetAddressAndPort to)
+    public void addWithExpiration(RequestCallback<?> cb, Message<?> message, 
InetAddressAndPort to)
     {
-        // mutations need to call the overload with a ConsistencyLevel
+        // mutations need to call the overload
         assert message.verb() != Verb.MUTATION_REQ && message.verb() != 
Verb.COUNTER_MUTATION_REQ;
         CallbackInfo previous = callbacks.put(key(message.id(), to), new 
CallbackInfo(message, to, cb));
         assert previous == null : format("Callback already exists for id 
%d/%s! (%s)", message.id(), to, previous);
     }
 
-    public void addWithExpiration(AbstractWriteResponseHandler<?> cb,
-                                  Message<?> message,
-                                  Replica to,
-                                  ConsistencyLevel consistencyLevel,
-                                  boolean allowHints)
+    public void addWithExpiration(AbstractWriteResponseHandler<?> cb, 
Message<?> message, Replica to)
     {
         assert message.verb() == Verb.MUTATION_REQ || message.verb() == 
Verb.COUNTER_MUTATION_REQ || message.verb() == Verb.PAXOS_COMMIT_REQ;
-        CallbackInfo previous = callbacks.put(key(message.id(), 
to.endpoint()), new WriteCallbackInfo(message, to, cb, consistencyLevel, 
allowHints));
+        CallbackInfo previous = callbacks.put(key(message.id(), 
to.endpoint()), new CallbackInfo(message, to.endpoint(), cb));
         assert previous == null : format("Callback already exists for id 
%d/%s! (%s)", message.id(), to.endpoint(), previous);
     }
 
@@ -274,11 +265,6 @@ public class RequestCallbacks implements 
OutboundMessageCallbacks
             return atNano > expiresAtNanos;
         }
 
-        boolean shouldHint()
-        {
-            return false;
-        }
-
         boolean invokeOnFailure()
         {
             return callback.invokeOnFailure();
@@ -290,53 +276,6 @@ public class RequestCallbacks implements 
OutboundMessageCallbacks
         }
     }
 
-    // FIXME: shouldn't need a specialized container for write callbacks; 
hinting should be part of
-    //        AbstractWriteResponseHandler implementation.
-    static class WriteCallbackInfo extends CallbackInfo
-    {
-        // either a Mutation, or a Paxos Commit (MessageOut)
-        private final Object mutation;
-        private final Replica replica;
-
-        @VisibleForTesting
-        WriteCallbackInfo(Message message, Replica replica, RequestCallback<?> 
callback, ConsistencyLevel consistencyLevel, boolean allowHints)
-        {
-            super(message, replica.endpoint(), callback);
-            this.mutation = shouldHint(allowHints, message, consistencyLevel) 
? message.payload : null;
-            //Local writes shouldn't go through messaging service 
(https://issues.apache.org/jira/browse/CASSANDRA-10477)
-            //noinspection AssertWithSideEffects
-            assert !peer.equals(FBUtilities.getBroadcastAddressAndPort());
-            this.replica = replica;
-        }
-
-        public boolean shouldHint()
-        {
-            return mutation != null && StorageProxy.shouldHint(replica);
-        }
-
-        public Replica getReplica()
-        {
-            return replica;
-        }
-
-        public Mutation mutation()
-        {
-            return getMutation(mutation);
-        }
-
-        private static Mutation getMutation(Object object)
-        {
-            assert object instanceof Commit || object instanceof Mutation : 
object;
-            return object instanceof Commit ? ((Commit) object).makeMutation()
-                                            : (Mutation) object;
-        }
-
-        private static boolean shouldHint(boolean allowHints, Message 
sentMessage, ConsistencyLevel consistencyLevel)
-        {
-            return allowHints && sentMessage.verb() != 
Verb.COUNTER_MUTATION_REQ && consistencyLevel != ConsistencyLevel.ANY;
-        }
-    }
-
     @Override
     public void onOverloaded(Message<?> message, InetAddressAndPort peer)
     {
diff --git 
a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 313f714820..ce282661a8 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -282,7 +282,7 @@ public abstract class AbstractWriteResponseHandler<T> 
implements RequestCallback
         if (blockFor() + n > candidateReplicaCount())
             signal();
 
-        if (hintOnFailure != null)
+        if (hintOnFailure != null && 
StorageProxy.shouldHint(replicaPlan.lookup(from)))
             StorageProxy.submitHint(hintOnFailure.get(), 
replicaPlan.lookup(from), null);
     }
 
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 8499d625a4..2a0ccab793 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -779,7 +779,7 @@ public class StorageProxy implements StorageProxyMBean
                     if (replica.isSelf())
                         commitPaxosLocal(replica, message, responseHandler);
                     else
-                        
MessagingService.instance().sendWriteWithCallback(message, replica, 
responseHandler, allowHints && shouldHint(replica));
+                        
MessagingService.instance().sendWriteWithCallback(message, replica, 
responseHandler);
                 }
                 else
                 {
@@ -1529,7 +1529,7 @@ public class StorageProxy implements StorageProxyMBean
         if (localDc != null)
         {
             for (Replica destination : localDc)
-                MessagingService.instance().sendWriteWithCallback(message, 
destination, responseHandler, true);
+                MessagingService.instance().sendWriteWithCallback(message, 
destination, responseHandler);
         }
         if (dcGroups != null)
         {
@@ -1571,7 +1571,7 @@ public class StorageProxy implements StorageProxyMBean
 
             for (Replica replica : forwardToReplicas)
             {
-                
MessagingService.instance().callbacks.addWithExpiration(handler, message, 
replica, handler.replicaPlan.consistencyLevel(), true);
+                
MessagingService.instance().callbacks.addWithExpiration(handler, message, 
replica);
                 logger.trace("Adding FWD message to {}@{}", message.id(), 
replica);
             }
 
@@ -1586,7 +1586,7 @@ public class StorageProxy implements StorageProxyMBean
             target = targets.get(0);
         }
 
-        MessagingService.instance().sendWriteWithCallback(message, target, 
handler, true);
+        MessagingService.instance().sendWriteWithCallback(message, target, 
handler);
         logger.trace("Sending message to {}@{}", message.id(), target);
     }
 
@@ -1684,7 +1684,7 @@ public class StorageProxy implements StorageProxyMBean
 
             Tracing.trace("Enqueuing counter update to {}", replica);
             Message message = Message.outWithFlag(Verb.COUNTER_MUTATION_REQ, 
cm, MessageFlag.CALL_BACK_ON_FAILURE);
-            MessagingService.instance().sendWriteWithCallback(message, 
replica, responseHandler, false);
+            MessagingService.instance().sendWriteWithCallback(message, 
replica, responseHandler);
             return responseHandler;
         }
     }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/HintsDisabledTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/HintsDisabledTest.java
new file mode 100644
index 0000000000..64d023fa50
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/HintsDisabledTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import 
com.google.monitoring.runtime.instrumentation.common.util.concurrent.Uninterruptibles;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.metrics.StorageMetrics;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.net.Verb.MUTATION_REQ;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class HintsDisabledTest extends TestBaseImpl
+{
+    @Test
+    public void testHintedHandoffDisabled() throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withDataDirCount(1)
+                                           .withConfig(config -> 
config.with(NETWORK, GOSSIP)
+                                                                       
.set("write_request_timeout", "10ms")
+                                                                       
.set("hinted_handoff_enabled", false))
+                                           .start(), 2))
+        {
+            String createTableStatement = String.format("CREATE TABLE %s.cf (k 
text PRIMARY KEY, c1 text) " +
+                                                        "WITH compaction = 
{'class': 'SizeTieredCompactionStrategy', 'enabled': 'false'} ", KEYSPACE);
+            cluster.schemaChange(createTableStatement);
+
+            // Drop all messages from node1 to node2 so hints should be created
+            IMessageFilters.Filter drop1to2 = 
cluster.filters().verbs(MUTATION_REQ.id).from(1).to(2).drop();
+
+            cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.cf (k, 
c1) VALUES (?, ?) USING TIMESTAMP 1;"),
+                                           ConsistencyLevel.ONE,
+                                           String.valueOf(1),
+                                           String.valueOf(1));
+
+            // Wait 15ms for write to timeout (write_request_timeout=10ms)
+            Uninterruptibles.sleepUninterruptibly(15, TimeUnit.MILLISECONDS);
+
+            // Check that no hints were created on node1
+            assertThat(cluster.get(1).callOnInstance(() -> 
Long.valueOf(StorageMetrics.totalHints.getCount()))).isEqualTo(0L);
+        }
+    }
+
+    private static int getNumberOfSSTables(Cluster cluster, int node)
+    {
+        return cluster.get(node).callOnInstance(() -> 
ColumnFamilyStore.getIfExists(KEYSPACE, "cf").getLiveSSTables().size());
+    }
+}
diff --git a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java 
b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
deleted file mode 100644
index 2bc24dc840..0000000000
--- a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.cassandra.net;
-
-import java.util.UUID;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.junit.Assert;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.RegularAndStaticColumns;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.schema.MockSchema;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.paxos.Commit;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-import static org.apache.cassandra.locator.ReplicaUtils.full;
-import static org.apache.cassandra.service.paxos.Ballot.Flag.NONE;
-import static 
org.apache.cassandra.service.paxos.BallotGenerator.Global.nextBallot;
-
-public class WriteCallbackInfoTest
-{
-    private InetAddressAndPort testEp;
-
-    @BeforeClass
-    public static void initDD()
-    {
-        DatabaseDescriptor.daemonInitialization();
-    }
-
-    @Before
-    public void setup() throws Exception
-    {
-        testEp = InetAddressAndPort.getByName("192.168.1.1");
-        
StorageService.instance.getTokenMetadata().updateHostId(UUID.randomUUID(), 
testEp);
-    }
-
-    @After
-    public void teardown()
-    {
-        StorageService.instance.getTokenMetadata().removeEndpoint(testEp);
-    }
-
-    @Test
-    public void testShouldHint() throws Exception
-    {
-        testShouldHint(Verb.COUNTER_MUTATION_REQ, ConsistencyLevel.ALL, true, 
false);
-        for (Verb verb : new Verb[] { Verb.PAXOS_COMMIT_REQ, Verb.MUTATION_REQ 
})
-        {
-            testShouldHint(verb, ConsistencyLevel.ALL, true, true);
-            testShouldHint(verb, ConsistencyLevel.ANY, true, false);
-            testShouldHint(verb, ConsistencyLevel.ALL, false, false);
-        }
-    }
-
-    private void testShouldHint(Verb verb, ConsistencyLevel cl, boolean 
allowHints, boolean expectHint)
-    {
-        TableMetadata metadata = MockSchema.newTableMetadata("", "");
-        Object payload = verb == Verb.PAXOS_COMMIT_REQ
-                         ? new Commit(nextBallot(NONE), new 
PartitionUpdate.Builder(metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER, 
RegularAndStaticColumns.NONE, 1).build())
-                         : new 
Mutation(PartitionUpdate.simpleBuilder(metadata, "").build());
-
-        RequestCallbacks.WriteCallbackInfo wcbi = new 
RequestCallbacks.WriteCallbackInfo(Message.out(verb, payload), full(testEp), 
null, cl, allowHints);
-        Assert.assertEquals(expectHint, wcbi.shouldHint());
-        if (expectHint)
-        {
-            Assert.assertNotNull(wcbi.mutation());
-        }
-        else
-        {
-            boolean fail = false;
-            try
-            {
-                wcbi.mutation();
-            }
-            catch (Throwable t)
-            {
-                fail = true;
-            }
-            Assert.assertTrue(fail);
-        }
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to