This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6af88ae3fa4c927f749a9ce85b9e26924d11cc5b Author: Santanu Kar <kar.santan...@gmail.com> AuthorDate: Thu Mar 6 00:56:16 2025 +0530 [fix][client] Copy eventTime to retry letter topic and DLQ messages (#24059) (cherry picked from commit 8303b96c4d44681f81279c87495b95ee50e32013) --- .../pulsar/client/api/DeadLetterTopicTest.java | 62 ++++++++++++++++++++++ .../apache/pulsar/client/api/RetryTopicTest.java | 5 ++ .../apache/pulsar/client/impl/ConsumerImpl.java | 8 +++ .../client/impl/TypedMessageBuilderImpl.java | 1 - 4 files changed, 75 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index ab26949c04f..f624b010534 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -25,6 +25,7 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -261,6 +262,67 @@ public class DeadLetterTopicTest extends ProducerConsumerBase { consumer.close(); } + @Test + public void testDeadLetterTopicMessagesWithEventTime() throws Exception { + final String topic = "persistent://my-property/my-ns/dead-letter-topic"; + + final int maxRedeliveryCount = 1; + + final int sendMessages = 100; + + Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1, TimeUnit.SECONDS) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + @Cleanup + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ") + .subscriptionName("my-subscription") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + + long testEventTime = Instant.now().toEpochMilli(); + for (int i = 0; i < sendMessages; i++) { + producer.newMessage() + .eventTime(testEventTime) + .value(String.format("Hello Pulsar, eventTime: [%d]", testEventTime).getBytes()) + .send(); + } + + producer.close(); + + int totalReceived = 0; + do { + Message<byte[]> message = consumer.receive(); + log.info("consumer received message : {} {}", message.getMessageId(), + new String(message.getData())); + totalReceived++; + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + + int totalInDeadLetter = 0; + do { + Message<byte[]> message = deadLetterConsumer.receive(); + assertEquals(message.getEventTime(), testEventTime); + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + + deadLetterConsumer.close(); + consumer.close(); + } + public void testDeadLetterTopicWithProducerName() throws Exception { final String topic = "persistent://my-property/my-ns/dead-letter-topic"; final String subscription = "my-subscription"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java index 2b897760b6f..8cb595a6854 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java @@ -24,6 +24,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import java.time.Instant; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -300,6 +301,7 @@ public class RetryTopicTest extends ProducerConsumerBase { byte[] key = "key".getBytes(); byte[] orderingKey = "orderingKey".getBytes(); + long eventTime = Instant.now().toEpochMilli(); final int maxRedeliveryCount = 3; @@ -333,6 +335,7 @@ public class RetryTopicTest extends ProducerConsumerBase { .value(String.format("Hello Pulsar [%d]", i).getBytes()) .keyBytes(key) .orderingKey(orderingKey) + .eventTime(eventTime) .send(); originMessageIds.add(msgId.toString()); } @@ -350,6 +353,7 @@ public class RetryTopicTest extends ProducerConsumerBase { assertEquals(message.getKeyBytes(), key); assertTrue(message.hasOrderingKey()); assertEquals(message.getOrderingKey(), orderingKey); + assertEquals(message.getEventTime(), eventTime); retryMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID)); } consumer.reconsumeLater(message, 1, TimeUnit.SECONDS); @@ -373,6 +377,7 @@ public class RetryTopicTest extends ProducerConsumerBase { assertEquals(message.getKeyBytes(), key); assertTrue(message.hasOrderingKey()); assertEquals(message.getOrderingKey(), orderingKey); + assertEquals(message.getEventTime(), eventTime); deadLetterMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID)); } deadLetterConsumer.acknowledge(message); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 7186bfd3fb1..8de4ad7cb5e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -713,6 +713,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle .value(retryMessage.getData()) .properties(propertiesMap); copyMessageKeysIfNeeded(message, typedMessageBuilderNew); + copyMessageEventTime(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync().thenAccept(msgId -> { consumerDlqMessagesCounter.increment(); @@ -745,6 +746,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle typedMessageBuilderNew.deliverAfter(delayTime, unit); } copyMessageKeysIfNeeded(message, typedMessageBuilderNew); + copyMessageEventTime(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync() .thenCompose( __ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) @@ -820,6 +822,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return null; } + private static void copyMessageEventTime(Message<?> message, + TypedMessageBuilder<byte[]> typedMessageBuilderNew) { + typedMessageBuilderNew.eventTime(message.getEventTime()); + } + @Override public void negativeAcknowledge(MessageId messageId) { consumerNacksCounter.increment(); @@ -2221,6 +2228,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle .value(message.getData()) .properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr)); copyMessageKeysIfNeeded(message, typedMessageBuilderNew); + copyMessageEventTime(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync() .thenAccept(messageIdInDLQ -> { possibleSendToDeadLetterTopicMessages.remove(messageId); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java index d90c2e88283..8ef9079091a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java @@ -186,7 +186,6 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> { @Override public TypedMessageBuilder<T> eventTime(long timestamp) { - checkArgument(timestamp > 0, "Invalid timestamp : '%s'", timestamp); msgMetadata.setEventTime(timestamp); return this; }