This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
     new ce21eb5  Add tests for the Hint service metrics
ce21eb5 is described below

commit ce21eb5fac385098b7ed19c77167a38b5dee230a
Author: Andrés de la Peña <[email protected]>
AuthorDate: Tue Aug 17 13:43:29 2021 +0100

    Add tests for the Hint service metrics
    
    patch by Andrés de la Peña; reviewed by Benjamin Lerer for CASSANDRA-16189
---
 .../distributed/impl/InstanceMetrics.java          |  15 +-
 .../test/metrics/HintsServiceMetricsTest.java      | 231 +++++++++++++++++++++
 2 files changed, 241 insertions(+), 5 deletions(-)

diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/InstanceMetrics.java 
b/test/distributed/org/apache/cassandra/distributed/impl/InstanceMetrics.java
index 3bd5894..939691d 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/InstanceMetrics.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/InstanceMetrics.java
@@ -69,10 +69,7 @@ class InstanceMetrics implements Metrics
     public double getHistogram(String name, MetricValue value)
     {
         Histogram histogram = metricsRegistry.getHistograms().get(name);
-        if (value == MetricValue.COUNT)
-            return histogram.getCount();
-
-        return getValue(histogram.getSnapshot(), value);
+        return getValue(histogram, value);
     }
 
     public Map<String, Double> getHistograms(Predicate<String> filter, 
MetricValue value)
@@ -81,7 +78,7 @@ class InstanceMetrics implements Metrics
         for (Map.Entry<String, Histogram> e : 
metricsRegistry.getHistograms().entrySet())
         {
             if (filter.test(e.getKey()))
-                values.put(e.getKey(), getValue(e.getValue().getSnapshot(), 
value));
+                values.put(e.getKey(), getValue(e.getValue(), value));
         }
         return values;
     }
@@ -135,6 +132,14 @@ class InstanceMetrics implements Metrics
         return values;
     }
 
