This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0622649b2cdb3d0b8fac943972e025e70f5a3200 Author: lipenghui <[email protected]> AuthorDate: Sat Jun 26 00:27:00 2021 +0800 Print message metadata when getting message by id (#11092) ``` Batch Message ID: 10:1:0 Properties: "X-Pulsar-batch-size 26825" "X-Pulsar-num-batch-message 26" "publish-time 2021-06-25T16:00:40.919+08:00" ``` (cherry picked from commit cadf59d18d6f71f1da06f425709bffa1b10fcf11) --- .../broker/admin/impl/PersistentTopicsBase.java | 2 ++ .../pulsar/client/admin/internal/TopicsImpl.java | 21 ++++++++++++++------- .../java/org/apache/pulsar/admin/cli/CmdTopics.java | 11 +++++++++++ 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 96377de..541a2b9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2336,6 +2336,8 @@ public class PersistentTopicsBase extends AdminResource { } if (metadata.hasNumMessagesInBatch()) { responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch()); + responseBuilder.header("X-Pulsar-batch-size", metadataAndPayload.readableBytes() + - metadata.getSerializedSize()); } if (metadata.hasNullValue()) { responseBuilder.header("X-Pulsar-null-value", metadata.hasNullValue()); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 550b7d4..9f2a911 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -91,9 +91,10 @@ public class TopicsImpl extends BaseResource implements Topics { private final WebTarget adminTopics; private final WebTarget adminV2Topics; // CHECKSTYLE.OFF: MemberName - private final String BATCH_HEADER = "X-Pulsar-num-batch-message"; - private final String MESSAGE_ID = "X-Pulsar-Message-ID"; - private final String PUBLISH_TIME = "X-Pulsar-publish-time"; + private static final String BATCH_HEADER = "X-Pulsar-num-batch-message"; + private static final String BATCH_SIZE_HEADER = "X-Pulsar-batch-size"; + private static final String MESSAGE_ID = "X-Pulsar-Message-ID"; + private static final String PUBLISH_TIME = "X-Pulsar-publish-time"; // CHECKSTYLE.ON: MemberName public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) { @@ -1437,11 +1438,11 @@ public class TopicsImpl extends BaseResource implements Topics { messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString())); } - tmp = headers.getFirst(BATCH_HEADER); - if (response.getHeaderString(BATCH_HEADER) != null) { - properties.put(BATCH_HEADER, (String) tmp); - return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata); + tmp = headers.getFirst(BATCH_SIZE_HEADER); + if (tmp != null) { + properties.put(BATCH_SIZE_HEADER, (String) tmp); } + for (Entry<String, List<Object>> entry : headers.entrySet()) { String header = entry.getKey(); if (header.contains("X-Pulsar-PROPERTY-")) { @@ -1450,6 +1451,12 @@ public class TopicsImpl extends BaseResource implements Topics { } } + tmp = headers.getFirst(BATCH_HEADER); + if (response.getHeaderString(BATCH_HEADER) != null) { + properties.put(BATCH_HEADER, (String) tmp); + return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata); + } + return Collections.singletonList(new MessageImpl<byte[]>(topic, msgId, properties, Unpooled.wrappedBuffer(data), Schema.BYTES, messageMetadata)); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 65679d1..dbc9ff2 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -884,6 +884,17 @@ public class CmdTopics extends CmdBase { System.out.println("Cannot find any messages based on ledgerId:" + ledgerId + " entryId:" + entryId); } else { + if (message.getMessageId() instanceof BatchMessageIdImpl) { + BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId(); + System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + msgId.getBatchIndex()); + } else { + MessageIdImpl msgId = (MessageIdImpl) message.getMessageId(); + System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); + } + if (message.getProperties().size() > 0) { + System.out.println("Properties:"); + print(message.getProperties()); + } ByteBuf date = Unpooled.wrappedBuffer(message.getData()); System.out.println(ByteBufUtil.prettyHexDump(date)); }
