This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new adae4ae2b06 [monitor][broker] add metrics for BatchMetadataStore 
(#17072)
adae4ae2b06 is described below

commit adae4ae2b06304eae9ec001357e0bd6ee6ffb053
Author: Tao Jiuming <[email protected]>
AuthorDate: Tue Nov 1 20:22:00 2022 +0800

    [monitor][broker] add metrics for BatchMetadataStore (#17072)
---
 .../broker/stats/MetadataStoreStatsTest.java       |  82 ++++++++++++++++
 .../batching/AbstractBatchedMetadataStore.java     |  25 ++++-
 .../pulsar/metadata/impl/batching/MetadataOp.java  |   2 +
 .../pulsar/metadata/impl/batching/OpDelete.java    |   6 ++
 .../pulsar/metadata/impl/batching/OpGet.java       |   6 ++
 .../metadata/impl/batching/OpGetChildren.java      |   6 ++
 .../pulsar/metadata/impl/batching/OpPut.java       |   6 ++
 .../impl/stats/BatchMetadataStoreStats.java        | 104 +++++++++++++++++++++
 8 files changed, 232 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
index da60da4601f..3ce735b797f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
@@ -137,4 +137,86 @@ public class MetadataStoreStatsTest extends BrokerTestBase 
{
         }
     }
 
+    @Test
+    public void testBatchMetadataStoreMetrics() throws Exception {
+        String ns = "prop/ns-abc1";
+        admin.namespaces().createNamespace(ns);
+
+        String topic = "persistent://prop/ns-abc1/metadata-store-" + 
UUID.randomUUID();
+        String subName = "my-sub1";
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic).create();
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic).subscriptionName(subName).subscribe();
+
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().value(UUID.randomUUID().toString()).send();
+        }
+
+        for (;;) {
+            Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+            consumer.acknowledge(message);
+        }
+
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
false, output);
+        String metricsStr = output.toString();
+        Multimap<String, PrometheusMetricsTest.Metric> metricsMap = 
PrometheusMetricsTest.parseMetrics(metricsStr);
+
+        Collection<PrometheusMetricsTest.Metric> executorQueueSize = 
metricsMap.get("pulsar_batch_metadata_store_executor_queue_size");
+        Collection<PrometheusMetricsTest.Metric> opsWaiting = 
metricsMap.get("pulsar_batch_metadata_store_queue_wait_time_ms" + "_sum");
+        Collection<PrometheusMetricsTest.Metric> batchExecuteTime = 
metricsMap.get("pulsar_batch_metadata_store_batch_execute_time_ms" + "_sum");
+        Collection<PrometheusMetricsTest.Metric> opsPerBatch = 
metricsMap.get("pulsar_batch_metadata_store_batch_size" + "_sum");
+
+        Assert.assertTrue(executorQueueSize.size() > 1);
+        Assert.assertTrue(opsWaiting.size() > 1);
+        Assert.assertTrue(batchExecuteTime.size() > 0);
+        Assert.assertTrue(opsPerBatch.size() > 0);
+
+        for (PrometheusMetricsTest.Metric m : executorQueueSize) {
+            Assert.assertEquals(m.tags.get("cluster"), "test");
+            String metadataStoreName = m.tags.get("name");
+            Assert.assertNotNull(metadataStoreName);
+            
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
+                    || 
metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
+                    || 
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
+            Assert.assertTrue(m.value >= 0);
+        }
+        for (PrometheusMetricsTest.Metric m : opsWaiting) {
+            Assert.assertEquals(m.tags.get("cluster"), "test");
+            String metadataStoreName = m.tags.get("name");
+            Assert.assertNotNull(metadataStoreName);
+            
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
+                    || 
metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
+                    || 
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
+            Assert.assertTrue(m.value >= 0);
+        }
+
+        for (PrometheusMetricsTest.Metric m : batchExecuteTime) {
+            Assert.assertEquals(m.tags.get("cluster"), "test");
+            String metadataStoreName = m.tags.get("name");
+            Assert.assertNotNull(metadataStoreName);
+            
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
+                    || 
metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
+                    || 
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
+            Assert.assertTrue(m.value > 0);
+        }
+
+        for (PrometheusMetricsTest.Metric m : opsPerBatch) {
+            Assert.assertEquals(m.tags.get("cluster"), "test");
+            String metadataStoreName = m.tags.get("name");
+            Assert.assertNotNull(metadataStoreName);
+            
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
+                    || 
metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
+                    || 
metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
+            Assert.assertTrue(m.value > 0);
+        }
+    }
+
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index dcbefb3631a..52cd81abc51 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
+import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
 import org.jctools.queues.MessagePassingQueue;
 import org.jctools.queues.MpscUnboundedArrayQueue;
 