+    static double getValue(Histogram histogram, MetricValue value)
+    {
+        if (value == MetricValue.COUNT)
+            return histogram.getCount();
+
+        return getValue(histogram.getSnapshot(), value);
+    }
+
     static double getValue(Snapshot snapshot, MetricValue value)
     {
         switch (value)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/metrics/HintsServiceMetricsTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/metrics/HintsServiceMetricsTest.java
new file mode 100644
index 0000000..a47c782
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/metrics/HintsServiceMetricsTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.metrics;
+
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.Metrics;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.hints.Hint;
+import org.apache.cassandra.metrics.HintsServiceMetrics;
+import org.apache.cassandra.net.Verb;
+import org.awaitility.core.ThrowingRunnable;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.awaitility.Awaitility.await;
+
+/**
+ * Tests {@link HintsServiceMetrics}.
+ */
+public class HintsServiceMetricsTest extends TestBaseImpl
+{
+    private static final int NUM_ROWS = 100;
+    private static final int NUM_FAILURES_PER_NODE = 5;
+    private static final int NUM_TIMEOUTS_PER_NODE = 3;
+
+    @Test
+    public void testHintsServiceMetrics() throws Exception
+    {
+        // setup a 3-node cluster with a bytebuddy injection that makes the 
writting of some hints to fail
+        try (Cluster cluster = builder().withNodes(3)
+                                        .withConfig(config -> 
config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
+                                        
.withInstanceInitializer(FailHints::install)
+                                        .start())
+        {
+            // setup a message filter to drop some of the hint request 
messages from node1
+            AtomicInteger hintsNode2 = new AtomicInteger();
+            AtomicInteger hintsNode3 = new AtomicInteger();
+            cluster.filters()
+                   .verbs(Verb.HINT_REQ.id)
+                   .from(1)
+                   .messagesMatching((from, to, message) ->
+                                     (to == 2 && hintsNode2.incrementAndGet() 
<= NUM_TIMEOUTS_PER_NODE) ||
+                                     (to == 3 && hintsNode3.incrementAndGet() 
<= NUM_TIMEOUTS_PER_NODE))
+                   .drop();
+
+            // setup a message filter to drop mutations requests from node1, 
so it creates hints for those mutations
+            AtomicBoolean dropWritesForNode2 = new AtomicBoolean(false);
+            AtomicBoolean dropWritesForNode3 = new AtomicBoolean(false);
+            cluster.filters()
+                   .verbs(Verb.MUTATION_REQ.id)
+                   .from(1)
+                   .messagesMatching((from, to, message) ->
+                                     (to == 2 && dropWritesForNode2.get()) ||
+                                     (to == 3 && dropWritesForNode3.get()))
+                   .drop();
+
+            // fix under replicated keyspaces so they don't produce hint 
requests while we are dropping mutations
+            fixDistributedSchemas(cluster);
+
+            cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 3}"));
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.t (k int 
PRIMARY KEY, v int)"));
+
+            ICoordinator coordinator = cluster.coordinator(1);
+            IInvokableInstance node1 = cluster.get(1);
+            IInvokableInstance node2 = cluster.get(2);
+            IInvokableInstance node3 = cluster.get(3);
+
+            // write the first half of the rows with the second node dropping 
mutation requests,
+            // so some hints will be created for that node
+            dropWritesForNode2.set(true);
+            for (int i = 0; i < NUM_ROWS / 2; i++)
+                coordinator.execute(withKeyspace("INSERT INTO %s.t (k, v) 
VALUES (?, ?)"), QUORUM, i, i);
+            dropWritesForNode2.set(false);
+
+            // write the second half of the rows with the third node dropping 
mutations requests,
+            // so some hints will be created for that node
+            dropWritesForNode3.set(true);
+            for (int i = NUM_ROWS / 2; i < NUM_ROWS; i++)
+                coordinator.execute(withKeyspace("INSERT INTO %s.t (k, v) 
VALUES (?, ?)"), QUORUM, i, i);
+            dropWritesForNode3.set(false);
+
+            // wait until all the hints have been successfully applied to the 
nodes that have been dropping mutations
+            waitUntilAsserted(() -> 
assertThat(countRows(node2)).isEqualTo(countRows(node3)).isEqualTo(NUM_ROWS));
+
+            // Verify the metrics for the coordinator node, which is the only 
one actually sending hints.
+            // The hint delivery errors that we have injected should have made 
the service try to send them again.
+            // These retries are done periodically and in pages, so the 
retries may send again some of the hints that
+            // were already successfully sent. This way, there may be more 
succeeded hints than actual hints/rows.
+            waitUntilAsserted(() -> 
assertThat(countHintsSucceeded(node1)).isGreaterThanOrEqualTo(NUM_ROWS));
+            waitUntilAsserted(() -> 
assertThat(countHintsFailed(node1)).isEqualTo(NUM_FAILURES_PER_NODE * 2));
+            waitUntilAsserted(() -> 
assertThat(countHintsTimedOut(node1)).isEqualTo(NUM_TIMEOUTS_PER_NODE * 2));
+
+            // verify delay metrics
+            long numGlobalDelays = countGlobalDelays(node1);
+            assertThat(numGlobalDelays).isGreaterThanOrEqualTo(NUM_ROWS);
+            assertThat(countEndpointDelays(node1, node1)).isEqualTo(0);
+            assertThat(countEndpointDelays(node1, 
node2)).isGreaterThan(0).isLessThanOrEqualTo(numGlobalDelays);
+            assertThat(countEndpointDelays(node1, 
node3)).isGreaterThan(0).isLessThanOrEqualTo(numGlobalDelays);
+            assertThat(countEndpointDelays(node1, node2) + 
countEndpointDelays(node1, node3)).isGreaterThanOrEqualTo(numGlobalDelays);
+
+            // verify that the metrics for the not-coordinator nodes are zero
+            for (IInvokableInstance node : Arrays.asList(node2, node3))
+            {
+                assertThat(countHintsSucceeded(node)).isEqualTo(0);
+                assertThat(countHintsFailed(node)).isEqualTo(0);
+                assertThat(countHintsTimedOut(node)).isEqualTo(0);
+                assertThat(countGlobalDelays(node)).isEqualTo(0);
+                cluster.forEach(target -> assertThat(countEndpointDelays(node, 
target)).isEqualTo(0));
+            }
+        }
+    }
+
+    private static void waitUntilAsserted(ThrowingRunnable assertion)
+    {
+        await().atMost(5, MINUTES)
+               .pollDelay(0, SECONDS)
+               .pollInterval(1, SECONDS)
+               .untilAsserted(assertion);
+    }
+
+    private static int countRows(IInvokableInstance node)
+    {
+        return node.executeInternal(withKeyspace("SELECT * FROM %s.t")).length;
+    }
+
+    @SuppressWarnings("Convert2MethodRef")
+    private static Long countHintsSucceeded(IInvokableInstance node)
+    {
+        return node.callOnInstance(() -> 
HintsServiceMetrics.hintsSucceeded.getCount());
+    }
+
+    @SuppressWarnings("Convert2MethodRef")
+    private static Long countHintsFailed(IInvokableInstance node)
+    {
+        return node.callOnInstance(() -> 
HintsServiceMetrics.hintsFailed.getCount());
+    }
+
+    @SuppressWarnings("Convert2MethodRef")
+    private static Long countHintsTimedOut(IInvokableInstance node)
+    {
+        return node.callOnInstance(() -> 
HintsServiceMetrics.hintsTimedOut.getCount());
+    }
+
+    private static Long countGlobalDelays(IInvokableInstance node)
+    {
+        return getHistogramCount(node, 
"org.apache.cassandra.metrics.HintsService.Hint_delays");
+    }
+
+    private static Long countEndpointDelays(IInvokableInstance node, 
IInvokableInstance target)
+    {
+        return getHistogramCount(node, 
String.format("org.apache.cassandra.metrics.HintsService.Hint_delays-%s.%d",
+                                                     
target.broadcastAddress().getAddress(),
+                                                     
target.broadcastAddress().getPort()));
+    }
+
+    private static long getHistogramCount(IInvokableInstance node, String name)
+    {
+        return node.metrics()
+                   .getHistograms(s -> s.equals(name), 
Metrics.MetricValue.COUNT)
+                   .values()
+                   .stream()
+                   .findFirst()
+                   .map(Math::round)
+                   .orElse(0L);
+    }
+
+    /**
+     * Bytebuddy injection to make the application of hints to fail on the 
destination node.
+     */
+    public static class FailHints
+    {
+        private static final AtomicInteger numHints = new AtomicInteger(0);
+
+        private static void install(ClassLoader cl, int nodeNumber)
+        {
+            // we can ignore the coordinator node
+            if (nodeNumber == 1)
+                return;
+
+            new ByteBuddy().rebase(Hint.class)
+                           .method(named("applyFuture").and(takesArguments(0)))
+                           .intercept(MethodDelegation.to(FailHints.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        public static CompletableFuture<?> execute(@SuperCall 
Callable<CompletableFuture<?>> r) throws Exception
+        {
+            if (numHints.incrementAndGet() <= NUM_FAILURES_PER_NODE)
+                throw new RuntimeException("Injected failure");
+            return r.call();
+        }
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to