This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
new 69f5df0b00 Add metrics and logging to repair retries
69f5df0b00 is described below
commit 69f5df0b00cc822f41823901956aeff9fcdd9dbb
Author: David Capwell <[email protected]>
AuthorDate: Mon Oct 30 10:32:30 2023 -0700
Add metrics and logging to repair retries
patch by David Capwell; reviewed by Caleb Rackliffe, Maxim Muzafarov for
CASSANDRA-18952
---
CHANGES.txt | 1 +
.../org/apache/cassandra/gms/GossipShutdown.java | 15 +-
.../apache/cassandra/metrics/RepairMetrics.java | 71 +++++++
.../cassandra/repair/messages/RepairMessage.java | 79 ++++++--
.../org/apache/cassandra/utils/NoSpamLogger.java | 13 +-
.../apache/cassandra/gms/GossipShutdownTest.java | 66 +++++++
.../org/apache/cassandra/repair/FuzzTestBase.java | 24 ++-
.../repair/messages/RepairMessageTest.java | 211 +++++++++++++++++++++
.../cassandra/test/asserts/ExtendedAssertions.java | 84 ++++++++
.../apache/cassandra/utils/NoSpamLoggerTest.java | 2 +-
10 files changed, 525 insertions(+), 41 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 779d6f2d13..672d277058 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.0-alpha2
+ * Add metrics and logging to repair retries (CASSANDRA-18952)
* Remove deprecated code in Cassandra 1.x and 2.x (CASSANDRA-18959)
* ClientRequestSize metrics should not treat CONTAINS restrictions as being
equality-based (CASSANDRA-18896)
* Add support for vector search in SAI (CASSANDRA-18715)
diff --git a/src/java/org/apache/cassandra/gms/GossipShutdown.java
b/src/java/org/apache/cassandra/gms/GossipShutdown.java
index 02f0f375c4..713b2c4fe3 100644
--- a/src/java/org/apache/cassandra/gms/GossipShutdown.java
+++ b/src/java/org/apache/cassandra/gms/GossipShutdown.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.gms;
import java.io.IOException;
-
import javax.annotation.Nullable;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -38,29 +37,31 @@ public class GossipShutdown
this.state = state;
}
- public static final class Serializer implements
IVersionedSerializer<GossipShutdown>
+ public static final class Serializer implements
IVersionedSerializer<Object>
{
@Override
- public void serialize(GossipShutdown t, DataOutputPlus out, int
version) throws IOException
+ public void serialize(Object t, DataOutputPlus out, int version)
throws IOException
{
if (version < MessagingService.VERSION_50) return;
- EndpointState.serializer.serialize(t.state, out, version);
+ GossipShutdown shutdown = (GossipShutdown) t;
+ EndpointState.serializer.serialize(shutdown.state, out, version);
}
@Nullable
@Override
- public GossipShutdown deserialize(DataInputPlus in, int version)
throws IOException
+ public Object deserialize(DataInputPlus in, int version) throws
IOException
{
if (version < MessagingService.VERSION_50) return null;
return new GossipShutdown(EndpointState.serializer.deserialize(in,
version));
}
@Override
- public long serializedSize(GossipShutdown t, int version)
+ public long serializedSize(Object t, int version)
{
if (version < MessagingService.VERSION_50) return 0;
- return EndpointState.serializer.serializedSize(t.state, version);
+ GossipShutdown shutdown = (GossipShutdown) t;
+ return EndpointState.serializer.serializedSize(shutdown.state,
version);
}
}
}
diff --git a/src/java/org/apache/cassandra/metrics/RepairMetrics.java
b/src/java/org/apache/cassandra/metrics/RepairMetrics.java
index 5b4f67e230..27dbbd3118 100644
--- a/src/java/org/apache/cassandra/metrics/RepairMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/RepairMetrics.java
@@ -18,7 +18,16 @@
package org.apache.cassandra.metrics;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+
import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.repair.messages.RepairMessage;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
@@ -26,9 +35,71 @@ public class RepairMetrics
{
public static final String TYPE_NAME = "Repair";
public static final Counter previewFailures =
Metrics.counter(DefaultNameFactory.createMetricName(TYPE_NAME,
"PreviewFailures", null));
+ public static final Histogram retries =
Metrics.histogram(DefaultNameFactory.createMetricName(TYPE_NAME, "Retries",
null), false);
+ public static final Map<Verb, Histogram> retriesByVerb;
+ public static final Counter retryTimeout =
Metrics.counter(DefaultNameFactory.createMetricName(TYPE_NAME, "RetryTimeout",
null));
+ public static final Map<Verb, Counter> retryTimeoutByVerb;
+ public static final Counter retryFailure =
Metrics.counter(DefaultNameFactory.createMetricName(TYPE_NAME, "RetryFailure",
null));
+ public static final Map<Verb, Counter> retryFailureByVerb;
+
+ static
+ {
+ Map<Verb, Histogram> retries = new EnumMap<>(Verb.class);
+ Map<Verb, Counter> timeout = new EnumMap<>(Verb.class);
+ Map<Verb, Counter> failure = new EnumMap<>(Verb.class);
+ for (Verb verb : RepairMessage.ALLOWS_RETRY)
+ {
+ retries.put(verb,
Metrics.histogram(DefaultNameFactory.createMetricName(TYPE_NAME, "Retries-" +
verb.name(), null), false));
+ timeout.put(verb,
Metrics.counter(DefaultNameFactory.createMetricName(TYPE_NAME, "RetryTimeout-"
+ verb.name(), null)));
+ failure.put(verb,
Metrics.counter(DefaultNameFactory.createMetricName(TYPE_NAME, "RetryFailure-"
+ verb.name(), null)));
+ }
+ retriesByVerb = Collections.unmodifiableMap(retries);
+ retryTimeoutByVerb = Collections.unmodifiableMap(timeout);
+ retryFailureByVerb = Collections.unmodifiableMap(failure);
+ }
public static void init()
{
// noop
}
+
+ @VisibleForTesting
+ public static void unsafeReset()
+ {
+ reset(previewFailures);
+ reset(retries);
+ retriesByVerb.values().forEach(RepairMetrics::reset);
+ reset(retryTimeout);
+ retryTimeoutByVerb.values().forEach(RepairMetrics::reset);
+ reset(retryFailure);
+ retryFailureByVerb.values().forEach(RepairMetrics::reset);
+ }
+
+ private static void reset(Histogram retries)
+ {
+ ((ClearableHistogram) retries).clear();
+ }
+
+ private static void reset(Counter counter)
+ {
+ counter.dec(counter.getCount());
+ }
+
+ public static void retry(Verb verb, int attempt)
+ {
+ retries.update(attempt);
+ retriesByVerb.get(verb).update(attempt);
+ }
+
+ public static void retryTimeout(Verb verb)
+ {
+ retryTimeout.inc();
+ retryTimeoutByVerb.get(verb).inc();
+ }
+
+ public static void retryFailure(Verb verb)
+ {
+ retryFailure.inc();
+ retryFailureByVerb.get(verb).inc();
+ }
}
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
index d6d4d9b536..6411d12c78 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -22,16 +22,19 @@ import java.util.EnumMap;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.Nullable;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.RepairRetrySpec;
import org.apache.cassandra.config.RetrySpec;
+import org.apache.cassandra.metrics.RepairMetrics;
import org.apache.cassandra.repair.SharedContext;
import org.apache.cassandra.exceptions.RepairException;
import org.apache.cassandra.exceptions.RequestFailureReason;
@@ -43,6 +46,7 @@ import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.Backoff;
import org.apache.cassandra.utils.CassandraVersion;
+import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.Future;
@@ -56,8 +60,23 @@ import static
org.apache.cassandra.net.MessageFlag.CALL_BACK_ON_FAILURE;
public abstract class RepairMessage
{
private enum ErrorHandling { NONE, TIMEOUT, RETRY }
- private static final CassandraVersion SUPPORTS_RETRY = new
CassandraVersion("5.0.0-alpha2.SNAPSHOT");
+ @VisibleForTesting
+ static final CassandraVersion SUPPORTS_RETRY = new
CassandraVersion("5.0.0-alpha2.SNAPSHOT");
private static final Map<Verb, CassandraVersion> VERB_TIMEOUT_VERSIONS;
+ public static final Set<Verb> ALLOWS_RETRY;
+ private static final Set<Verb> SUPPORTS_RETRY_WITHOUT_VERSION_CHECK =
Collections.unmodifiableSet(EnumSet.of(Verb.CLEANUP_MSG));
+ public static final RequestCallback<Object> NOOP_CALLBACK = new
RequestCallback<>()
+ {
+ @Override
+ public void onResponse(Message<Object> msg)
+ {
+ }
+
+ @Override
+ public void onFailure(InetAddressAndPort from, RequestFailureReason
failureReason)
+ {
+ }
+ };
static
{
@@ -68,10 +87,21 @@ public abstract class RepairMessage
map.put(Verb.VALIDATION_RSP, SUPPORTS_RETRY);
map.put(Verb.SYNC_RSP, SUPPORTS_RETRY);
VERB_TIMEOUT_VERSIONS = Collections.unmodifiableMap(map);
+
+ EnumSet<Verb> allowsRetry = EnumSet.noneOf(Verb.class);
+ allowsRetry.add(Verb.PREPARE_MSG);
+ allowsRetry.add(Verb.VALIDATION_REQ);
+ allowsRetry.add(Verb.VALIDATION_RSP);
+ allowsRetry.add(Verb.SYNC_REQ);
+ allowsRetry.add(Verb.SYNC_RSP);
+ allowsRetry.add(Verb.SNAPSHOT_MSG);
+ allowsRetry.add(Verb.CLEANUP_MSG);
+ ALLOWS_RETRY = Collections.unmodifiableSet(allowsRetry);
}
- private static final Set<Verb> SUPPORTS_RETRY_WITHOUT_VERSION_CHECK =
Collections.unmodifiableSet(EnumSet.of(Verb.CLEANUP_MSG));
private static final Logger logger =
LoggerFactory.getLogger(RepairMessage.class);
+ private static final NoSpamLogger noSpam = NoSpamLogger.getLogger(logger,
1, TimeUnit.MINUTES);
+
@Nullable
public final RepairJobDesc desc;
@@ -104,7 +134,7 @@ public abstract class RepairMessage
return () -> !f.isDone();
}
- private static Supplier<Boolean> always()
+ public static Supplier<Boolean> always()
{
return () -> true;
}
@@ -121,27 +151,20 @@ public abstract class RepairMessage
public static void sendMessageWithRetries(SharedContext ctx, RepairMessage
request, Verb verb, InetAddressAndPort endpoint)
{
- sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request,
verb, endpoint, new RequestCallback<>()
- {
- @Override
- public void onResponse(Message<Object> msg)
- {
- }
-
- @Override
- public void onFailure(InetAddressAndPort from,
RequestFailureReason failureReason)
- {
- }
- }, 0);
+ sendMessageWithRetries(ctx, backoff(ctx, verb), always(), request,
verb, endpoint, NOOP_CALLBACK, 0);
}
- private static <T> void sendMessageWithRetries(SharedContext ctx, Backoff
backoff, Supplier<Boolean> allowRetry, RepairMessage request, Verb verb,
InetAddressAndPort endpoint, RequestCallback<T> finalCallback, int attempt)
+ @VisibleForTesting
+ static <T> void sendMessageWithRetries(SharedContext ctx, Backoff backoff,
Supplier<Boolean> allowRetry, RepairMessage request, Verb verb,
InetAddressAndPort endpoint, RequestCallback<T> finalCallback, int attempt)
{
+ if (!ALLOWS_RETRY.contains(verb))
+ throw new AssertionError("Repair verb " + verb + " does not
support retry, but a request to send with retry was given!");
RequestCallback<T> callback = new RequestCallback<>()
{
@Override
public void onResponse(Message<T> msg)
{
+ maybeRecordRetry(null);
finalCallback.onResponse(msg);
}
@@ -165,6 +188,7 @@ public abstract class RepairMessage
backoff.computeWaitTime(attempt), backoff.unit());
return;
}
+ maybeRecordRetry(failureReason);
finalCallback.onFailure(from, failureReason);
return;
default:
@@ -172,6 +196,29 @@ public abstract class RepairMessage
}
}
+ private void maybeRecordRetry(@Nullable RequestFailureReason
reason)
+ {
+ if (attempt <= 0)
+ return;
+ // we don't know what the prefix kind is... so use NONE...
this impacts logPrefix as it will cause us to use "repair" rather than "preview
repair" which may not be correct... but close enough...
+ String prefix =
PreviewKind.NONE.logPrefix(request.parentRepairSession());
+ RepairMetrics.retry(verb, attempt);
+ if (reason == null)
+ {
+ noSpam.info("{} Retry of repair verb " + verb + " was
successful after {} attempts", prefix, attempt);
+ }
+ else if (reason == RequestFailureReason.TIMEOUT)
+ {
+ noSpam.warn("{} Timeout for repair verb " + verb + ";
could not complete within {} attempts", prefix, attempt);
+ RepairMetrics.retryTimeout(verb);
+ }
+ else
+ {
+ noSpam.warn("{} Failure for repair verb " + verb + ";
could not complete within {} attempts", prefix, attempt);
+ RepairMetrics.retryFailure(verb);
+ }
+ }
+
@Override
public boolean invokeOnFailure()
{
diff --git a/src/java/org/apache/cassandra/utils/NoSpamLogger.java
b/src/java/org/apache/cassandra/utils/NoSpamLogger.java
index 9b62e2168b..0a13f6b2a5 100644
--- a/src/java/org/apache/cassandra/utils/NoSpamLogger.java
+++ b/src/java/org/apache/cassandra/utils/NoSpamLogger.java
@@ -51,19 +51,18 @@ public class NoSpamLogger
}
@VisibleForTesting
- static interface Clock
+ public interface Clock
{
long nanoTime();
}
+ private static Clock CLOCK = Global::nanoTime;
+
@VisibleForTesting
- static Clock CLOCK = new Clock()
+ public static void unsafeSetClock(Clock clock)
{
- public long nanoTime()
- {
- return Global.nanoTime();
- }
- };
+ CLOCK = clock;
+ }
public class NoSpamLogStatement extends AtomicLong
{
diff --git a/test/unit/org/apache/cassandra/gms/GossipShutdownTest.java
b/test/unit/org/apache/cassandra/gms/GossipShutdownTest.java
new file mode 100644
index 0000000000..7bfa2ad0d4
--- /dev/null
+++ b/test/unit/org/apache/cassandra/gms/GossipShutdownTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.IOException;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.NoPayload;
+import org.apache.cassandra.net.Verb;
+import org.assertj.core.api.Assertions;
+
+public class GossipShutdownTest
+{
+ private static final int BEFORE_CHANGE =
MessagingService.Version.VERSION_40.value;
+ private static final int AFTER_CHANGE =
MessagingService.Version.VERSION_50.value;
+
+ @BeforeClass
+ public static void beforeClass()
+ {
+ DatabaseDescriptor.clientInitialization();
+ }
+
+ @Test
+ public void mixedMode() throws IOException
+ {
+ Message<GossipShutdown> message = Message.out(Verb.GOSSIP_SHUTDOWN,
new GossipShutdown(new EndpointState(HeartBeatState.empty())));
+
+ Assertions.assertThat(serde(message, BEFORE_CHANGE)).isNull();
+ Assertions.assertThat(serde(message,
AFTER_CHANGE)).isInstanceOf(GossipShutdown.class);
+
+ // got from 4.x peer
+ Assertions.assertThat(serde(Message.out(Verb.GOSSIP_SHUTDOWN,
NoPayload.noPayload), BEFORE_CHANGE)).isNull();
+ }
+
+ private Object serde(Message<?> message, int version) throws IOException
+ {
+ try (DataOutputBuffer out = DataOutputBuffer.scratchBuffer.get())
+ {
+ Message.serializer.serialize(message, out, version);
+ return Message.serializer.deserialize(new
DataInputBuffer(out.unsafeGetBufferAndFlip(), false), message.header,
version).payload;
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
index ac7cb5260a..d29f35799e 100644
--- a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
+++ b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
@@ -101,6 +101,7 @@ import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageDelivery;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.repair.messages.ValidationResponse;
import org.apache.cassandra.repair.state.Completable;
@@ -133,6 +134,7 @@ import org.apache.cassandra.utils.Generators;
import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.utils.MerkleTrees;
+import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
@@ -319,17 +321,13 @@ public abstract class FuzzTestBase extends
CQLTester.InMemory
@Override
public Set<Faults> apply(Cluster.Node node, Message<?> message)
{
+ if (RepairMessage.ALLOWS_RETRY.contains(message.verb()))
+ {
+ allowDrop.add(message.id());
+ return Faults.DROPPED;
+ }
switch (message.verb())
{
- case PREPARE_MSG:
- case VALIDATION_REQ:
- case VALIDATION_RSP:
- case SYNC_REQ:
- case SYNC_RSP:
- case SNAPSHOT_MSG:
- case CLEANUP_MSG:
- allowDrop.add(message.id());
- return Faults.DROPPED;
// these messages are not resilent to ephemeral issues
case PREPARE_CONSISTENT_REQ:
case PREPARE_CONSISTENT_RSP:
@@ -337,7 +335,6 @@ public abstract class FuzzTestBase extends
CQLTester.InMemory
case FINALIZE_PROMISE_MSG:
case FINALIZE_COMMIT_MSG:
case FAILED_SESSION_MSG:
-
noFaults.add(message.id());
return Faults.NONE;
default:
@@ -650,6 +647,13 @@ public abstract class FuzzTestBase extends
CQLTester.InMemory
Stage.ANTI_ENTROPY.unsafeSetExecutor(orderedExecutor);
Stage.INTERNAL_RESPONSE.unsafeSetExecutor(unorderedScheduled);
Mockito.when(failureDetector.isAlive(Mockito.any())).thenReturn(true);
+ Thread expectedThread = Thread.currentThread();
+ NoSpamLogger.unsafeSetClock(() -> {
+ if (Thread.currentThread() != expectedThread)
+ throw new AssertionError("NoSpamLogger.Clock accessed
outside of fuzzing...");
+ return globalExecutor.nanoTime();
+ });
+
int numNodes = rs.nextInt(3, 10);
List<String> dcs =
Gens.lists(IDENTIFIER_GEN).unique().ofSizeBetween(1, Math.min(10,
numNodes)).next(rs);
Map<InetAddressAndPort, Node> nodes =
Maps.newHashMapWithExpectedSize(numNodes);
diff --git
a/test/unit/org/apache/cassandra/repair/messages/RepairMessageTest.java
b/test/unit/org/apache/cassandra/repair/messages/RepairMessageTest.java
new file mode 100644
index 0000000000..e1a6eec3f1
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/messages/RepairMessageTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.messages;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.gms.IGossiper;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.RepairMetrics;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageDelivery;
+import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.repair.SharedContext;
+import org.apache.cassandra.utils.Backoff;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.TimeUUID;
+import org.assertj.core.api.Assertions;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+
+import static org.apache.cassandra.repair.messages.RepairMessage.always;
+import static
org.apache.cassandra.repair.messages.RepairMessage.sendMessageWithRetries;
+import static org.apache.cassandra.test.asserts.ExtendedAssertions.assertThat;
+
+public class RepairMessageTest
+{
+ private static final TimeUUID SESSION = new TimeUUID(0, 0);
+ private static final InetAddressAndPort ADDRESS =
FBUtilities.getBroadcastAddressAndPort();
+ private static final Answer REJECT_ALL = ignore -> {
+ throw new UnsupportedOperationException();
+ };
+ private static final int[] attempts = {1, 2, 10};
+ // Tests may use verb / message pairs that do not make sense... that is
due to the fact that the message sending logic does not validate this and
delegates such validation to messaging, which is mocked within the class...
+ // By using messages with simpler state it makes the test easier to read,
even though the verb -> message mapping is incorrect.
+ private static final Verb VERB = Verb.PREPARE_MSG;
+ private static final RepairMessage PAYLOAD = new CleanupMessage(SESSION);
+ public static final int[] NO_RETRY_ATTEMPTS = { 0 };
+
+ static
+ {
+ DatabaseDescriptor.clientInitialization();
+ RepairMetrics.init();
+ }
+
+ @Before
+ public void before()
+ {
+ RepairMetrics.unsafeReset();
+ }
+
+ @Test
+ public void noRetries()
+ {
+ test(NO_RETRY_ATTEMPTS, (ignore, callback) -> {
+ callback.onResponse(Message.out(VERB, PAYLOAD));
+ assertNoRetries();
+ });
+ }
+
+ @Test
+ public void noRetriesRequestFailed()
+ {
+ test(NO_RETRY_ATTEMPTS, ((ignore, callback) -> {
+ callback.onFailure(ADDRESS, RequestFailureReason.UNKNOWN);
+ assertNoRetries();
+ }));
+ }
+
+ @Test
+ public void retryWithSuccess()
+ {
+ test((maxAttempts, callback) -> {
+ callback.onResponse(Message.out(VERB, PAYLOAD));
+ assertMetrics(maxAttempts, false, false);
+ });
+ }
+
+ @Test
+ public void retryWithTimeout()
+ {
+ test((maxAttempts, callback) -> {
+ callback.onFailure(ADDRESS, RequestFailureReason.TIMEOUT);
+ assertMetrics(maxAttempts, true, false);
+ });
+ }
+
+ @Test
+ public void retryWithFailure()
+ {
+ test((maxAttempts, callback) -> {
+ callback.onFailure(ADDRESS, RequestFailureReason.UNKNOWN);
+ assertMetrics(maxAttempts, false, true);
+ });
+ }
+
+ private void assertNoRetries()
+ {
+ assertMetrics(0, false, false);
+ }
+
+ private void assertMetrics(long attempts, boolean timeout, boolean failure)
+ {
+ if (attempts == 0)
+ {
+ assertThat(RepairMetrics.retries).isEmpty();
+ assertThat(RepairMetrics.retriesByVerb.get(VERB)).isEmpty();
+ assertThat(RepairMetrics.retryTimeout).isEmpty();
+ assertThat(RepairMetrics.retryTimeoutByVerb.get(VERB)).isEmpty();
+ assertThat(RepairMetrics.retryFailure).isEmpty();
+ assertThat(RepairMetrics.retryFailureByVerb.get(VERB)).isEmpty();
+ }
+ else
+ {
+ assertThat(RepairMetrics.retries).hasCount(1).hasMax(attempts);
+
assertThat(RepairMetrics.retriesByVerb.get(VERB)).hasCount(1).hasMax(attempts);
+ assertThat(RepairMetrics.retryTimeout).hasCount(timeout ? 1 : 0);
+
assertThat(RepairMetrics.retryTimeoutByVerb.get(VERB)).hasCount(timeout ? 1 :
0);
+ assertThat(RepairMetrics.retryFailure).hasCount(failure ? 1 : 0);
+
assertThat(RepairMetrics.retryFailureByVerb.get(VERB)).hasCount(failure ? 1 :
0);
+ }
+ }
+
+ private static Backoff backoff(int maxAttempts)
+ {
+ return new Backoff.ExponentialBackoff(maxAttempts, 100, 1000, () ->
.5);
+ }
+
+ private static SharedContext ctx()
+ {
+ SharedContext ctx = Mockito.mock(SharedContext.class, REJECT_ALL);
+ MessageDelivery messaging = Mockito.mock(MessageDelivery.class,
REJECT_ALL);
+ // allow the single method under test
+ Mockito.doNothing().when(messaging).sendWithCallback(Mockito.any(),
Mockito.any(), Mockito.any());
+ IGossiper gossiper = Mockito.mock(IGossiper.class, REJECT_ALL);
+
Mockito.doReturn(RepairMessage.SUPPORTS_RETRY).when(gossiper).getReleaseVersion(Mockito.any());
+ ScheduledExecutorPlus executor =
Mockito.mock(ScheduledExecutorPlus.class, REJECT_ALL);
+ Mockito.doAnswer(invocationOnMock -> {
+ Runnable fn = invocationOnMock.getArgument(0);
+ fn.run();
+ return null;
+ }).when(executor).schedule(Mockito.<Runnable>any(), Mockito.anyLong(),
Mockito.any());
+
+ Mockito.doReturn(messaging).when(ctx).messaging();
+ Mockito.doReturn(gossiper).when(ctx).gossiper();
+ Mockito.doReturn(executor).when(ctx).optionalTasks();
+ return ctx;
+ }
+
+ private static <T extends RepairMessage> RequestCallback<T>
callback(MessageDelivery messaging)
+ {
+ ArgumentCaptor<Message<?>> messageCapture =
ArgumentCaptor.forClass(Message.class);
+ ArgumentCaptor<InetAddressAndPort> endpointCapture =
ArgumentCaptor.forClass(InetAddressAndPort.class);
+ ArgumentCaptor<RequestCallback<T>> callbackCapture =
ArgumentCaptor.forClass(RequestCallback.class);
+
+ Mockito.verify(messaging).sendWithCallback(messageCapture.capture(),
endpointCapture.capture(), callbackCapture.capture());
+ Mockito.clearInvocations(messaging);
+
+ Assertions.assertThat(endpointCapture.getValue()).isEqualTo(ADDRESS);
+
+ return callbackCapture.getValue();
+ }
+
+ private interface TestCase
+ {
+ void test(int maxAttempts, RequestCallback<RepairMessage> callback);
+ }
+
+ private void test(TestCase fn)
+ {
+ test(attempts, fn);
+ }
+
+ private void test(int[] attempts, TestCase fn)
+ {
+ SharedContext ctx = ctx();
+ MessageDelivery messaging = ctx.messaging();
+
+ for (int maxAttempts : attempts)
+ {
+ before();
+
+ sendMessageWithRetries(ctx, backoff(maxAttempts), always(),
PAYLOAD, VERB, ADDRESS, RepairMessage.NOOP_CALLBACK, 0);
+ for (int i = 0; i < maxAttempts; i++)
+ callback(messaging).onFailure(ADDRESS,
RequestFailureReason.TIMEOUT);
+ fn.test(maxAttempts, callback(messaging));
+ Mockito.verifyNoInteractions(messaging);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/test/unit/org/apache/cassandra/test/asserts/ExtendedAssertions.java
b/test/unit/org/apache/cassandra/test/asserts/ExtendedAssertions.java
new file mode 100644
index 0000000000..fc6040e1b9
--- /dev/null
+++ b/test/unit/org/apache/cassandra/test/asserts/ExtendedAssertions.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.asserts;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Counting;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Snapshot;
+import org.assertj.core.api.AbstractObjectAssert;
+
+public class ExtendedAssertions
+{
+ public static CounterAssert assertThat(Counter counter)
+ {
+ return new CounterAssert(counter);
+ }
+
+ public static HistogramAssert assertThat(Histogram histogram)
+ {
+ return new HistogramAssert(histogram);
+ }
+
+ public static abstract class CountingAssert<T extends Counting, Self
extends CountingAssert<T, Self>> extends AbstractObjectAssert<Self, T>
+ {
+ protected CountingAssert(T t, Class<?> selfType)
+ {
+ super(t, selfType);
+ }
+
+ public Self hasCount(int expected)
+ {
+ isNotNull();
+ if (actual.getCount() != expected)
+ throw failure("%s count was %d, but expected %d",
actual.getClass().getSimpleName(), actual.getCount(), expected);
+ return (Self) this;
+ }
+
+ public Self isEmpty()
+ {
+ return hasCount(0);
+ }
+ }
+
+ public static class CounterAssert extends CountingAssert<Counter,
CounterAssert>
+ {
+ public CounterAssert(Counter counter)
+ {
+ super(counter, CounterAssert.class);
+ }
+ }
+
+ public static class HistogramAssert extends CountingAssert<Histogram,
HistogramAssert>
+ {
+ public HistogramAssert(Histogram histogram)
+ {
+ super(histogram, HistogramAssert.class);
+ }
+
+ public HistogramAssert hasMax(long expected)
+ {
+ isNotNull();
+ Snapshot snapshot = actual.getSnapshot();
+ if (snapshot.getMax() != expected)
+ throw failure("Expected max %d but given %d", expected,
actual.getCount());
+ return this;
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/utils/NoSpamLoggerTest.java
b/test/unit/org/apache/cassandra/utils/NoSpamLoggerTest.java
index 73aef09a51..039d72257f 100644
--- a/test/unit/org/apache/cassandra/utils/NoSpamLoggerTest.java
+++ b/test/unit/org/apache/cassandra/utils/NoSpamLoggerTest.java
@@ -80,7 +80,7 @@ public class NoSpamLoggerTest
@BeforeClass
public static void setUpClass() throws Exception
{
- NoSpamLogger.CLOCK = () -> now;
+ NoSpamLogger.unsafeSetClock(() -> now);
}
@Before
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]