asafm commented on code in PR #17041:
URL: https://github.com/apache/pulsar/pull/17041#discussion_r943389948
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java:
##########
@@ -126,6 +128,8 @@ public CompletableFuture<Boolean> asyncReload(String key,
Boolean oldValue,
}
}
});
+
+ this.stats = MetadataStoreStats.create();
Review Comment:
why `this.`? why note `stats = ` No other `stats` argument here.
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java:
##########
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.stats;
+
+import io.prometheus.client.Counter;
+import io.prometheus.client.Histogram;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public final class MetadataStoreStats {
+ private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false);
+ private static final double[] BUCKETS = new double[]{1, 3, 5, 10, 20, 50,
100, 200, 500, 1000, 2000, 5000};
+
+ private final Histogram getOpsLatency;
+ private final Histogram delOpsLatency;
+ private final Histogram putOpsLatency;
+ private final Counter getFailedCounter;
+ private final Counter delFailedCounter;
+ private final Counter putFailedCounter;
+ private final Counter putBytesCounter;
+
+ private MetadataStoreStats() {
+ getOpsLatency =
Histogram.build("pulsar_metadata_store_get_ops_latency", "-")
Review Comment:
You're missing units in name `pulsar_metadata_store_get_ops_latency` -->
`pulsar_metadata_store_get_ops_latency_ms`
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1522,6 +1517,83 @@ public void testSplitTopicAndPartitionLabel() throws
Exception {
consumer2.close();
}
+
+ @Test
+ public void testMetadataStoreStats() throws Exception {
+ String ns = "prop/ns-abc1";
+ admin.namespaces().createNamespace(ns);
+
+ String topic = "persistent://prop/ns-abc1/metadata-store-" +
UUID.randomUUID();
+ String subName = "my-sub1";
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic).create();
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic).subscriptionName(subName).subscribe();
+
+ for (int i = 0; i < 100; i++) {
+ producer.newMessage().value(UUID.randomUUID().toString()).send();
+ }
+
+ for (;;) {
+ Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ consumer.acknowledge(message);
+ }
+
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, false, false, false,
false, output);
+ String metricsStr = output.toString();
+ Multimap<String, Metric> metricsMap = parseMetrics(metricsStr);
+
+ Collection<Metric> getOpsFailed =
metricsMap.get("pulsar_metadata_store_get_ops_failed" + "_total");
+ Collection<Metric> delOpsFailed =
metricsMap.get("pulsar_metadata_store_del_ops_failed" + "_total");
+ Collection<Metric> putOpsFailed =
metricsMap.get("pulsar_metadata_store_put_ops_failed" + "_total");
+
+ Collection<Metric> getOpsLatency =
metricsMap.get("pulsar_metadata_store_get_ops_latency" + "_sum");
+ Collection<Metric> delOpsLatency =
metricsMap.get("pulsar_metadata_store_del_ops_latency" + "_sum");
+ Collection<Metric> putOpsLatency =
metricsMap.get("pulsar_metadata_store_put_ops_latency" + "_sum");
+
+ Collection<Metric> putBytes =
metricsMap.get("pulsar_metadata_store_put_bytes" + "_total");
+
+ Assert.assertTrue(getOpsFailed.size() > 0);
+ Assert.assertTrue(delOpsFailed.size() > 0);
+ Assert.assertTrue(putOpsFailed.size() > 0);
+ Assert.assertTrue(getOpsLatency.size() > 0);
+ Assert.assertTrue(delOpsLatency.size() > 0);
+ Assert.assertTrue(putOpsLatency.size() > 0);
+ Assert.assertTrue(putBytes.size() > 0);
+
+ for (Metric m : getOpsFailed) {
+ Assert.assertEquals(m.tags.get("cluster"), "test");
Review Comment:
Why are you not checking the value of this sample?
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java:
##########
@@ -235,10 +239,20 @@ public <T> MetadataCache<T>
getMetadataCache(MetadataSerde<T> serde) {
@Override
public CompletableFuture<Optional<GetResult>> get(String path) {
+ long start = System.currentTimeMillis();
if (!isValidPath(path)) {
- return FutureUtil.failedFuture(new
MetadataStoreException.InvalidPathException(path));
+ stats.recordGetOpsFailed();
+ return FutureUtil
+ .failedFuture(new
MetadataStoreException.InvalidPathException(path));
}
- return storeGet(path);
+ return storeGet(path)
+ .whenComplete((v, t) -> {
+ if (t != null) {
+ stats.recordGetOpsFailed();
+ } else {
+ stats.recordGetOpsLatency(System.currentTimeMillis() -
start);
Review Comment:
This method name is inconsistent.
I suggest
```
stats.recordGetOpsFailed(opLatency);
status.recordGetOpsSucceeded(opLatency)
```
1. You want consistent naming across failure and success
2. You want latency also for failed ops - sometimes they are the root cause
- seeing spike in latency will explain problems.
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java:
##########
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.stats;
+
+import io.prometheus.client.Counter;
+import io.prometheus.client.Histogram;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public final class MetadataStoreStats {
+ private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false);
+ private static final double[] BUCKETS = new double[]{1, 3, 5, 10, 20, 50,
100, 200, 500, 1000, 2000, 5000};
+
+ private final Histogram getOpsLatency;
+ private final Histogram delOpsLatency;
+ private final Histogram putOpsLatency;
+ private final Counter getFailedCounter;
+ private final Counter delFailedCounter;
+ private final Counter putFailedCounter;
+ private final Counter putBytesCounter;
+
+ private MetadataStoreStats() {
+ getOpsLatency =
Histogram.build("pulsar_metadata_store_get_ops_latency", "-")
+ .buckets(BUCKETS)
+ .register();
+ delOpsLatency =
Histogram.build("pulsar_metadata_store_del_ops_latency", "-")
+ .buckets(BUCKETS)
+ .register();
+ putOpsLatency =
Histogram.build("pulsar_metadata_store_put_ops_latency", "-")
+ .buckets(BUCKETS)
+ .register();
+
+ getFailedCounter =
Counter.build("pulsar_metadata_store_get_ops_failed", "-")
+ .register();
+ delFailedCounter =
Counter.build("pulsar_metadata_store_del_ops_failed", "-")
+ .register();
+ putFailedCounter =
Counter.build("pulsar_metadata_store_put_ops_failed", "-")
+ .register();
+ putBytesCounter = Counter.build("pulsar_metadata_store_put_bytes", "-")
+ .register();
+ }
+
+ public void recordGetOpsLatency(long millis) {
+ this.getOpsLatency.observe(millis);
Review Comment:
```suggestion
getOpsLatency.observe(millis);
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1522,6 +1517,83 @@ public void testSplitTopicAndPartitionLabel() throws
Exception {
consumer2.close();
}
+
+ @Test
+ public void testMetadataStoreStats() throws Exception {
+ String ns = "prop/ns-abc1";
+ admin.namespaces().createNamespace(ns);
+
+ String topic = "persistent://prop/ns-abc1/metadata-store-" +
UUID.randomUUID();
+ String subName = "my-sub1";
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic).create();
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic).subscriptionName(subName).subscribe();
+
+ for (int i = 0; i < 100; i++) {
+ producer.newMessage().value(UUID.randomUUID().toString()).send();
+ }
+
+ for (;;) {
+ Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ consumer.acknowledge(message);
+ }
+
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, false, false, false,
false, output);
+ String metricsStr = output.toString();
+ Multimap<String, Metric> metricsMap = parseMetrics(metricsStr);
+
+ Collection<Metric> getOpsFailed =
metricsMap.get("pulsar_metadata_store_get_ops_failed" + "_total");
+ Collection<Metric> delOpsFailed =
metricsMap.get("pulsar_metadata_store_del_ops_failed" + "_total");
+ Collection<Metric> putOpsFailed =
metricsMap.get("pulsar_metadata_store_put_ops_failed" + "_total");
+
+ Collection<Metric> getOpsLatency =
metricsMap.get("pulsar_metadata_store_get_ops_latency" + "_sum");
+ Collection<Metric> delOpsLatency =
metricsMap.get("pulsar_metadata_store_del_ops_latency" + "_sum");
+ Collection<Metric> putOpsLatency =
metricsMap.get("pulsar_metadata_store_put_ops_latency" + "_sum");
+
+ Collection<Metric> putBytes =
metricsMap.get("pulsar_metadata_store_put_bytes" + "_total");
+
+ Assert.assertTrue(getOpsFailed.size() > 0);
Review Comment:
import static Assert
##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java:
##########
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.stats;
+
+import io.prometheus.client.Counter;
+import io.prometheus.client.Histogram;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public final class MetadataStoreStats {
+ private static final AtomicBoolean INITIALIZED = new AtomicBoolean(false);
+ private static final double[] BUCKETS = new double[]{1, 3, 5, 10, 20, 50,
100, 200, 500, 1000, 2000, 5000};
+
+ private final Histogram getOpsLatency;
+ private final Histogram delOpsLatency;
+ private final Histogram putOpsLatency;
+ private final Counter getFailedCounter;
+ private final Counter delFailedCounter;
+ private final Counter putFailedCounter;
+ private final Counter putBytesCounter;
+
+ private MetadataStoreStats() {
+ getOpsLatency =
Histogram.build("pulsar_metadata_store_get_ops_latency", "-")
+ .buckets(BUCKETS)
+ .register();
+ delOpsLatency =
Histogram.build("pulsar_metadata_store_del_ops_latency", "-")
+ .buckets(BUCKETS)
+ .register();
+ putOpsLatency =
Histogram.build("pulsar_metadata_store_put_ops_latency", "-")
+ .buckets(BUCKETS)
+ .register();
+
+ getFailedCounter =
Counter.build("pulsar_metadata_store_get_ops_failed", "-")
+ .register();
+ delFailedCounter =
Counter.build("pulsar_metadata_store_del_ops_failed", "-")
+ .register();
+ putFailedCounter =
Counter.build("pulsar_metadata_store_put_ops_failed", "-")
+ .register();
+ putBytesCounter = Counter.build("pulsar_metadata_store_put_bytes", "-")
+ .register();
+ }
+
+ public void recordGetOpsLatency(long millis) {
+ this.getOpsLatency.observe(millis);
+ }
+
+ public void recordDelOpsLatency(long millis) {
+ this.delOpsLatency.observe(millis);
+ }
+
+ public void recordPutOpsLatency(long millis, int bytes) {
+ this.putOpsLatency.observe(millis);
+ this.putBytesCounter.inc(bytes);
+ }
+
+ public void recordGetOpsFailed() {
+ this.getFailedCounter.inc();
+ }
+
+ public void recordDelOpsFailed() {
+ this.delFailedCounter.inc();
+ }
+
+ public void recordPutOpsFailed() {
+ this.putFailedCounter.inc();
+ }
+
+ private static MetadataStoreStats instance;
Review Comment:
1. static member should be located at the beginning of the class.
2. static variables should be in uppercase.
3. There is an easier approach - eager init:
```java
private static MetadataStoreStats INSTANCE = new MetadataStoreStats();
//
//...
//
public static MetadataStoreStats getInstance() {
return INSTANCE;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]