This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 68b0ca4abd001fcfef267b6dc18da26a2bbd19ed
Author: Aloys <[email protected]>
AuthorDate: Mon Jul 12 22:23:54 2021 +0800

    Expose broker entry metadata and deliverAtTime to 
peekMessages/getMessageById/examineMessage  (#11279)
    
    
    (cherry picked from commit ea628541950a2cd02746ef0cfce02528b7824fed)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 14 ++++
 .../broker/service/BrokerEntryMetadataE2ETest.java | 89 +++++++++++++++++++++-
 .../pulsar/client/admin/internal/TopicsImpl.java   | 57 ++++++++++++--
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 27 ++++++-
 .../org/apache/pulsar/client/impl/MessageImpl.java |  7 ++
 5 files changed, 181 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 234f153..fa3e939 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -89,6 +89,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeyValue;
@@ -2478,6 +2479,7 @@ public class PersistentTopicsBase extends AdminResource {
         PositionImpl pos = (PositionImpl) entry.getPosition();
         ByteBuf metadataAndPayload = entry.getDataBuffer();
 
+        BrokerEntryMetadata brokerEntryMetadata = 
Commands.peekBrokerEntryMetadataIfExist(metadataAndPayload);
         MessageMetadata metadata = 
Commands.parseMessageMetadata(metadataAndPayload);
 
         ResponseBuilder responseBuilder = Response.ok();
@@ -2485,12 +2487,24 @@ public class PersistentTopicsBase extends AdminResource 
{
         for (KeyValue keyValue : metadata.getPropertiesList()) {
             responseBuilder.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), 
keyValue.getValue());
         }
+        if (brokerEntryMetadata != null) {
+            if (brokerEntryMetadata.hasBrokerTimestamp()) {
+                
responseBuilder.header("X-Pulsar-Broker-Entry-METADATA-timestamp",
+                        
DateFormatter.format(brokerEntryMetadata.getBrokerTimestamp()));
+            }
+            if (brokerEntryMetadata.hasIndex()) {
+                responseBuilder.header("X-Pulsar-Broker-Entry-METADATA-index", 
brokerEntryMetadata.getIndex());
+            }
+        }
         if (metadata.hasPublishTime()) {
             responseBuilder.header("X-Pulsar-publish-time", 
DateFormatter.format(metadata.getPublishTime()));
         }
         if (metadata.hasEventTime()) {
             responseBuilder.header("X-Pulsar-event-time", 
DateFormatter.format(metadata.getEventTime()));
         }
+        if (metadata.hasDeliverAtTime()) {
+            responseBuilder.header("X-Pulsar-deliver-at-time", 
DateFormatter.format(metadata.getDeliverAtTime()));
+        }
         if (metadata.hasNumMessagesInBatch()) {
             responseBuilder.header("X-Pulsar-num-batch-message", 
metadata.getNumMessagesInBatch());
             responseBuilder.header("X-Pulsar-batch-size", 
metadataAndPayload.readableBytes()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
index e7d98a8..52c8375 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -25,6 +25,9 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
 import org.assertj.core.util.Sets;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -38,6 +41,7 @@ import org.testng.annotations.Test;
 @Test(groups = "broker")
 public class BrokerEntryMetadataE2ETest extends BrokerTestBase {
 
+
     @DataProvider(name = "subscriptionTypes")
     public static Object[] subscriptionTypes() {
         return new Object[] {
@@ -97,17 +101,98 @@ public class BrokerEntryMetadataE2ETest extends 
BrokerTestBase {
     public void testPeekMessage() throws Exception {
         final String topic = newTopicName();
         final String subscription = "my-sub";
+        final long eventTime= 200;
+        final long deliverAtTime = 300;
 
         @Cleanup
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic(topic)
                 .create();
-        producer.newMessage().value("hello".getBytes()).send();
+
+        long sendTime = System.currentTimeMillis();
+        producer.newMessage()
+                .eventTime(eventTime)
+                .deliverAt(deliverAtTime)
+                .value("hello".getBytes())
+                .send();
 
         admin.topics().createSubscription(topic, subscription, 
MessageId.earliest);
         final List<Message<byte[]>> messages = 
admin.topics().peekMessages(topic, subscription, 1);
         Assert.assertEquals(messages.size(), 1);
-        Assert.assertEquals(messages.get(0).getData(), "hello".getBytes());
+        MessageImpl message = (MessageImpl) messages.get(0);
+        Assert.assertEquals(message.getData(), "hello".getBytes());
+        Assert.assertEquals(message.getEventTime(), eventTime);
+        Assert.assertEquals(message.getDeliverAtTime(), deliverAtTime);
+        Assert.assertTrue(message.getPublishTime() >= sendTime);
+
+        BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata();
+        Assert.assertEquals(entryMetadata.getIndex(), 0);
+        Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
+    }
+
+    @Test(timeOut = 20000)
+    public void testGetMessageById() throws Exception {
+        final String topic = newTopicName();
+        final String subscription = "my-sub";
+        final long eventTime= 200;
+        final long deliverAtTime = 300;
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+
+        long sendTime = System.currentTimeMillis();
+        MessageIdImpl messageId = (MessageIdImpl) producer.newMessage()
+                .eventTime(eventTime)
+                .deliverAt(deliverAtTime)
+                .value("hello".getBytes())
+                .send();
+
+        admin.topics().createSubscription(topic, subscription, 
MessageId.earliest);
+        MessageImpl message = (MessageImpl) admin.topics()
+                .getMessageById(topic, messageId.getLedgerId(), 
messageId.getEntryId());
+        Assert.assertEquals(message.getData(), "hello".getBytes());
+        Assert.assertEquals(message.getEventTime(), eventTime);
+        Assert.assertEquals(message.getDeliverAtTime(), deliverAtTime);
+        Assert.assertTrue(message.getPublishTime() >= sendTime);
+
+        BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata();
+        Assert.assertEquals(entryMetadata.getIndex(), 0);
+        Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
+    }
+
+
+    @Test(timeOut = 20000)
+    public void testExamineMessage() throws Exception {
+        final String topic = newTopicName();
+        final String subscription = "my-sub";
+        final long eventTime= 200;
+        final long deliverAtTime = 300;
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+
+        long sendTime = System.currentTimeMillis();
+        producer.newMessage()
+                .eventTime(eventTime)
+                .deliverAt(deliverAtTime)
+                .value("hello".getBytes())
+                .send();
+
+        admin.topics().createSubscription(topic, subscription, 
MessageId.earliest);
+        MessageImpl message =
+                (MessageImpl) admin.topics().examineMessage(topic, "earliest", 
1);
+        Assert.assertEquals(message.getData(), "hello".getBytes());
+        Assert.assertEquals(message.getEventTime(), eventTime);
+        Assert.assertEquals(message.getDeliverAtTime(), deliverAtTime);
+        Assert.assertTrue(message.getPublishTime() >= sendTime);
+
+        BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata();
+        Assert.assertEquals(entryMetadata.getIndex(), 0);
+        Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
     }
 
     @Test(timeOut = 20000)
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 647804f..d46f214 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.ResetCursorData;
+import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
 import org.apache.pulsar.common.api.proto.KeyValue;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
@@ -84,6 +85,7 @@ import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.DateFormatter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,6 +97,10 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     private static final String BATCH_SIZE_HEADER = "X-Pulsar-batch-size";
     private static final String MESSAGE_ID = "X-Pulsar-Message-ID";
     private static final String PUBLISH_TIME = "X-Pulsar-publish-time";
+    private static final String EVENT_TIME = "X-Pulsar-event-time";
+    private static final String DELIVER_AT_TIME = "X-Pulsar-deliver-at-time";
+    private static final String BROKER_ENTRY_TIMESTAMP = 
"X-Pulsar-Broker-Entry-METADATA-timestamp";
+    private static final String BROKER_ENTRY_INDEX =  
"X-Pulsar-Broker-Entry-METADATA-index";
     // CHECKSTYLE.ON: MemberName
 
     public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
@@ -1466,6 +1472,24 @@ public class TopicsImpl extends BaseResource implements 
Topics {
         }
 
         String msgId = response.getHeaderString(MESSAGE_ID);
+
+        // build broker entry metadata if exist
+        String brokerEntryTimestamp = 
response.getHeaderString(BROKER_ENTRY_TIMESTAMP);
+        String brokerEntryIndex = response.getHeaderString(BROKER_ENTRY_INDEX);
+        BrokerEntryMetadata brokerEntryMetadata;
+        if (brokerEntryTimestamp == null && brokerEntryIndex == null) {
+            brokerEntryMetadata = null;
+        } else {
+            brokerEntryMetadata = new BrokerEntryMetadata();
+            if (brokerEntryTimestamp != null) {
+                
brokerEntryMetadata.setBrokerTimestamp(DateFormatter.parse(brokerEntryTimestamp.toString()));
+            }
+
+            if (brokerEntryIndex != null) {
+                brokerEntryMetadata.setIndex(Long.parseLong(brokerEntryIndex));
+            }
+        }
+
         MessageMetadata messageMetadata = new MessageMetadata();
         try (InputStream stream = (InputStream) response.getEntity()) {
             byte[] data = new byte[stream.available()];
@@ -1475,7 +1499,17 @@ public class TopicsImpl extends BaseResource implements 
Topics {
             MultivaluedMap<String, Object> headers = response.getHeaders();
             Object tmp = headers.getFirst(PUBLISH_TIME);
             if (tmp != null) {
-                properties.put("publish-time", (String) tmp);
+                
messageMetadata.setPublishTime(DateFormatter.parse(tmp.toString()));
+            }
+
+            tmp = headers.getFirst(EVENT_TIME);
+            if (tmp != null) {
+                
messageMetadata.setEventTime(DateFormatter.parse(tmp.toString()));
+            }
+
+            tmp = headers.getFirst(DELIVER_AT_TIME);
+            if (tmp != null) {
+                
messageMetadata.setDeliverAtTime(DateFormatter.parse(tmp.toString()));
             }
 
             tmp = headers.getFirst("X-Pulsar-null-value");
@@ -1508,16 +1542,21 @@ public class TopicsImpl extends BaseResource implements 
Topics {
             }
 
             if (!isEncrypted && response.getHeaderString(BATCH_HEADER) != 
null) {
-                return getIndividualMsgsFromBatch(topic, msgId, data, 
properties, messageMetadata);
+                return getIndividualMsgsFromBatch(topic, msgId, data, 
properties, messageMetadata, brokerEntryMetadata);
             }
 
-            return Collections.singletonList(new MessageImpl<byte[]>(topic, 
msgId, properties,
-                    Unpooled.wrappedBuffer(data), Schema.BYTES, 
messageMetadata));
+            MessageImpl message = new MessageImpl(topic, msgId, properties,
+                    Unpooled.wrappedBuffer(data), Schema.BYTES, 
messageMetadata);
+            if (brokerEntryMetadata != null) {
+                message.setBrokerEntryMetadata(brokerEntryMetadata);
+            }
+            return Collections.singletonList(message);
         }
     }
 
     private List<Message<byte[]>> getIndividualMsgsFromBatch(String topic, 
String msgId, byte[] data,
-                                 Map<String, String> properties, 
MessageMetadata msgMetadataBuilder) {
+                                 Map<String, String> properties, 
MessageMetadata msgMetadataBuilder,
+                                                             
BrokerEntryMetadata brokerEntryMetadata) {
         List<Message<byte[]>> ret = new ArrayList<>();
         int batchSize = Integer.parseInt(properties.get(BATCH_HEADER));
         ByteBuf buf = Unpooled.wrappedBuffer(data);
@@ -1532,8 +1571,12 @@ public class TopicsImpl extends BaseResource implements 
Topics {
                         properties.put(entry.getKey(), entry.getValue());
                     }
                 }
-                ret.add(new MessageImpl<>(topic, batchMsgId, properties, 
singleMessagePayload,
-                        Schema.BYTES, msgMetadataBuilder));
+                MessageImpl message = new MessageImpl<>(topic, batchMsgId, 
properties, singleMessagePayload,
+                        Schema.BYTES, msgMetadataBuilder);
+                if (brokerEntryMetadata != null) {
+                    message.setBrokerEntryMetadata(brokerEntryMetadata);
+                }
+                ret.add(message);
             } catch (Exception ex) {
                 log.error("Exception occurred while trying to get BatchMsgId: 
{}", batchMsgId, ex);
             }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index da42686..b751196 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
@@ -835,18 +836,36 @@ public class CmdTopics extends CmdBase {
             List<Message<byte[]>> messages = 
getTopics().peekMessages(persistentTopic, subName, numMessages);
             int position = 0;
             for (Message<byte[]> msg : messages) {
+                MessageImpl message = (MessageImpl) msg;
                 if (++position != 1) {
                     
System.out.println("-------------------------------------------------------------------------\n");
                 }
-                if (msg.getMessageId() instanceof BatchMessageIdImpl) {
-                    BatchMessageIdImpl msgId = (BatchMessageIdImpl) 
msg.getMessageId();
+                if (message.getMessageId() instanceof BatchMessageIdImpl) {
+                    BatchMessageIdImpl msgId = (BatchMessageIdImpl) 
message.getMessageId();
                     System.out.println("Batch Message ID: " + 
msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + msgId.getBatchIndex());
                 } else {
                     MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
                     System.out.println("Message ID: " + msgId.getLedgerId() + 
":" + msgId.getEntryId());
                 }
-                if (msg.getProperties().size() > 0) {
-                    System.out.println("Tenants:");
+
+                System.out.println("Publish time: " + 
message.getPublishTime());
+                System.out.println("Event time: " + message.getEventTime());
+
+                if (message.getDeliverAtTime() != 0) {
+                    System.out.println("Deliver at time: " + 
message.getDeliverAtTime());
+                }
+
+                if (message.getBrokerEntryMetadata() != null) {
+                    if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) 
{
+                        System.out.println("Broker entry metadata timestamp: " 
+ message.getBrokerEntryMetadata().getBrokerTimestamp());
+                    }
+                    if (message.getBrokerEntryMetadata().hasIndex()) {
+                        System.out.println("Broker entry metadata index: " + 
message.getBrokerEntryMetadata().getIndex());
+                    }
+                }
+
+                if (message.getProperties().size() > 0) {
+                    System.out.println("Properties:");
                     print(msg.getProperties());
                 }
                 ByteBuf data = Unpooled.wrappedBuffer(msg.getData());
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index c9370f3..7f0c113 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -341,6 +341,13 @@ public class MessageImpl<T> implements Message<T> {
         return 0;
     }
 
+    public long getDeliverAtTime() {
+        if (msgMetadata.hasDeliverAtTime()) {
+            return msgMetadata.getDeliverAtTime();
+        }
+        return 0;
+    }
+
     public boolean isExpired(int messageTTLInSeconds) {
         return messageTTLInSeconds != 0 && (brokerEntryMetadata == null || 
!brokerEntryMetadata.hasBrokerTimestamp()
                 ? (System.currentTimeMillis() >

Reply via email to