This is an automated email from the ASF dual-hosted git repository.

houston pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b8f933529f SOLR-17547: Collect ZK Metrics via Curator (#2850)
2b8f933529f is described below

commit 2b8f933529fa736fe5fd2a9b0c751bedf352f0c7
Author: Houston Putman <[email protected]>
AuthorDate: Wed Mar 19 15:25:09 2025 -0500

    SOLR-17547: Collect ZK Metrics via Curator (#2850)
    
    Watches still don't get monitored by Curator, so we have to collect the 
"watchesFired" metric manually
---
 .../solr/metrics/SolrMetricsIntegrationTest.java   |  13 +-
 .../solr/common/cloud/SolrZKMetricsListener.java   | 146 +++++++++++++++++++++
 .../org/apache/solr/common/cloud/SolrZkClient.java |  94 ++-----------
 3 files changed, 165 insertions(+), 88 deletions(-)

diff --git 
a/solr/core/src/test/org/apache/solr/metrics/SolrMetricsIntegrationTest.java 
b/solr/core/src/test/org/apache/solr/metrics/SolrMetricsIntegrationTest.java
index ea03399c2c6..39dba55365c 100644
--- a/solr/core/src/test/org/apache/solr/metrics/SolrMetricsIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/metrics/SolrMetricsIntegrationTest.java
@@ -44,6 +44,7 @@ import org.apache.solr.embedded.JettySolrRunner;
 import org.apache.solr.metrics.reporters.MockMetricReporter;
 import org.apache.solr.util.JmxUtil;
 import org.apache.solr.util.TestHarness;
+import org.hamcrest.number.OrderingComparison;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -273,9 +274,15 @@ public class SolrMetricsIntegrationTest extends 
SolrTestCaseJ4 {
                     false,
                     List.of("metrics", "solr.node:CONTAINER.zkClient"));
 
-        assertTrue(findDelta(zkMmetrics, zkMmetricsNew, "childFetches") >= 1);
-        assertTrue(findDelta(zkMmetrics, zkMmetricsNew, 
"cumulativeChildrenFetched") >= 3);
-        assertTrue(findDelta(zkMmetrics, zkMmetricsNew, "existsChecks") >= 4);
+        assertThat(
+            findDelta(zkMmetrics, zkMmetricsNew, "childFetches"),
+            OrderingComparison.greaterThanOrEqualTo(1L));
+        assertThat(
+            findDelta(zkMmetrics, zkMmetricsNew, "cumulativeChildrenFetched"),
+            OrderingComparison.greaterThanOrEqualTo(3L));
+        assertThat(
+            findDelta(zkMmetrics, zkMmetricsNew, "existsChecks"),
+            OrderingComparison.greaterThanOrEqualTo(4L));
       }
     } finally {
       cluster.shutdown();
diff --git 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZKMetricsListener.java
 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZKMetricsListener.java
new file mode 100644
index 00000000000..e6375b9348a
--- /dev/null
+++ 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZKMetricsListener.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.common.cloud;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.curator.drivers.AdvancedTracerDriver;
+import org.apache.curator.drivers.EventTrace;
+import org.apache.curator.drivers.OperationTrace;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorListener;
+import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.annotation.JsonProperty;
+import org.apache.solr.common.util.ReflectMapWriter;
+
+public class SolrZKMetricsListener extends AdvancedTracerDriver
+    implements ReflectMapWriter, CuratorListener {
+  // all fields of this class are public because ReflectMapWriter requires 
them to be.
+  // however the object itself is private and only this class can modify it
+
+  @JsonProperty public final LongAdder watchesFired = new LongAdder();
+  @JsonProperty public final LongAdder reads = new LongAdder();
+  @JsonProperty public final LongAdder writes = new LongAdder();
+  @JsonProperty public final LongAdder bytesRead = new LongAdder();
+  @JsonProperty public final LongAdder bytesWritten = new LongAdder();
+
+  @JsonProperty public final LongAdder multiOps = new LongAdder();
+
+  @JsonProperty public final LongAdder cumulativeMultiOps = new LongAdder();
+
+  @JsonProperty public final LongAdder childFetches = new LongAdder();
+
+  @JsonProperty public final LongAdder cumulativeChildrenFetched = new 
LongAdder();
+
+  @JsonProperty public final LongAdder existsChecks = new LongAdder();
+
+  @JsonProperty public final LongAdder deletes = new LongAdder();
+
+  /*
+  This is used by curator for all operations, but we will only use it for 
Foreground operations.
+  Background operations are handled by the eventReceived() method instead.
+   */
+  @Override
+  public void addTrace(OperationTrace trace) {
+    switch (trace.getName()) {
+      case "CreateBuilderImpl-Foreground", "SetDataBuilderImpl-Foreground" -> {
+        writes.increment();
+        bytesWritten.add(trace.getRequestBytesLength());
+      }
+      case "DeleteBuilderImpl-Foreground" -> deletes.increment();
+      case "ExistsBuilderImpl-Foreground" -> existsChecks.increment();
+      case "GetDataBuilderImpl-Foreground" -> {
+        reads.increment();
+        bytesRead.add(trace.getResponseBytesLength());
+      }
+      case "GetChildrenBuilderImpl-Foreground" -> {
+        childFetches.increment();
+        cumulativeChildrenFetched.add(trace.getResponseChildrenCount());
+      }
+      case "CuratorMultiTransactionImpl-Foreground" -> {
+        multiOps.increment();
+        cumulativeMultiOps.add(trace.getRequestTransactionCount());
+      }
+      default -> {
+        // NO-OP - We do not currently track metrics for these
+      }
+    }
+  }
+
+  /*
+  This is used by Zookeeper for ConnectionState changes and retries.
+  We currently do not record metrics for either.
+   */
+  @Override
+  public void addEvent(EventTrace trace) {}
+
+  /*
+  This is used for Background operations and Watch firing.
+   */
+  @Override
+  public void eventReceived(CuratorFramework client, CuratorEvent event) {
+    switch (event.getType()) {
+      case CREATE, SET_DATA -> {
+        writes.increment();
+        if (event.getData() != null) {
+          bytesWritten.add(event.getData().length);
+        }
+      }
+      case DELETE -> deletes.increment();
+      case EXISTS -> existsChecks.increment();
+      case GET_DATA -> {
+        reads.increment();
+        if (event.getData() != null) {
+          bytesRead.add(event.getData().length);
+        }
+      }
+      case CHILDREN -> {
+        childFetches.increment();
+        if (event.getChildren() != null) {
+          cumulativeChildrenFetched.add(event.getChildren().size());
+        }
+      }
+      case TRANSACTION -> {
+        multiOps.increment();
+        if (event.getOpResults() != null) {
+          cumulativeMultiOps.add(event.getOpResults().size());
+        }
+      }
+      case WATCHED -> watchesFired.increment();
+      default -> {
+        // NO-OP - We do not currently track metrics for these
+      }
+    }
+  }
+
+  @Override
+  public void writeMap(MapWriter.EntryWriter ew) throws IOException {
+    ReflectMapWriter.super.writeMap(
+        new MapWriter.EntryWriter() {
+          @Override
+          public MapWriter.EntryWriter put(CharSequence k, Object v) throws 
IOException {
+            if (v instanceof LongAdder) {
+              ew.put(k, ((LongAdder) v).longValue());
+            } else {
+              ew.put(k, v);
+            }
+            return this;
+          }
+        });
+  }
+}
diff --git 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 72cecda6567..14ec233981d 100644
--- 
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ 
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -50,12 +49,10 @@ import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.SolrZkClientTimeout;
 import org.apache.solr.common.MapWriter;
 import org.apache.solr.common.SolrException;
-import org.apache.solr.common.annotation.JsonProperty;
 import org.apache.solr.common.util.Compressor;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.ObjectReleaseTracker;
-import org.apache.solr.common.util.ReflectMapWriter;
 import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.zookeeper.CreateMode;
@@ -83,16 +80,14 @@ public class SolrZkClient implements Closeable {
   private ExecutorService curatorSafeServiceExecutor;
   private CuratorFramework client;
 
-  private final ZkMetrics metrics = new ZkMetrics();
+  private static final SolrZKMetricsListener metricsListener = new 
SolrZKMetricsListener();
 
   private Compressor compressor;
   private int minStateByteLenForCompression;
 
-  // Allow method reference to return a reference to a functional interface 
(Mapwriter),
-  // rather than a reference to a ZkMetrics object
-  @SuppressWarnings("UnnecessaryMethodReference")
+  // The metrics collector is shared across all SolrZkClient objects
   public MapWriter getMetrics() {
-    return metrics::writeMap;
+    return metricsListener;
   }
 
   private final ExecutorService zkCallbackExecutor =
@@ -198,6 +193,10 @@ public class SolrZkClient implements Closeable {
                 new SolrZkCompressionProvider(compressor, 
minStateByteLenForCompression))
             .enableCompression()
             .build();
+    // This will collect metrics for foreground curator commands
+    client.getZookeeperClient().setTracerDriver(metricsListener);
+    // This will collect metrics for background curator commands
+    client.getCuratorListenable().addListener(metricsListener);
     client.start();
     try {
       if (!client.blockUntilConnected(clientConnectTimeout, 
TimeUnit.MILLISECONDS)) {
@@ -323,7 +322,6 @@ public class SolrZkClient implements Closeable {
       throws InterruptedException, KeeperException {
     runWithCorrectThrows(
         "deleting znode", () -> 
client.delete().withVersion(version).forPath(path));
-    metrics.deletes.increment();
   }
 
   /**
@@ -358,7 +356,6 @@ public class SolrZkClient implements Closeable {
         runWithCorrectThrows(
             "checking exists",
             () -> 
client.checkExists().usingWatcher(wrapWatcher(watcher)).forPath(path));
-    metrics.existsChecks.increment();
     return result;
   }
 
@@ -367,7 +364,6 @@ public class SolrZkClient implements Closeable {
       throws KeeperException, InterruptedException {
     Boolean result =
         runWithCorrectThrows("checking exists", () -> 
client.checkExists().forPath(path) != null);
-    metrics.existsChecks.increment();
     return result;
   }
 
@@ -378,11 +374,6 @@ public class SolrZkClient implements Closeable {
         runWithCorrectThrows(
             "getting children",
             () -> 
client.getChildren().usingWatcher(wrapWatcher(watcher)).forPath(path));
-
-    metrics.childFetches.increment();
-    if (result != null) {
-      metrics.cumulativeChildrenFetched.add(result.size());
-    }
     return result;
   }
 
@@ -399,11 +390,6 @@ public class SolrZkClient implements Closeable {
                     .storingStatIn(stat)
                     .usingWatcher(wrapWatcher(watcher))
                     .forPath(path));
-
-    metrics.childFetches.increment();
-    if (result != null) {
-      metrics.cumulativeChildrenFetched.add(result.size());
-    }
     return result;
   }
 
@@ -420,10 +406,6 @@ public class SolrZkClient implements Closeable {
                     .storingStatIn(stat)
                     .usingWatcher(wrapWatcher(watcher))
                     .forPath(path));
-    metrics.reads.increment();
-    if (result != null) {
-      metrics.bytesRead.add(result.length);
-    }
     return result;
   }
 
@@ -440,10 +422,6 @@ public class SolrZkClient implements Closeable {
     Stat result =
         runWithCorrectThrows(
             "setting data", () -> 
client.setData().withVersion(version).forPath(path, data));
-    metrics.writes.increment();
-    if (data != null) {
-      metrics.bytesWritten.add(data.length);
-    }
     return result;
   }
 
@@ -490,10 +468,6 @@ public class SolrZkClient implements Closeable {
     String result =
         runWithCorrectThrows(
             "creating znode", () -> 
client.create().withMode(createMode).forPath(path, data));
-    metrics.writes.increment();
-    if (data != null) {
-      metrics.bytesWritten.add(data.length);
-    }
     return result;
   }
 
@@ -513,10 +487,6 @@ public class SolrZkClient implements Closeable {
         runWithCorrectThrows(
             "creating znode",
             () -> 
client.create().storingStatIn(stat).withMode(createMode).forPath(path, data));
-    metrics.writes.increment();
-    if (data != null) {
-      metrics.bytesWritten.add(data.length);
-    }
     return result;
   }
 
@@ -635,11 +605,6 @@ public class SolrZkClient implements Closeable {
       createBuilder.orSetData();
     }
 
-    metrics.writes.increment();
-    if (data != null) {
-      metrics.bytesWritten.add(data.length);
-    }
-
     if (path.startsWith("/")) {
       path = path.substring(1);
     }
@@ -828,10 +793,6 @@ public class SolrZkClient implements Closeable {
                             .map(op -> 
op.buildWithoutThrows(client.transactionOp()))
                             .collect(Collectors.toList())));
 
-    metrics.multiOps.increment();
-    if (result != null) {
-      metrics.cumulativeMultiOps.add(result.size());
-    }
     return result;
   }
 
@@ -1121,7 +1082,8 @@ public class SolrZkClient implements Closeable {
       try {
         zkCallbackExecutor.execute(
             () -> {
-              metrics.watchesFired.increment();
+              // Curator still does not have a way of counting fired watches 
natively
+              metricsListener.watchesFired.increment();
               watcher.process(event);
             });
       } catch (RejectedExecutionException e) {
@@ -1152,44 +1114,6 @@ public class SolrZkClient implements Closeable {
     }
   }
 
-  // all fields of this class are public because ReflectMapWriter requires 
them to be.
-  // however the object itself is private and only this class can modify it
-  public static class ZkMetrics implements ReflectMapWriter {
-    @JsonProperty public final LongAdder watchesFired = new LongAdder();
-    @JsonProperty public final LongAdder reads = new LongAdder();
-    @JsonProperty public final LongAdder writes = new LongAdder();
-    @JsonProperty public final LongAdder bytesRead = new LongAdder();
-    @JsonProperty public final LongAdder bytesWritten = new LongAdder();
-
-    @JsonProperty public final LongAdder multiOps = new LongAdder();
-
-    @JsonProperty public final LongAdder cumulativeMultiOps = new LongAdder();
-
-    @JsonProperty public final LongAdder childFetches = new LongAdder();
-
-    @JsonProperty public final LongAdder cumulativeChildrenFetched = new 
LongAdder();
-
-    @JsonProperty public final LongAdder existsChecks = new LongAdder();
-
-    @JsonProperty public final LongAdder deletes = new LongAdder();
-
-    @Override
-    public void writeMap(EntryWriter ew) throws IOException {
-      ReflectMapWriter.super.writeMap(
-          new EntryWriter() {
-            @Override
-            public EntryWriter put(CharSequence k, Object v) throws 
IOException {
-              if (v instanceof LongAdder) {
-                ew.put(k, ((LongAdder) v).longValue());
-              } else {
-                ew.put(k, v);
-              }
-              return this;
-            }
-          });
-    }
-  }
-
   public static class NodeData {
 
     public final Stat stat;

Reply via email to