This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 706bf915893 [improve][cli] Support `getEarliestTimeInBacklog` at 
`getPartitionedStats` method. (#16388)
706bf915893 is described below

commit 706bf915893ad745fd53cb5d4f1f5fe26629aaad
Author: Qiang Zhao <[email protected]>
AuthorDate: Wed Jul 13 10:41:04 2022 +0800

    [improve][cli] Support `getEarliestTimeInBacklog` at `getPartitionedStats` 
method. (#16388)
---
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |  6 +++---
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 24 ++++++++++++++++++++++
 .../org/apache/pulsar/client/admin/Topics.java     |  9 ++++----
 .../pulsar/client/admin/internal/TopicsImpl.java   | 11 ++++++----
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  3 ++-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  7 ++++++-
 6 files changed, 47 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index ef335d3eba2..1fedcbf7318 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -1567,7 +1567,7 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         TopicStats topicStats = admin.topics().getPartitionedStats(topic, 
false);
         
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 20);
 
-        topicStats = admin.topics().getPartitionedStats(topic, false, true, 
true);
+        topicStats = admin.topics().getPartitionedStats(topic, false, true, 
true, true);
         
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 1);
         
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 40);
     }
@@ -1606,7 +1606,7 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
             }
         }
 
-        TopicStats topicStats = admin.topics().getPartitionedStats(topic, 
false, true, true);
+        TopicStats topicStats = admin.topics().getPartitionedStats(topic, 
false, true, true, true);
         
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
         
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 440);
         
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(),
 5);
@@ -1616,7 +1616,7 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         }
         // Wait the ack send.
         Awaitility.await().untilAsserted(() -> {
-            TopicStats topicStats2 = admin.topics().getPartitionedStats(topic, 
false, true, true);
+            TopicStats topicStats2 = admin.topics().getPartitionedStats(topic, 
false, true, true, true);
             
assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklog(), 5);
             
assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 223);
             
assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklogNoDelayed(),
 0);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 60034257b15..24fba4126a6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -40,6 +40,7 @@ import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
 import java.lang.reflect.Field;
 import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -3360,4 +3361,27 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
             assertEquals(peekedMessages.get(i).getData(), 
receivedMessages.get(i).getData());
         }
     }
+
+    @Test
+    public void testGetPartitionStatsWithEarliestTimeInBacklog() throws 
PulsarAdminException, PulsarClientException {
+        final String topicName = 
"persistent://prop-xyz/ns1/testPeekEncryptedMessages-" + UUID.randomUUID();
+        final String subName = "my-sub";
+        admin.topics().createPartitionedTopic(topicName, 3);
+        PartitionedTopicStats partitionedStats =
+                admin.topics().getPartitionedStats(topicName, true, true, 
true, true);
+        long value1 = partitionedStats.getEarliestMsgPublishTimeInBacklogs();
+        Assert.assertEquals(value1, 0);
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+        producer.send("Test".getBytes(StandardCharsets.UTF_8));
+        @Cleanup
+        Consumer<byte[]> subscribe = pulsarClient.newConsumer()
+                .subscriptionName(subName)
+                .topic(topicName)
+                .subscribe();
+        long value2 = partitionedStats.getEarliestMsgPublishTimeInBacklogs();
+        Assert.assertNotEquals(value2, 0);
+    }
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index a4486d846b1..8f6f50445d8 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1304,11 +1304,11 @@ public interface Topics {
      *
      */
     PartitionedTopicStats getPartitionedStats(String topic, boolean 
perPartition, boolean getPreciseBacklog,
-                                              boolean subscriptionBacklogSize)
+                                              boolean subscriptionBacklogSize, 
boolean getEarliestTimeInBacklog)
             throws PulsarAdminException;
 
     default PartitionedTopicStats getPartitionedStats(String topic, boolean 
perPartition) throws PulsarAdminException {
-        return getPartitionedStats(topic, perPartition, false, false);
+        return getPartitionedStats(topic, perPartition, false, false, false);
     }
 
     /**
@@ -1325,10 +1325,11 @@ public interface Topics {
      * @return a future that can be used to track when the partitioned topic 
statistics are returned
      */
     CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(
-            String topic, boolean perPartition, boolean getPreciseBacklog, 
boolean subscriptionBacklogSize);
+            String topic, boolean perPartition, boolean getPreciseBacklog, 
boolean subscriptionBacklogSize,
+            boolean getEarliestTimeInBacklog);
 
     default CompletableFuture<PartitionedTopicStats> 
