This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 82086985bd9 [improve] [broker] Add subscription prefix for internal
reader (#23044)
82086985bd9 is described below
commit 82086985bd99be9574f41c7b0be428aaf2e4d2ef
Author: Hang Chen <[email protected]>
AuthorDate: Thu Jul 25 10:55:28 2024 +0800
[improve] [broker] Add subscription prefix for internal reader (#23044)
### Motivation
We have many system topics, such as
__change_events
__transaction_buffer_snapshot
__transaction_buffer_snapshot_indexes
__transaction_buffer_snapshot_segments
transaction_coordinator_assign
_transaction_log
__transaction_pending_ack
In Pulsar Broker, we create an internal reader to fetch messages from those
system topics. Due to we do not specify the subscription prefix, the reader
will generate a random subscription name for each reader.
In PIP-355, we introduced a broker-level metric named
pulsar_broker_out_bytes_total, which separate the system subscription traffic
bytes and user subscription traffic bytes. Due to the internal readers don't
have a subscription prefix, we group the internal reader's traffic bytes into
user subscription traffic.
### Modifications
In this PR, we introduce a system subscription prefix named __system_reader
and group the internal reader's traffic into system subscription traffic bytes
in metric pulsar_broker_out_bytes_total.
---
.../service/nonpersistent/NonPersistentTopic.java | 6 ++
.../broker/service/persistent/PersistentTopic.java | 5 +-
.../systopic/TopicPoliciesSystemTopicClient.java | 2 +
...sactionBufferSnapshotBaseSystemTopicClient.java | 2 +
.../pulsar/broker/stats/PrometheusMetricsTest.java | 70 ++++++++++++++--------
.../pulsar/common/naming/SystemTopicNames.java | 5 ++
6 files changed, 63 insertions(+), 27 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index b5d9e53c8eb..9d2f3588dc9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -76,6 +76,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
+import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
@@ -1213,6 +1214,11 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
SubscriptionStatsImpl stats = sub.getStats(getStatsOptions);
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
+
+ if (isSystemCursor(subscriptionName)
+ ||
subscriptionName.startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)) {
+
bytesOutFromRemovedSystemSubscriptions.add(stats.bytesOutCounter);
+ }
}
}, brokerService.executor());
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e52a69a1e2f..9cb4f60f535 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1405,7 +1405,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
- if (isSystemCursor(subscriptionName)) {
+ if (isSystemCursor(subscriptionName)
+ ||
subscriptionName.startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)) {
bytesOutFromRemovedSystemSubscriptions.add(stats.bytesOutCounter);
}
}
@@ -2627,7 +2628,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
topicMetricBean.value += v.value;
});
- if (isSystemCursor(name)) {
+ if (isSystemCursor(name) ||
name.startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)) {
stats.bytesOutInternalCounter += subStats.bytesOutCounter;
}
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
index b7cff2e08c2..ea3ac507d11 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.common.events.PulsarEvent;
+import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,6 +70,7 @@ public class TopicPoliciesSystemTopicClient extends
SystemTopicClientBase<Pulsar
protected CompletableFuture<Reader<PulsarEvent>> newReaderAsyncInternal() {
return client.newReader(avroSchema)
.topic(topicName.toString())
+ .subscriptionRolePrefix(SystemTopicNames.SYSTEM_READER_PREFIX)
.startMessageId(MessageId.earliest)
.readCompacted(true)
.createAsync()
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java
index 8efa983a64d..4023cd88bef 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
@Slf4j
@@ -201,6 +202,7 @@ public class
TransactionBufferSnapshotBaseSystemTopicClient<T> extends SystemTo
protected CompletableFuture<Reader<T>> newReaderAsyncInternal() {
return client.newReader(Schema.AVRO(schemaType))
.topic(topicName.toString())
+ .subscriptionRolePrefix(SystemTopicNames.SYSTEM_READER_PREFIX)
.startMessageId(MessageId.earliest)
.readCompacted(true)
.createAsync()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 0d7f8eb0aa3..81c0acba440 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -30,6 +30,7 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.base.Splitter;
import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
import io.jsonwebtoken.SignatureAlgorithm;
import io.prometheus.client.Collector;
import java.io.ByteArrayOutputStream;
@@ -87,6 +88,9 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.apache.zookeeper.CreateMode;
@@ -209,26 +213,31 @@ public class PrometheusMetricsTest extends BrokerTestBase
{
public void testBrokerMetrics() throws Exception {
cleanup();
conf.setAdditionalSystemCursorNames(Set.of("test-cursor"));
+ conf.setTopicLevelPoliciesEnabled(true);
+ conf.setSystemTopicEnabled(true);
setup();
- Producer<byte[]> p1 =
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
- Producer<byte[]> p2 =
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
+ admin.tenants().createTenant("test-tenant",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
Sets.newHashSet("test")));
+ admin.namespaces().createNamespace("test-tenant/test-ns", 4);
+ Producer<byte[]> p1 =
pulsarClient.newProducer().topic("persistent://test-tenant/test-ns/my-topic1").create();
+ Producer<byte[]> p2 =
pulsarClient.newProducer().topic("persistent://test-tenant/test-ns/my-topic2").create();
// system topic
- Producer<byte[]> p3 =
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/__change_events").create();
+ Producer<byte[]> p3 =
pulsarClient.newProducer().topic("persistent://test-tenant/test-ns/__test-topic").create();
Consumer<byte[]> c1 = pulsarClient.newConsumer()
- .topic("persistent://my-property/use/my-ns/my-topic1")
+ .topic("persistent://test-tenant/test-ns/my-topic1")
.subscriptionName("test")
.subscribe();
// additional system cursor
Consumer<byte[]> c2 = pulsarClient.newConsumer()
- .topic("persistent://my-property/use/my-ns/my-topic2")
+ .topic("persistent://test-tenant/test-ns/my-topic2")
.subscriptionName("test-cursor")
.subscribe();
Consumer<byte[]> c3 = pulsarClient.newConsumer()
- .topic("persistent://my-property/use/my-ns/__change_events")
+ .topic("persistent://test-tenant/test-ns/__test-topic")
.subscriptionName("test-v1")
.subscribe();
@@ -250,7 +259,8 @@ public class PrometheusMetricsTest extends BrokerTestBase {
c1.unsubscribe();
c2.unsubscribe();
-
//admin.topics().unload("persistent://my-property/use/my-ns/my-topic1");
+
admin.topicPolicies().setRetention("persistent://test-tenant/test-ns/my-topic2",
+ new RetentionPolicies(60, 1024));
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsTestUtil.generate(pulsar, true, false, false,
statsOut);
@@ -263,33 +273,43 @@ public class PrometheusMetricsTest extends BrokerTestBase
{
List<Metric> bytesOutTotal = (List<Metric>)
metrics.get("pulsar_broker_out_bytes_total");
List<Metric> bytesInTotal = (List<Metric>)
metrics.get("pulsar_broker_in_bytes_total");
+ List<Metric> topicLevelBytesOutTotal = (List<Metric>)
metrics.get("pulsar_out_bytes_total");
+
assertEquals(bytesOutTotal.size(), 2);
assertEquals(bytesInTotal.size(), 2);
+ assertEquals(topicLevelBytesOutTotal.size(), 3);
double systemOutBytes = 0.0;
double userOutBytes = 0.0;
- switch
(bytesOutTotal.get(0).tags.get("system_subscription").toString()) {
- case "true":
- systemOutBytes = bytesOutTotal.get(0).value;
- userOutBytes = bytesOutTotal.get(1).value;
- case "false":
- systemOutBytes = bytesOutTotal.get(1).value;
- userOutBytes = bytesOutTotal.get(0).value;
- }
-
double systemInBytes = 0.0;
double userInBytes = 0.0;
- switch (bytesInTotal.get(0).tags.get("system_topic").toString()) {
- case "true":
- systemInBytes = bytesInTotal.get(0).value;
- userInBytes = bytesInTotal.get(1).value;
- case "false":
- systemInBytes = bytesInTotal.get(1).value;
- userInBytes = bytesInTotal.get(0).value;
+
+ for (Metric metric : bytesOutTotal) {
+ if (metric.tags.get("system_subscription").equals("true")) {
+ systemOutBytes = metric.value;
+ } else {
+ userOutBytes = metric.value;
+ }
+ }
+
+ for (Metric metric : bytesInTotal) {
+ if (metric.tags.get("system_topic").equals("true")) {
+ systemInBytes = metric.value;
+ } else {
+ userInBytes = metric.value;
+ }
+ }
+
+ double systemCursorOutBytes = 0.0;
+ for (Metric metric : topicLevelBytesOutTotal) {
+ if
(metric.tags.get("subscription").startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)
+ ||
metric.tags.get("subscription").equals(Compactor.COMPACTION_SUBSCRIPTION)) {
+ systemCursorOutBytes = metric.value;
+ }
}
- assertEquals(userOutBytes / 2, systemOutBytes);
- assertEquals(userInBytes / 2, systemInBytes);
+ assertEquals(systemCursorOutBytes, systemInBytes);
+ assertEquals(userOutBytes / 2, systemOutBytes - systemCursorOutBytes);
assertEquals(userOutBytes + systemOutBytes, userInBytes +
systemInBytes);
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
index 716d9bc31fa..9a3689912c9 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
@@ -51,6 +51,11 @@ public class SystemTopicNames {
public static final String PENDING_ACK_STORE_CURSOR_NAME =
"__pending_ack_state";
+ /**
+ * Prefix for the system reader for all the system topics.
+ */
+ public static final String SYSTEM_READER_PREFIX = "__system_reader";
+
/**
* The set of all local topic names declared above.
*/