This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 349f5b9f6c19b81766af054b19034830d7f2c41d
Author: GuoJiwei <[email protected]>
AuthorDate: Tue Aug 10 18:39:32 2021 +0800

    Add metrics for writing or reading size of cursor (#11500)
    
    ### Motivation
    Currently, there is no visibility about the following activities:
    - How many bytes are written from a cursor update?
    - How many bytes are read from loading a cursor?
    So when the bookkeeper cluster is having heavy traffic, it is hard to tell 
which topic or namespace contributes most of the traffic.
    
    Add metrics at the broker about how many bytes are written and read per 
cursor/namespace.
    
    ### Modifications
    Add metrics `writeLedgerSize`, `writeLedgerLogicalSize`, `readLedgerSize`.
    
    (cherry picked from commit bfae8f6d32c099740342767e77a777469ef6bd5f)
---
 .../bookkeeper/mledger/ManagedCursorMXBean.java    | 34 +++++++++++-
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  5 +-
 .../mledger/impl/ManagedCursorMXBeanImpl.java      | 30 +++++++++++
 .../broker/stats/metrics/ManagedCursorMetrics.java |  3 ++
 .../broker/stats/ManagedCursorMetricsTest.java     | 63 ++++++++++++++++++++++
 site2/docs/reference-metrics.md                    |  3 ++
 6 files changed, 135 insertions(+), 3 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorMXBean.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorMXBean.java
index ffc0af2..9abe50d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorMXBean.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorMXBean.java
@@ -39,13 +39,13 @@ public interface ManagedCursorMXBean {
     String getLedgerName();
 
     /**
-     * persist cursor by ledger
+     * persist cursor by ledger.
      * @param success
      */
     void persistToLedger(boolean success);
 
     /**
-     * persist cursor by zookeeper
+     * persist cursor by zookeeper.
      * @param success
      */
     void persistToZookeeper(boolean success);
@@ -70,4 +70,34 @@ public interface ManagedCursorMXBean {
      */
     long getPersistZookeeperErrors();
 
+    /**
+     * Add write data to a ledger of a cursor (in bytes).
+     * This will update writeCursorLedgerLogicalSize and writeCursorLedgerSize.
+     *
+     * @param size Size of data written to cursor (in bytes)
+     */
+    void addWriteCursorLedgerSize(long size);
+
+    /**
+     * Add read data from a ledger of a cursor (in bytes).
+     *
+     * @param size Size of data read from cursor (in bytes)
+     */
+    void addReadCursorLedgerSize(long size);
+
+    /**
+     * @return the size of data written to cursor (in bytes)
+     */
+    long getWriteCursorLedgerSize();
+
+    /**
+     * @return the size of data written to cursor without replicas (in bytes)
+     */
+    long getWriteCursorLedgerLogicalSize();
+
+    /**
+     * @return the size of data read from cursor (in bytes)
+     */
+    long getReadCursorLedgerSize();
+
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index da93df2..82b4cfc 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -421,6 +421,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 }
 
                 LedgerEntry entry = seq.nextElement();
+                mbean.addReadCursorLedgerSize(entry.getLength());
                 PositionInfo positionInfo;
                 try {
                     positionInfo = PositionInfo.parseFrom(entry.getEntry());
@@ -2599,7 +2600,8 @@ public class ManagedCursorImpl implements ManagedCursor {
         }
 
         checkNotNull(lh);
-        lh.asyncAddEntry(pi.toByteArray(), (rc, lh1, entryId, ctx) -> {
+        byte[] data = pi.toByteArray();
+        lh.asyncAddEntry(data, (rc, lh1, entryId, ctx) -> {
             if (rc == BKException.Code.OK) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Updated cursor {} position {} in 
meta-ledger {}", ledger.getName(), name, position,
@@ -2614,6 +2616,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                 }
 
                 mbean.persistToLedger(true);
+                mbean.addWriteCursorLedgerSize(data.length);
                 callback.operationComplete();
             } else {
                 log.warn("[{}] Error updating cursor {} position {} in 
meta-ledger {}: {}", ledger.getName(), name,
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java
index 56a2300..fd9dcc1 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorMXBeanImpl.java
@@ -30,6 +30,10 @@ public class ManagedCursorMXBeanImpl implements 
ManagedCursorMXBean {
     private final LongAdder persistZookeeperSucceed = new LongAdder();
     private final LongAdder persistZookeeperFailed = new LongAdder();
 
+    private final LongAdder writeCursorLedgerSize = new LongAdder();
+    private final LongAdder writeCursorLedgerLogicalSize = new LongAdder();
+    private final LongAdder readCursorLedgerSize = new LongAdder();
+
     private final ManagedCursor managedCursor;
 
     public ManagedCursorMXBeanImpl(ManagedCursor managedCursor) {
@@ -83,4 +87,30 @@ public class ManagedCursorMXBeanImpl implements 
ManagedCursorMXBean {
     public long getPersistZookeeperErrors() {
         return persistZookeeperFailed.longValue();
     }
+
+    @Override
+    public void addWriteCursorLedgerSize(final long size) {
+        writeCursorLedgerSize.add(size * ((ManagedCursorImpl) 
managedCursor).config.getWriteQuorumSize());
+        writeCursorLedgerLogicalSize.add(size);
+    }
+
+    @Override
+    public void addReadCursorLedgerSize(final long size) {
+        readCursorLedgerSize.add(size);
+    }
+
+    @Override
+    public long getWriteCursorLedgerSize() {
+        return writeCursorLedgerSize.longValue();
+    }
+
+    @Override
+    public long getWriteCursorLedgerLogicalSize() {
+        return writeCursorLedgerLogicalSize.longValue();
+    }
+
+    @Override
+    public long getReadCursorLedgerSize() {
+        return readCursorLedgerSize.longValue();
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedCursorMetrics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedCursorMetrics.java
index 7000aae..17fbc27 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedCursorMetrics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedCursorMetrics.java
@@ -77,6 +77,9 @@ public class ManagedCursorMetrics extends AbstractMetrics {
                 metrics.put("brk_ml_cursor_persistLedgerErrors", 
cStats.getPersistLedgerErrors());
                 metrics.put("brk_ml_cursor_persistZookeeperSucceed", 
cStats.getPersistZookeeperSucceed());
                 metrics.put("brk_ml_cursor_persistZookeeperErrors", 
cStats.getPersistZookeeperErrors());
+                metrics.put("brk_ml_cursor_writeLedgerSize", 
cStats.getWriteCursorLedgerSize());
+                metrics.put("brk_ml_cursor_writeLedgerLogicalSize", 
cStats.getWriteCursorLedgerLogicalSize());
+                metrics.put("brk_ml_cursor_readLedgerSize", 
cStats.getReadCursorLedgerSize());
                 metricsCollection.add(metrics);
             }
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
index ff35a4a..75b9f9b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.stats;
 
+import lombok.Cleanup;
 import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
@@ -90,4 +91,66 @@ public class ManagedCursorMetricsTest extends 
MockedPulsarServiceBaseTest {
         
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"),
 0L);
     }
 
+    @Test
+    public void testCursorReadWriteMetrics() throws Exception {
+        final String subName = "read-write";
+        final String topicName = 
"persistent://my-namespace/use/my-ns/read-write";
+        final int messageSize = 10;
+
+        ManagedCursorMetrics metrics = new ManagedCursorMetrics(pulsar);
+
+        List<Metrics> metricsList = metrics.generate();
+        Assert.assertTrue(metricsList.isEmpty());
+
+        metricsList = metrics.generate();
+        Assert.assertTrue(metricsList.isEmpty());
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionName(subName)
+                .subscribe();
+
+        @Cleanup
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionName(subName + "-2")
+                .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+
+        for (PulsarMockLedgerHandle ledgerHandle : 
mockBookKeeper.getLedgerMap().values()) {
+            ledgerHandle.close();
+        }
+
+        for (int i = 0; i < messageSize; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+            if (i % 2 == 0) {
+                consumer.acknowledge(consumer.receive().getMessageId());
+            } else {
+                consumer2.acknowledge(consumer.receive().getMessageId());
+            }
+        }
+        metricsList = metrics.generate();
+        Assert.assertEquals(metricsList.size(), 3);
+        
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerSize"),
 26L);
+        
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"),
 13L);
+        
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_readLedgerSize"),
 0L);
+
+        
Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerSize"),
 26L);
+        
Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"),
 13L);
+        
Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_readLedgerSize"),
 0L);
+
+        
Assert.assertEquals(metricsList.get(2).getMetrics().get("brk_ml_cursor_writeLedgerSize"),
 52L);
+        
Assert.assertEquals(metricsList.get(2).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"),
 26L);
+        
Assert.assertEquals(metricsList.get(2).getMetrics().get("brk_ml_cursor_readLedgerSize"),
 0L);
+    }
 }
diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md
index aa69b79..839029b 100644
--- a/site2/docs/reference-metrics.md
+++ b/site2/docs/reference-metrics.md
@@ -286,6 +286,9 @@ brk_ml_cursor_persistLedgerErrors(namespace="", 
ledger_name="", cursor_name:"")|
 brk_ml_cursor_persistZookeeperSucceed(namespace="", ledger_name="", 
cursor_name:"")|Gauge|The number of acknowledgment states that is persistent to 
ZooKeeper.
 brk_ml_cursor_persistZookeeperErrors(namespace="", ledger_name="", 
cursor_name:"")|Gauge|The number of ledger errors occurred when acknowledgment 
states fail to be persistent to ZooKeeper.
 brk_ml_cursor_nonContiguousDeletedMessagesRange(namespace="", ledger_name="", 
cursor_name:"")|Gauge|The number of non-contiguous deleted messages ranges.
+brk_ml_cursor_writeLedgerSize(namespace="", ledger_name="", 
cursor_name:"")|Gauge|The size of write to ledger.
+brk_ml_cursor_writeLedgerLogicalSize(namespace="", ledger_name="", 
cursor_name:"")|Gauge|The size of write to ledger (accounting for without 
replicas).
+brk_ml_cursor_readLedgerSize(namespace="", ledger_name="", 
cursor_name:"")|Gauge|The size of read from ledger.
 
 ### LoadBalancing metrics
 All the loadbalancing metrics are labelled with the following labels:

Reply via email to