This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new ea9b7c265a0 [fix][client] Copy orderingKey to retry letter topic and
DLQ messages and fix bug in copying (#23182)
ea9b7c265a0 is described below
commit ea9b7c265a0c16e1918b32c05425e2c44e16aac8
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Aug 16 06:33:18 2024 +0300
[fix][client] Copy orderingKey to retry letter topic and DLQ messages and
fix bug in copying (#23182)
Fixes #23173
Fixes #23181
See #23173 and #23181
- copy ordering key to messages sent to retry letter topic and DLQ topic
(cherry picked from commit 67fc5b9f5342bd35d3fdacf37cf172a629ee15f9)
---
.../pulsar/client/api/DeadLetterTopicTest.java | 60 ++++++++++++++++++++++
.../apache/pulsar/client/api/RetryTopicTest.java | 17 +++++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 10 ++--
3 files changed, 83 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 143b463fd3b..dd36d4fdc4d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -197,6 +197,66 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
consumer.close();
}
+ @Test
+ public void testDeadLetterTopicMessagesWithOrderingKey() throws Exception {
+ final String topic =
"persistent://my-property/my-ns/dead-letter-topic";
+
+ final int maxRedeliveryCount = 1;
+
+ final int sendMessages = 100;
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+ .topic(topic)
+ .subscriptionName("my-subscription")
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(1, TimeUnit.SECONDS)
+
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+ .receiverQueueSize(100)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ @Cleanup
+ PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(),
0);// Creates new client connection
+ Consumer<byte[]> deadLetterConsumer =
newPulsarClient.newConsumer(Schema.BYTES)
+
.topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+ .subscriptionName("my-subscription")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic(topic)
+ .create();
+
+ byte[] key = new byte[]{1, 2, 3, 4};
+ for (int i = 0; i < sendMessages; i++) {
+ producer.newMessage()
+ .orderingKey(key)
+ .value(String.format("Hello Pulsar [%d]", i).getBytes())
+ .send();
+ }
+
+ producer.close();
+
+ int totalReceived = 0;
+ do {
+ Message<byte[]> message = consumer.receive();
+ log.info("consumer received message : {} {}",
message.getMessageId(), new String(message.getData()));
+ totalReceived++;
+ } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+ int totalInDeadLetter = 0;
+ do {
+ Message message = deadLetterConsumer.receive();
+ assertEquals(message.getOrderingKey(), key);
+ log.info("dead letter consumer received message : {} {}",
message.getMessageId(), new String(message.getData()));
+ deadLetterConsumer.acknowledge(message);
+ totalInDeadLetter++;
+ } while (totalInDeadLetter < sendMessages);
+
+ deadLetterConsumer.close();
+ consumer.close();
+ }
+
@DataProvider(name = "produceLargeMessages")
public Object[][] produceLargeMessages() {
return new Object[][] { { false }, { true } };
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index 2ccae721434..9cb82fde041 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -257,6 +257,9 @@ public class RetryTopicTest extends ProducerConsumerBase {
public void testRetryTopicProperties() throws Exception {
final String topic = "persistent://my-property/my-ns/retry-topic";
+ byte[] key = "key".getBytes();
+ byte[] orderingKey = "orderingKey".getBytes();
+
final int maxRedeliveryCount = 3;
final int sendMessages = 10;
@@ -285,7 +288,11 @@ public class RetryTopicTest extends ProducerConsumerBase {
Set<String> originMessageIds = new HashSet<>();
for (int i = 0; i < sendMessages; i++) {
- MessageId msgId = producer.send(String.format("Hello Pulsar [%d]",
i).getBytes());
+ MessageId msgId = producer.newMessage()
+ .value(String.format("Hello Pulsar [%d]", i).getBytes())
+ .keyBytes(key)
+ .orderingKey(orderingKey)
+ .send();
originMessageIds.add(msgId.toString());
}
@@ -298,6 +305,10 @@ public class RetryTopicTest extends ProducerConsumerBase {
if
(message.hasProperty(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
// check the REAL_TOPIC property
assertEquals(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC),
topic);
+ assertTrue(message.hasKey());
+ assertEquals(message.getKeyBytes(), key);
+ assertTrue(message.hasOrderingKey());
+ assertEquals(message.getOrderingKey(), orderingKey);
retryMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID));
}
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
@@ -317,6 +328,10 @@ public class RetryTopicTest extends ProducerConsumerBase {
if
(message.hasProperty(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
// check the REAL_TOPIC property
assertEquals(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC),
topic);
+ assertTrue(message.hasKey());
+ assertEquals(message.getKeyBytes(), key);
+ assertTrue(message.hasOrderingKey());
+ assertEquals(message.getOrderingKey(), orderingKey);
deadLetterMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID));
}
deadLetterConsumer.acknowledge(message);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e5abf769297..17c13726d09 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -590,7 +590,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
}
- private static void copyMessageKeyIfNeeded(Message<?> message,
TypedMessageBuilder<?> typedMessageBuilderNew) {
+ private static void copyMessageKeysIfNeeded(Message<?> message,
TypedMessageBuilder<?> typedMessageBuilderNew) {
if (message.hasKey()) {
if (message.hasBase64EncodedKey()) {
typedMessageBuilderNew.keyBytes(message.getKeyBytes());
@@ -598,6 +598,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
typedMessageBuilderNew.key(message.getKey());
}
}
+ if (message.hasOrderingKey()) {
+ typedMessageBuilderNew.orderingKey(message.getOrderingKey());
+ }
}
@SuppressWarnings("unchecked")
@@ -655,6 +658,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get()))
.value(retryMessage.getData())
.properties(propertiesMap);
+ copyMessageKeysIfNeeded(message,
typedMessageBuilderNew);
typedMessageBuilderNew.sendAsync().thenAccept(msgId ->
{
doAcknowledge(finalMessageId, ackType,
Collections.emptyMap(), null).thenAccept(v -> {
result.complete(null);
@@ -681,7 +685,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (delayTime > 0) {
typedMessageBuilderNew.deliverAfter(delayTime,
unit);
}
- copyMessageKeyIfNeeded(message,
typedMessageBuilderNew);
+ copyMessageKeysIfNeeded(message,
typedMessageBuilderNew);
typedMessageBuilderNew.sendAsync()
.thenCompose(__ ->
doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null))
.thenAccept(v -> result.complete(null))
@@ -2121,7 +2125,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
.value(message.getData())
.properties(getPropertiesMap(message,
originMessageIdStr, originTopicNameStr));
- copyMessageKeyIfNeeded(message, typedMessageBuilderNew);
+ copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
typedMessageBuilderNew.sendAsync()
.thenAccept(messageIdInDLQ -> {
possibleSendToDeadLetterTopicMessages.remove(messageId);