This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 60fb3bf CAMEL-14608: add support for dead letter policies to
camel-pulsar (#3595)
60fb3bf is described below
commit 60fb3bfd056e7aab4545b122ae1bc4034ccc9a9c
Author: Connor McAuliffe <[email protected]>
AuthorDate: Tue Feb 25 02:15:03 2020 -0500
CAMEL-14608: add support for dead letter policies to camel-pulsar (#3595)
* CAMEL-14608: add support for dead letter policies to camel-pulsar
* CAMEL-14608: address documentation issues
---
.../component/pulsar/PulsarEndpointConfigurer.java | 4 +
.../org/apache/camel/component/pulsar/pulsar.json | 2 +
.../src/main/docs/pulsar-component.adoc | 4 +-
.../pulsar/configuration/PulsarConfiguration.java | 19 +++
.../consumers/CommonCreationStrategyImpl.java | 15 +-
.../pulsar/PulsarConsumerDeadLetterPolicyTest.java | 169 +++++++++++++++++++++
6 files changed, 211 insertions(+), 2 deletions(-)
diff --git
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
index 90e004b..3f6dd0c 100644
---
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
+++
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
@@ -31,6 +31,10 @@ public class PulsarEndpointConfigurer extends
PropertyConfigurerSupport implemen
case "consumerQueueSize":
target.getPulsarConfiguration().setConsumerQueueSize(property(camelContext,
int.class, value)); return true;
case "negativeackredeliverydelaymicros":
case "negativeAckRedeliveryDelayMicros":
target.getPulsarConfiguration().setNegativeAckRedeliveryDelayMicros(property(camelContext,
long.class, value)); return true;
+ case "deadlettertopic":
+ case "deadLetterTopic":
target.getPulsarConfiguration().setDeadLetterTopic(property(camelContext,
java.lang.String.class, value)); return true;
+ case "maxredelivercount":
+ case "maxRedeliverCount":
target.getPulsarConfiguration().setMaxRedeliverCount(property(camelContext,
java.lang.Integer.class, value)); return true;
case "numberofconsumers":
case "numberOfConsumers":
target.getPulsarConfiguration().setNumberOfConsumers(property(camelContext,
int.class, value)); return true;
case "subscriptioninitialposition":
diff --git
a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
index 029a1c2..04f8db9 100644
---
a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
+++
b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
@@ -40,6 +40,8 @@
"consumerNamePrefix": { "kind": "parameter", "displayName": "Consumer Name
Prefix", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "secret": false,
"defaultValue": "cons", "configurationClass":
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration",
"configurationField": "pulsarConfiguration", "description": "Prefix to add to
consumer names when a SHARED or FAILOVER subscription is used" },
"consumerQueueSize": { "kind": "parameter", "displayName": "Consumer Queue
Size", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "secret": false,
"defaultValue": "10", "configurationClass":
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration",
"configurationField": "pulsarConfiguration", "description": "Size of the
consumer queue - defaults to 10" },
"negativeAckRedeliveryDelayMicros": { "kind": "parameter", "displayName":
"Negative Ack Redelivery Delay Micros", "group": "consumer", "label":
"consumer", "required": false, "type": "integer", "javaType": "long",
"deprecated": false, "secret": false, "defaultValue": "60000000",
"configurationClass":
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration",
"configurationField": "pulsarConfiguration", "description": "Set the negative
acknowledgement delay" },
+ "deadLetterTopic": { "kind": "parameter", "displayName": "Dead Letter
Topic", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "secret": false,
"configurationClass":
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration",
"configurationField": "pulsarConfiguration", "description": "Name of the topic
where the messages which fail maxRedeliverCount times will be sent. Note: if
not set, def [...]
+ "maxRedeliverCount": { "kind": "parameter", "displayName": "Max Redeliver
Count", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "java.lang.Integer", "deprecated": false, "secret":
false, "configurationClass":
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration",
"configurationField": "pulsarConfiguration", "description": "Maximum number of
times that a message will be redelivered before being sent to the dead letter
queue [...]
"numberOfConsumers": { "kind": "parameter", "displayName": "Number Of
Consumers", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "secret": false,
"defaultValue": "1", "configurationClass":
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration",
"configurationField": "pulsarConfiguration", "description": "Number of
consumers - defaults to 1" },
"subscriptionInitialPosition": { "kind": "parameter", "displayName":
"Subscription Initial Position", "group": "consumer", "label": "consumer",
"required": false, "type": "object", "javaType":
"org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition",
"enum": [ "EARLIEST", "LATEST" ], "deprecated": false, "secret": false,
"defaultValue": "LATEST", "configurationClass":
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration",
"configurationField": " [...]
"subscriptionName": { "kind": "parameter", "displayName": "Subscription
Name", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "secret": false,
"defaultValue": "subs", "configurationClass":
"org.apache.camel.component.pulsar.configuration.PulsarConfiguration",
"configurationField": "pulsarConfiguration", "description": "Name of the
subscription to use" },
diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index 2be30b4..517af8e 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -74,7 +74,7 @@ with the following path and query parameters:
|===
-=== Query Parameters (29 parameters):
+=== Query Parameters (31 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -88,6 +88,8 @@ with the following path and query parameters:
| *consumerNamePrefix* (consumer) | Prefix to add to consumer names when a
SHARED or FAILOVER subscription is used | cons | String
| *consumerQueueSize* (consumer) | Size of the consumer queue - defaults to 10
| 10 | int
| *negativeAckRedeliveryDelay Micros* (consumer) | Set the negative
acknowledgement delay | 60000000 | long
+| *deadLetterTopic* (consumer) | Name of the topic where the messages which
fail maxRedeliverCount times will be sent. Note: if not set, default topic name
will be topicName-subscriptionName-DLQ | | String
+| *maxRedeliverCount* (consumer) | Maximum number of times that a message will
be redelivered before being sent to the dead letter queue. If this value is not
set, no Dead Letter Policy will be created | | Integer
| *numberOfConsumers* (consumer) | Number of consumers - defaults to 1 | 1 |
int
| *subscriptionInitialPosition* (consumer) | Control the initial position in
the topic of a newly created subscription. Default is latest message. The value
can be one of: EARLIEST, LATEST | LATEST | SubscriptionInitialPosition
| *subscriptionName* (consumer) | Name of the subscription to use | subs |
String
diff --git
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index 4ecfd7d..c73fd17 100644
---
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -57,6 +57,10 @@ public class PulsarConfiguration {
private long ackGroupTimeMillis = 100;
@UriParam(label = "consumer", defaultValue = "LATEST")
private SubscriptionInitialPosition subscriptionInitialPosition = LATEST;
+ @UriParam(label = "consumer", description = "Maximum number of times that
a message will be redelivered before being sent to the dead letter queue. If
this value is not set, no Dead Letter Policy will be created")
+ private Integer maxRedeliverCount;
+ @UriParam(label = "consumer", description = "Name of the topic where the
messages which fail maxRedeliverCount times will be sent. Note: if not set,
default topic name will be topicName-subscriptionName-DLQ")
+ private String deadLetterTopic;
@UriParam(label = "producer", description = "Send timeout in
milliseconds", defaultValue = "30000")
private int sendTimeoutMs = 30000;
@UriParam(label = "producer", description = "Whether to block the
producing thread if pending messages queue is full or to throw a
ProducerQueueIsFullError", defaultValue = "false")
@@ -366,4 +370,19 @@ public class PulsarConfiguration {
public void setNegativeAckRedeliveryDelayMicros(long
negativeAckRedeliveryDelayMicros) {
this.negativeAckRedeliveryDelayMicros =
negativeAckRedeliveryDelayMicros;
}
+ public Integer getMaxRedeliverCount() {
+ return maxRedeliverCount;
+ }
+
+ public void setMaxRedeliverCount(Integer maxRedeliverCount) {
+ this.maxRedeliverCount = maxRedeliverCount;
+ }
+
+ public String getDeadLetterTopic() {
+ return deadLetterTopic;
+ }
+
+ public void setDeadLetterTopic(String deadLetterTopic) {
+ this.deadLetterTopic = deadLetterTopic;
+ }
}
diff --git
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
index b5c72bc..3d4913e 100644
---
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
+++
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
@@ -23,6 +23,8 @@ import org.apache.camel.component.pulsar.PulsarEndpoint;
import org.apache.camel.component.pulsar.PulsarMessageListener;
import org.apache.camel.component.pulsar.configuration.PulsarConfiguration;
import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.DeadLetterPolicy.DeadLetterPolicyBuilder;
public final class CommonCreationStrategyImpl {
@@ -32,11 +34,22 @@ public final class CommonCreationStrategyImpl {
public static ConsumerBuilder<byte[]> create(final String name, final
PulsarEndpoint pulsarEndpoint, final PulsarConsumer pulsarConsumer) {
final PulsarConfiguration endpointConfiguration =
pulsarEndpoint.getPulsarConfiguration();
- return
pulsarEndpoint.getPulsarClient().newConsumer().topic(pulsarEndpoint.getUri()).subscriptionName(endpointConfiguration.getSubscriptionName())
+ ConsumerBuilder<byte[]> builder =
pulsarEndpoint.getPulsarClient().newConsumer().topic(pulsarEndpoint.getUri()).subscriptionName(endpointConfiguration.getSubscriptionName())
.receiverQueueSize(endpointConfiguration.getConsumerQueueSize()).consumerName(name).ackTimeout(endpointConfiguration.getAckTimeoutMillis(),
TimeUnit.MILLISECONDS)
.subscriptionInitialPosition(endpointConfiguration.getSubscriptionInitialPosition().toPulsarSubscriptionInitialPosition())
.acknowledgmentGroupTime(endpointConfiguration.getAckGroupTimeMillis(),
TimeUnit.MILLISECONDS)
.negativeAckRedeliveryDelay(endpointConfiguration.getNegativeAckRedeliveryDelayMicros(),
TimeUnit.MICROSECONDS)
.messageListener(new PulsarMessageListener(pulsarEndpoint,
pulsarConsumer.getExceptionHandler(), pulsarConsumer.getProcessor()));
+
+ if (endpointConfiguration.getMaxRedeliverCount() != null) {
+ DeadLetterPolicyBuilder policy = DeadLetterPolicy.builder()
+
.maxRedeliverCount(endpointConfiguration.getMaxRedeliverCount());
+ if (endpointConfiguration.getDeadLetterTopic() != null) {
+
policy.deadLetterTopic(endpointConfiguration.getDeadLetterTopic());
+ }
+
+ builder.deadLetterPolicy(policy.build());
+ }
+ return builder;
}
}
diff --git
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerDeadLetterPolicyTest.java
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerDeadLetterPolicyTest.java
new file mode 100644
index 0000000..9c171a0
--- /dev/null
+++
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerDeadLetterPolicyTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pulsar;
+
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.pulsar.utils.AutoConfiguration;
+import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
+import org.apache.camel.spi.Registry;
+import org.apache.camel.support.SimpleRegistry;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PulsarConsumerDeadLetterPolicyTest extends PulsarTestSupport {
+
+ private static final String TOPIC_URI =
"persistent://public/default/camel-topic";
+ private static final String PRODUCER = "camel-producer-1";
+
+ @EndpointInject("mock:result")
+ private MockEndpoint to;
+
+ @EndpointInject("mock:deadLetter")
+ private MockEndpoint deadLetter;
+
+ private Producer<String> producer;
+
+ @Override
+ protected Registry createCamelRegistry() throws Exception {
+ Registry registry = new SimpleRegistry();
+
+ registerPulsarBeans(registry);
+
+ return registry;
+ }
+
+ private void registerPulsarBeans(final Registry registry) throws
PulsarClientException {
+ PulsarClient pulsarClient = givenPulsarClient();
+ AutoConfiguration autoConfiguration = new AutoConfiguration(null,
null);
+
+ registry.bind("pulsarClient", pulsarClient);
+ PulsarComponent comp = new PulsarComponent(context);
+ comp.setAutoConfiguration(autoConfiguration);
+ comp.setPulsarClient(pulsarClient);
+ registry.bind("pulsar", comp);
+ }
+
+ @Before
+ public void buildProducer() throws PulsarClientException {
+ try {
+ context.removeRoute("myRoute");
+ context.removeRoute("myDeadLetterRoute");
+ } catch (Exception ignored) {
+
+ }
+ producer =
givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
+
+ }
+
+ @Test
+ public void
givenNoMaxRedeliverCountAndDeadLetterTopicverifyValuesAreNull() throws
Exception {
+ PulsarComponent component = context.getComponent("pulsar",
PulsarComponent.class);
+
+ PulsarEndpoint endpoint = (PulsarEndpoint)
component.createEndpoint("pulsar:" + TOPIC_URI);
+
+ assertNull(endpoint.getPulsarConfiguration().getMaxRedeliverCount());
+ assertNull(endpoint.getPulsarConfiguration().getDeadLetterTopic());
+ }
+
+ @Test
+ public void
givenMaxRedeliverCountverifyMessageGetsSentToDefaultDeadLetterTopicAfterCountExceeded()
+ throws Exception {
+ PulsarComponent component = context.getComponent("pulsar",
PulsarComponent.class);
+
+ PulsarEndpoint from = (PulsarEndpoint)
component.createEndpoint("pulsar:" + TOPIC_URI +
"?maxRedeliverCount=5&subscriptionType=Shared&allowManualAcknowledgement=true&ackTimeoutMillis=1000");
+ PulsarEndpoint deadLetterFrom = (PulsarEndpoint)
component.createEndpoint("pulsar:" + TOPIC_URI + "-subs-DLQ");
+
+ to.expectedMessageCount(5);
+ deadLetter.expectedMessageCount(1);
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ from(from).routeId("myRoute").to(to);
+
+
from(deadLetterFrom).routeId("myDeadLetterRoute").to(deadLetter);
+ }
+ });
+ producer.send("Hello World!");
+
+ assertMockEndpointsSatisfied(10, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void
givenMaxRedeliverCountAndDeadLetterTopicverifyMessageGetsSentToSpecifiedDeadLetterTopicAfterCountExceeded()
throws Exception {
+ PulsarComponent component = context.getComponent("pulsar",
PulsarComponent.class);
+
+ PulsarEndpoint from = (PulsarEndpoint)
component.createEndpoint("pulsar:" + TOPIC_URI +
"?maxRedeliverCount=5&deadLetterTopic=customTopic&subscriptionType=Shared&allowManualAcknowledgement=true&ackTimeoutMillis=1000");
+ PulsarEndpoint deadLetterFrom = (PulsarEndpoint)
component.createEndpoint("pulsar:persistent://public/default/customTopic");
+
+ to.expectedMessageCount(5);
+ deadLetter.expectedMessageCount(1);
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ from(from).routeId("myRoute").to(to);
+
+
from(deadLetterFrom).routeId("myDeadLetterRoute").to(deadLetter);
+ }
+ });
+
+ producer.send("Hello World!");
+ assertMockEndpointsSatisfied(10, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void
givenOnlyDeadLetterTopicverifyMessageDoesNotGetSentToSpecifiedTopic() throws
Exception {
+ PulsarComponent component = context.getComponent("pulsar",
PulsarComponent.class);
+
+ PulsarEndpoint from = (PulsarEndpoint)
component.createEndpoint("pulsar:" + TOPIC_URI +
"?maxRedeliverCount=5&deadLetterTopic=customTopic&subscriptionType=Shared&allowManualAcknowledgement=true&ackTimeoutMillis=1000");
+ PulsarEndpoint deadLetterFrom = (PulsarEndpoint)
component.createEndpoint("pulsar:persistent://public/default/customTopic");
+
+ to.expectedMessageCount(6);
+ deadLetter.expectedMessageCount(0);
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ from(from).routeId("myRoute").to(to).process(exchange -> {
+ Integer tries = exchange.getProperty("retryCount", 1,
Integer.class);
+ if (tries >= 6) {
+ PulsarMessageReceipt receipt = (PulsarMessageReceipt)
exchange.getIn().getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT);
+ receipt.acknowledge();
+ }
+ exchange.setProperty("retryCount", tries + 1);
+ });
+
+
from(deadLetterFrom).routeId("myDeadLetterRoute").to(deadLetter);
+ }
+ });
+
+ producer.send("Hello World!");
+ assertMockEndpointsSatisfied(10, TimeUnit.SECONDS);
+ }
+
+ private PulsarClient givenPulsarClient() throws PulsarClientException {
+ return new
ClientBuilderImpl().serviceUrl(getPulsarBrokerUrl()).ioThreads(1).listenerThreads(1).build();
+ }
+}