This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6af88ae3fa4c927f749a9ce85b9e26924d11cc5b
Author: Santanu Kar <kar.santan...@gmail.com>
AuthorDate: Thu Mar 6 00:56:16 2025 +0530

    [fix][client] Copy eventTime to retry letter topic and DLQ messages (#24059)
    
    (cherry picked from commit 8303b96c4d44681f81279c87495b95ee50e32013)
---
 .../pulsar/client/api/DeadLetterTopicTest.java     | 62 ++++++++++++++++++++++
 .../apache/pulsar/client/api/RetryTopicTest.java   |  5 ++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  8 +++
 .../client/impl/TypedMessageBuilderImpl.java       |  1 -
 4 files changed, 75 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index ab26949c04f..f624b010534 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -25,6 +25,7 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -261,6 +262,67 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
         consumer.close();
     }
 
+    @Test
+    public void testDeadLetterTopicMessagesWithEventTime() throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+
+        final int maxRedeliveryCount = 1;
+
+        final int sendMessages = 100;
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+                .receiverQueueSize(100)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        @Cleanup
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
+                
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+                .subscriptionName("my-subscription")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+
+        long testEventTime = Instant.now().toEpochMilli();
+        for (int i = 0; i < sendMessages; i++) {
+            producer.newMessage()
+                    .eventTime(testEventTime)
+                    .value(String.format("Hello Pulsar, eventTime: [%d]", 
testEventTime).getBytes())
+                    .send();
+        }
+
+        producer.close();
+
+        int totalReceived = 0;
+        do {
+            Message<byte[]> message = consumer.receive();
+            log.info("consumer received message : {} {}", 
message.getMessageId(),
+                    new String(message.getData()));
+            totalReceived++;
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+        int totalInDeadLetter = 0;
+        do {
+            Message<byte[]> message = deadLetterConsumer.receive();
+            assertEquals(message.getEventTime(), testEventTime);
+            log.info("dead letter consumer received message : {} {}", 
message.getMessageId(), new String(message.getData()));
+            deadLetterConsumer.acknowledge(message);
+            totalInDeadLetter++;
+        } while (totalInDeadLetter < sendMessages);
+
+        deadLetterConsumer.close();
+        consumer.close();
+    }
+
     public void testDeadLetterTopicWithProducerName() throws Exception {
         final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
         final String subscription = "my-subscription";
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index 2b897760b6f..8cb595a6854 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -24,6 +24,7 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -300,6 +301,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
 
         byte[] key = "key".getBytes();
         byte[] orderingKey = "orderingKey".getBytes();
+        long eventTime = Instant.now().toEpochMilli();
 
         final int maxRedeliveryCount = 3;
 
@@ -333,6 +335,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
                     .value(String.format("Hello Pulsar [%d]", i).getBytes())
                     .keyBytes(key)
                     .orderingKey(orderingKey)
+                    .eventTime(eventTime)
                     .send();
             originMessageIds.add(msgId.toString());
         }
@@ -350,6 +353,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
                 assertEquals(message.getKeyBytes(), key);
                 assertTrue(message.hasOrderingKey());
                 assertEquals(message.getOrderingKey(), orderingKey);
+                assertEquals(message.getEventTime(), eventTime);
                 
retryMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID));
             }
             consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
@@ -373,6 +377,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
                 assertEquals(message.getKeyBytes(), key);
                 assertTrue(message.hasOrderingKey());
                 assertEquals(message.getOrderingKey(), orderingKey);
+                assertEquals(message.getEventTime(), eventTime);
                 
deadLetterMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID));
             }
             deadLetterConsumer.acknowledge(message);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 7186bfd3fb1..8de4ad7cb5e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -713,6 +713,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                             .value(retryMessage.getData())
                                             .properties(propertiesMap);
                             copyMessageKeysIfNeeded(message, 
typedMessageBuilderNew);
+                            copyMessageEventTime(message, 
typedMessageBuilderNew);
                             
typedMessageBuilderNew.sendAsync().thenAccept(msgId -> {
                                 consumerDlqMessagesCounter.increment();
 
@@ -745,6 +746,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                 typedMessageBuilderNew.deliverAfter(delayTime, 
unit);
                             }
                             copyMessageKeysIfNeeded(message, 
typedMessageBuilderNew);
+                            copyMessageEventTime(message, 
typedMessageBuilderNew);
                             typedMessageBuilderNew.sendAsync()
                                     .thenCompose(
                                             __ -> 
doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null))
@@ -820,6 +822,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return null;
     }
 
+    private static void copyMessageEventTime(Message<?> message,
+                                             TypedMessageBuilder<byte[]> 
typedMessageBuilderNew) {
+        typedMessageBuilderNew.eventTime(message.getEventTime());
+    }
+
     @Override
     public void negativeAcknowledge(MessageId messageId) {
         consumerNacksCounter.increment();
@@ -2221,6 +2228,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                         .value(message.getData())
                                         .properties(getPropertiesMap(message, 
originMessageIdStr, originTopicNameStr));
                         copyMessageKeysIfNeeded(message, 
typedMessageBuilderNew);
+                        copyMessageEventTime(message, typedMessageBuilderNew);
                         typedMessageBuilderNew.sendAsync()
                                 .thenAccept(messageIdInDLQ -> {
                                     
possibleSendToDeadLetterTopicMessages.remove(messageId);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index d90c2e88283..8ef9079091a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -186,7 +186,6 @@ public class TypedMessageBuilderImpl<T> implements 
TypedMessageBuilder<T> {
 
     @Override
     public TypedMessageBuilder<T> eventTime(long timestamp) {
-        checkArgument(timestamp > 0, "Invalid timestamp : '%s'", timestamp);
         msgMetadata.setEventTime(timestamp);
         return this;
     }

Reply via email to