This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f3e52b568ec [improve][pip] PIP-347: Add role field in consumer's stat 
(#22562)
f3e52b568ec is described below

commit f3e52b568ec7e86e7582bdc425321fe172bc4deb
Author: Wenzhi Feng <[email protected]>
AuthorDate: Thu May 16 13:29:26 2024 +0800

    [improve][pip] PIP-347: Add role field in consumer's stat (#22562)
---
 pip/pip-347.md                                     |   2 +-
 .../org/apache/pulsar/broker/service/Consumer.java |   1 +
 .../stats/AuthenticatedConsumerStatsTest.java      | 169 +++++++++++++++++++++
 .../pulsar/broker/stats/ConsumerStatsTest.java     |   2 +-
 .../pulsar/common/policies/data/ConsumerStats.java |   3 +
 .../policies/data/stats/ConsumerStatsImpl.java     |   3 +
 6 files changed, 178 insertions(+), 2 deletions(-)

diff --git a/pip/pip-347.md b/pip/pip-347.md
index 5326fed3533..a5d5d76ae17 100644
--- a/pip/pip-347.md
+++ b/pip/pip-347.md
@@ -34,4 +34,4 @@ Fully compatible.
 Updated afterwards
 -->
 * Mailing List discussion thread: 
https://lists.apache.org/thread/p9y9r8pb7ygk8f0jd121c1121phvzd09
-* Mailing List voting thread:
+* Mailing List voting thread: 
https://lists.apache.org/thread/sfv0vq498dnjx6k6zdrnn0cw8f22tz05
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index fe9fbe6a400..c9f417c4bc4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -208,6 +208,7 @@ public class Consumer {
         stats = new ConsumerStatsImpl();
         stats.setAddress(cnx.clientSourceAddressAndPort());
         stats.consumerName = consumerName;
+        stats.appId = appId;
         stats.setConnectedSince(DateFormatter.format(connectedSince));
         stats.setClientVersion(cnx.getClientVersion());
         stats.metadata = this.metadata;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java
new file mode 100644
index 00000000000..e8cadb72e1e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.security.KeyPair;
+import java.security.KeyPairGenerator;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.time.Duration;
+import java.util.Base64;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Set;
+
+public class AuthenticatedConsumerStatsTest extends ConsumerStatsTest{
+    private final String ADMIN_TOKEN;
+    private final String TOKEN_PUBLIC_KEY;
+    private final KeyPair kp;
+
+    AuthenticatedConsumerStatsTest() throws NoSuchAlgorithmException {
+        KeyPairGenerator kpg = KeyPairGenerator.getInstance("RSA");
+        kp = kpg.generateKeyPair();
+
+        byte[] encodedPublicKey = kp.getPublic().getEncoded();
+        TOKEN_PUBLIC_KEY = "data:;base64," + 
Base64.getEncoder().encodeToString(encodedPublicKey);
+        ADMIN_TOKEN = generateToken(kp, "admin");
+    }
+
+
+    private String generateToken(KeyPair kp, String subject) {
+        PrivateKey pkey = kp.getPrivate();
+        long expMillis = System.currentTimeMillis() + 
Duration.ofHours(1).toMillis();
+        Date exp = new Date(expMillis);
+
+        return Jwts.builder()
+                .setSubject(subject)
+                .setExpiration(exp)
+                .signWith(pkey, SignatureAlgorithm.forSigningKey(pkey))
+                .compact();
+    }
+
+    @Override
+    protected void customizeNewPulsarClientBuilder(ClientBuilder 
clientBuilder) {
+        clientBuilder.authentication(AuthenticationFactory.token(ADMIN_TOKEN));
+    }
+
+    @Override
+    protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder 
pulsarAdminBuilder) {
+        
pulsarAdminBuilder.authentication(AuthenticationFactory.token(ADMIN_TOKEN));
+    }
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        conf.setAuthenticationProviders(providers);
+        
conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        conf.setBrokerClientAuthenticationParameters("token:" + ADMIN_TOKEN);
+
+        conf.setClusterName("test");
+
+        // Set provider domain name
+        Properties properties = new Properties();
+        properties.setProperty("tokenPublicKey", TOKEN_PUBLIC_KEY);
+        conf.setProperties(properties);
+
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Test
+    public void testConsumerStatsOutput() throws Exception {
+        Set<String> allowedFields = Sets.newHashSet(
+                "msgRateOut",
+                "msgThroughputOut",
+                "bytesOutCounter",
+                "msgOutCounter",
+                "messageAckRate",
+                "msgRateRedeliver",
+                "chunkedMessageRate",
+                "consumerName",
+                "availablePermits",
+                "unackedMessages",
+                "avgMessagesPerEntry",
+                "blockedConsumerOnUnackedMsgs",
+                "readPositionWhenJoining",
+                "lastAckedTime",
+                "lastAckedTimestamp",
+                "lastConsumedTime",
+                "lastConsumedTimestamp",
+                "lastConsumedFlowTimestamp",
+                "keyHashRanges",
+                "metadata",
+                "address",
+                "connectedSince",
+                "clientVersion",
+                "appId");
+
+        final String topicName = 
"persistent://public/default/testConsumerStatsOutput";
+        final String subName = "my-subscription";
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionName(subName)
+                .subscribe();
+
+        TopicStats stats = admin.topics().getStats(topicName);
+        ObjectMapper mapper = ObjectMapperFactory.create();
+        ConsumerStats consumerStats = stats.getSubscriptions()
+                .get(subName).getConsumers().get(0);
+        Assert.assertTrue(consumerStats.getLastConsumedFlowTimestamp() > 0);
+        JsonNode node = 
mapper.readTree(mapper.writer().writeValueAsString(consumerStats));
+        Iterator<String> itr = node.fieldNames();
+        while (itr.hasNext()) {
+            String field = itr.next();
+            Assert.assertTrue(allowedFields.contains(field), field + " should 
not be exposed");
+        }
+        // assert that role is exposed
+        Assert.assertEquals(consumerStats.getAppId(), "admin");
+        consumer.close();
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 512a5cfcab6..024d8582fa2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -195,7 +195,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase 
{
 
     @Test
     public void testUpdateStatsForActiveConsumerAndSubscription() throws 
Exception {
-        final String topicName = 
"persistent://prop/use/ns-abc/testUpdateStatsForActiveConsumerAndSubscription";
+        final String topicName = 
"persistent://public/default/testUpdateStatsForActiveConsumerAndSubscription";
         pulsarClient.newConsumer()
                 .topic(topicName)
                 .subscriptionType(SubscriptionType.Shared)
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
index 8c9a615d6d0..d2d3600df96 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
@@ -25,6 +25,9 @@ import java.util.Map;
  * Consumer statistics.
  */
 public interface ConsumerStats {
+    /** the app id. */
+    String getAppId();
+
     /** Total rate of messages delivered to the consumer (msg/s). */
     double getMsgRateOut();
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
index 548abdc9ada..de36b330b7f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
@@ -30,6 +30,9 @@ import org.apache.pulsar.common.util.DateFormatter;
  */
 @Data
 public class ConsumerStatsImpl implements ConsumerStats {
+    /** the app id. */
+    public String appId;
+
     /** Total rate of messages delivered to the consumer (msg/s). */
     public double msgRateOut;
 

Reply via email to