This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 626b211f91f [improve][cli] Support additional msg metadata for V1
topic on peek message cmd (#23978)
626b211f91f is described below
commit 626b211f91fd8d1e9821ae8e2b9b29520e72ac63
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Feb 24 22:52:08 2025 -0800
[improve][cli] Support additional msg metadata for V1 topic on peek message
cmd (#23978)
---
.../pulsar/admin/cli/CmdPersistentTopics.java | 24 +-----
.../org/apache/pulsar/admin/cli/CmdTopics.java | 94 ++++++++++++----------
2 files changed, 52 insertions(+), 66 deletions(-)
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
index 3dc0ba7b6f2..7b86e2af7f5 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.admin.cli;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.admin.cli.CmdTopics.printMessages;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.netty.buffer.ByteBuf;
@@ -37,8 +38,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.impl.BatchMessageIdImpl;
-import org.apache.pulsar.client.impl.MessageIdImpl;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import picocli.CommandLine.Parameters;
@@ -589,26 +588,7 @@ public class CmdPersistentTopics extends CmdBase {
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(topicName);
List<Message<byte[]>> messages =
getPersistentTopics().peekMessages(persistentTopic, subName, numMessages);
- int position = 0;
- for (Message<byte[]> msg : messages) {
- if (++position != 1) {
-
System.out.println("-------------------------------------------------------------------------\n");
- }
- if (msg.getMessageId() instanceof BatchMessageIdImpl) {
- BatchMessageIdImpl msgId = (BatchMessageIdImpl)
msg.getMessageId();
- System.out.println("Batch Message ID: " +
msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
- + msgId.getBatchIndex());
- } else {
- MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
- System.out.println("Message ID: " + msgId.getLedgerId() +
":" + msgId.getEntryId());
- }
- if (msg.getProperties().size() > 0) {
- System.out.println("Properties:");
- print(msg.getProperties());
- }
- ByteBuf data = Unpooled.wrappedBuffer(msg.getData());
- System.out.println(ByteBufUtil.prettyHexDump(data));
- }
+ printMessages(messages, false, this);
}
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 22073b1a89d..ca15e111390 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -1117,50 +1117,7 @@ public class CmdTopics extends CmdBase {
String persistentTopic = validatePersistentTopic(topicName);
List<Message<byte[]>> messages =
getTopics().peekMessages(persistentTopic, subName, numMessages,
showServerMarker, transactionIsolationLevel);
- int position = 0;
- for (Message<byte[]> msg : messages) {
- MessageImpl message = (MessageImpl) msg;
- if (++position != 1) {
-
System.out.println("-------------------------------------------------------------------------\n");
- }
- if (message.getMessageId() instanceof BatchMessageIdImpl) {
- BatchMessageIdImpl msgId = (BatchMessageIdImpl)
message.getMessageId();
- System.out.println("Batch Message ID: " +
msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
- + msgId.getBatchIndex());
- } else {
- MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
- System.out.println("Message ID: " + msgId.getLedgerId() +
":" + msgId.getEntryId());
- }
-
- System.out.println("Publish time: " +
message.getPublishTime());
- System.out.println("Event time: " + message.getEventTime());
-
- if (message.getDeliverAtTime() != 0) {
- System.out.println("Deliver at time: " +
message.getDeliverAtTime());
- }
- MessageMetadata msgMetaData = message.getMessageBuilder();
- if (showServerMarker && msgMetaData.hasMarkerType()) {
- System.out.println("Marker Type: " +
MarkerType.valueOf(msgMetaData.getMarkerType()));
- }
-
- if (message.getBrokerEntryMetadata() != null) {
- if (message.getBrokerEntryMetadata().hasBrokerTimestamp())
{
- System.out.println("Broker entry metadata timestamp: "
- +
message.getBrokerEntryMetadata().getBrokerTimestamp());
- }
- if (message.getBrokerEntryMetadata().hasIndex()) {
- System.out.println("Broker entry metadata index: "
- + message.getBrokerEntryMetadata().getIndex());
- }
- }
-
- if (message.getProperties().size() > 0) {
- System.out.println("Properties:");
- print(msg.getProperties());
- }
- ByteBuf data = Unpooled.wrappedBuffer(msg.getData());
- System.out.println(ByteBufUtil.prettyHexDump(data));
- }
+ printMessages(messages, showServerMarker, this);
}
}
@@ -1379,6 +1336,55 @@ public class CmdTopics extends CmdBase {
return null;
}
+ public static void printMessages(List<Message<byte[]>> messages, boolean
showServerMarker, CliCommand cli) {
+ if (messages == null) {
+ return;
+ }
+ int position = 0;
+ for (Message<byte[]> msg : messages) {
+ MessageImpl message = (MessageImpl) msg;
+ if (++position != 1) {
+
System.out.println("-------------------------------------------------------------------------\n");
+ }
+ if (message.getMessageId() instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl msgId = (BatchMessageIdImpl)
message.getMessageId();
+ System.out.println("Batch Message ID: " + msgId.getLedgerId()
+ ":" + msgId.getEntryId() + ":"
+ + msgId.getBatchIndex());
+ } else {
+ MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+ System.out.println("Message ID: " + msgId.getLedgerId() + ":"
+ msgId.getEntryId());
+ }
+
+ System.out.println("Publish time: " + message.getPublishTime());
+ System.out.println("Event time: " + message.getEventTime());
+
+ if (message.getDeliverAtTime() != 0) {
+ System.out.println("Deliver at time: " +
message.getDeliverAtTime());
+ }
+ MessageMetadata msgMetaData = message.getMessageBuilder();
+ if (showServerMarker && msgMetaData.hasMarkerType()) {
+ System.out.println("Marker Type: " +
MarkerType.valueOf(msgMetaData.getMarkerType()));
+ }
+
+ if (message.getBrokerEntryMetadata() != null) {
+ if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) {
+ System.out.println("Broker entry metadata timestamp: "
+ +
message.getBrokerEntryMetadata().getBrokerTimestamp());
+ }
+ if (message.getBrokerEntryMetadata().hasIndex()) {
+ System.out.println("Broker entry metadata index: " +
message.getBrokerEntryMetadata().getIndex());
+ }
+ }
+
+ if (message.getProperties().size() > 0) {
+ System.out.println("Properties:");
+ cli.print(msg.getProperties());
+ }
+ ByteBuf data = Unpooled.wrappedBuffer(msg.getData());
+ System.out.println(ByteBufUtil.prettyHexDump(data));
+ }
+ }
+
@Command(description = "Trigger offload of data from a topic to long-term
storage (e.g. Amazon S3)")
private class Offload extends CliCommand {
@Option(names = { "-s", "--size-threshold" },