This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8ce9e2dfd241edbb92a3b2874c43e54a74198ba6
Author: Marvin Cai <[email protected]>
AuthorDate: Sun Aug 8 17:36:19 2021 -0700

    Fix MsgDropRate missing from NonPersistentTopics stats output. (#11119)
    
    Fixes #https://github.com/apache/pulsar/issues/10495
    
    ### Motivation
    MsgDropRate info is missing after NonPersistentTopics admin api merged with 
Topics admin api. This PR is trying to fix this.
    
    ### Modifications
    Seems due to API merging, data is not properly deserialized in admin client.
    And also due to the added TopicsStats interface, the field hiding causing 
weird behavior with Jackson so fields in NonPersistentTopicStatsImpl intended 
to hide superclass' fields are not shown in output.
    
    Fixing by not using same field name to hide superclass fields and use 
@JsonIgnore to hide them from output. And add new fields to store 
subscription/publisher/replicator info for NonPersistentTopic.
    This does change the output name of those info, but it only changed in cli 
output, for admin client the old 
getSubscriptions/getSubscriptions/getReplication will still work.
    
    (cherry picked from commit 0aca5f9153afc7804a3ae9b169346a06ee9811d9)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |   5 +-
 .../broker/admin/v2/NonPersistentTopics.java       | 116 +++++++++++++++++++++
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  14 +--
 .../apache/pulsar/broker/admin/AdminApiTest2.java  |  20 ++--
 ...ava => NonPersistentPartitionedTopicStats.java} |  12 +--
 .../policies/data/PartitionedTopicStats.java       |   4 +-
 .../pulsar/client/admin/internal/TopicsImpl.java   |  94 ++++++++++++-----
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  30 +++---
 .../NonPersistentPartitionedTopicStatsImpl.java    |  63 +++++++++++
 .../data/stats/NonPersistentTopicStatsImpl.java    | 113 ++++++++++++++++----
 .../pulsar/common/util/ObjectMapperFactory.java    |   3 +
 .../NonPersistentPartitionedTopicStatsTest.java    |  61 +++++++++++
 12 files changed, 452 insertions(+), 83 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 0c1a410..7540206 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1261,7 +1261,8 @@ public class PersistentTopicsBase extends AdminResource {
                         try {
                             stats.add(statFuture.get());
                             if (perPartition) {
-                                
stats.partitions.put(topicName.getPartition(i).toString(), statFuture.get());
+                                
stats.getPartitions().put(topicName.getPartition(i).toString(),
+                                        (TopicStatsImpl) statFuture.get());
                             }
                         } catch (Exception e) {
                             asyncResponse.resume(new RestException(e));
@@ -1274,7 +1275,7 @@ public class PersistentTopicsBase extends AdminResource {
                     try {
                         boolean zkPathExists = 
namespaceResources().getPartitionedTopicResources().exists(path);
                         if (zkPathExists) {
-                            stats.partitions.put(topicName.toString(), new 
TopicStatsImpl());
+                            stats.getPartitions().put(topicName.toString(), 
new TopicStatsImpl());
                         } else {
                             asyncResponse.resume(
                                     new RestException(Status.NOT_FOUND,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 9755b93..2cfb20f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -46,6 +46,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.admin.ZkAdminPaths;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
@@ -57,6 +58,9 @@ import 
org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import 
org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopicStatsImpl;
+import 
org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.slf4j.Logger;
@@ -202,6 +206,118 @@ public class NonPersistentTopics extends PersistentTopics 
{
         }
     }
 
+
+    @GET
+    @Path("{tenant}/{namespace}/{topic}/partitioned-stats")
+    @ApiOperation(value = "Get the stats for the partitioned topic.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to 
administrate resources on this tenant"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 412, message = "Partitioned topic name is 
invalid"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Failed to validate global 
cluster configuration")
+    })
+    public void getPartitionedStats(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Get per partition stats")
+            @QueryParam("perPartition") @DefaultValue("true") boolean 
perPartition,
+            @ApiParam(value = "Is authentication required to perform this 
operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
+            @ApiParam(value = "If return precise backlog or imprecise backlog")
+            @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean 
getPreciseBacklog,
+            @ApiParam(value = "If return backlog size for each subscription, 
require locking on ledger so be careful "
+                    + "not to use when there's heavy traffic.")
+            @QueryParam("subscriptionBacklogSize") @DefaultValue("false") 
boolean subscriptionBacklogSize) {
+        try {
+            validatePartitionedTopicName(tenant, namespace, encodedTopic);
+            if (topicName.isGlobal()) {
+                try {
+                    validateGlobalNamespaceOwnership(namespaceName);
+                } catch (Exception e) {
+                    log.error("[{}] Failed to get partitioned stats for {}", 
clientAppId(), topicName, e);
+                    resumeAsyncResponseExceptionally(asyncResponse, e);
+                    return;
+                }
+            }
+            getPartitionedTopicMetadataAsync(topicName,
+                    authoritative, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions == 0) {
+                    asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Partitioned Topic not found"));
+                    return;
+                }
+                NonPersistentPartitionedTopicStatsImpl stats =
+                        new 
NonPersistentPartitionedTopicStatsImpl(partitionMetadata);
+                List<CompletableFuture<TopicStats>> topicStatsFutureList = 
Lists.newArrayList();
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    try {
+                        topicStatsFutureList
+                                
.add(pulsar().getAdminClient().topics().getStatsAsync(
+                                        
(topicName.getPartition(i).toString()), getPreciseBacklog,
+                                        subscriptionBacklogSize));
+                    } catch (PulsarServerException e) {
+                        asyncResponse.resume(new RestException(e));
+                        return;
+                    }
+                }
+
+                FutureUtil.waitForAll(topicStatsFutureList).handle((result, 
exception) -> {
+                    CompletableFuture<TopicStats> statFuture = null;
+                    for (int i = 0; i < topicStatsFutureList.size(); i++) {
+                        statFuture = topicStatsFutureList.get(i);
+                        if (statFuture.isDone() && 
!statFuture.isCompletedExceptionally()) {
+                            try {
+                                stats.add((NonPersistentTopicStatsImpl) 
statFuture.get());
+                                if (perPartition) {
+                                    
stats.getPartitions().put(topicName.getPartition(i).toString(),
+                                            (NonPersistentTopicStatsImpl) 
statFuture.get());
+                                }
+                            } catch (Exception e) {
+                                asyncResponse.resume(new RestException(e));
+                                return null;
+                            }
+                        }
+                    }
+                    if (perPartition && stats.partitions.isEmpty()) {
+                        String path = 
ZkAdminPaths.partitionedTopicPath(topicName);
+                        try {
+                            boolean zkPathExists = 
namespaceResources().getPartitionedTopicResources().exists(path);
+                            if (zkPathExists) {
+                                
stats.getPartitions().put(topicName.toString(), new 
NonPersistentTopicStatsImpl());
+                            } else {
+                                asyncResponse.resume(
+                                        new RestException(Status.NOT_FOUND,
+                                                "Internal topics have not been 
generated yet"));
+                                return null;
+                            }
+                        } catch (Exception e) {
+                            asyncResponse.resume(new RestException(e));
+                            return null;
+                        }
+                    }
+                    asyncResponse.resume(stats);
+                    return null;
+                });
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to get partitioned stats for {}", 
clientAppId(), topicName, ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+
     @PUT
     @Path("/{tenant}/{namespace}/{topic}/unload")
     @ApiOperation(value = "Unload a topic")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 82d7b1c..5fa6b46 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -921,12 +921,14 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
             fail("getPartitionedTopicMetadata of " + anotherTopic + " should 
not succeed");
         } catch (NotFoundException expected) {
         }
-        // check the getPartitionedStats for PartitionedTopic returns only 
partitions metadata, and no partitions info
+
+        PartitionedTopicStats topicStats = 
admin.topics().getPartitionedStats(partitionedTopicName,false);
+
+                // check the getPartitionedStats for PartitionedTopic returns 
only partitions metadata, and no partitions info
         
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
-                
admin.topics().getPartitionedStats(partitionedTopicName,false).getMetadata().partitions);
+                topicStats.getMetadata().partitions);
 
-        assertEquals(admin.topics().getPartitionedStats(partitionedTopicName, 
false).getPartitions().size(),
-                0);
+        assertEquals(topicStats.getPartitions().size(), 0);
 
         List<String> subscriptions = 
admin.topics().getSubscriptions(partitionedTopicName);
         assertEquals(subscriptions.size(), 0);
@@ -988,7 +990,7 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
                         partitionedTopicName + "-partition-2", 
partitionedTopicName + "-partition-3"));
 
         // test cumulative stats for partitioned topic
-        PartitionedTopicStats topicStats = 
admin.topics().getPartitionedStats(partitionedTopicName, false);
+        topicStats = 
admin.topics().getPartitionedStats(partitionedTopicName,false);
         if (isPersistent) {
             // TODO: for non-persistent topics, the subscription doesn't exist
             assertEquals(topicStats.getSubscriptions().keySet(), 
Sets.newTreeSet(Lists.newArrayList("my-sub")));
@@ -999,7 +1001,7 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(topicStats.getPartitions(), Maps.newHashMap());
 
         // test per partition stats for partitioned topic
-        topicStats = admin.topics().getPartitionedStats(partitionedTopicName, 
true);
+        topicStats = 
admin.topics().getPartitionedStats(partitionedTopicName,true);
         assertEquals(topicStats.getMetadata().partitions, 4);
         assertEquals(topicStats.getPartitions().keySet(),
                 Sets.newHashSet(partitionedTopicName + "-partition-0", 
partitionedTopicName + "-partition-1",
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 864e75d..3eac264 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -88,6 +88,7 @@ import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
+import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -323,9 +324,9 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
     public void nonPersistentTopics() throws Exception {
         final String topicName = "nonPersistentTopic";
 
-        final String persistentTopicName = "non-persistent://prop-xyz/ns1/" + 
topicName;
+        final String nonPersistentTopicName = "non-persistent://prop-xyz/ns1/" 
+ topicName;
         // Force to create a topic
-        publishMessagesOnTopic("non-persistent://prop-xyz/ns1/" + topicName, 
0, 0);
+        publishMessagesOnTopic(nonPersistentTopicName, 0, 0);
 
         // create consumer and subscription
         @Cleanup
@@ -333,28 +334,27 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
                 .serviceUrl(pulsar.getWebServiceAddress())
                 .statsInterval(0, TimeUnit.SECONDS)
                 .build();
-        Consumer<byte[]> consumer = 
client.newConsumer().topic(persistentTopicName).subscriptionName("my-sub")
+        Consumer<byte[]> consumer = 
client.newConsumer().topic(nonPersistentTopicName).subscriptionName("my-sub")
                 .subscribe();
 
-        publishMessagesOnTopic("non-persistent://prop-xyz/ns1/" + topicName, 
10, 0);
+        publishMessagesOnTopic(nonPersistentTopicName, 10, 0);
 
-        TopicStats topicStats = admin.topics().getStats(persistentTopicName);
+        NonPersistentTopicStats topicStats = (NonPersistentTopicStats) 
admin.topics().getStats(nonPersistentTopicName);
         assertEquals(topicStats.getSubscriptions().keySet(), 
Sets.newTreeSet(Lists.newArrayList("my-sub")));
         
assertEquals(topicStats.getSubscriptions().get("my-sub").getConsumers().size(), 
1);
+        
assertEquals(topicStats.getSubscriptions().get("my-sub").getMsgDropRate(), 0);
         assertEquals(topicStats.getPublishers().size(), 0);
+        assertEquals(topicStats.getMsgDropRate(), 0);
 
-        PersistentTopicInternalStats internalStats = 
admin.topics().getInternalStats(persistentTopicName, false);
+        PersistentTopicInternalStats internalStats = 
admin.topics().getInternalStats(nonPersistentTopicName, false);
         assertEquals(internalStats.cursors.keySet(), 
Sets.newTreeSet(Lists.newArrayList("my-sub")));
 
         consumer.close();
-
-        topicStats = admin.topics().getStats(persistentTopicName);
+        topicStats = (NonPersistentTopicStats) 
admin.topics().getStats(nonPersistentTopicName);
         assertTrue(topicStats.getSubscriptions().containsKey("my-sub"));
         assertEquals(topicStats.getPublishers().size(), 0);
-
         // test partitioned-topic
         final String partitionedTopicName = 
"non-persistent://prop-xyz/ns1/paritioned";
-        
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 0);
         admin.topics().createPartitionedTopic(partitionedTopicName, 5);
         
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
 5);
     }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicStats.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStats.java
similarity index 76%
copy from 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicStats.java
copy to 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStats.java
index 76ed0e3..68be731 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicStats.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStats.java
@@ -19,14 +19,12 @@
 package org.apache.pulsar.common.policies.data;
 
 import java.util.Map;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 
 /**
- * Statistics for a partitioned topic.
+ * Statistics for a non-persistent partitioned topic.
  */
-public interface PartitionedTopicStats extends TopicStats {
+public interface NonPersistentPartitionedTopicStats extends 
PartitionedTopicStats{
+    Map<String, ? extends NonPersistentTopicStats> getPartitions();
 
-    PartitionedTopicMetadata getMetadata();
-
-    Map<String, TopicStats> getPartitions();
-}
\ No newline at end of file
+    NonPersistentTopicStats add(NonPersistentTopicStats ts);
+}
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicStats.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicStats.java
index 76ed0e3..f5ed8b2 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicStats.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicStats.java
@@ -28,5 +28,7 @@ public interface PartitionedTopicStats extends TopicStats {
 
     PartitionedTopicMetadata getMetadata();
 
-    Map<String, TopicStats> getPartitions();
+    Map<String, ? extends TopicStats> getPartitions();
+
+    TopicStats add(TopicStats ts);
 }
\ No newline at end of file
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 8ddc75d..ff5a8f3 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -76,6 +76,8 @@ import 
org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
+import 
org.apache.pulsar.common.policies.data.NonPersistentPartitionedTopicStats;
+import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
@@ -707,19 +709,38 @@ public class TopicsImpl extends BaseResource implements 
Topics {
                 .queryParam("getPreciseBacklog", getPreciseBacklog)
                 .queryParam("subscriptionBacklogSize", 
subscriptionBacklogSize);
         final CompletableFuture<TopicStats> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<TopicStats>() {
 
-                    @Override
-                    public void completed(TopicStats response) {
-                        future.complete(response);
-                    }
+        InvocationCallback<TopicStats> persistentCB = new 
InvocationCallback<TopicStats>() {
+            @Override
+            public void completed(TopicStats response) {
+                future.complete(response);
+            }
+
+            @Override
+            public void failed(Throwable throwable) {
+                
future.completeExceptionally(getApiException(throwable.getCause()));
+            }
+        };
+
+        InvocationCallback<NonPersistentTopicStats> nonpersistentCB =
+                new InvocationCallback<NonPersistentTopicStats>() {
+            @Override
+            public void completed(NonPersistentTopicStats response) {
+                future.complete(response);
+            }
+
+            @Override
+            public void failed(Throwable throwable) {
+                
future.completeExceptionally(getApiException(throwable.getCause()));
+            }
+        };
+
+        if (topic.startsWith(TopicDomain.non_persistent.value())) {
+            asyncGetRequest(path, nonpersistentCB);
+        } else {
+            asyncGetRequest(path, persistentCB);
+        }
 
-                    @Override
-                    public void failed(Throwable throwable) {
-                        
future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
         return future;
     }
 
@@ -829,22 +850,45 @@ public class TopicsImpl extends BaseResource implements 
Topics {
                 .queryParam("getPreciseBacklog", getPreciseBacklog)
                 .queryParam("subscriptionBacklogSize", 
subscriptionBacklogSize);
         final CompletableFuture<PartitionedTopicStats> future = new 
CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<PartitionedTopicStats>() {
 
-                    @Override
-                    public void completed(PartitionedTopicStats response) {
-                        if (!perPartition) {
-                            response.getPartitions().clear();
-                        }
-                        future.complete(response);
-                    }
+        InvocationCallback<NonPersistentPartitionedTopicStats> nonpersistentCB 
=
+                new InvocationCallback<NonPersistentPartitionedTopicStats>() {
 
-                    @Override
-                    public void failed(Throwable throwable) {
-                        
future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
+            @Override
+            public void completed(NonPersistentPartitionedTopicStats response) 
{
+                if (!perPartition) {
+                    response.getPartitions().clear();
+                }
+                future.complete(response);
+            }
+
+            @Override
+            public void failed(Throwable throwable) {
+                
future.completeExceptionally(getApiException(throwable.getCause()));
+            }
+        };
+
+        InvocationCallback<PartitionedTopicStats> persistentCB = new 
InvocationCallback<PartitionedTopicStats>() {
+
+            @Override
+            public void completed(PartitionedTopicStats response) {
+                if (!perPartition) {
+                    response.getPartitions().clear();
+                }
+                future.complete(response);
+            }
+
+            @Override
+            public void failed(Throwable throwable) {
+                
future.completeExceptionally(getApiException(throwable.getCause()));
+            }
+        };
+
+        if (topic.startsWith(TopicDomain.non_persistent.value())) {
+            asyncGetRequest(path, nonpersistentCB);
+        } else {
+            asyncGetRequest(path, persistentCB);
+        }
         return future;
     }
 
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index b0810c2..5d27c7f 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -39,6 +39,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.pulsar.client.admin.Bookies;
 import org.apache.pulsar.client.admin.BrokerStats;
 import org.apache.pulsar.client.admin.Brokers;
@@ -1370,26 +1371,29 @@ public class PulsarAdminToolTest {
     @Test
     public void nonPersistentTopics() throws Exception {
         PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
-        NonPersistentTopics mockTopics = mock(NonPersistentTopics.class);
-        when(admin.nonPersistentTopics()).thenReturn(mockTopics);
+        Topics mockTopics = mock(Topics.class);
+        when(admin.topics()).thenReturn(mockTopics);
 
-        CmdNonPersistentTopics topics = new CmdNonPersistentTopics(() -> 
admin);
+        CmdTopics topics = new CmdTopics(() -> admin);
 
-        topics.run(split("stats non-persistent://myprop/clust/ns1/ds1"));
-        verify(mockTopics).getStats("non-persistent://myprop/clust/ns1/ds1");
+        topics.run(split("stats non-persistent://myprop/ns1/ds1"));
+        verify(mockTopics).getStats("non-persistent://myprop/ns1/ds1", false, 
false);
 
-        topics.run(split("stats-internal 
non-persistent://myprop/clust/ns1/ds1"));
-        
verify(mockTopics).getInternalStats("non-persistent://myprop/clust/ns1/ds1");
+        topics.run(split("stats-internal non-persistent://myprop/ns1/ds1"));
+        verify(mockTopics).getInternalStats("non-persistent://myprop/ns1/ds1", 
false);
 
-        topics.run(split("create-partitioned-topic 
non-persistent://myprop/clust/ns1/ds1 --partitions 32"));
-        
verify(mockTopics).createPartitionedTopic("non-persistent://myprop/clust/ns1/ds1",
 32);
+        topics.run(split("create-partitioned-topic 
non-persistent://myprop/ns1/ds1 --partitions 32"));
+        
verify(mockTopics).createPartitionedTopic("non-persistent://myprop/ns1/ds1", 
32);
 
-        topics.run(split("list myprop/clust/ns1"));
-        verify(mockTopics).getList("myprop/clust/ns1");
+        topics.run(split("list myprop/ns1"));
+        verify(mockTopics).getList("myprop/ns1", null);
 
-        topics.run(split("list-in-bundle myprop/clust/ns1 --bundle 
0x23d70a30_0x26666658"));
-        verify(mockTopics).getListInBundle("myprop/clust/ns1", 
"0x23d70a30_0x26666658");
+        NonPersistentTopics mockNonPersistentTopics = 
mock(NonPersistentTopics.class);
+        when(admin.nonPersistentTopics()).thenReturn(mockNonPersistentTopics);
 
+        CmdNonPersistentTopics nonPersistentTopics = new 
CmdNonPersistentTopics(() -> admin);
+        nonPersistentTopics.run(split("list-in-bundle myprop/clust/ns1 
--bundle 0x23d70a30_0x26666658"));
+        verify(mockNonPersistentTopics).getListInBundle("myprop/clust/ns1", 
"0x23d70a30_0x26666658");
     }
 
     @Test
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java
new file mode 100644
index 0000000..748e634
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data.stats;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import lombok.Getter;
+import lombok.ToString;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import 
org.apache.pulsar.common.policies.data.NonPersistentPartitionedTopicStats;
+import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Statistics for a non-persistent partitioned topic.
+ */
+@SuppressFBWarnings("EQ_DOESNT_OVERRIDE_EQUALS")
+public class NonPersistentPartitionedTopicStatsImpl extends 
NonPersistentTopicStatsImpl
+        implements NonPersistentPartitionedTopicStats {
+
+    @Getter
+    public PartitionedTopicMetadata metadata;
+
+    @Getter
+    public Map<String, NonPersistentTopicStatsImpl> partitions;
+
+    public NonPersistentPartitionedTopicStatsImpl() {
+        super();
+        metadata = new PartitionedTopicMetadata();
+        partitions = new HashMap<>();
+    }
+
+    public NonPersistentPartitionedTopicStatsImpl(PartitionedTopicMetadata 
metadata) {
+        this();
+        this.metadata = metadata;
+    }
+
+    @Override
+    public void reset() {
+        super.reset();
+        partitions.clear();
+        metadata.partitions = 0;
+    }
+
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
index 06f59cb..51f350d 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
@@ -18,15 +18,14 @@
  */
 package org.apache.pulsar.common.policies.data.stats;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import lombok.Data;
 import lombok.Getter;
 import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
 import org.apache.pulsar.common.policies.data.NonPersistentReplicatorStats;
 import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
 import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
-import org.apache.pulsar.common.policies.data.PublisherStats;
-import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -47,31 +46,52 @@ public class NonPersistentTopicStatsImpl extends 
TopicStatsImpl implements NonPe
     @Getter
     public double msgDropRate;
 
-    /** List of connected publishers on this topic w/ their stats. */
-    @Getter
-    public List<? extends NonPersistentPublisherStats> publishers;
+    @JsonIgnore
+    public List<PublisherStatsImpl> publishers;
 
-    /** Map of subscriptions with their individual statistics. */
-    @Getter
-    public Map<String, ? extends NonPersistentSubscriptionStats> subscriptions;
+    @JsonIgnore
+    public Map<String, SubscriptionStatsImpl> subscriptions;
 
-    /** Map of replication statistics by remote cluster context. */
-    @Getter
-    public Map<String, ? extends NonPersistentReplicatorStats> replication;
+    @JsonIgnore
+    public Map<String, ReplicatorStatsImpl> replication;
+
+    @JsonProperty("publishers")
+    public List<NonPersistentPublisherStats> getNonPersistentPublishers() {
+        return (List<NonPersistentPublisherStats>) nonPersistentPublishers;
+    }
+
+    @JsonProperty("subscriptions")
+    public Map<String, NonPersistentSubscriptionStats> 
getNonPersistentSubscriptions() {
+        return (Map<String, NonPersistentSubscriptionStats>) 
nonPersistentSubscriptions;
+    }
+
+    @JsonProperty("replication")
+    public Map<String, NonPersistentReplicatorStats> 
getNonPersistentReplicators() {
+        return (Map<String, NonPersistentReplicatorStats>) 
nonPersistentReplicators;
+    }
+
+    /** List of connected publishers on this non-persistent topic w/ their 
stats. */
+    public List<? extends NonPersistentPublisherStats> nonPersistentPublishers;
+
+    /** Map of non-persistent subscriptions with their individual statistics. 
*/
+    public Map<String, ? extends NonPersistentSubscriptionStats> 
nonPersistentSubscriptions;
+
+    /** Map of non-persistent replication statistics by remote cluster 
context. */
+    public Map<String, ? extends NonPersistentReplicatorStats> 
nonPersistentReplicators;
 
     @SuppressFBWarnings(value = "MF_CLASS_MASKS_FIELD", justification = 
"expected to override")
     public List<NonPersistentPublisherStats> getPublishers() {
-        return (List<NonPersistentPublisherStats>) publishers;
+        return (List<NonPersistentPublisherStats>) nonPersistentPublishers;
     }
 
     @SuppressFBWarnings(value = "MF_CLASS_MASKS_FIELD", justification = 
"expected to override")
     public Map<String, NonPersistentSubscriptionStats> getSubscriptions() {
-        return (Map<String, NonPersistentSubscriptionStats>) subscriptions;
+        return (Map<String, NonPersistentSubscriptionStats>) 
nonPersistentSubscriptions;
     }
 
     @SuppressFBWarnings(value = "MF_CLASS_MASKS_FIELD", justification = 
"expected to override")
     public Map<String, NonPersistentReplicatorStats> getReplication() {
-        return (Map<String, NonPersistentReplicatorStats>) replication;
+        return (Map<String, NonPersistentReplicatorStats>) 
nonPersistentReplicators;
     }
 
     @Override
@@ -80,22 +100,77 @@ public class NonPersistentTopicStatsImpl extends 
TopicStatsImpl implements NonPe
     }
 
     public NonPersistentTopicStatsImpl() {
-        this.publishers = new ArrayList<>();
-        this.subscriptions = new HashMap<>();
-        this.replication = new TreeMap<>();
+        this.nonPersistentPublishers = new ArrayList<>();
+        this.nonPersistentSubscriptions = new HashMap<>();
+        this.nonPersistentReplicators = new TreeMap<>();
     }
 
     public void reset() {
         super.reset();
+        this.nonPersistentPublishers.clear();
+        this.nonPersistentSubscriptions.clear();
+        this.nonPersistentReplicators.clear();
         this.msgDropRate = 0;
     }
 
     // if the stats are added for the 1st time, we will need to make a copy of 
these stats and add it to the current
     // stats.
-    public NonPersistentTopicStatsImpl add(NonPersistentTopicStatsImpl stats) {
+    public NonPersistentTopicStatsImpl add(NonPersistentTopicStats ts) {
+        NonPersistentTopicStatsImpl stats = (NonPersistentTopicStatsImpl) ts;
         Objects.requireNonNull(stats);
         super.add(stats);
         this.msgDropRate += stats.msgDropRate;
+
+        if (this.getNonPersistentPublishers().size() != 
stats.getNonPersistentPublishers().size()) {
+            for (int i = 0; i < stats.getNonPersistentPublishers().size(); 
i++) {
+                NonPersistentPublisherStatsImpl publisherStats = new 
NonPersistentPublisherStatsImpl();
+                this.getNonPersistentPublishers().add(publisherStats
+                        .add((NonPersistentPublisherStatsImpl) 
stats.getNonPersistentPublishers().get(i)));
+            }
+        } else {
+            for (int i = 0; i < stats.getNonPersistentPublishers().size(); 
i++) {
+                ((NonPersistentPublisherStatsImpl) 
this.getNonPersistentPublishers().get(i))
+                        .add((NonPersistentPublisherStatsImpl) 
stats.getNonPersistentPublishers().get(i));
+            }
+        }
+
+        if (this.getNonPersistentSubscriptions().size() != 
stats.getNonPersistentSubscriptions().size()) {
+            for (String subscription : 
stats.getNonPersistentSubscriptions().keySet()) {
+                NonPersistentSubscriptionStatsImpl subscriptionStats = new 
NonPersistentSubscriptionStatsImpl();
+                this.getNonPersistentSubscriptions().put(subscription, 
subscriptionStats
+                        .add((NonPersistentSubscriptionStatsImpl) 
stats.getNonPersistentSubscriptions().get(subscription)));
+            }
+        } else {
+            for (String subscription : 
stats.getNonPersistentSubscriptions().keySet()) {
+                if (this.getNonPersistentSubscriptions().get(subscription) != 
null) {
+                    ((NonPersistentSubscriptionStatsImpl) 
this.getNonPersistentSubscriptions().get(subscription))
+                          .add((NonPersistentSubscriptionStatsImpl) 
stats.getNonPersistentSubscriptions().get(subscription));
+                } else {
+                    NonPersistentSubscriptionStatsImpl subscriptionStats = new 
NonPersistentSubscriptionStatsImpl();
+                    this.getNonPersistentSubscriptions().put(subscription, 
subscriptionStats
+                         .add((NonPersistentSubscriptionStatsImpl) 
stats.getNonPersistentSubscriptions().get(subscription)));
+                }
+            }
+        }
+
+        if (this.getNonPersistentReplicators().size() != 
stats.getNonPersistentReplicators().size()) {
+            for (String repl : stats.getNonPersistentReplicators().keySet()) {
+                NonPersistentReplicatorStatsImpl replStats = new 
NonPersistentReplicatorStatsImpl();
+                this.getNonPersistentReplicators().put(repl, replStats
+                        .add((NonPersistentReplicatorStatsImpl) 
stats.getNonPersistentReplicators().get(repl)));
+            }
+        } else {
+            for (String repl : stats.getNonPersistentReplicators().keySet()) {
+                if (this.getNonPersistentReplicators().get(repl) != null) {
+                    ((NonPersistentReplicatorStatsImpl) 
this.getNonPersistentReplicators().get(repl))
+                            .add((NonPersistentReplicatorStatsImpl) 
stats.getNonPersistentReplicators().get(repl));
+                } else {
+                    NonPersistentReplicatorStatsImpl replStats = new 
NonPersistentReplicatorStatsImpl();
+                    this.getNonPersistentReplicators().put(repl, replStats
+                            .add((NonPersistentReplicatorStatsImpl) 
stats.getNonPersistentReplicators().get(repl)));
+                }
+            }
+        }
         return this;
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
index ffab615..94e1b7a 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
@@ -63,6 +63,7 @@ import 
org.apache.pulsar.common.policies.data.FunctionStatsImpl;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
+import 
org.apache.pulsar.common.policies.data.NonPersistentPartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
 import org.apache.pulsar.common.policies.data.NonPersistentReplicatorStats;
 import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
@@ -90,6 +91,7 @@ import 
org.apache.pulsar.common.policies.data.impl.BundlesDataImpl;
 import org.apache.pulsar.common.policies.data.impl.DelayedDeliveryPoliciesImpl;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
+import 
org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopicStatsImpl;
 import 
org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl;
 import 
org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl;
 import 
org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl;
@@ -183,6 +185,7 @@ public class ObjectMapperFactory {
         resolver.addMapping(NonPersistentSubscriptionStats.class, 
NonPersistentSubscriptionStatsImpl.class);
         resolver.addMapping(NonPersistentTopicStats.class, 
NonPersistentTopicStatsImpl.class);
         resolver.addMapping(PartitionedTopicStats.class, 
PartitionedTopicStatsImpl.class);
+        resolver.addMapping(NonPersistentPartitionedTopicStats.class, 
NonPersistentPartitionedTopicStatsImpl.class);
         resolver.addMapping(PublisherStats.class, PublisherStatsImpl.class);
         resolver.addMapping(ReplicatorStats.class, ReplicatorStatsImpl.class);
         resolver.addMapping(SubscriptionStats.class, 
SubscriptionStatsImpl.class);
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java
new file mode 100644
index 0000000..5135acf
--- /dev/null
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import 
org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopicStatsImpl;
+import 
org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl;
+import 
org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl;
+import 
org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class NonPersistentPartitionedTopicStatsTest {
+
+    @Test
+    public void testPartitionedTopicStats() {
+        NonPersistentPartitionedTopicStatsImpl 
nonPersistentPartitionedTopicStats = new 
NonPersistentPartitionedTopicStatsImpl();
+        nonPersistentPartitionedTopicStats.msgRateIn = 1;
+        nonPersistentPartitionedTopicStats.msgThroughputIn = 1;
+        nonPersistentPartitionedTopicStats.msgRateOut = 1;
+        nonPersistentPartitionedTopicStats.msgThroughputOut = 1;
+        nonPersistentPartitionedTopicStats.averageMsgSize = 1;
+        nonPersistentPartitionedTopicStats.storageSize = 1;
+        nonPersistentPartitionedTopicStats.getPublishers().add(new 
NonPersistentPublisherStatsImpl());
+        nonPersistentPartitionedTopicStats.getSubscriptions().put("test_ns", 
new NonPersistentSubscriptionStatsImpl());
+        nonPersistentPartitionedTopicStats.getReplication().put("test_ns", new 
NonPersistentReplicatorStatsImpl());
+        nonPersistentPartitionedTopicStats.metadata.partitions = 1;
+        nonPersistentPartitionedTopicStats.partitions.put("test", 
nonPersistentPartitionedTopicStats);
+        nonPersistentPartitionedTopicStats.reset();
+        assertEquals(nonPersistentPartitionedTopicStats.msgRateIn, 0.0);
+        assertEquals(nonPersistentPartitionedTopicStats.msgThroughputIn, 0.0);
+        assertEquals(nonPersistentPartitionedTopicStats.msgRateOut, 0.0);
+        assertEquals(nonPersistentPartitionedTopicStats.msgThroughputOut, 0.0);
+        assertEquals(nonPersistentPartitionedTopicStats.averageMsgSize, 0.0);
+        assertEquals(nonPersistentPartitionedTopicStats.storageSize, 0);
+        
assertEquals(nonPersistentPartitionedTopicStats.getPublishers().size(), 0);
+        
assertEquals(nonPersistentPartitionedTopicStats.getSubscriptions().size(), 0);
+        
assertEquals(nonPersistentPartitionedTopicStats.getReplication().size(), 0);
+        assertEquals(nonPersistentPartitionedTopicStats.metadata.partitions, 
0);
+        assertEquals(nonPersistentPartitionedTopicStats.partitions.size(), 0);
+    }
+}

Reply via email to