This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f3e52b568ec [improve][pip] PIP-347: Add role field in consumer's stat
(#22562)
f3e52b568ec is described below
commit f3e52b568ec7e86e7582bdc425321fe172bc4deb
Author: Wenzhi Feng <[email protected]>
AuthorDate: Thu May 16 13:29:26 2024 +0800
[improve][pip] PIP-347: Add role field in consumer's stat (#22562)
---
pip/pip-347.md | 2 +-
.../org/apache/pulsar/broker/service/Consumer.java | 1 +
.../stats/AuthenticatedConsumerStatsTest.java | 169 +++++++++++++++++++++
.../pulsar/broker/stats/ConsumerStatsTest.java | 2 +-
.../pulsar/common/policies/data/ConsumerStats.java | 3 +
.../policies/data/stats/ConsumerStatsImpl.java | 3 +
6 files changed, 178 insertions(+), 2 deletions(-)
diff --git a/pip/pip-347.md b/pip/pip-347.md
index 5326fed3533..a5d5d76ae17 100644
--- a/pip/pip-347.md
+++ b/pip/pip-347.md
@@ -34,4 +34,4 @@ Fully compatible.
Updated afterwards
-->
* Mailing List discussion thread:
https://lists.apache.org/thread/p9y9r8pb7ygk8f0jd121c1121phvzd09
-* Mailing List voting thread:
+* Mailing List voting thread:
https://lists.apache.org/thread/sfv0vq498dnjx6k6zdrnn0cw8f22tz05
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index fe9fbe6a400..c9f417c4bc4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -208,6 +208,7 @@ public class Consumer {
stats = new ConsumerStatsImpl();
stats.setAddress(cnx.clientSourceAddressAndPort());
stats.consumerName = consumerName;
+ stats.appId = appId;
stats.setConnectedSince(DateFormatter.format(connectedSince));
stats.setClientVersion(cnx.getClientVersion());
stats.metadata = this.metadata;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java
new file mode 100644
index 00000000000..e8cadb72e1e
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.time.Duration;
+import java.util.Base64;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Set;
+
+public class AuthenticatedConsumerStatsTest extends ConsumerStatsTest{
+ private final String ADMIN_TOKEN;
+ private final String TOKEN_PUBLIC_KEY;
+ private final KeyPair kp;
+
+ AuthenticatedConsumerStatsTest() throws NoSuchAlgorithmException {
+ KeyPairGenerator kpg = KeyPairGenerator.getInstance("RSA");
+ kp = kpg.generateKeyPair();
+
+ byte[] encodedPublicKey = kp.getPublic().getEncoded();
+ TOKEN_PUBLIC_KEY = "data:;base64," +
Base64.getEncoder().encodeToString(encodedPublicKey);
+ ADMIN_TOKEN = generateToken(kp, "admin");
+ }
+
+
+ private String generateToken(KeyPair kp, String subject) {
+ PrivateKey pkey = kp.getPrivate();
+ long expMillis = System.currentTimeMillis() +
Duration.ofHours(1).toMillis();
+ Date exp = new Date(expMillis);
+
+ return Jwts.builder()
+ .setSubject(subject)
+ .setExpiration(exp)
+ .signWith(pkey, SignatureAlgorithm.forSigningKey(pkey))
+ .compact();
+ }
+
+ @Override
+ protected void customizeNewPulsarClientBuilder(ClientBuilder
clientBuilder) {
+ clientBuilder.authentication(AuthenticationFactory.token(ADMIN_TOKEN));
+ }
+
+ @Override
+ protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder
pulsarAdminBuilder) {
+
pulsarAdminBuilder.authentication(AuthenticationFactory.token(ADMIN_TOKEN));
+ }
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthorizationEnabled(true);
+
+ Set<String> superUserRoles = new HashSet<>();
+ superUserRoles.add("admin");
+ conf.setSuperUserRoles(superUserRoles);
+
+ Set<String> providers = new HashSet<>();
+ providers.add(AuthenticationProviderToken.class.getName());
+ conf.setAuthenticationProviders(providers);
+
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+ conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
+
+ conf.setClusterName("test");
+
+ // Set provider domain name
+ Properties properties = new Properties();
+ properties.setProperty("tokenPublicKey", TOKEN_PUBLIC_KEY);
+ conf.setProperties(properties);
+
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @Test
+ public void testConsumerStatsOutput() throws Exception {
+ Set<String> allowedFields = Sets.newHashSet(
+ "msgRateOut",
+ "msgThroughputOut",
+ "bytesOutCounter",
+ "msgOutCounter",
+ "messageAckRate",
+ "msgRateRedeliver",
+ "chunkedMessageRate",
+ "consumerName",
+ "availablePermits",
+ "unackedMessages",
+ "avgMessagesPerEntry",
+ "blockedConsumerOnUnackedMsgs",
+ "readPositionWhenJoining",
+ "lastAckedTime",
+ "lastAckedTimestamp",
+ "lastConsumedTime",
+ "lastConsumedTimestamp",
+ "lastConsumedFlowTimestamp",
+ "keyHashRanges",
+ "metadata",
+ "address",
+ "connectedSince",
+ "clientVersion",
+ "appId");
+
+ final String topicName =
"persistent://public/default/testConsumerStatsOutput";
+ final String subName = "my-subscription";
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionName(subName)
+ .subscribe();
+
+ TopicStats stats = admin.topics().getStats(topicName);
+ ObjectMapper mapper = ObjectMapperFactory.create();
+ ConsumerStats consumerStats = stats.getSubscriptions()
+ .get(subName).getConsumers().get(0);
+ Assert.assertTrue(consumerStats.getLastConsumedFlowTimestamp() > 0);
+ JsonNode node =
mapper.readTree(mapper.writer().writeValueAsString(consumerStats));
+ Iterator<String> itr = node.fieldNames();
+ while (itr.hasNext()) {
+ String field = itr.next();
+ Assert.assertTrue(allowedFields.contains(field), field + " should
not be exposed");
+ }
+ // assert that role is exposed
+ Assert.assertEquals(consumerStats.getAppId(), "admin");
+ consumer.close();
+ }
+
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 512a5cfcab6..024d8582fa2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -195,7 +195,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase
{
@Test
public void testUpdateStatsForActiveConsumerAndSubscription() throws
Exception {
- final String topicName =
"persistent://prop/use/ns-abc/testUpdateStatsForActiveConsumerAndSubscription";
+ final String topicName =
"persistent://public/default/testUpdateStatsForActiveConsumerAndSubscription";
pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
index 8c9a615d6d0..d2d3600df96 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
@@ -25,6 +25,9 @@ import java.util.Map;
* Consumer statistics.
*/
public interface ConsumerStats {
+ /** the app id. */
+ String getAppId();
+
/** Total rate of messages delivered to the consumer (msg/s). */
double getMsgRateOut();
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
index 548abdc9ada..de36b330b7f 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
@@ -30,6 +30,9 @@ import org.apache.pulsar.common.util.DateFormatter;
*/
@Data
public class ConsumerStatsImpl implements ConsumerStats {
+ /** the app id. */
+ public String appId;
+
/** Total rate of messages delivered to the consumer (msg/s). */
public double msgRateOut;