This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 79cae0a5c67 [fix] DLQ to handle bytes key properly (#23172)
79cae0a5c67 is described below
commit 79cae0a5c678c5cb599b0572399039039877ca91
Author: Andrey Yegorov <[email protected]>
AuthorDate: Thu Aug 15 10:33:56 2024 -0700
[fix] DLQ to handle bytes key properly (#23172)
(cherry picked from commit 46c25ac73427312db7f38e150cd797a8cee23f28)
---
.../pulsar/client/api/DeadLetterTopicTest.java | 60 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 56 +++++++++++---------
2 files changed, 92 insertions(+), 24 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 2a0cb3187d2..143b463fd3b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -137,6 +137,66 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
consumer.close();
}
+ @Test
+ public void testDeadLetterTopicWithBinaryMessageKey() throws Exception {
+ final String topic =
"persistent://my-property/my-ns/dead-letter-topic";
+
+ final int maxRedeliveryCount = 1;
+
+ final int sendMessages = 100;
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+ .topic(topic)
+ .subscriptionName("my-subscription")
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+ .receiverQueueSize(100)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ @Cleanup
+ PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0);// Creates new client connection
+ Consumer<byte[]> deadLetterConsumer =
newPulsarClient.newConsumer(Schema.BYTES)
+
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+ .subscriptionName("my-subscription")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic(topic)
+ .create();
+
+ byte[] key = new byte[]{1, 2, 3, 4};
+ for (int i = 0; i < sendMessages; i++) {
+ producer.newMessage()
+ .keyBytes(key)
+ .value(String.format("Hello Pulsar [%d]", i).getBytes())
+ .send();
+ }
+
+ producer.close();
+
+ int totalReceived = 0;
+ do {
+ Message<byte[]> message = consumer.receive();
+ log.info("consumer received message : {} {}",
message.getMessageId(), new String(message.getData()));
+ totalReceived++;
+ } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+ int totalInDeadLetter = 0;
+ do {
+ Message message = deadLetterConsumer.receive();
+ assertEquals(message.getKeyBytes(), key);
+ log.info("dead letter consumer received message : {} {}",
message.getMessageId(), new String(message.getData()));
+ deadLetterConsumer.acknowledge(message);
+ totalInDeadLetter++;
+ } while (totalInDeadLetter < sendMessages);
+
+ deadLetterConsumer.close();
+ consumer.close();
+ }
+
@DataProvider(name = "produceLargeMessages")
public Object[][] produceLargeMessages() {
return new Object[][] { { false }, { true } };
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 5e80ec5a8d4..8121cc61e62 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -220,6 +220,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private final AtomicReference<ClientCnx>
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final List<Throwable> previousExceptions = new
CopyOnWriteArrayList<Throwable>();
private volatile boolean hasSoughtByTimestamp = false;
+
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
String topic,
ConsumerConfigurationData<T>
conf,
@@ -263,10 +264,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
protected ConsumerImpl(PulsarClientImpl client, String topic,
ConsumerConfigurationData<T> conf,
- ExecutorProvider executorProvider, int partitionIndex, boolean
hasParentConsumer,
- boolean parentConsumerHasListener, CompletableFuture<Consumer<T>>
subscribeFuture, MessageId startMessageId,
- long startMessageRollbackDurationInSec, Schema<T> schema,
ConsumerInterceptors<T> interceptors,
- boolean createTopicIfDoesNotExist) {
+ ExecutorProvider executorProvider, int
partitionIndex, boolean hasParentConsumer,
+ boolean parentConsumerHasListener,
CompletableFuture<Consumer<T>> subscribeFuture,
+ MessageId startMessageId,
+ long startMessageRollbackDurationInSec, Schema<T>
schema,
+ ConsumerInterceptors<T> interceptors,
+ boolean createTopicIfDoesNotExist) {
super(client, topic, conf, conf.getReceiverQueueSize(),
executorProvider, subscribeFuture, schema,
interceptors);
this.consumerId = client.newConsumerId();
@@ -331,21 +334,21 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
this.connectionHandler = new ConnectionHandler(this,
- new BackoffBuilder()
-
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
- TimeUnit.NANOSECONDS)
-
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
- .setMandatoryStop(0, TimeUnit.MILLISECONDS)
- .create(),
+ new BackoffBuilder()
+
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
+ TimeUnit.NANOSECONDS)
+
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
+ .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+ .create(),
this);
this.topicName = TopicName.get(topic);
if (this.topicName.isPersistent()) {
this.acknowledgmentsGroupingTracker =
- new PersistentAcknowledgmentsGroupingTracker(this, conf,
client.eventLoopGroup());
+ new PersistentAcknowledgmentsGroupingTracker(this, conf,
client.eventLoopGroup());
} else {
this.acknowledgmentsGroupingTracker =
- NonPersistentAcknowledgmentGroupingTracker.of();
+ NonPersistentAcknowledgmentGroupingTracker.of();
}
if (conf.getDeadLetterPolicy() != null) {
@@ -423,16 +426,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
log.error("[{}][{}] Failed to unsubscribe: {}", topic,
subscription, e.getCause().getMessage());
setState(State.Ready);
unsubscribeFuture.completeExceptionally(
- PulsarClientException.wrap(e.getCause(),
- String.format("Failed to unsubscribe the subscription
%s of topic %s",
- subscription, topicName.toString())));
+ PulsarClientException.wrap(e.getCause(),
+ String.format("Failed to unsubscribe the
subscription %s of topic %s",
+ subscription, topicName.toString())));
return null;
});
} else {
unsubscribeFuture.completeExceptionally(
- new PulsarClientException(
- String.format("The client is not connected to the broker
when unsubscribing the "
- + "subscription %s of the topic %s", subscription,
topicName.toString())));
+ new PulsarClientException(
+ String.format("The client is not connected to the
broker when unsubscribing the "
+ + "subscription %s of the topic %s",
subscription, topicName.toString())));
}
return unsubscribeFuture;
}
@@ -585,6 +588,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
}
+ private static void copyMessageKeyIfNeeded(Message<?> message,
TypedMessageBuilder<?> typedMessageBuilderNew) {
+ if (message.hasKey()) {
+ if (message.hasBase64EncodedKey()) {
+ typedMessageBuilderNew.keyBytes(message.getKeyBytes());
+ } else {
+ typedMessageBuilderNew.key(message.getKey());
+ }
+ }
+ }
@SuppressWarnings("unchecked")
@Override
@@ -667,9 +679,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (delayTime > 0) {
typedMessageBuilderNew.deliverAfter(delayTime,
unit);
}
- if (message.hasKey()) {
- typedMessageBuilderNew.key(message.getKey());
- }
+ copyMessageKeyIfNeeded(message,
typedMessageBuilderNew);
typedMessageBuilderNew.sendAsync()
.thenCompose(__ ->
doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null))
.thenAccept(v -> result.complete(null))
@@ -2118,9 +2128,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
.value(message.getData())
.properties(getPropertiesMap(message,
originMessageIdStr, originTopicNameStr));
- if (message.hasKey()) {
- typedMessageBuilderNew.key(message.getKey());
- }
+ copyMessageKeyIfNeeded(message, typedMessageBuilderNew);
typedMessageBuilderNew.sendAsync()
.thenAccept(messageIdInDLQ -> {
possibleSendToDeadLetterTopicMessages.remove(messageId);