@@ -50,7 +51,8 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
     private final int maxDelayMillis;
     private final int maxOperations;
     private final int maxSize;
-    private MetadataEventSynchronizer synchronizer;
+    private final MetadataEventSynchronizer synchronizer;
+    private final BatchMetadataStoreStats batchMetadataStoreStats;
 
     protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
         super(conf.getMetadataStoreName());
@@ -74,6 +76,8 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
         // update synchronizer and register sync listener
         synchronizer = conf.getSynchronizer();
         registerSyncLister(Optional.ofNullable(synchronizer));
+        this.batchMetadataStoreStats =
+                new BatchMetadataStoreStats(metadataStoreName, executor);
     }
 
     @Override
@@ -87,13 +91,14 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
             scheduledTask.cancel(true);
         }
         super.close();
+        this.batchMetadataStoreStats.close();
     }
 
     private void flush() {
         while (!readOps.isEmpty()) {
             List<MetadataOp> ops = new ArrayList<>();
             readOps.drain(ops::add, maxOperations);
-            batchOperation(ops);
+            internalBatchOperation(ops);
         }
 
         while (!writeOps.isEmpty()) {
@@ -114,7 +119,7 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
                 batchSize += op.size();
                 ops.add(writeOps.poll());
             }
-            batchOperation(ops);
+            internalBatchOperation(ops);
         }
 
         flushInProgress.set(false);
@@ -158,16 +163,26 @@ public abstract class AbstractBatchedMetadataStore 
extends AbstractMetadataStore
         if (enabled) {
             if (!queue.offer(op)) {
                 // Execute individually if we're failing to enqueue
-                batchOperation(Collections.singletonList(op));
+                internalBatchOperation(Collections.singletonList(op));
                 return;
             }
             if (queue.size() > maxOperations && 
flushInProgress.compareAndSet(false, true)) {
                 executor.execute(this::flush);
             }
         } else {
-            batchOperation(Collections.singletonList(op));
+            internalBatchOperation(Collections.singletonList(op));
         }
     }
 
