This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a42a29403fc154155be7d9036e7a8529743455bb
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Feb 24 22:52:08 2025 -0800

    [improve][cli] Support additional msg metadata for V1 topic on peek message 
cmd (#23978)
    
    (cherry picked from commit 626b211f91fd8d1e9821ae8e2b9b29520e72ac63)
---
 .../pulsar/admin/cli/CmdPersistentTopics.java      | 24 +-----
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 94 ++++++++++++----------
 2 files changed, 52 insertions(+), 66 deletions(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
index 3dc0ba7b6f2..7b86e2af7f5 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.admin.cli;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.admin.cli.CmdTopics.printMessages;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import io.netty.buffer.ByteBuf;
@@ -37,8 +38,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Topics;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.impl.BatchMessageIdImpl;
-import org.apache.pulsar.client.impl.MessageIdImpl;
 import picocli.CommandLine.Command;
 import picocli.CommandLine.Option;
 import picocli.CommandLine.Parameters;
@@ -589,26 +588,7 @@ public class CmdPersistentTopics extends CmdBase {
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(topicName);
             List<Message<byte[]>> messages = 
getPersistentTopics().peekMessages(persistentTopic, subName, numMessages);
-            int position = 0;
-            for (Message<byte[]> msg : messages) {
-                if (++position != 1) {
-                    
System.out.println("-------------------------------------------------------------------------\n");
-                }
-                if (msg.getMessageId() instanceof BatchMessageIdImpl) {
-                    BatchMessageIdImpl msgId = (BatchMessageIdImpl) 
msg.getMessageId();
-                    System.out.println("Batch Message ID: " + 
msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
-                            + msgId.getBatchIndex());
-                } else {
-                    MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
-                    System.out.println("Message ID: " + msgId.getLedgerId() + 
":" + msgId.getEntryId());
-                }
-                if (msg.getProperties().size() > 0) {
-                    System.out.println("Properties:");
-                    print(msg.getProperties());
-                }
-                ByteBuf data = Unpooled.wrappedBuffer(msg.getData());
-                System.out.println(ByteBufUtil.prettyHexDump(data));
-            }
+            printMessages(messages, false, this);
         }
     }
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 6da45725233..f19723ec6f1 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -1117,50 +1117,7 @@ public class CmdTopics extends CmdBase {
             String persistentTopic = validatePersistentTopic(topicName);
             List<Message<byte[]>> messages = 
getTopics().peekMessages(persistentTopic, subName, numMessages,
                     showServerMarker, transactionIsolationLevel);
-            int position = 0;
-            for (Message<byte[]> msg : messages) {
-                MessageImpl message = (MessageImpl) msg;
-                if (++position != 1) {
-                    
System.out.println("-------------------------------------------------------------------------\n");
-                }
-                if (message.getMessageId() instanceof BatchMessageIdImpl) {
-                    BatchMessageIdImpl msgId = (BatchMessageIdImpl) 
message.getMessageId();
-                    System.out.println("Batch Message ID: " + 
msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
-                            + msgId.getBatchIndex());
-                } else {
-                    MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
-                    System.out.println("Message ID: " + msgId.getLedgerId() + 
":" + msgId.getEntryId());
-                }
-
-                System.out.println("Publish time: " + 
message.getPublishTime());
-                System.out.println("Event time: " + message.getEventTime());
-
-                if (message.getDeliverAtTime() != 0) {
-                    System.out.println("Deliver at time: " + 
message.getDeliverAtTime());
-                }
-                MessageMetadata msgMetaData = message.getMessageBuilder();
-                if (showServerMarker && msgMetaData.hasMarkerType()) {
-                    System.out.println("Marker Type: " + 
MarkerType.valueOf(msgMetaData.getMarkerType()));
-                }
-
-                if (message.getBrokerEntryMetadata() != null) {
-                    if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) 
{
-                        System.out.println("Broker entry metadata timestamp: "
-                                + 
message.getBrokerEntryMetadata().getBrokerTimestamp());
-                    }
-                    if (message.getBrokerEntryMetadata().hasIndex()) {
-                        System.out.println("Broker entry metadata index: "
-                                + message.getBrokerEntryMetadata().getIndex());
-                    }
-                }
-
-                if (message.getProperties().size() > 0) {
-                    System.out.println("Properties:");
-                    print(msg.getProperties());
-                }
-                ByteBuf data = Unpooled.wrappedBuffer(msg.getData());
-                System.out.println(ByteBufUtil.prettyHexDump(data));
-            }
+            printMessages(messages, showServerMarker, this);
         }
     }
 
@@ -1379,6 +1336,55 @@ public class CmdTopics extends CmdBase {
         return null;
     }
 
+    public static void printMessages(List<Message<byte[]>> messages, boolean 
showServerMarker, CliCommand cli) {
+        if (messages == null) {
+            return;
+        }
+        int position = 0;
+        for (Message<byte[]> msg : messages) {
+            MessageImpl message = (MessageImpl) msg;
+            if (++position != 1) {
+                
System.out.println("-------------------------------------------------------------------------\n");
+            }
+            if (message.getMessageId() instanceof BatchMessageIdImpl) {
+                BatchMessageIdImpl msgId = (BatchMessageIdImpl) 
message.getMessageId();
+                System.out.println("Batch Message ID: " + msgId.getLedgerId() 
+ ":" + msgId.getEntryId() + ":"
+                        + msgId.getBatchIndex());
+            } else {
+                MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+                System.out.println("Message ID: " + msgId.getLedgerId() + ":" 
+ msgId.getEntryId());
+            }
+
+            System.out.println("Publish time: " + message.getPublishTime());
+            System.out.println("Event time: " + message.getEventTime());
+
+            if (message.getDeliverAtTime() != 0) {
+                System.out.println("Deliver at time: " + 
message.getDeliverAtTime());
+            }
+            MessageMetadata msgMetaData = message.getMessageBuilder();
+            if (showServerMarker && msgMetaData.hasMarkerType()) {
+                System.out.println("Marker Type: " + 
MarkerType.valueOf(msgMetaData.getMarkerType()));
+            }
+
+            if (message.getBrokerEntryMetadata() != null) {
+                if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) {
+                    System.out.println("Broker entry metadata timestamp: "
+                            + 
message.getBrokerEntryMetadata().getBrokerTimestamp());
+                }
+                if (message.getBrokerEntryMetadata().hasIndex()) {
+                    System.out.println("Broker entry metadata index: " + 
message.getBrokerEntryMetadata().getIndex());
+                }
+            }
+
+            if (message.getProperties().size() > 0) {
+                System.out.println("Properties:");
+                cli.print(msg.getProperties());
+            }
+            ByteBuf data = Unpooled.wrappedBuffer(msg.getData());
+            System.out.println(ByteBufUtil.prettyHexDump(data));
+        }
+    }
+
     @Command(description = "Trigger offload of data from a topic to long-term 
storage (e.g. Amazon S3)")
     private class Offload extends CliCommand {
         @Option(names = { "-s", "--size-threshold" },

Reply via email to