This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new adae4ae2b06 [monitor][broker] add metrics for BatchMetadataStore
(#17072)
adae4ae2b06 is described below
commit adae4ae2b06304eae9ec001357e0bd6ee6ffb053
Author: Tao Jiuming <[email protected]>
AuthorDate: Tue Nov 1 20:22:00 2022 +0800
[monitor][broker] add metrics for BatchMetadataStore (#17072)
---
.../broker/stats/MetadataStoreStatsTest.java | 82 ++++++++++++++++
.../batching/AbstractBatchedMetadataStore.java | 25 ++++-
.../pulsar/metadata/impl/batching/MetadataOp.java | 2 +
.../pulsar/metadata/impl/batching/OpDelete.java | 6 ++
.../pulsar/metadata/impl/batching/OpGet.java | 6 ++
.../metadata/impl/batching/OpGetChildren.java | 6 ++
.../pulsar/metadata/impl/batching/OpPut.java | 6 ++
.../impl/stats/BatchMetadataStoreStats.java | 104 +++++++++++++++++++++
8 files changed, 232 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
index da60da4601f..3ce735b797f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
@@ -137,4 +137,86 @@ public class MetadataStoreStatsTest extends BrokerTestBase
{
}
}
+ @Test
+ public void testBatchMetadataStoreMetrics() throws Exception {
+ String ns = "prop/ns-abc1";
+ admin.namespaces().createNamespace(ns);
+
+ String topic = "persistent://prop/ns-abc1/metadata-store-" +
UUID.randomUUID();
+ String subName = "my-sub1";
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic).create();
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic).subscriptionName(subName).subscribe();
+
+ for (int i = 0; i < 100; i++) {
+ producer.newMessage().value(UUID.randomUUID().toString()).send();
+ }
+
+ for (;;) {
+ Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ consumer.acknowledge(message);
+ }
+
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, false, false, false,
false, output);
+ String metricsStr = output.toString();
+ Multimap<String, PrometheusMetricsTest.Metric> metricsMap =
PrometheusMetricsTest.parseMetrics(metricsStr);
+
+ Collection<PrometheusMetricsTest.Metric> executorQueueSize =
metricsMap.get("pulsar_batch_metadata_store_executor_queue_size");
+ Collection<PrometheusMetricsTest.Metric> opsWaiting =
metricsMap.get("pulsar_batch_metadata_store_queue_wait_time_ms" + "_sum");
+ Collection<PrometheusMetricsTest.Metric> batchExecuteTime =
metricsMap.get("pulsar_batch_metadata_store_batch_execute_time_ms" + "_sum");
+ Collection<PrometheusMetricsTest.Metric> opsPerBatch =
metricsMap.get("pulsar_batch_metadata_store_batch_size" + "_sum");
+
+ Assert.assertTrue(executorQueueSize.size() > 1);
+ Assert.assertTrue(opsWaiting.size() > 1);
+ Assert.assertTrue(batchExecuteTime.size() > 0);
+ Assert.assertTrue(opsPerBatch.size() > 0);
+
+ for (PrometheusMetricsTest.Metric m : executorQueueSize) {
+ Assert.assertEquals(m.tags.get("cluster"), "test");
+ String metadataStoreName = m.tags.get("name");
+ Assert.assertNotNull(metadataStoreName);
+
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
+ ||
metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
+ ||
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
+ Assert.assertTrue(m.value >= 0);
+ }
+ for (PrometheusMetricsTest.Metric m : opsWaiting) {
+ Assert.assertEquals(m.tags.get("cluster"), "test");
+ String metadataStoreName = m.tags.get("name");
+ Assert.assertNotNull(metadataStoreName);
+
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
+ ||
metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
+ ||
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
+ Assert.assertTrue(m.value >= 0);
+ }
+
+ for (PrometheusMetricsTest.Metric m : batchExecuteTime) {
+ Assert.assertEquals(m.tags.get("cluster"), "test");
+ String metadataStoreName = m.tags.get("name");
+ Assert.assertNotNull(metadataStoreName);
+
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
+ ||
metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
+ ||
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
+ Assert.assertTrue(m.value > 0);
+ }
+
+ for (PrometheusMetricsTest.Metric m : opsPerBatch) {
+ Assert.assertEquals(m.tags.get("cluster"), "test");
+ String metadataStoreName = m.tags.get("name");
+ Assert.assertNotNull(metadataStoreName);
+
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
+ ||
metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
+ ||
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
+ Assert.assertTrue(m.value > 0);
+ }
+ }
+
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index dcbefb3631a..52cd81abc51 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
+import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscUnboundedArrayQueue;
@@ -50,7 +51,8 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
private final int maxDelayMillis;
private final int maxOperations;
private final int maxSize;
- private MetadataEventSynchronizer synchronizer;
+ private final MetadataEventSynchronizer synchronizer;
+ private final BatchMetadataStoreStats batchMetadataStoreStats;
protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
super(conf.getMetadataStoreName());
@@ -74,6 +76,8 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
// update synchronizer and register sync listener
synchronizer = conf.getSynchronizer();
registerSyncLister(Optional.ofNullable(synchronizer));
+ this.batchMetadataStoreStats =
+ new BatchMetadataStoreStats(metadataStoreName, executor);
}
@Override
@@ -87,13 +91,14 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
scheduledTask.cancel(true);
}
super.close();
+ this.batchMetadataStoreStats.close();
}
private void flush() {
while (!readOps.isEmpty()) {
List<MetadataOp> ops = new ArrayList<>();
readOps.drain(ops::add, maxOperations);
- batchOperation(ops);
+ internalBatchOperation(ops);
}
while (!writeOps.isEmpty()) {
@@ -114,7 +119,7 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
batchSize += op.size();
ops.add(writeOps.poll());
}
- batchOperation(ops);
+ internalBatchOperation(ops);
}
flushInProgress.set(false);
@@ -158,16 +163,26 @@ public abstract class AbstractBatchedMetadataStore
extends AbstractMetadataStore
if (enabled) {
if (!queue.offer(op)) {
// Execute individually if we're failing to enqueue
- batchOperation(Collections.singletonList(op));
+ internalBatchOperation(Collections.singletonList(op));
return;
}
if (queue.size() > maxOperations &&
flushInProgress.compareAndSet(false, true)) {
executor.execute(this::flush);
}
} else {
- batchOperation(Collections.singletonList(op));
+ internalBatchOperation(Collections.singletonList(op));
}
}
+ private void internalBatchOperation(List<MetadataOp> ops) {
+ long now = System.currentTimeMillis();
+ for (MetadataOp op : ops) {
+ this.batchMetadataStoreStats.recordOpWaiting(now - op.created());
+ }
+ this.batchOperation(ops);
+ this.batchMetadataStoreStats.recordOpsInBatch(ops.size());
+
this.batchMetadataStoreStats.recordBatchExecuteTime(System.currentTimeMillis()
- now);
+ }
+
protected abstract void batchOperation(List<MetadataOp> ops);
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java
index eb4bb9e2938..abf60f7b724 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java
@@ -34,6 +34,8 @@ public interface MetadataOp {
int size();
+ long created();
+
default OpGet asGet() {
return (OpGet) this;
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpDelete.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpDelete.java
index 42141a2f38f..f773d40fe08 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpDelete.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpDelete.java
@@ -28,6 +28,7 @@ import lombok.Data;
public class OpDelete implements MetadataOp {
private final String path;
private final Optional<Long> optExpectedVersion;
+ public final long created = System.currentTimeMillis();
private final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -40,4 +41,9 @@ public class OpDelete implements MetadataOp {
public int size() {
return path.length();
}
+
+ @Override
+ public long created() {
+ return this.created;
+ }
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGet.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGet.java
index a402c9471ae..8c684b9d661 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGet.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGet.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.metadata.api.GetResult;
public class OpGet implements MetadataOp {
private final String path;
+ public final long created = System.currentTimeMillis();
private final CompletableFuture<Optional<GetResult>> future = new
CompletableFuture<>();
@Override
@@ -40,4 +41,9 @@ public class OpGet implements MetadataOp {
public int size() {
return path.length();
}
+
+ @Override
+ public long created() {
+ return this.created;
+ }
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGetChildren.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGetChildren.java
index 9338147aa8e..777c8f7d0a3 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGetChildren.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGetChildren.java
@@ -28,6 +28,7 @@ import lombok.Data;
public class OpGetChildren implements MetadataOp {
private final String path;
+ public final long created = System.currentTimeMillis();
private final CompletableFuture<List<String>> future = new
CompletableFuture<>();
@Override
@@ -39,4 +40,9 @@ public class OpGetChildren implements MetadataOp {
public int size() {
return path.length();
}
+
+ @Override
+ public long created() {
+ return this.created;
+ }
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpPut.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpPut.java
index 408338159fa..dc29fea9559 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpPut.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpPut.java
@@ -34,6 +34,7 @@ public class OpPut implements MetadataOp {
private final Optional<Long> optExpectedVersion;
private final EnumSet<CreateOption> options;
+ public final long created = System.currentTimeMillis();
private final CompletableFuture<Stat> future = new CompletableFuture<>();
public boolean isEphemeral() {
@@ -49,4 +50,9 @@ public class OpPut implements MetadataOp {
public int size() {
return path.length() + (data != null ? data.length : 0);
}
+
+ @Override
+ public long created() {
+ return this.created;
+ }
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
new file mode 100644
index 00000000000..f87155b9259
--- /dev/null
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.stats;
+
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Histogram;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public final class BatchMetadataStoreStats implements AutoCloseable {
+ private static final double[] BUCKETS = new double[]{1, 5, 10, 20, 50,
100, 200, 500, 1000};
+ private static final String NAME = "name";
+
+ private static final Gauge EXECUTOR_QUEUE_SIZE = Gauge
+ .build("pulsar_batch_metadata_store_executor_queue_size", "-")
+ .labelNames(NAME)
+ .register();
+ private static final Histogram OPS_WAITING = Histogram
+ .build("pulsar_batch_metadata_store_queue_wait_time", "-")
+ .unit("ms")
+ .labelNames(NAME)
+ .buckets(BUCKETS)
+ .register();
+ private static final Histogram BATCH_EXECUTE_TIME = Histogram
+ .build("pulsar_batch_metadata_store_batch_execute_time", "-")
+ .unit("ms")
+ .labelNames(NAME)
+ .buckets(BUCKETS)
+ .register();
+ private static final Histogram OPS_PER_BATCH = Histogram
+ .build("pulsar_batch_metadata_store_batch_size", "-")
+ .labelNames(NAME)
+ .buckets(BUCKETS)
+ .register();
+
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final ThreadPoolExecutor executor;
+ private final String metadataStoreName;
+
+ private final Histogram.Child batchOpsWaitingChild;
+ private final Histogram.Child batchExecuteTimeChild;
+ private final Histogram.Child opsPerBatchChild;
+
+ public BatchMetadataStoreStats(String metadataStoreName, ExecutorService
executor) {
+ if (executor instanceof ThreadPoolExecutor tx) {
+ this.executor = tx;
+ } else {
+ this.executor = null;
+ }
+ this.metadataStoreName = metadataStoreName;
+
+ EXECUTOR_QUEUE_SIZE.setChild(new Gauge.Child() {
+ @Override
+ public double get() {
+ return BatchMetadataStoreStats.this.executor == null ? 0 :
+
BatchMetadataStoreStats.this.executor.getQueue().size();
+ }
+ }, metadataStoreName);
+
+ this.batchOpsWaitingChild = OPS_WAITING.labels(metadataStoreName);
+ this.batchExecuteTimeChild =
BATCH_EXECUTE_TIME.labels(metadataStoreName);
+ this.opsPerBatchChild = OPS_PER_BATCH.labels(metadataStoreName);
+
+ }
+
+ public void recordOpWaiting(long millis) {
+ this.batchOpsWaitingChild.observe(millis);
+ }
+
+ public void recordBatchExecuteTime(long millis) {
+ this.batchExecuteTimeChild.observe(millis);
+ }
+
+ public void recordOpsInBatch(int ops) {
+ this.opsPerBatchChild.observe(ops);
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (closed.compareAndSet(false, true)) {
+ EXECUTOR_QUEUE_SIZE.remove(this.metadataStoreName);
+ OPS_WAITING.remove(this.metadataStoreName);
+ BATCH_EXECUTE_TIME.remove(this.metadataStoreName);
+ OPS_PER_BATCH.remove(metadataStoreName);
+ }
+ }
+}