This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 73e6659bbf2c63f48566b84d77623196943c82a8
Author: Andrey Yegorov <[email protected]>
AuthorDate: Thu Aug 15 10:33:56 2024 -0700

    [fix] DLQ to handle bytes key properly (#23172)
    
    (cherry picked from commit 46c25ac73427312db7f38e150cd797a8cee23f28)
---
 .../pulsar/client/api/DeadLetterTopicTest.java     | 60 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 56 +++++++++++---------
 2 files changed, 92 insertions(+), 24 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 2a0cb3187d2..143b463fd3b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -137,6 +137,66 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
         consumer.close();
     }
 
+    @Test
+    public void testDeadLetterTopicWithBinaryMessageKey() throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
+
+        final int maxRedeliveryCount = 1;
+
+        final int sendMessages = 100;
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+                .receiverQueueSize(100)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        @Cleanup
+        PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        Consumer<byte[]> deadLetterConsumer = 
newPulsarClient.newConsumer(Schema.BYTES)
+                
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+                .subscriptionName("my-subscription")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+
+        byte[] key = new byte[]{1, 2, 3, 4};
+        for (int i = 0; i < sendMessages; i++) {
+            producer.newMessage()
+                    .keyBytes(key)
+                    .value(String.format("Hello Pulsar [%d]", i).getBytes())
+                    .send();
+        }
+
+        producer.close();
+
+        int totalReceived = 0;
+        do {
+            Message<byte[]> message = consumer.receive();
+            log.info("consumer received message : {} {}", 
message.getMessageId(), new String(message.getData()));
+            totalReceived++;
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+        int totalInDeadLetter = 0;
+        do {
+            Message message = deadLetterConsumer.receive();
+            assertEquals(message.getKeyBytes(), key);
+            log.info("dead letter consumer received message : {} {}", 
message.getMessageId(), new String(message.getData()));
+            deadLetterConsumer.acknowledge(message);
+            totalInDeadLetter++;
+        } while (totalInDeadLetter < sendMessages);
+
+        deadLetterConsumer.close();
+        consumer.close();
+    }
+
     @DataProvider(name = "produceLargeMessages")
     public Object[][] produceLargeMessages() {
         return new Object[][] { { false }, { true } };
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 58e2692fea5..e5abf769297 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -215,6 +215,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     private final AtomicReference<ClientCnx> 
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new 
CopyOnWriteArrayList<Throwable>();
+
     static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
                                                String topic,
                                                ConsumerConfigurationData<T> 
conf,
@@ -258,10 +259,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     }
 
     protected ConsumerImpl(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
-           ExecutorProvider executorProvider, int partitionIndex, boolean 
hasParentConsumer,
-           boolean parentConsumerHasListener, CompletableFuture<Consumer<T>> 
subscribeFuture, MessageId startMessageId,
-           long startMessageRollbackDurationInSec, Schema<T> schema, 
ConsumerInterceptors<T> interceptors,
-           boolean createTopicIfDoesNotExist) {
+                           ExecutorProvider executorProvider, int 
partitionIndex, boolean hasParentConsumer,
+                           boolean parentConsumerHasListener, 
CompletableFuture<Consumer<T>> subscribeFuture,
+                           MessageId startMessageId,
+                           long startMessageRollbackDurationInSec, Schema<T> 
schema,
+                           ConsumerInterceptors<T> interceptors,
+                           boolean createTopicIfDoesNotExist) {
         super(client, topic, conf, conf.getReceiverQueueSize(), 
executorProvider, subscribeFuture, schema,
                 interceptors);
         this.consumerId = client.newConsumerId();
@@ -333,21 +336,21 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
 
         this.connectionHandler = new ConnectionHandler(this,
-                        new BackoffBuilder()
-                                
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
-                                        TimeUnit.NANOSECONDS)
-                                
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), 
TimeUnit.NANOSECONDS)
-                                .setMandatoryStop(0, TimeUnit.MILLISECONDS)
-                                .create(),
+                new BackoffBuilder()
+                        
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
+                                TimeUnit.NANOSECONDS)
+                        
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), 
TimeUnit.NANOSECONDS)
+                        .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+                        .create(),
                 this);
 
         this.topicName = TopicName.get(topic);
         if (this.topicName.isPersistent()) {
             this.acknowledgmentsGroupingTracker =
-                new PersistentAcknowledgmentsGroupingTracker(this, conf, 
client.eventLoopGroup());
+                    new PersistentAcknowledgmentsGroupingTracker(this, conf, 
client.eventLoopGroup());
         } else {
             this.acknowledgmentsGroupingTracker =
-                NonPersistentAcknowledgmentGroupingTracker.of();
+                    NonPersistentAcknowledgmentGroupingTracker.of();
         }
 
         if (conf.getDeadLetterPolicy() != null) {
@@ -425,16 +428,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 log.error("[{}][{}] Failed to unsubscribe: {}", topic, 
subscription, e.getCause().getMessage());
                 setState(State.Ready);
                 unsubscribeFuture.completeExceptionally(
-                    PulsarClientException.wrap(e.getCause(),
-                        String.format("Failed to unsubscribe the subscription 
%s of topic %s",
-                                subscription, topicName.toString())));
+                        PulsarClientException.wrap(e.getCause(),
+                                String.format("Failed to unsubscribe the 
subscription %s of topic %s",
+                                        subscription, topicName.toString())));
                 return null;
             });
         } else {
             unsubscribeFuture.completeExceptionally(
-                new PulsarClientException(
-                    String.format("The client is not connected to the broker 
when unsubscribing the "
-                            + "subscription %s of the topic %s", subscription, 
topicName.toString())));
+                    new PulsarClientException(
+                            String.format("The client is not connected to the 
broker when unsubscribing the "
+                                    + "subscription %s of the topic %s", 
subscription, topicName.toString())));
         }
         return unsubscribeFuture;
     }
@@ -587,6 +590,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
     }
 
+    private static void copyMessageKeyIfNeeded(Message<?> message, 
TypedMessageBuilder<?> typedMessageBuilderNew) {
+        if (message.hasKey()) {
+            if (message.hasBase64EncodedKey()) {
+                typedMessageBuilderNew.keyBytes(message.getKeyBytes());
+            } else {
+                typedMessageBuilderNew.key(message.getKey());
+            }
+        }
+    }
 
     @SuppressWarnings("unchecked")
     @Override
@@ -669,9 +681,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                         if (delayTime > 0) {
                             typedMessageBuilderNew.deliverAfter(delayTime, 
unit);
                         }
-                        if (message.hasKey()) {
-                            typedMessageBuilderNew.key(message.getKey());
-                        }
+                        copyMessageKeyIfNeeded(message, 
typedMessageBuilderNew);
                         typedMessageBuilderNew.sendAsync()
                                 .thenCompose(__ -> 
doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null))
                                 .thenAccept(v -> result.complete(null))
@@ -2111,9 +2121,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                             
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
                             .value(message.getData())
                             .properties(getPropertiesMap(message, 
originMessageIdStr, originTopicNameStr));
-                    if (message.hasKey()) {
-                        typedMessageBuilderNew.key(message.getKey());
-                    }
+                    copyMessageKeyIfNeeded(message, typedMessageBuilderNew);
                     typedMessageBuilderNew.sendAsync()
                             .thenAccept(messageIdInDLQ -> {
                                 
possibleSendToDeadLetterTopicMessages.remove(messageId);

Reply via email to