This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 94a9423 cli support for get last message id of a topic (#3251)
94a9423 is described below
commit 94a942384b4d1e07fba4dfb1fe2cc0c06bc0a2ea
Author: legendtkl <[email protected]>
AuthorDate: Thu Dec 27 00:14:50 2018 +0800
cli support for get last message id of a topic (#3251)
This pr is add cli support to get last message id of a topic
1. add cli support for api:
/admin/v2/persistent/{tenant}/{namespace}/{topic}/lastMessageId
2. the cli command is like: ./pulsar-admin topics last-message-id topic-name
Test Result:
```bash
➜ bin ./pulsar-admin topics last-message-id
persistent://public/default/my-topic
{
"ledgerId" : 10,
"entryId" : -1,
"partitionIndex" : -1
}
➜ bin ./pulsar-admin topics last-message-id
persistent://public/default/my-topi
Topic not found
Reason: Topic not found
```
---
.../org/apache/pulsar/client/admin/Topics.java | 9 ++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 32 ++++++++++++++++++++++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 13 +++++++++
3 files changed, 54 insertions(+)
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index f3f95ac..9c442c7 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1017,4 +1017,13 @@ public interface Topics {
* @return the status of the offload operation
*/
OffloadProcessStatus offloadStatus(String topic) throws
PulsarAdminException;
+
+ /**
+ * Get the last commit message Id of a topic
+ *
+ * @param topic the topic name
+ * @return
+ * @throws PulsarAdminException
+ */
+ MessageId getLastMessageId(String topic) throws PulsarAdminException;
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 216275d..5dc84fc 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -950,5 +950,37 @@ public class TopicsImpl extends BaseResource implements
Topics, PersistentTopics
return ret;
}
+ @Override
+ public MessageId getLastMessageId(String topic) throws
PulsarAdminException {
+ try {
+ return (MessageIdImpl) getLastMessageIdAsync(topic).get();
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e.getCause());
+ }
+ }
+
+ public CompletableFuture<MessageId> getLastMessageIdAsync(String topic) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "lastMessageId");
+ final CompletableFuture<MessageId> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<MessageIdImpl>() {
+
+ @Override
+ public void completed(MessageIdImpl response) {
+ future.complete(response);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+
future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
private static final Logger log =
LoggerFactory.getLogger(TopicsImpl.class);
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 5014f14..ceaebf4 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -93,6 +93,7 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("compaction-status", new CompactionStatusCmd());
jcommander.addCommand("offload", new Offload());
jcommander.addCommand("offload-status", new OffloadStatusCmd());
+ jcommander.addCommand("last-message-id", new GetLastMessageId());
}
@Parameters(commandDescription = "Get the list of topics under a
namespace.")
@@ -708,4 +709,16 @@ public class CmdTopics extends CmdBase {
}
}
}
+
+ @Parameters(commandDescription = "get the last commit message id of topic")
+ private class GetLastMessageId extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic",
required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ print(topics.getLastMessageId(persistentTopic));
+ }
+ }
}