This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 706bf915893 [improve][cli] Support `getEarliestTimeInBacklog` at
`getPartitionedStats` method. (#16388)
706bf915893 is described below
commit 706bf915893ad745fd53cb5d4f1f5fe26629aaad
Author: Qiang Zhao <[email protected]>
AuthorDate: Wed Jul 13 10:41:04 2022 +0800
[improve][cli] Support `getEarliestTimeInBacklog` at `getPartitionedStats`
method. (#16388)
---
.../apache/pulsar/broker/admin/AdminApi2Test.java | 6 +++---
.../apache/pulsar/broker/admin/AdminApiTest.java | 24 ++++++++++++++++++++++
.../org/apache/pulsar/client/admin/Topics.java | 9 ++++----
.../pulsar/client/admin/internal/TopicsImpl.java | 11 ++++++----
.../pulsar/admin/cli/PulsarAdminToolTest.java | 3 ++-
.../org/apache/pulsar/admin/cli/CmdTopics.java | 7 ++++++-
6 files changed, 47 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index ef335d3eba2..1fedcbf7318 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -1567,7 +1567,7 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
TopicStats topicStats = admin.topics().getPartitionedStats(topic,
false);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 20);
- topicStats = admin.topics().getPartitionedStats(topic, false, true,
true);
+ topicStats = admin.topics().getPartitionedStats(topic, false, true,
true, true);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 1);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 40);
}
@@ -1606,7 +1606,7 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
}
}
- TopicStats topicStats = admin.topics().getPartitionedStats(topic,
false, true, true);
+ TopicStats topicStats = admin.topics().getPartitionedStats(topic,
false, true, true, true);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 440);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(),
5);
@@ -1616,7 +1616,7 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
}
// Wait the ack send.
Awaitility.await().untilAsserted(() -> {
- TopicStats topicStats2 = admin.topics().getPartitionedStats(topic,
false, true, true);
+ TopicStats topicStats2 = admin.topics().getPartitionedStats(topic,
false, true, true, true);
assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklog(), 5);
assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 223);
assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklogNoDelayed(),
0);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 60034257b15..24fba4126a6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -40,6 +40,7 @@ import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import java.lang.reflect.Field;
import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -3360,4 +3361,27 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
assertEquals(peekedMessages.get(i).getData(),
receivedMessages.get(i).getData());
}
}
+
+ @Test
+ public void testGetPartitionStatsWithEarliestTimeInBacklog() throws
PulsarAdminException, PulsarClientException {
+ final String topicName =
"persistent://prop-xyz/ns1/testPeekEncryptedMessages-" + UUID.randomUUID();
+ final String subName = "my-sub";
+ admin.topics().createPartitionedTopic(topicName, 3);
+ PartitionedTopicStats partitionedStats =
+ admin.topics().getPartitionedStats(topicName, true, true,
true, true);
+ long value1 = partitionedStats.getEarliestMsgPublishTimeInBacklogs();
+ Assert.assertEquals(value1, 0);
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .create();
+ producer.send("Test".getBytes(StandardCharsets.UTF_8));
+ @Cleanup
+ Consumer<byte[]> subscribe = pulsarClient.newConsumer()
+ .subscriptionName(subName)
+ .topic(topicName)
+ .subscribe();
+ long value2 = partitionedStats.getEarliestMsgPublishTimeInBacklogs();
+ Assert.assertNotEquals(value2, 0);
+ }
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index a4486d846b1..8f6f50445d8 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1304,11 +1304,11 @@ public interface Topics {
*
*/
PartitionedTopicStats getPartitionedStats(String topic, boolean
perPartition, boolean getPreciseBacklog,
- boolean subscriptionBacklogSize)
+ boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog)
throws PulsarAdminException;
default PartitionedTopicStats getPartitionedStats(String topic, boolean
perPartition) throws PulsarAdminException {
- return getPartitionedStats(topic, perPartition, false, false);
+ return getPartitionedStats(topic, perPartition, false, false, false);
}
/**
@@ -1325,10 +1325,11 @@ public interface Topics {
* @return a future that can be used to track when the partitioned topic
statistics are returned
*/
CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(
- String topic, boolean perPartition, boolean getPreciseBacklog,
boolean subscriptionBacklogSize);
+ String topic, boolean perPartition, boolean getPreciseBacklog,
boolean subscriptionBacklogSize,
+ boolean getEarliestTimeInBacklog);
default CompletableFuture<PartitionedTopicStats>
getPartitionedStatsAsync(String topic, boolean perPartition) {
- return getPartitionedStatsAsync(topic, perPartition, false, false);
+ return getPartitionedStatsAsync(topic, perPartition, false, false,
false);
}
/**
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 909b8f74ea5..d7363597459 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -785,19 +785,22 @@ public class TopicsImpl extends BaseResource implements
Topics {
@Override
public PartitionedTopicStats getPartitionedStats(String topic, boolean
perPartition, boolean getPreciseBacklog,
- boolean
subscriptionBacklogSize)
+ boolean
subscriptionBacklogSize, boolean getEarliestTimeInBacklog)
throws PulsarAdminException {
- return sync(() -> getPartitionedStatsAsync(topic, perPartition,
getPreciseBacklog, subscriptionBacklogSize));
+ return sync(() -> getPartitionedStatsAsync(topic, perPartition,
getPreciseBacklog,
+ subscriptionBacklogSize, getEarliestTimeInBacklog));
}
@Override
public CompletableFuture<PartitionedTopicStats>
getPartitionedStatsAsync(String topic,
- boolean perPartition, boolean getPreciseBacklog, boolean
subscriptionBacklogSize) {
+ boolean perPartition, boolean getPreciseBacklog, boolean
subscriptionBacklogSize,
+
boolean getEarliestTimeInBacklog) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "partitioned-stats");
path = path.queryParam("perPartition", perPartition)
.queryParam("getPreciseBacklog", getPreciseBacklog)
- .queryParam("subscriptionBacklogSize",
subscriptionBacklogSize);
+ .queryParam("subscriptionBacklogSize", subscriptionBacklogSize)
+ .queryParam("getEarliestTimeInBacklog",
getEarliestTimeInBacklog);
final CompletableFuture<PartitionedTopicStats> future = new
CompletableFuture<>();
InvocationCallback<NonPersistentPartitionedTopicStats> nonpersistentCB
=
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 6baa2e7e5e7..6fd2efc31d9 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1440,7 +1440,8 @@ public class PulsarAdminToolTest {
verify(mockTopics).getInternalInfo("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("partitioned-stats
persistent://myprop/clust/ns1/ds1 --per-partition"));
-
verify(mockTopics).getPartitionedStats("persistent://myprop/clust/ns1/ds1",
true, false, false);
+
verify(mockTopics).getPartitionedStats("persistent://myprop/clust/ns1/ds1",
+ true, false, false, false);
cmdTopics.run(split("partitioned-stats-internal
persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getPartitionedInternalStats("persistent://myprop/clust/ns1/ds1");
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 23cffd4e3a2..8bcc865efbb 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -808,10 +808,15 @@ public class CmdTopics extends CmdBase {
+ ", locking required.")
private boolean subscriptionBacklogSize = false;
+ @Parameter(names = { "-etb",
+ "--get-earliest-time-in-backlog" }, description = "Set true to
get earliest time in backlog")
+ private boolean getEarliestTimeInBacklog = false;
+
@Override
void run() throws Exception {
String topic = validateTopicName(params);
- print(getTopics().getPartitionedStats(topic, perPartition,
getPreciseBacklog, subscriptionBacklogSize));
+ print(getTopics().getPartitionedStats(topic, perPartition,
getPreciseBacklog,
+ subscriptionBacklogSize, getEarliestTimeInBacklog));
}
}