This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c83428fa328 [fix][client] Copy orderingKey to retry letter topic and 
DLQ messages and fix bug in copying (#23182)
c83428fa328 is described below

commit c83428fa328b327c9e7e1ae48878e8ddcd99a5db
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Aug 16 06:33:18 2024 +0300

    [fix][client] Copy orderingKey to retry letter topic and DLQ messages and 
fix bug in copying (#23182)
    
    Fixes #23173
    Fixes #23181
    
    See #23173 and #23181
    
    - copy ordering key to messages sent to retry letter topic and DLQ topic
    
    (cherry picked from commit 67fc5b9f5342bd35d3fdacf37cf172a629ee15f9)
---
 .../pulsar/client/api/DeadLetterTopicTest.java     | 60 ++++++++++++++++++++++
 .../apache/pulsar/client/api/RetryTopicTest.java   | 17 +++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 10 ++--
 3 files changed, 83 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 143b463fd3b..dd36d4fdc4d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -197,6 +197,66 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
         consumer.close();
     }
 
+    @Test
+    public void testDeadLetterTopicMessagesWithOrderingKey() throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+
+        final int maxRedeliveryCount = 1;
+
+        final int sendMessages = 100;
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+                .receiverQueueSize(100)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        @Cleanup
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
+                
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+                .subscriptionName("my-subscription")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+
+        byte[] key = new byte[]{1, 2, 3, 4};
+        for (int i = 0; i < sendMessages; i++) {
+            producer.newMessage()
+                    .orderingKey(key)
+                    .value(String.format("Hello Pulsar [%d]", i).getBytes())
+                    .send();
+        }
+
+        producer.close();
+
+        int totalReceived = 0;
+        do {
+            Message<byte[]> message = consumer.receive();
+            log.info("consumer received message : {} {}", 
message.getMessageId(), new String(message.getData()));
+            totalReceived++;
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+        int totalInDeadLetter = 0;
+        do {
+            Message message = deadLetterConsumer.receive();
+            assertEquals(message.getOrderingKey(), key);
+            log.info("dead letter consumer received message : {} {}", 
message.getMessageId(), new String(message.getData()));
+            deadLetterConsumer.acknowledge(message);
+            totalInDeadLetter++;
+        } while (totalInDeadLetter < sendMessages);
+
+        deadLetterConsumer.close();
+        consumer.close();
+    }
+
     @DataProvider(name = "produceLargeMessages")
     public Object[][] produceLargeMessages() {
         return new Object[][] { { false }, { true } };
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index 2ccae721434..9cb82fde041 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -257,6 +257,9 @@ public class RetryTopicTest extends ProducerConsumerBase {
     public void testRetryTopicProperties() throws Exception {
         final String topic = "persistent://my-property/my-ns/retry-topic";
 
+        byte[] key = "key".getBytes();
+        byte[] orderingKey = "orderingKey".getBytes();
+
         final int maxRedeliveryCount = 3;
 
         final int sendMessages = 10;
@@ -285,7 +288,11 @@ public class RetryTopicTest extends ProducerConsumerBase {
 
         Set<String> originMessageIds = new HashSet<>();
         for (int i = 0; i < sendMessages; i++) {
-            MessageId msgId = producer.send(String.format("Hello Pulsar [%d]", 
i).getBytes());
+            MessageId msgId = producer.newMessage()
+                    .value(String.format("Hello Pulsar [%d]", i).getBytes())
+                    .keyBytes(key)
+                    .orderingKey(orderingKey)
+                    .send();
             originMessageIds.add(msgId.toString());
         }
 
@@ -298,6 +305,10 @@ public class RetryTopicTest extends ProducerConsumerBase {
             if 
(message.hasProperty(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
                 // check the REAL_TOPIC property
                 
assertEquals(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), 
topic);
+                assertTrue(message.hasKey());
+                assertEquals(message.getKeyBytes(), key);
+                assertTrue(message.hasOrderingKey());
+                assertEquals(message.getOrderingKey(), orderingKey);
                 
retryMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID));
             }
             consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
@@ -317,6 +328,10 @@ public class RetryTopicTest extends ProducerConsumerBase {
             if 
(message.hasProperty(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
                 // check the REAL_TOPIC property
                 
assertEquals(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), 
topic);
+                assertTrue(message.hasKey());
+                assertEquals(message.getKeyBytes(), key);
+                assertTrue(message.hasOrderingKey());
+                assertEquals(message.getOrderingKey(), orderingKey);
                 
deadLetterMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID));
             }
             deadLetterConsumer.acknowledge(message);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8121cc61e62..022b194072d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -588,7 +588,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
     }
 
-    private static void copyMessageKeyIfNeeded(Message<?> message, 
TypedMessageBuilder<?> typedMessageBuilderNew) {
+    private static void copyMessageKeysIfNeeded(Message<?> message, 
TypedMessageBuilder<?> typedMessageBuilderNew) {
         if (message.hasKey()) {
             if (message.hasBase64EncodedKey()) {
                 typedMessageBuilderNew.keyBytes(message.getKeyBytes());
@@ -596,6 +596,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 typedMessageBuilderNew.key(message.getKey());
             }
         }
+        if (message.hasOrderingKey()) {
+            typedMessageBuilderNew.orderingKey(message.getOrderingKey());
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -653,6 +656,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                 
dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get()))
                                         .value(retryMessage.getData())
                                         .properties(propertiesMap);
+                        copyMessageKeysIfNeeded(message, 
typedMessageBuilderNew);
                         typedMessageBuilderNew.sendAsync().thenAccept(msgId -> 
{
                             doAcknowledge(finalMessageId, ackType, 
Collections.emptyMap(), null).thenAccept(v -> {
                                 result.complete(null);
@@ -679,7 +683,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                         if (delayTime > 0) {
                             typedMessageBuilderNew.deliverAfter(delayTime, 
unit);
                         }
-                        copyMessageKeyIfNeeded(message, 
typedMessageBuilderNew);
+                        copyMessageKeysIfNeeded(message, 
typedMessageBuilderNew);
                         typedMessageBuilderNew.sendAsync()
                                 .thenCompose(__ -> 
doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null))
                                 .thenAccept(v -> result.complete(null))
@@ -2128,7 +2132,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                             
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
                             .value(message.getData())
                             .properties(getPropertiesMap(message, 
originMessageIdStr, originTopicNameStr));
-                    copyMessageKeyIfNeeded(message, typedMessageBuilderNew);
+                    copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
                     typedMessageBuilderNew.sendAsync()
                             .thenAccept(messageIdInDLQ -> {
                                 
possibleSendToDeadLetterTopicMessages.remove(messageId);

Reply via email to