This is an automated email from the ASF dual-hosted git repository.
houston pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 2b8f933529f SOLR-17547: Collect ZK Metrics via Curator (#2850)
2b8f933529f is described below
commit 2b8f933529fa736fe5fd2a9b0c751bedf352f0c7
Author: Houston Putman <[email protected]>
AuthorDate: Wed Mar 19 15:25:09 2025 -0500
SOLR-17547: Collect ZK Metrics via Curator (#2850)
Watches still don't get monitored by Curator, so we have to collect the
"watchesFired" metric manually
---
.../solr/metrics/SolrMetricsIntegrationTest.java | 13 +-
.../solr/common/cloud/SolrZKMetricsListener.java | 146 +++++++++++++++++++++
.../org/apache/solr/common/cloud/SolrZkClient.java | 94 ++-----------
3 files changed, 165 insertions(+), 88 deletions(-)
diff --git
a/solr/core/src/test/org/apache/solr/metrics/SolrMetricsIntegrationTest.java
b/solr/core/src/test/org/apache/solr/metrics/SolrMetricsIntegrationTest.java
index ea03399c2c6..39dba55365c 100644
--- a/solr/core/src/test/org/apache/solr/metrics/SolrMetricsIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/metrics/SolrMetricsIntegrationTest.java
@@ -44,6 +44,7 @@ import org.apache.solr.embedded.JettySolrRunner;
import org.apache.solr.metrics.reporters.MockMetricReporter;
import org.apache.solr.util.JmxUtil;
import org.apache.solr.util.TestHarness;
+import org.hamcrest.number.OrderingComparison;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -273,9 +274,15 @@ public class SolrMetricsIntegrationTest extends
SolrTestCaseJ4 {
false,
List.of("metrics", "solr.node:CONTAINER.zkClient"));
- assertTrue(findDelta(zkMmetrics, zkMmetricsNew, "childFetches") >= 1);
- assertTrue(findDelta(zkMmetrics, zkMmetricsNew,
"cumulativeChildrenFetched") >= 3);
- assertTrue(findDelta(zkMmetrics, zkMmetricsNew, "existsChecks") >= 4);
+ assertThat(
+ findDelta(zkMmetrics, zkMmetricsNew, "childFetches"),
+ OrderingComparison.greaterThanOrEqualTo(1L));
+ assertThat(
+ findDelta(zkMmetrics, zkMmetricsNew, "cumulativeChildrenFetched"),
+ OrderingComparison.greaterThanOrEqualTo(3L));
+ assertThat(
+ findDelta(zkMmetrics, zkMmetricsNew, "existsChecks"),
+ OrderingComparison.greaterThanOrEqualTo(4L));
}
} finally {
cluster.shutdown();
diff --git
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZKMetricsListener.java
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZKMetricsListener.java
new file mode 100644
index 00000000000..e6375b9348a
--- /dev/null
+++
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZKMetricsListener.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.common.cloud;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.curator.drivers.AdvancedTracerDriver;
+import org.apache.curator.drivers.EventTrace;
+import org.apache.curator.drivers.OperationTrace;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorListener;
+import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.annotation.JsonProperty;
+import org.apache.solr.common.util.ReflectMapWriter;
+
+public class SolrZKMetricsListener extends AdvancedTracerDriver
+ implements ReflectMapWriter, CuratorListener {
+ // all fields of this class are public because ReflectMapWriter requires
them to be.
+ // however the object itself is private and only this class can modify it
+
+ @JsonProperty public final LongAdder watchesFired = new LongAdder();
+ @JsonProperty public final LongAdder reads = new LongAdder();
+ @JsonProperty public final LongAdder writes = new LongAdder();
+ @JsonProperty public final LongAdder bytesRead = new LongAdder();
+ @JsonProperty public final LongAdder bytesWritten = new LongAdder();
+
+ @JsonProperty public final LongAdder multiOps = new LongAdder();
+
+ @JsonProperty public final LongAdder cumulativeMultiOps = new LongAdder();
+
+ @JsonProperty public final LongAdder childFetches = new LongAdder();
+
+ @JsonProperty public final LongAdder cumulativeChildrenFetched = new
LongAdder();
+
+ @JsonProperty public final LongAdder existsChecks = new LongAdder();
+
+ @JsonProperty public final LongAdder deletes = new LongAdder();
+
+ /*
+ This is used by curator for all operations, but we will only use it for
Foreground operations.
+ Background operations are handled by the eventReceived() method instead.
+ */
+ @Override
+ public void addTrace(OperationTrace trace) {
+ switch (trace.getName()) {
+ case "CreateBuilderImpl-Foreground", "SetDataBuilderImpl-Foreground" -> {
+ writes.increment();
+ bytesWritten.add(trace.getRequestBytesLength());
+ }
+ case "DeleteBuilderImpl-Foreground" -> deletes.increment();
+ case "ExistsBuilderImpl-Foreground" -> existsChecks.increment();
+ case "GetDataBuilderImpl-Foreground" -> {
+ reads.increment();
+ bytesRead.add(trace.getResponseBytesLength());
+ }
+ case "GetChildrenBuilderImpl-Foreground" -> {
+ childFetches.increment();
+ cumulativeChildrenFetched.add(trace.getResponseChildrenCount());
+ }
+ case "CuratorMultiTransactionImpl-Foreground" -> {
+ multiOps.increment();
+ cumulativeMultiOps.add(trace.getRequestTransactionCount());
+ }
+ default -> {
+ // NO-OP - We do not currently track metrics for these
+ }
+ }
+ }
+
+ /*
+ This is used by Zookeeper for ConnectionState changes and retries.
+ We currently do not record metrics for either.
+ */
+ @Override
+ public void addEvent(EventTrace trace) {}
+
+ /*
+ This is used for Background operations and Watch firing.
+ */
+ @Override
+ public void eventReceived(CuratorFramework client, CuratorEvent event) {
+ switch (event.getType()) {
+ case CREATE, SET_DATA -> {
+ writes.increment();
+ if (event.getData() != null) {
+ bytesWritten.add(event.getData().length);
+ }
+ }
+ case DELETE -> deletes.increment();
+ case EXISTS -> existsChecks.increment();
+ case GET_DATA -> {
+ reads.increment();
+ if (event.getData() != null) {
+ bytesRead.add(event.getData().length);
+ }
+ }
+ case CHILDREN -> {
+ childFetches.increment();
+ if (event.getChildren() != null) {
+ cumulativeChildrenFetched.add(event.getChildren().size());
+ }
+ }
+ case TRANSACTION -> {
+ multiOps.increment();
+ if (event.getOpResults() != null) {
+ cumulativeMultiOps.add(event.getOpResults().size());
+ }
+ }
+ case WATCHED -> watchesFired.increment();
+ default -> {
+ // NO-OP - We do not currently track metrics for these
+ }
+ }
+ }
+
+ @Override
+ public void writeMap(MapWriter.EntryWriter ew) throws IOException {
+ ReflectMapWriter.super.writeMap(
+ new MapWriter.EntryWriter() {
+ @Override
+ public MapWriter.EntryWriter put(CharSequence k, Object v) throws
IOException {
+ if (v instanceof LongAdder) {
+ ew.put(k, ((LongAdder) v).longValue());
+ } else {
+ ew.put(k, v);
+ }
+ return this;
+ }
+ });
+ }
+}
diff --git
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 72cecda6567..14ec233981d 100644
---
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -50,12 +49,10 @@ import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.SolrZkClientTimeout;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
-import org.apache.solr.common.annotation.JsonProperty;
import org.apache.solr.common.util.Compressor;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.ObjectReleaseTracker;
-import org.apache.solr.common.util.ReflectMapWriter;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.StrUtils;
import org.apache.zookeeper.CreateMode;
@@ -83,16 +80,14 @@ public class SolrZkClient implements Closeable {
private ExecutorService curatorSafeServiceExecutor;
private CuratorFramework client;
- private final ZkMetrics metrics = new ZkMetrics();
+ private static final SolrZKMetricsListener metricsListener = new
SolrZKMetricsListener();
private Compressor compressor;
private int minStateByteLenForCompression;
- // Allow method reference to return a reference to a functional interface
(Mapwriter),
- // rather than a reference to a ZkMetrics object
- @SuppressWarnings("UnnecessaryMethodReference")
+ // The metrics collector is shared across all SolrZkClient objects
public MapWriter getMetrics() {
- return metrics::writeMap;
+ return metricsListener;
}
private final ExecutorService zkCallbackExecutor =
@@ -198,6 +193,10 @@ public class SolrZkClient implements Closeable {
new SolrZkCompressionProvider(compressor,
minStateByteLenForCompression))
.enableCompression()
.build();
+ // This will collect metrics for foreground curator commands
+ client.getZookeeperClient().setTracerDriver(metricsListener);
+ // This will collect metrics for background curator commands
+ client.getCuratorListenable().addListener(metricsListener);
client.start();
try {
if (!client.blockUntilConnected(clientConnectTimeout,
TimeUnit.MILLISECONDS)) {
@@ -323,7 +322,6 @@ public class SolrZkClient implements Closeable {
throws InterruptedException, KeeperException {
runWithCorrectThrows(
"deleting znode", () ->
client.delete().withVersion(version).forPath(path));
- metrics.deletes.increment();
}
/**
@@ -358,7 +356,6 @@ public class SolrZkClient implements Closeable {
runWithCorrectThrows(
"checking exists",
() ->
client.checkExists().usingWatcher(wrapWatcher(watcher)).forPath(path));
- metrics.existsChecks.increment();
return result;
}
@@ -367,7 +364,6 @@ public class SolrZkClient implements Closeable {
throws KeeperException, InterruptedException {
Boolean result =
runWithCorrectThrows("checking exists", () ->
client.checkExists().forPath(path) != null);
- metrics.existsChecks.increment();
return result;
}
@@ -378,11 +374,6 @@ public class SolrZkClient implements Closeable {
runWithCorrectThrows(
"getting children",
() ->
client.getChildren().usingWatcher(wrapWatcher(watcher)).forPath(path));
-
- metrics.childFetches.increment();
- if (result != null) {
- metrics.cumulativeChildrenFetched.add(result.size());
- }
return result;
}
@@ -399,11 +390,6 @@ public class SolrZkClient implements Closeable {
.storingStatIn(stat)
.usingWatcher(wrapWatcher(watcher))
.forPath(path));
-
- metrics.childFetches.increment();
- if (result != null) {
- metrics.cumulativeChildrenFetched.add(result.size());
- }
return result;
}
@@ -420,10 +406,6 @@ public class SolrZkClient implements Closeable {
.storingStatIn(stat)
.usingWatcher(wrapWatcher(watcher))
.forPath(path));
- metrics.reads.increment();
- if (result != null) {
- metrics.bytesRead.add(result.length);
- }
return result;
}
@@ -440,10 +422,6 @@ public class SolrZkClient implements Closeable {
Stat result =
runWithCorrectThrows(
"setting data", () ->
client.setData().withVersion(version).forPath(path, data));
- metrics.writes.increment();
- if (data != null) {
- metrics.bytesWritten.add(data.length);
- }
return result;
}
@@ -490,10 +468,6 @@ public class SolrZkClient implements Closeable {
String result =
runWithCorrectThrows(
"creating znode", () ->
client.create().withMode(createMode).forPath(path, data));
- metrics.writes.increment();
- if (data != null) {
- metrics.bytesWritten.add(data.length);
- }
return result;
}
@@ -513,10 +487,6 @@ public class SolrZkClient implements Closeable {
runWithCorrectThrows(
"creating znode",
() ->
client.create().storingStatIn(stat).withMode(createMode).forPath(path, data));
- metrics.writes.increment();
- if (data != null) {
- metrics.bytesWritten.add(data.length);
- }
return result;
}
@@ -635,11 +605,6 @@ public class SolrZkClient implements Closeable {
createBuilder.orSetData();
}
- metrics.writes.increment();
- if (data != null) {
- metrics.bytesWritten.add(data.length);
- }
-
if (path.startsWith("/")) {
path = path.substring(1);
}
@@ -828,10 +793,6 @@ public class SolrZkClient implements Closeable {
.map(op ->
op.buildWithoutThrows(client.transactionOp()))
.collect(Collectors.toList())));
- metrics.multiOps.increment();
- if (result != null) {
- metrics.cumulativeMultiOps.add(result.size());
- }
return result;
}
@@ -1121,7 +1082,8 @@ public class SolrZkClient implements Closeable {
try {
zkCallbackExecutor.execute(
() -> {
- metrics.watchesFired.increment();
+ // Curator still does not have a way of counting fired watches
natively
+ metricsListener.watchesFired.increment();
watcher.process(event);
});
} catch (RejectedExecutionException e) {
@@ -1152,44 +1114,6 @@ public class SolrZkClient implements Closeable {
}
}
- // all fields of this class are public because ReflectMapWriter requires
them to be.
- // however the object itself is private and only this class can modify it
- public static class ZkMetrics implements ReflectMapWriter {
- @JsonProperty public final LongAdder watchesFired = new LongAdder();
- @JsonProperty public final LongAdder reads = new LongAdder();
- @JsonProperty public final LongAdder writes = new LongAdder();
- @JsonProperty public final LongAdder bytesRead = new LongAdder();
- @JsonProperty public final LongAdder bytesWritten = new LongAdder();
-
- @JsonProperty public final LongAdder multiOps = new LongAdder();
-
- @JsonProperty public final LongAdder cumulativeMultiOps = new LongAdder();
-
- @JsonProperty public final LongAdder childFetches = new LongAdder();
-
- @JsonProperty public final LongAdder cumulativeChildrenFetched = new
LongAdder();
-
- @JsonProperty public final LongAdder existsChecks = new LongAdder();
-
- @JsonProperty public final LongAdder deletes = new LongAdder();
-
- @Override
- public void writeMap(EntryWriter ew) throws IOException {
- ReflectMapWriter.super.writeMap(
- new EntryWriter() {
- @Override
- public EntryWriter put(CharSequence k, Object v) throws
IOException {
- if (v instanceof LongAdder) {
- ew.put(k, ((LongAdder) v).longValue());
- } else {
- ew.put(k, v);
- }
- return this;
- }
- });
- }
- }
-
public static class NodeData {
public final Stat stat;