+    private void internalBatchOperation(List<MetadataOp> ops) {
+        long now = System.currentTimeMillis();
+        for (MetadataOp op : ops) {
+            this.batchMetadataStoreStats.recordOpWaiting(now - op.created());
+        }
+        this.batchOperation(ops);
+        this.batchMetadataStoreStats.recordOpsInBatch(ops.size());
+        
this.batchMetadataStoreStats.recordBatchExecuteTime(System.currentTimeMillis() 
- now);
+    }
+
     protected abstract void batchOperation(List<MetadataOp> ops);
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java
index eb4bb9e2938..abf60f7b724 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java
@@ -34,6 +34,8 @@ public interface MetadataOp {
 
     int size();
 
+    long created();
+
     default OpGet asGet() {
         return (OpGet) this;
     }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpDelete.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpDelete.java
index 42141a2f38f..f773d40fe08 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpDelete.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpDelete.java
@@ -28,6 +28,7 @@ import lombok.Data;
 public class OpDelete implements MetadataOp {
     private final String path;
     private final Optional<Long> optExpectedVersion;
+    public final long created = System.currentTimeMillis();
 
     private final CompletableFuture<Void> future = new CompletableFuture<>();
 
@@ -40,4 +41,9 @@ public class OpDelete implements MetadataOp {
     public int size() {
         return path.length();
     }
+
+    @Override
+    public long created() {
+        return this.created;
+    }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGet.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGet.java
index a402c9471ae..8c684b9d661 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGet.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGet.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.metadata.api.GetResult;
 public class OpGet implements MetadataOp {
 
     private final String path;
+    public final long created = System.currentTimeMillis();
     private final CompletableFuture<Optional<GetResult>> future = new 
CompletableFuture<>();
 
     @Override
@@ -40,4 +41,9 @@ public class OpGet implements MetadataOp {
     public int size() {
         return path.length();
     }
+
+    @Override
+    public long created() {
+        return this.created;
+    }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGetChildren.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGetChildren.java
index 9338147aa8e..777c8f7d0a3 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGetChildren.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpGetChildren.java
@@ -28,6 +28,7 @@ import lombok.Data;
 public class OpGetChildren implements MetadataOp {
 
     private final String path;
+    public final long created = System.currentTimeMillis();
     private final CompletableFuture<List<String>> future = new 
CompletableFuture<>();
 
     @Override
@@ -39,4 +40,9 @@ public class OpGetChildren implements MetadataOp {
     public int size() {
         return path.length();
     }
+
+    @Override
+    public long created() {
+        return this.created;
+    }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpPut.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpPut.java
index 408338159fa..dc29fea9559 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpPut.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/OpPut.java
@@ -34,6 +34,7 @@ public class OpPut implements MetadataOp {
     private final Optional<Long> optExpectedVersion;
     private final EnumSet<CreateOption> options;
 
+    public final long created = System.currentTimeMillis();
     private final CompletableFuture<Stat> future = new CompletableFuture<>();
 
     public boolean isEphemeral() {
@@ -49,4 +50,9 @@ public class OpPut implements MetadataOp {
     public int size() {
         return path.length() + (data != null ? data.length : 0);
     }
+
+    @Override
+    public long created() {
+        return this.created;
+    }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
new file mode 100644
index 00000000000..f87155b9259
--- /dev/null
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.stats;
+
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Histogram;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public final class BatchMetadataStoreStats implements AutoCloseable {
+    private static final double[] BUCKETS = new double[]{1, 5, 10, 20, 50, 
100, 200, 500, 1000};
+    private static final String NAME = "name";
+
+    private static final Gauge EXECUTOR_QUEUE_SIZE = Gauge
+            .build("pulsar_batch_metadata_store_executor_queue_size", "-")
+            .labelNames(NAME)
+            .register();
+    private static final Histogram OPS_WAITING = Histogram
+            .build("pulsar_batch_metadata_store_queue_wait_time", "-")
+            .unit("ms")
+            .labelNames(NAME)
+            .buckets(BUCKETS)
+            .register();
+    private static final Histogram BATCH_EXECUTE_TIME = Histogram
+            .build("pulsar_batch_metadata_store_batch_execute_time", "-")
+            .unit("ms")
+            .labelNames(NAME)
+            .buckets(BUCKETS)
+            .register();
+    private static final Histogram OPS_PER_BATCH = Histogram
+            .build("pulsar_batch_metadata_store_batch_size", "-")
+            .labelNames(NAME)
+            .buckets(BUCKETS)
+            .register();
+
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final ThreadPoolExecutor executor;
+    private final String metadataStoreName;
+
+    private final Histogram.Child batchOpsWaitingChild;
+    private final Histogram.Child batchExecuteTimeChild;
+    private final Histogram.Child opsPerBatchChild;
+
+    public BatchMetadataStoreStats(String metadataStoreName, ExecutorService 
executor) {
+        if (executor instanceof ThreadPoolExecutor tx) {
+            this.executor = tx;
+        } else {
+            this.executor = null;
+        }
+        this.metadataStoreName = metadataStoreName;
+
+        EXECUTOR_QUEUE_SIZE.setChild(new Gauge.Child() {
+            @Override
+            public double get() {
+                return BatchMetadataStoreStats.this.executor == null ? 0 :
+                        
BatchMetadataStoreStats.this.executor.getQueue().size();
+            }
+        }, metadataStoreName);
+
+        this.batchOpsWaitingChild = OPS_WAITING.labels(metadataStoreName);
+        this.batchExecuteTimeChild = 
BATCH_EXECUTE_TIME.labels(metadataStoreName);
+        this.opsPerBatchChild = OPS_PER_BATCH.labels(metadataStoreName);
+
+    }
+
+    public void recordOpWaiting(long millis) {
+        this.batchOpsWaitingChild.observe(millis);
+    }
+
+    public void recordBatchExecuteTime(long millis) {
+        this.batchExecuteTimeChild.observe(millis);
+    }
+
+    public void recordOpsInBatch(int ops) {
+        this.opsPerBatchChild.observe(ops);
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (closed.compareAndSet(false, true)) {
+            EXECUTOR_QUEUE_SIZE.remove(this.metadataStoreName);
+            OPS_WAITING.remove(this.metadataStoreName);
+            BATCH_EXECUTE_TIME.remove(this.metadataStoreName);
+            OPS_PER_BATCH.remove(metadataStoreName);
+        }
+    }
+}

Reply via email to