This is an automated email from the ASF dual-hosted git repository. paulo pushed a commit to branch cassandra-4.1 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.1 by this push: new 7c2f97cd29 Do not submit hints when hinted_handoff_enabled=false 7c2f97cd29 is described below commit 7c2f97cd29486196b50c65a093e92b0fcd9789d9 Author: Paulo Motta <pa...@apache.org> AuthorDate: Mon Mar 6 18:17:32 2023 -0500 Do not submit hints when hinted_handoff_enabled=false - Remove dead WriteCallbackInfo code Patch by Paulo Motta, Aleksey Yeschenko; Reviewed by Stefan Miklosovic, Claude Warren for CASSANDRA-18304 Co-authored-by: Aleksey Yeschenko <alek...@apache.org> --- CHANGES.txt | 1 + .../apache/cassandra/batchlog/BatchlogManager.java | 2 +- .../org/apache/cassandra/net/MessagingService.java | 4 +- .../org/apache/cassandra/net/RequestCallbacks.java | 69 +------------ .../service/AbstractWriteResponseHandler.java | 2 +- .../org/apache/cassandra/service/StorageProxy.java | 10 +- .../distributed/test/HintsDisabledTest.java | 74 ++++++++++++++ .../cassandra/net/WriteCallbackInfoTest.java | 107 --------------------- 8 files changed, 88 insertions(+), 181 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 7c7290f133..1cb56d160f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.1.2 + * Do not submit hints when hinted_handoff_enabled=false (CASSANDRA-18304) * Fix COPY ... TO STDOUT behavior in cqlsh (CASSANDRA-18353) * Remove six and Py2SaferScanner merge cruft (CASSANDRA-18354) diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java index 6ff5aee0cf..6d102b01c0 100644 --- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java +++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java @@ -505,7 +505,7 @@ public class BatchlogManager implements BatchlogManagerMBean ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(replicaPlan, mutation, nanoTime()); Message<Mutation> message = Message.outWithFlag(MUTATION_REQ, mutation, MessageFlag.CALL_BACK_ON_FAILURE); for (Replica replica : liveRemoteOnly.all()) - MessagingService.instance().sendWriteWithCallback(message, replica, handler, false); + MessagingService.instance().sendWriteWithCallback(message, replica, handler); return handler; } diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index dab6962f5e..f14835661e 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -357,10 +357,10 @@ public class MessagingService extends MessagingServiceMBeanImpl * @param handler callback interface which is used to pass the responses or * suggest that a timeout occurred to the invoker of the send(). */ - public void sendWriteWithCallback(Message message, Replica to, AbstractWriteResponseHandler<?> handler, boolean allowHints) + public void sendWriteWithCallback(Message message, Replica to, AbstractWriteResponseHandler<?> handler) { assert message.callBackOnFailure(); - callbacks.addWithExpiration(handler, message, to, handler.consistencyLevel(), allowHints); + callbacks.addWithExpiration(handler, message, to); send(message, to.endpoint(), null); } diff --git a/src/java/org/apache/cassandra/net/RequestCallbacks.java b/src/java/org/apache/cassandra/net/RequestCallbacks.java index ae57078964..663126ff02 100644 --- a/src/java/org/apache/cassandra/net/RequestCallbacks.java +++ b/src/java/org/apache/cassandra/net/RequestCallbacks.java @@ -31,17 +31,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ConsistencyLevel; -import org.apache.cassandra.db.Mutation; import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.io.IVersionedAsymmetricSerializer; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.metrics.InternodeOutboundMetrics; import org.apache.cassandra.service.AbstractWriteResponseHandler; -import org.apache.cassandra.service.StorageProxy; -import org.apache.cassandra.service.paxos.Commit; -import org.apache.cassandra.utils.FBUtilities; import static java.lang.String.format; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -99,22 +94,18 @@ public class RequestCallbacks implements OutboundMessageCallbacks /** * Register the provided {@link RequestCallback}, inferring expiry and id from the provided {@link Message}. */ - void addWithExpiration(RequestCallback cb, Message message, InetAddressAndPort to) + public void addWithExpiration(RequestCallback<?> cb, Message<?> message, InetAddressAndPort to) { - // mutations need to call the overload with a ConsistencyLevel + // mutations need to call the overload assert message.verb() != Verb.MUTATION_REQ && message.verb() != Verb.COUNTER_MUTATION_REQ; CallbackInfo previous = callbacks.put(key(message.id(), to), new CallbackInfo(message, to, cb)); assert previous == null : format("Callback already exists for id %d/%s! (%s)", message.id(), to, previous); } - public void addWithExpiration(AbstractWriteResponseHandler<?> cb, - Message<?> message, - Replica to, - ConsistencyLevel consistencyLevel, - boolean allowHints) + public void addWithExpiration(AbstractWriteResponseHandler<?> cb, Message<?> message, Replica to) { assert message.verb() == Verb.MUTATION_REQ || message.verb() == Verb.COUNTER_MUTATION_REQ || message.verb() == Verb.PAXOS_COMMIT_REQ; - CallbackInfo previous = callbacks.put(key(message.id(), to.endpoint()), new WriteCallbackInfo(message, to, cb, consistencyLevel, allowHints)); + CallbackInfo previous = callbacks.put(key(message.id(), to.endpoint()), new CallbackInfo(message, to.endpoint(), cb)); assert previous == null : format("Callback already exists for id %d/%s! (%s)", message.id(), to.endpoint(), previous); } @@ -274,11 +265,6 @@ public class RequestCallbacks implements OutboundMessageCallbacks return atNano > expiresAtNanos; } - boolean shouldHint() - { - return false; - } - boolean invokeOnFailure() { return callback.invokeOnFailure(); @@ -290,53 +276,6 @@ public class RequestCallbacks implements OutboundMessageCallbacks } } - // FIXME: shouldn't need a specialized container for write callbacks; hinting should be part of - // AbstractWriteResponseHandler implementation. - static class WriteCallbackInfo extends CallbackInfo - { - // either a Mutation, or a Paxos Commit (MessageOut) - private final Object mutation; - private final Replica replica; - - @VisibleForTesting - WriteCallbackInfo(Message message, Replica replica, RequestCallback<?> callback, ConsistencyLevel consistencyLevel, boolean allowHints) - { - super(message, replica.endpoint(), callback); - this.mutation = shouldHint(allowHints, message, consistencyLevel) ? message.payload : null; - //Local writes shouldn't go through messaging service (https://issues.apache.org/jira/browse/CASSANDRA-10477) - //noinspection AssertWithSideEffects - assert !peer.equals(FBUtilities.getBroadcastAddressAndPort()); - this.replica = replica; - } - - public boolean shouldHint() - { - return mutation != null && StorageProxy.shouldHint(replica); - } - - public Replica getReplica() - { - return replica; - } - - public Mutation mutation() - { - return getMutation(mutation); - } - - private static Mutation getMutation(Object object) - { - assert object instanceof Commit || object instanceof Mutation : object; - return object instanceof Commit ? ((Commit) object).makeMutation() - : (Mutation) object; - } - - private static boolean shouldHint(boolean allowHints, Message sentMessage, ConsistencyLevel consistencyLevel) - { - return allowHints && sentMessage.verb() != Verb.COUNTER_MUTATION_REQ && consistencyLevel != ConsistencyLevel.ANY; - } - } - @Override public void onOverloaded(Message<?> message, InetAddressAndPort peer) { diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index 313f714820..ce282661a8 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -282,7 +282,7 @@ public abstract class AbstractWriteResponseHandler<T> implements RequestCallback if (blockFor() + n > candidateReplicaCount()) signal(); - if (hintOnFailure != null) + if (hintOnFailure != null && StorageProxy.shouldHint(replicaPlan.lookup(from))) StorageProxy.submitHint(hintOnFailure.get(), replicaPlan.lookup(from), null); } diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 8499d625a4..2a0ccab793 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -779,7 +779,7 @@ public class StorageProxy implements StorageProxyMBean if (replica.isSelf()) commitPaxosLocal(replica, message, responseHandler); else - MessagingService.instance().sendWriteWithCallback(message, replica, responseHandler, allowHints && shouldHint(replica)); + MessagingService.instance().sendWriteWithCallback(message, replica, responseHandler); } else { @@ -1529,7 +1529,7 @@ public class StorageProxy implements StorageProxyMBean if (localDc != null) { for (Replica destination : localDc) - MessagingService.instance().sendWriteWithCallback(message, destination, responseHandler, true); + MessagingService.instance().sendWriteWithCallback(message, destination, responseHandler); } if (dcGroups != null) { @@ -1571,7 +1571,7 @@ public class StorageProxy implements StorageProxyMBean for (Replica replica : forwardToReplicas) { - MessagingService.instance().callbacks.addWithExpiration(handler, message, replica, handler.replicaPlan.consistencyLevel(), true); + MessagingService.instance().callbacks.addWithExpiration(handler, message, replica); logger.trace("Adding FWD message to {}@{}", message.id(), replica); } @@ -1586,7 +1586,7 @@ public class StorageProxy implements StorageProxyMBean target = targets.get(0); } - MessagingService.instance().sendWriteWithCallback(message, target, handler, true); + MessagingService.instance().sendWriteWithCallback(message, target, handler); logger.trace("Sending message to {}@{}", message.id(), target); } @@ -1684,7 +1684,7 @@ public class StorageProxy implements StorageProxyMBean Tracing.trace("Enqueuing counter update to {}", replica); Message message = Message.outWithFlag(Verb.COUNTER_MUTATION_REQ, cm, MessageFlag.CALL_BACK_ON_FAILURE); - MessagingService.instance().sendWriteWithCallback(message, replica, responseHandler, false); + MessagingService.instance().sendWriteWithCallback(message, replica, responseHandler); return responseHandler; } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/HintsDisabledTest.java b/test/distributed/org/apache/cassandra/distributed/test/HintsDisabledTest.java new file mode 100644 index 0000000000..64d023fa50 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/HintsDisabledTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import com.google.monitoring.runtime.instrumentation.common.util.concurrent.Uninterruptibles; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.IMessageFilters; +import org.apache.cassandra.metrics.StorageMetrics; + +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.apache.cassandra.net.Verb.MUTATION_REQ; +import static org.assertj.core.api.Assertions.assertThat; + +public class HintsDisabledTest extends TestBaseImpl +{ + @Test + public void testHintedHandoffDisabled() throws IOException + { + try (Cluster cluster = init(Cluster.build(2) + .withDataDirCount(1) + .withConfig(config -> config.with(NETWORK, GOSSIP) + .set("write_request_timeout", "10ms") + .set("hinted_handoff_enabled", false)) + .start(), 2)) + { + String createTableStatement = String.format("CREATE TABLE %s.cf (k text PRIMARY KEY, c1 text) " + + "WITH compaction = {'class': 'SizeTieredCompactionStrategy', 'enabled': 'false'} ", KEYSPACE); + cluster.schemaChange(createTableStatement); + + // Drop all messages from node1 to node2 so hints should be created + IMessageFilters.Filter drop1to2 = cluster.filters().verbs(MUTATION_REQ.id).from(1).to(2).drop(); + + cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.cf (k, c1) VALUES (?, ?) USING TIMESTAMP 1;"), + ConsistencyLevel.ONE, + String.valueOf(1), + String.valueOf(1)); + + // Wait 15ms for write to timeout (write_request_timeout=10ms) + Uninterruptibles.sleepUninterruptibly(15, TimeUnit.MILLISECONDS); + + // Check that no hints were created on node1 + assertThat(cluster.get(1).callOnInstance(() -> Long.valueOf(StorageMetrics.totalHints.getCount()))).isEqualTo(0L); + } + } + + private static int getNumberOfSSTables(Cluster cluster, int node) + { + return cluster.get(node).callOnInstance(() -> ColumnFamilyStore.getIfExists(KEYSPACE, "cf").getLiveSSTables().size()); + } +} diff --git a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java b/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java deleted file mode 100644 index 2bc24dc840..0000000000 --- a/test/unit/org/apache/cassandra/net/WriteCallbackInfoTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ -package org.apache.cassandra.net; - -import java.util.UUID; - -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import org.junit.Assert; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ConsistencyLevel; -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.RegularAndStaticColumns; -import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.schema.MockSchema; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.service.paxos.Commit; -import org.apache.cassandra.utils.ByteBufferUtil; - -import static org.apache.cassandra.locator.ReplicaUtils.full; -import static org.apache.cassandra.service.paxos.Ballot.Flag.NONE; -import static org.apache.cassandra.service.paxos.BallotGenerator.Global.nextBallot; - -public class WriteCallbackInfoTest -{ - private InetAddressAndPort testEp; - - @BeforeClass - public static void initDD() - { - DatabaseDescriptor.daemonInitialization(); - } - - @Before - public void setup() throws Exception - { - testEp = InetAddressAndPort.getByName("192.168.1.1"); - StorageService.instance.getTokenMetadata().updateHostId(UUID.randomUUID(), testEp); - } - - @After - public void teardown() - { - StorageService.instance.getTokenMetadata().removeEndpoint(testEp); - } - - @Test - public void testShouldHint() throws Exception - { - testShouldHint(Verb.COUNTER_MUTATION_REQ, ConsistencyLevel.ALL, true, false); - for (Verb verb : new Verb[] { Verb.PAXOS_COMMIT_REQ, Verb.MUTATION_REQ }) - { - testShouldHint(verb, ConsistencyLevel.ALL, true, true); - testShouldHint(verb, ConsistencyLevel.ANY, true, false); - testShouldHint(verb, ConsistencyLevel.ALL, false, false); - } - } - - private void testShouldHint(Verb verb, ConsistencyLevel cl, boolean allowHints, boolean expectHint) - { - TableMetadata metadata = MockSchema.newTableMetadata("", ""); - Object payload = verb == Verb.PAXOS_COMMIT_REQ - ? new Commit(nextBallot(NONE), new PartitionUpdate.Builder(metadata, ByteBufferUtil.EMPTY_BYTE_BUFFER, RegularAndStaticColumns.NONE, 1).build()) - : new Mutation(PartitionUpdate.simpleBuilder(metadata, "").build()); - - RequestCallbacks.WriteCallbackInfo wcbi = new RequestCallbacks.WriteCallbackInfo(Message.out(verb, payload), full(testEp), null, cl, allowHints); - Assert.assertEquals(expectHint, wcbi.shouldHint()); - if (expectHint) - { - Assert.assertNotNull(wcbi.mutation()); - } - else - { - boolean fail = false; - try - { - wcbi.mutation(); - } - catch (Throwable t) - { - fail = true; - } - Assert.assertTrue(fail); - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org