This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 82086985bd9 [improve] [broker] Add subscription prefix for internal 
reader (#23044)
82086985bd9 is described below

commit 82086985bd99be9574f41c7b0be428aaf2e4d2ef
Author: Hang Chen <[email protected]>
AuthorDate: Thu Jul 25 10:55:28 2024 +0800

    [improve] [broker] Add subscription prefix for internal reader (#23044)
    
    ### Motivation
    We have many system topics, such as
    
    __change_events
    __transaction_buffer_snapshot
    __transaction_buffer_snapshot_indexes
    __transaction_buffer_snapshot_segments
    transaction_coordinator_assign
    _transaction_log
    __transaction_pending_ack
    In Pulsar Broker, we create an internal reader to fetch messages from those 
system topics. Due to we do not specify the subscription prefix, the reader 
will generate a random subscription name for each reader.
    
    In PIP-355, we introduced a broker-level metric named 
pulsar_broker_out_bytes_total, which separate the system subscription traffic 
bytes and user subscription traffic bytes. Due to the internal readers don't 
have a subscription prefix, we group the internal reader's traffic bytes into 
user subscription traffic.
    
    ### Modifications
    In this PR, we introduce a system subscription prefix named __system_reader 
and group the internal reader's traffic into system subscription traffic bytes 
in metric pulsar_broker_out_bytes_total.
---
 .../service/nonpersistent/NonPersistentTopic.java  |  6 ++
 .../broker/service/persistent/PersistentTopic.java |  5 +-
 .../systopic/TopicPoliciesSystemTopicClient.java   |  2 +
 ...sactionBufferSnapshotBaseSystemTopicClient.java |  2 +
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 70 ++++++++++++++--------
 .../pulsar/common/naming/SystemTopicNames.java     |  5 ++
 6 files changed, 63 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index b5d9e53c8eb..9d2f3588dc9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -76,6 +76,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
@@ -1213,6 +1214,11 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
                 SubscriptionStatsImpl stats = sub.getStats(getStatsOptions);
                 bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
                 msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+
+                if (isSystemCursor(subscriptionName)
+                        || 
subscriptionName.startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)) {
+                    
bytesOutFromRemovedSystemSubscriptions.add(stats.bytesOutCounter);
+                }
             }
         }, brokerService.executor());
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e52a69a1e2f..9cb4f60f535 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1405,7 +1405,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
             msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
 
-            if (isSystemCursor(subscriptionName)) {
+            if (isSystemCursor(subscriptionName)
+                    || 
subscriptionName.startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)) {
                 
bytesOutFromRemovedSystemSubscriptions.add(stats.bytesOutCounter);
             }
         }
@@ -2627,7 +2628,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 topicMetricBean.value += v.value;
             });
 
-            if (isSystemCursor(name)) {
+            if (isSystemCursor(name) || 
name.startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)) {
                 stats.bytesOutInternalCounter += subStats.bytesOutCounter;
             }
         });
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
index b7cff2e08c2..ea3ac507d11 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.events.ActionType;
 import org.apache.pulsar.common.events.PulsarEvent;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,6 +70,7 @@ public class TopicPoliciesSystemTopicClient extends 
