This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
new ce21eb5 Add tests for the Hint service metrics
ce21eb5 is described below
commit ce21eb5fac385098b7ed19c77167a38b5dee230a
Author: Andrés de la Peña <[email protected]>
AuthorDate: Tue Aug 17 13:43:29 2021 +0100
Add tests for the Hint service metrics
patch by Andrés de la Peña; reviewed by Benjamin Lerer for CASSANDRA-16189
---
.../distributed/impl/InstanceMetrics.java | 15 +-
.../test/metrics/HintsServiceMetricsTest.java | 231 +++++++++++++++++++++
2 files changed, 241 insertions(+), 5 deletions(-)
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/InstanceMetrics.java
b/test/distributed/org/apache/cassandra/distributed/impl/InstanceMetrics.java
index 3bd5894..939691d 100644
---
a/test/distributed/org/apache/cassandra/distributed/impl/InstanceMetrics.java
+++
b/test/distributed/org/apache/cassandra/distributed/impl/InstanceMetrics.java
@@ -69,10 +69,7 @@ class InstanceMetrics implements Metrics
public double getHistogram(String name, MetricValue value)
{
Histogram histogram = metricsRegistry.getHistograms().get(name);
- if (value == MetricValue.COUNT)
- return histogram.getCount();
-
- return getValue(histogram.getSnapshot(), value);
+ return getValue(histogram, value);
}
public Map<String, Double> getHistograms(Predicate<String> filter,
MetricValue value)
@@ -81,7 +78,7 @@ class InstanceMetrics implements Metrics
for (Map.Entry<String, Histogram> e :
metricsRegistry.getHistograms().entrySet())
{
if (filter.test(e.getKey()))
- values.put(e.getKey(), getValue(e.getValue().getSnapshot(),
value));
+ values.put(e.getKey(), getValue(e.getValue(), value));
}
return values;
}
@@ -135,6 +132,14 @@ class InstanceMetrics implements Metrics
return values;
}
+ static double getValue(Histogram histogram, MetricValue value)
+ {
+ if (value == MetricValue.COUNT)
+ return histogram.getCount();
+
+ return getValue(histogram.getSnapshot(), value);
+ }
+
static double getValue(Snapshot snapshot, MetricValue value)
{
switch (value)
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/metrics/HintsServiceMetricsTest.java
b/test/distributed/org/apache/cassandra/distributed/test/metrics/HintsServiceMetricsTest.java
new file mode 100644
index 0000000..a47c782
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/metrics/HintsServiceMetricsTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.metrics;
+
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.Metrics;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.hints.Hint;
+import org.apache.cassandra.metrics.HintsServiceMetrics;
+import org.apache.cassandra.net.Verb;
+import org.awaitility.core.ThrowingRunnable;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.awaitility.Awaitility.await;
+
+/**
+ * Tests {@link HintsServiceMetrics}.
+ */
+public class HintsServiceMetricsTest extends TestBaseImpl
+{
+ private static final int NUM_ROWS = 100;
+ private static final int NUM_FAILURES_PER_NODE = 5;
+ private static final int NUM_TIMEOUTS_PER_NODE = 3;
+
+ @Test
+ public void testHintsServiceMetrics() throws Exception
+ {
+ // setup a 3-node cluster with a bytebuddy injection that makes the
writting of some hints to fail
+ try (Cluster cluster = builder().withNodes(3)
+ .withConfig(config ->
config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
+
.withInstanceInitializer(FailHints::install)
+ .start())
+ {
+ // setup a message filter to drop some of the hint request
messages from node1
+ AtomicInteger hintsNode2 = new AtomicInteger();
+ AtomicInteger hintsNode3 = new AtomicInteger();
+ cluster.filters()
+ .verbs(Verb.HINT_REQ.id)
+ .from(1)
+ .messagesMatching((from, to, message) ->
+ (to == 2 && hintsNode2.incrementAndGet()
<= NUM_TIMEOUTS_PER_NODE) ||
+ (to == 3 && hintsNode3.incrementAndGet()
<= NUM_TIMEOUTS_PER_NODE))
+ .drop();
+
+ // setup a message filter to drop mutations requests from node1,
so it creates hints for those mutations
+ AtomicBoolean dropWritesForNode2 = new AtomicBoolean(false);
+ AtomicBoolean dropWritesForNode3 = new AtomicBoolean(false);
+ cluster.filters()
+ .verbs(Verb.MUTATION_REQ.id)
+ .from(1)
+ .messagesMatching((from, to, message) ->
+ (to == 2 && dropWritesForNode2.get()) ||
+ (to == 3 && dropWritesForNode3.get()))
+ .drop();
+
+ // fix under replicated keyspaces so they don't produce hint
requests while we are dropping mutations
+ fixDistributedSchemas(cluster);
+
+ cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH
replication = {'class': 'SimpleStrategy', 'replication_factor': 3}"));
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.t (k int
PRIMARY KEY, v int)"));
+
+ ICoordinator coordinator = cluster.coordinator(1);
+ IInvokableInstance node1 = cluster.get(1);
+ IInvokableInstance node2 = cluster.get(2);
+ IInvokableInstance node3 = cluster.get(3);
+
+ // write the first half of the rows with the second node dropping
mutation requests,
+ // so some hints will be created for that node
+ dropWritesForNode2.set(true);
+ for (int i = 0; i < NUM_ROWS / 2; i++)
+ coordinator.execute(withKeyspace("INSERT INTO %s.t (k, v)
VALUES (?, ?)"), QUORUM, i, i);
+ dropWritesForNode2.set(false);
+
+ // write the second half of the rows with the third node dropping
mutations requests,
+ // so some hints will be created for that node
+ dropWritesForNode3.set(true);
+ for (int i = NUM_ROWS / 2; i < NUM_ROWS; i++)
+ coordinator.execute(withKeyspace("INSERT INTO %s.t (k, v)
VALUES (?, ?)"), QUORUM, i, i);
+ dropWritesForNode3.set(false);
+
+ // wait until all the hints have been successfully applied to the
nodes that have been dropping mutations
+ waitUntilAsserted(() ->
assertThat(countRows(node2)).isEqualTo(countRows(node3)).isEqualTo(NUM_ROWS));
+
+ // Verify the metrics for the coordinator node, which is the only
one actually sending hints.
+ // The hint delivery errors that we have injected should have made
the service try to send them again.
+ // These retries are done periodically and in pages, so the
retries may send again some of the hints that
+ // were already successfully sent. This way, there may be more
succeeded hints than actual hints/rows.
+ waitUntilAsserted(() ->
assertThat(countHintsSucceeded(node1)).isGreaterThanOrEqualTo(NUM_ROWS));
+ waitUntilAsserted(() ->
assertThat(countHintsFailed(node1)).isEqualTo(NUM_FAILURES_PER_NODE * 2));
+ waitUntilAsserted(() ->
assertThat(countHintsTimedOut(node1)).isEqualTo(NUM_TIMEOUTS_PER_NODE * 2));
+
+ // verify delay metrics
+ long numGlobalDelays = countGlobalDelays(node1);
+ assertThat(numGlobalDelays).isGreaterThanOrEqualTo(NUM_ROWS);
+ assertThat(countEndpointDelays(node1, node1)).isEqualTo(0);
+ assertThat(countEndpointDelays(node1,
node2)).isGreaterThan(0).isLessThanOrEqualTo(numGlobalDelays);
+ assertThat(countEndpointDelays(node1,
node3)).isGreaterThan(0).isLessThanOrEqualTo(numGlobalDelays);
+ assertThat(countEndpointDelays(node1, node2) +
countEndpointDelays(node1, node3)).isGreaterThanOrEqualTo(numGlobalDelays);
+
+ // verify that the metrics for the not-coordinator nodes are zero
+ for (IInvokableInstance node : Arrays.asList(node2, node3))
+ {
+ assertThat(countHintsSucceeded(node)).isEqualTo(0);
+ assertThat(countHintsFailed(node)).isEqualTo(0);
+ assertThat(countHintsTimedOut(node)).isEqualTo(0);
+ assertThat(countGlobalDelays(node)).isEqualTo(0);
+ cluster.forEach(target -> assertThat(countEndpointDelays(node,
target)).isEqualTo(0));
+ }
+ }
+ }
+
+ private static void waitUntilAsserted(ThrowingRunnable assertion)
+ {
+ await().atMost(5, MINUTES)
+ .pollDelay(0, SECONDS)
+ .pollInterval(1, SECONDS)
+ .untilAsserted(assertion);
+ }
+
+ private static int countRows(IInvokableInstance node)
+ {
+ return node.executeInternal(withKeyspace("SELECT * FROM %s.t")).length;
+ }
+
+ @SuppressWarnings("Convert2MethodRef")
+ private static Long countHintsSucceeded(IInvokableInstance node)
+ {
+ return node.callOnInstance(() ->
HintsServiceMetrics.hintsSucceeded.getCount());
+ }
+
+ @SuppressWarnings("Convert2MethodRef")
+ private static Long countHintsFailed(IInvokableInstance node)
+ {
+ return node.callOnInstance(() ->
HintsServiceMetrics.hintsFailed.getCount());
+ }
+
+ @SuppressWarnings("Convert2MethodRef")
+ private static Long countHintsTimedOut(IInvokableInstance node)
+ {
+ return node.callOnInstance(() ->
HintsServiceMetrics.hintsTimedOut.getCount());
+ }
+
+ private static Long countGlobalDelays(IInvokableInstance node)
+ {
+ return getHistogramCount(node,
"org.apache.cassandra.metrics.HintsService.Hint_delays");
+ }
+
+ private static Long countEndpointDelays(IInvokableInstance node,
IInvokableInstance target)
+ {
+ return getHistogramCount(node,
String.format("org.apache.cassandra.metrics.HintsService.Hint_delays-%s.%d",
+
target.broadcastAddress().getAddress(),
+
target.broadcastAddress().getPort()));
+ }
+
+ private static long getHistogramCount(IInvokableInstance node, String name)
+ {
+ return node.metrics()
+ .getHistograms(s -> s.equals(name),
Metrics.MetricValue.COUNT)
+ .values()
+ .stream()
+ .findFirst()
+ .map(Math::round)
+ .orElse(0L);
+ }
+
+ /**
+ * Bytebuddy injection to make the application of hints to fail on the
destination node.
+ */
+ public static class FailHints
+ {
+ private static final AtomicInteger numHints = new AtomicInteger(0);
+
+ private static void install(ClassLoader cl, int nodeNumber)
+ {
+ // we can ignore the coordinator node
+ if (nodeNumber == 1)
+ return;
+
+ new ByteBuddy().rebase(Hint.class)
+ .method(named("applyFuture").and(takesArguments(0)))
+ .intercept(MethodDelegation.to(FailHints.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ public static CompletableFuture<?> execute(@SuperCall
Callable<CompletableFuture<?>> r) throws Exception
+ {
+ if (numHints.incrementAndGet() <= NUM_FAILURES_PER_NODE)
+ throw new RuntimeException("Injected failure");
+ return r.call();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]