This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 73e6659bbf2c63f48566b84d77623196943c82a8 Author: Andrey Yegorov <[email protected]> AuthorDate: Thu Aug 15 10:33:56 2024 -0700 [fix] DLQ to handle bytes key properly (#23172) (cherry picked from commit 46c25ac73427312db7f38e150cd797a8cee23f28) --- .../pulsar/client/api/DeadLetterTopicTest.java | 60 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerImpl.java | 56 +++++++++++--------- 2 files changed, 92 insertions(+), 24 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index 2a0cb3187d2..143b463fd3b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -137,6 +137,66 @@ public class DeadLetterTopicTest extends ProducerConsumerBase { consumer.close(); } + @Test + public void testDeadLetterTopicWithBinaryMessageKey() throws Exception { + final String topic = "persistent://my-property/my-ns/dead-letter-topic"; + + final int maxRedeliveryCount = 1; + + final int sendMessages = 100; + + Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1, TimeUnit.SECONDS) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + @Cleanup + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ") + .subscriptionName("my-subscription") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + + byte[] key = new byte[]{1, 2, 3, 4}; + for (int i = 0; i < sendMessages; i++) { + producer.newMessage() + .keyBytes(key) + .value(String.format("Hello Pulsar [%d]", i).getBytes()) + .send(); + } + + producer.close(); + + int totalReceived = 0; + do { + Message<byte[]> message = consumer.receive(); + log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + totalReceived++; + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + + int totalInDeadLetter = 0; + do { + Message message = deadLetterConsumer.receive(); + assertEquals(message.getKeyBytes(), key); + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + + deadLetterConsumer.close(); + consumer.close(); + } + @DataProvider(name = "produceLargeMessages") public Object[][] produceLargeMessages() { return new Object[][] { { false }, { true } }; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 58e2692fea5..e5abf769297 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -215,6 +215,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>(); private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>(); + static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, @@ -258,10 +259,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, - ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer, - boolean parentConsumerHasListener, CompletableFuture<Consumer<T>> subscribeFuture, MessageId startMessageId, - long startMessageRollbackDurationInSec, Schema<T> schema, ConsumerInterceptors<T> interceptors, - boolean createTopicIfDoesNotExist) { + ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer, + boolean parentConsumerHasListener, CompletableFuture<Consumer<T>> subscribeFuture, + MessageId startMessageId, + long startMessageRollbackDurationInSec, Schema<T> schema, + ConsumerInterceptors<T> interceptors, + boolean createTopicIfDoesNotExist) { super(client, topic, conf, conf.getReceiverQueueSize(), executorProvider, subscribeFuture, schema, interceptors); this.consumerId = client.newConsumerId(); @@ -333,21 +336,21 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } this.connectionHandler = new ConnectionHandler(this, - new BackoffBuilder() - .setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), - TimeUnit.NANOSECONDS) - .setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS) - .setMandatoryStop(0, TimeUnit.MILLISECONDS) - .create(), + new BackoffBuilder() + .setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), + TimeUnit.NANOSECONDS) + .setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS) + .setMandatoryStop(0, TimeUnit.MILLISECONDS) + .create(), this); this.topicName = TopicName.get(topic); if (this.topicName.isPersistent()) { this.acknowledgmentsGroupingTracker = - new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup()); + new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup()); } else { this.acknowledgmentsGroupingTracker = - NonPersistentAcknowledgmentGroupingTracker.of(); + NonPersistentAcknowledgmentGroupingTracker.of(); } if (conf.getDeadLetterPolicy() != null) { @@ -425,16 +428,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle log.error("[{}][{}] Failed to unsubscribe: {}", topic, subscription, e.getCause().getMessage()); setState(State.Ready); unsubscribeFuture.completeExceptionally( - PulsarClientException.wrap(e.getCause(), - String.format("Failed to unsubscribe the subscription %s of topic %s", - subscription, topicName.toString()))); + PulsarClientException.wrap(e.getCause(), + String.format("Failed to unsubscribe the subscription %s of topic %s", + subscription, topicName.toString()))); return null; }); } else { unsubscribeFuture.completeExceptionally( - new PulsarClientException( - String.format("The client is not connected to the broker when unsubscribing the " - + "subscription %s of the topic %s", subscription, topicName.toString()))); + new PulsarClientException( + String.format("The client is not connected to the broker when unsubscribing the " + + "subscription %s of the topic %s", subscription, topicName.toString()))); } return unsubscribeFuture; } @@ -587,6 +590,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } } + private static void copyMessageKeyIfNeeded(Message<?> message, TypedMessageBuilder<?> typedMessageBuilderNew) { + if (message.hasKey()) { + if (message.hasBase64EncodedKey()) { + typedMessageBuilderNew.keyBytes(message.getKeyBytes()); + } else { + typedMessageBuilderNew.key(message.getKey()); + } + } + } @SuppressWarnings("unchecked") @Override @@ -669,9 +681,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle if (delayTime > 0) { typedMessageBuilderNew.deliverAfter(delayTime, unit); } - if (message.hasKey()) { - typedMessageBuilderNew.key(message.getKey()); - } + copyMessageKeyIfNeeded(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync() .thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) .thenAccept(v -> result.complete(null)) @@ -2111,9 +2121,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) .value(message.getData()) .properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr)); - if (message.hasKey()) { - typedMessageBuilderNew.key(message.getKey()); - } + copyMessageKeyIfNeeded(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync() .thenAccept(messageIdInDLQ -> { possibleSendToDeadLetterTopicMessages.remove(messageId);