SystemTopicClientBase<Pulsar
     protected CompletableFuture<Reader<PulsarEvent>> newReaderAsyncInternal() {
         return client.newReader(avroSchema)
                 .topic(topicName.toString())
+                .subscriptionRolePrefix(SystemTopicNames.SYSTEM_READER_PREFIX)
                 .startMessageId(MessageId.earliest)
                 .readCompacted(true)
                 .createAsync()
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java
index 8efa983a64d..4023cd88bef 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 
 @Slf4j
@@ -201,6 +202,7 @@ public class  
TransactionBufferSnapshotBaseSystemTopicClient<T> extends SystemTo
     protected CompletableFuture<Reader<T>> newReaderAsyncInternal() {
         return client.newReader(Schema.AVRO(schemaType))
                 .topic(topicName.toString())
+                .subscriptionRolePrefix(SystemTopicNames.SYSTEM_READER_PREFIX)
                 .startMessageId(MessageId.earliest)
                 .readCompacted(true)
                 .createAsync()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 0d7f8eb0aa3..81c0acba440 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -30,6 +30,7 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
 import io.jsonwebtoken.SignatureAlgorithm;
 import io.prometheus.client.Collector;
 import java.io.ByteArrayOutputStream;
@@ -87,6 +88,9 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
 import org.apache.zookeeper.CreateMode;
@@ -209,26 +213,31 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
     public void testBrokerMetrics() throws Exception {
         cleanup();
         conf.setAdditionalSystemCursorNames(Set.of("test-cursor"));
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setSystemTopicEnabled(true);
         setup();
 
-        Producer<byte[]> p1 = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
-        Producer<byte[]> p2 = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
+        admin.tenants().createTenant("test-tenant",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("test-tenant/test-ns", 4);
+        Producer<byte[]> p1 = 
pulsarClient.newProducer().topic("persistent://test-tenant/test-ns/my-topic1").create();
+        Producer<byte[]> p2 = 
pulsarClient.newProducer().topic("persistent://test-tenant/test-ns/my-topic2").create();
         // system topic
-        Producer<byte[]> p3 = 
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/__change_events").create();
+        Producer<byte[]> p3 = 
pulsarClient.newProducer().topic("persistent://test-tenant/test-ns/__test-topic").create();
 
         Consumer<byte[]> c1 = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/my-topic1")
+                .topic("persistent://test-tenant/test-ns/my-topic1")
                 .subscriptionName("test")
                 .subscribe();
 
         // additional system cursor
         Consumer<byte[]> c2 = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/my-topic2")
+                .topic("persistent://test-tenant/test-ns/my-topic2")
                 .subscriptionName("test-cursor")
                 .subscribe();
 
         Consumer<byte[]> c3 = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/__change_events")
+                .topic("persistent://test-tenant/test-ns/__test-topic")
                 .subscriptionName("test-v1")
                 .subscribe();
 
@@ -250,7 +259,8 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         c1.unsubscribe();
         c2.unsubscribe();
 
-        
//admin.topics().unload("persistent://my-property/use/my-ns/my-topic1");
+        
admin.topicPolicies().setRetention("persistent://test-tenant/test-ns/my-topic2",
+                        new RetentionPolicies(60, 1024));
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
         PrometheusMetricsTestUtil.generate(pulsar, true, false, false, 
statsOut);
@@ -263,33 +273,43 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
 
         List<Metric> bytesOutTotal = (List<Metric>) 
metrics.get("pulsar_broker_out_bytes_total");
         List<Metric> bytesInTotal = (List<Metric>) 
metrics.get("pulsar_broker_in_bytes_total");
+        List<Metric> topicLevelBytesOutTotal = (List<Metric>) 
metrics.get("pulsar_out_bytes_total");
+
         assertEquals(bytesOutTotal.size(), 2);
         assertEquals(bytesInTotal.size(), 2);
+        assertEquals(topicLevelBytesOutTotal.size(), 3);
 
         double systemOutBytes = 0.0;
         double userOutBytes = 0.0;
-        switch 
(bytesOutTotal.get(0).tags.get("system_subscription").toString()) {
-            case "true":
-                systemOutBytes = bytesOutTotal.get(0).value;
-                userOutBytes = bytesOutTotal.get(1).value;
-            case "false":
-                systemOutBytes = bytesOutTotal.get(1).value;
-                userOutBytes = bytesOutTotal.get(0).value;
-        }
-
         double systemInBytes = 0.0;
         double userInBytes = 0.0;
-        switch (bytesInTotal.get(0).tags.get("system_topic").toString()) {
-            case "true":
-                systemInBytes = bytesInTotal.get(0).value;
-                userInBytes = bytesInTotal.get(1).value;
-            case "false":
-                systemInBytes = bytesInTotal.get(1).value;
-                userInBytes = bytesInTotal.get(0).value;
+
+        for (Metric metric : bytesOutTotal) {
+            if (metric.tags.get("system_subscription").equals("true")) {
+                systemOutBytes = metric.value;
+            } else {
+                userOutBytes = metric.value;
+            }
+        }
+
+        for (Metric metric : bytesInTotal) {
+            if (metric.tags.get("system_topic").equals("true")) {
+                systemInBytes = metric.value;
+            } else {
+                userInBytes = metric.value;
+            }
+        }
+
+        double systemCursorOutBytes = 0.0;
+        for (Metric metric : topicLevelBytesOutTotal) {
+            if 
(metric.tags.get("subscription").startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)
+                    || 
metric.tags.get("subscription").equals(Compactor.COMPACTION_SUBSCRIPTION)) {
+                systemCursorOutBytes = metric.value;
+            }
         }
 
-        assertEquals(userOutBytes / 2, systemOutBytes);
-        assertEquals(userInBytes / 2, systemInBytes);
+        assertEquals(systemCursorOutBytes, systemInBytes);
+        assertEquals(userOutBytes / 2, systemOutBytes - systemCursorOutBytes);
         assertEquals(userOutBytes + systemOutBytes, userInBytes + 
systemInBytes);
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
index 716d9bc31fa..9a3689912c9 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
@@ -51,6 +51,11 @@ public class SystemTopicNames {
 
     public static final String PENDING_ACK_STORE_CURSOR_NAME = 
"__pending_ack_state";
 
+    /**
+     * Prefix for the system reader for all the system topics.
+     */
+    public static final String SYSTEM_READER_PREFIX = "__system_reader";
+
     /**
      * The set of all local topic names declared above.
      */

Reply via email to