getPartitionedStatsAsync(String topic, boolean perPartition) {
-        return getPartitionedStatsAsync(topic, perPartition, false, false);
+        return getPartitionedStatsAsync(topic, perPartition, false, false, 
false);
     }
 
     /**
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 909b8f74ea5..d7363597459 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -785,19 +785,22 @@ public class TopicsImpl extends BaseResource implements 
Topics {
 
     @Override
     public PartitionedTopicStats getPartitionedStats(String topic, boolean 
perPartition, boolean getPreciseBacklog,
-                                                     boolean 
subscriptionBacklogSize)
+                                                     boolean 
subscriptionBacklogSize, boolean getEarliestTimeInBacklog)
             throws PulsarAdminException {
-        return sync(() -> getPartitionedStatsAsync(topic, perPartition, 
getPreciseBacklog, subscriptionBacklogSize));
+        return sync(() -> getPartitionedStatsAsync(topic, perPartition, 
getPreciseBacklog,
+                subscriptionBacklogSize, getEarliestTimeInBacklog));
     }
 
     @Override
     public CompletableFuture<PartitionedTopicStats> 
getPartitionedStatsAsync(String topic,
-            boolean perPartition, boolean getPreciseBacklog, boolean 
subscriptionBacklogSize) {
+            boolean perPartition, boolean getPreciseBacklog, boolean 
subscriptionBacklogSize,
+                                                                             
boolean getEarliestTimeInBacklog) {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "partitioned-stats");
         path = path.queryParam("perPartition", perPartition)
                 .queryParam("getPreciseBacklog", getPreciseBacklog)
-                .queryParam("subscriptionBacklogSize", 
subscriptionBacklogSize);
+                .queryParam("subscriptionBacklogSize", subscriptionBacklogSize)
+                .queryParam("getEarliestTimeInBacklog", 
getEarliestTimeInBacklog);
         final CompletableFuture<PartitionedTopicStats> future = new 
CompletableFuture<>();
 
         InvocationCallback<NonPersistentPartitionedTopicStats> nonpersistentCB 
=
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 6baa2e7e5e7..6fd2efc31d9 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1440,7 +1440,8 @@ public class PulsarAdminToolTest {
         
verify(mockTopics).getInternalInfo("persistent://myprop/clust/ns1/ds1");
 
         cmdTopics.run(split("partitioned-stats 
persistent://myprop/clust/ns1/ds1 --per-partition"));
-        
verify(mockTopics).getPartitionedStats("persistent://myprop/clust/ns1/ds1", 
true, false, false);
+        
verify(mockTopics).getPartitionedStats("persistent://myprop/clust/ns1/ds1",
+                true, false, false, false);
 
         cmdTopics.run(split("partitioned-stats-internal 
persistent://myprop/clust/ns1/ds1"));
         
verify(mockTopics).getPartitionedInternalStats("persistent://myprop/clust/ns1/ds1");
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 23cffd4e3a2..8bcc865efbb 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -808,10 +808,15 @@ public class CmdTopics extends CmdBase {
                 + ", locking required.")
         private boolean subscriptionBacklogSize = false;
 
+        @Parameter(names = { "-etb",
+                "--get-earliest-time-in-backlog" }, description = "Set true to 
get earliest time in backlog")
+        private boolean getEarliestTimeInBacklog = false;
+
         @Override
         void run() throws Exception {
             String topic = validateTopicName(params);
-            print(getTopics().getPartitionedStats(topic, perPartition, 
getPreciseBacklog, subscriptionBacklogSize));
+            print(getTopics().getPartitionedStats(topic, perPartition, 
getPreciseBacklog,
+                    subscriptionBacklogSize, getEarliestTimeInBacklog));
         }
     }
 

Reply via email to