LeBW commented on a change in pull request #11553:
URL: https://github.com/apache/pulsar/pull/11553#discussion_r687458847



##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
##########
@@ -273,6 +274,78 @@ public void testGetLastMessageId() throws Exception {
                 .subscriptionType(SubscriptionType.Exclusive)
                 .subscriptionName(subscription)
                 .subscribe();
-        consumer.getLastMessageId();
+    }
+
+    @Test(timeOut = 20000)
+    public void testConsumerGetBrokerEntryMetadataForIndividualMessage() 
throws Exception {
+        final String topic = newTopicName();
+        final String subscription = "my-sub";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(subscription)
+                .subscribe();
+
+        long sendTime = System.currentTimeMillis();
+
+        final int messages = 10;
+        for (int i = 0; i < messages; i++) {
+            producer.send(String.valueOf(i).getBytes());
+        }
+
+        for (int i = 0; i < messages; i++) {
+            Message<byte[]> received = consumer.receive();
+            Assert.assertTrue(
+                    received.hasBrokerPublishTime() && 
received.getBrokerPublishTime().orElse(-1L) >= sendTime);
+            Assert.assertTrue(received.hasIndex() && 
received.getIndex().orElse(-1L) == i);
+        }
+        producer.close();
+        consumer.close();
+    }
+
+    @Test(timeOut = 20000)
+    public void testConsumerGetBrokerEntryMetadataForBatchMessage() throws 
Exception {
+        final String topic = newTopicName();
+        final String subscription = "my-sub";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(true)

Review comment:
       Got it. Fixed.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
##########
@@ -663,6 +664,36 @@ public void release() {
         }
     }
 
+    @Override
+    public boolean hasBrokerPublishTime() {
+        return brokerEntryMetadata != null && 
brokerEntryMetadata.hasBrokerTimestamp();
+    }
+
+    @Override
+    public Optional<Long> getBrokerPublishTime() {
+        if (brokerEntryMetadata != null && 
brokerEntryMetadata.hasBrokerTimestamp()) {
+            return Optional.of(brokerEntryMetadata.getBrokerTimestamp());
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public boolean hasIndex() {
+        return brokerEntryMetadata != null && brokerEntryMetadata.hasIndex();
+    }
+
+    @Override
+    public Optional<Long> getIndex() {
+        if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) {
+            if (msgMetadata.hasNumMessagesInBatch() && messageId instanceof 
BatchMessageIdImpl) {
+                int batchIndex = ((BatchMessageIdImpl) 
messageId).getBatchIndex();
+                return Optional.of(brokerEntryMetadata.getIndex() - 
batchIndex);

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to