This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 6e1bdb2955f CAMEL-18376: camel-pulsar: add redelivery backoff for ack
timeout, nack (#8147)
6e1bdb2955f is described below
commit 6e1bdb2955fb6de483d5ef95042ac33377865600
Author: Anthony Sam Wu <[email protected]>
AuthorDate: Sat Aug 13 04:33:22 2022 -0400
CAMEL-18376: camel-pulsar: add redelivery backoff for ack timeout, nack
(#8147)
* CAMEL-18376: camel-pulsar; add redelivery backoff for ack timeout,
negative ack
* Fix import order/linebreak
* Apply formatting
* Provide ack timeout in negative ack test
* Rebind in test
* Set backoff not in endpoint query param
* Fix import
* Fix tests
---
.../pulsar/PulsarComponentConfigurer.java | 12 +++++++
.../component/pulsar/PulsarEndpointConfigurer.java | 12 +++++++
.../component/pulsar/PulsarEndpointUriFactory.java | 4 ++-
.../org/apache/camel/component/pulsar/pulsar.json | 4 +++
.../component/pulsar/PulsarConfiguration.java | 27 +++++++++++++++
.../consumers/CommonCreationStrategyImpl.java | 10 ++++++
.../PulsarConsumerDeadLetterPolicyIT.java | 6 ++--
...> PulsarConsumerNegativeAcknowledgementIT.java} | 38 +++++++++++++++-------
.../PulsarConsumerNoAcknowledgementIT.java | 29 +++++++++++++++--
9 files changed, 124 insertions(+), 18 deletions(-)
diff --git
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
index 86afa21c75a..97dab737df9 100644
---
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
+++
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarComponentConfigurer.java
@@ -32,6 +32,8 @@ public class PulsarComponentConfigurer extends
PropertyConfigurerSupport impleme
case "ackGroupTimeMillis":
getOrCreateConfiguration(target).setAckGroupTimeMillis(property(camelContext,
long.class, value)); return true;
case "acktimeoutmillis":
case "ackTimeoutMillis":
getOrCreateConfiguration(target).setAckTimeoutMillis(property(camelContext,
long.class, value)); return true;
+ case "acktimeoutredeliverybackoff":
+ case "ackTimeoutRedeliveryBackoff":
getOrCreateConfiguration(target).setAckTimeoutRedeliveryBackoff(property(camelContext,
org.apache.pulsar.client.api.RedeliveryBackoff.class, value)); return true;
case "allowmanualacknowledgement":
case "allowManualAcknowledgement":
getOrCreateConfiguration(target).setAllowManualAcknowledgement(property(camelContext,
boolean.class, value)); return true;
case "authenticationclass":
@@ -81,6 +83,8 @@ public class PulsarComponentConfigurer extends
PropertyConfigurerSupport impleme
case "messageRouter":
getOrCreateConfiguration(target).setMessageRouter(property(camelContext,
org.apache.pulsar.client.api.MessageRouter.class, value)); return true;
case "messageroutingmode":
case "messageRoutingMode":
getOrCreateConfiguration(target).setMessageRoutingMode(property(camelContext,
org.apache.pulsar.client.api.MessageRoutingMode.class, value)); return true;
+ case "negativeackredeliverybackoff":
+ case "negativeAckRedeliveryBackoff":
getOrCreateConfiguration(target).setNegativeAckRedeliveryBackoff(property(camelContext,
org.apache.pulsar.client.api.RedeliveryBackoff.class, value)); return true;
case "negativeackredeliverydelaymicros":
case "negativeAckRedeliveryDelayMicros":
getOrCreateConfiguration(target).setNegativeAckRedeliveryDelayMicros(property(camelContext,
long.class, value)); return true;
case "numberofconsumerthreads":
@@ -125,6 +129,8 @@ public class PulsarComponentConfigurer extends
PropertyConfigurerSupport impleme
case "ackGroupTimeMillis": return long.class;
case "acktimeoutmillis":
case "ackTimeoutMillis": return long.class;
+ case "acktimeoutredeliverybackoff":
+ case "ackTimeoutRedeliveryBackoff": return
org.apache.pulsar.client.api.RedeliveryBackoff.class;
case "allowmanualacknowledgement":
case "allowManualAcknowledgement": return boolean.class;
case "authenticationclass":
@@ -174,6 +180,8 @@ public class PulsarComponentConfigurer extends
PropertyConfigurerSupport impleme
case "messageRouter": return
org.apache.pulsar.client.api.MessageRouter.class;
case "messageroutingmode":
case "messageRoutingMode": return
org.apache.pulsar.client.api.MessageRoutingMode.class;
+ case "negativeackredeliverybackoff":
+ case "negativeAckRedeliveryBackoff": return
org.apache.pulsar.client.api.RedeliveryBackoff.class;
case "negativeackredeliverydelaymicros":
case "negativeAckRedeliveryDelayMicros": return long.class;
case "numberofconsumerthreads":
@@ -214,6 +222,8 @@ public class PulsarComponentConfigurer extends
PropertyConfigurerSupport impleme
case "ackGroupTimeMillis": return
getOrCreateConfiguration(target).getAckGroupTimeMillis();
case "acktimeoutmillis":
case "ackTimeoutMillis": return
getOrCreateConfiguration(target).getAckTimeoutMillis();
+ case "acktimeoutredeliverybackoff":
+ case "ackTimeoutRedeliveryBackoff": return
getOrCreateConfiguration(target).getAckTimeoutRedeliveryBackoff();
case "allowmanualacknowledgement":
case "allowManualAcknowledgement": return
getOrCreateConfiguration(target).isAllowManualAcknowledgement();
case "authenticationclass":
@@ -263,6 +273,8 @@ public class PulsarComponentConfigurer extends
PropertyConfigurerSupport impleme
case "messageRouter": return
getOrCreateConfiguration(target).getMessageRouter();
case "messageroutingmode":
case "messageRoutingMode": return
getOrCreateConfiguration(target).getMessageRoutingMode();
+ case "negativeackredeliverybackoff":
+ case "negativeAckRedeliveryBackoff": return
getOrCreateConfiguration(target).getNegativeAckRedeliveryBackoff();
case "negativeackredeliverydelaymicros":
case "negativeAckRedeliveryDelayMicros": return
getOrCreateConfiguration(target).getNegativeAckRedeliveryDelayMicros();
case "numberofconsumerthreads":
diff --git
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
index f19746e95f0..08be79b9ea5 100644
---
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
+++
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointConfigurer.java
@@ -25,6 +25,8 @@ public class PulsarEndpointConfigurer extends
PropertyConfigurerSupport implemen
case "ackGroupTimeMillis":
target.getPulsarConfiguration().setAckGroupTimeMillis(property(camelContext,
long.class, value)); return true;
case "acktimeoutmillis":
case "ackTimeoutMillis":
target.getPulsarConfiguration().setAckTimeoutMillis(property(camelContext,
long.class, value)); return true;
+ case "acktimeoutredeliverybackoff":
+ case "ackTimeoutRedeliveryBackoff":
target.getPulsarConfiguration().setAckTimeoutRedeliveryBackoff(property(camelContext,
org.apache.pulsar.client.api.RedeliveryBackoff.class, value)); return true;
case "allowmanualacknowledgement":
case "allowManualAcknowledgement":
target.getPulsarConfiguration().setAllowManualAcknowledgement(property(camelContext,
boolean.class, value)); return true;
case "authenticationclass":
@@ -73,6 +75,8 @@ public class PulsarEndpointConfigurer extends
PropertyConfigurerSupport implemen
case "messageRouter":
target.getPulsarConfiguration().setMessageRouter(property(camelContext,
org.apache.pulsar.client.api.MessageRouter.class, value)); return true;
case "messageroutingmode":
case "messageRoutingMode":
target.getPulsarConfiguration().setMessageRoutingMode(property(camelContext,
org.apache.pulsar.client.api.MessageRoutingMode.class, value)); return true;
+ case "negativeackredeliverybackoff":
+ case "negativeAckRedeliveryBackoff":
target.getPulsarConfiguration().setNegativeAckRedeliveryBackoff(property(camelContext,
org.apache.pulsar.client.api.RedeliveryBackoff.class, value)); return true;
case "negativeackredeliverydelaymicros":
case "negativeAckRedeliveryDelayMicros":
target.getPulsarConfiguration().setNegativeAckRedeliveryDelayMicros(property(camelContext,
long.class, value)); return true;
case "numberofconsumerthreads":
@@ -108,6 +112,8 @@ public class PulsarEndpointConfigurer extends
PropertyConfigurerSupport implemen
case "ackGroupTimeMillis": return long.class;
case "acktimeoutmillis":
case "ackTimeoutMillis": return long.class;
+ case "acktimeoutredeliverybackoff":
+ case "ackTimeoutRedeliveryBackoff": return
org.apache.pulsar.client.api.RedeliveryBackoff.class;
case "allowmanualacknowledgement":
case "allowManualAcknowledgement": return boolean.class;
case "authenticationclass":
@@ -156,6 +162,8 @@ public class PulsarEndpointConfigurer extends
PropertyConfigurerSupport implemen
case "messageRouter": return
org.apache.pulsar.client.api.MessageRouter.class;
case "messageroutingmode":
case "messageRoutingMode": return
org.apache.pulsar.client.api.MessageRoutingMode.class;
+ case "negativeackredeliverybackoff":
+ case "negativeAckRedeliveryBackoff": return
org.apache.pulsar.client.api.RedeliveryBackoff.class;
case "negativeackredeliverydelaymicros":
case "negativeAckRedeliveryDelayMicros": return long.class;
case "numberofconsumerthreads":
@@ -192,6 +200,8 @@ public class PulsarEndpointConfigurer extends
PropertyConfigurerSupport implemen
case "ackGroupTimeMillis": return
target.getPulsarConfiguration().getAckGroupTimeMillis();
case "acktimeoutmillis":
case "ackTimeoutMillis": return
target.getPulsarConfiguration().getAckTimeoutMillis();
+ case "acktimeoutredeliverybackoff":
+ case "ackTimeoutRedeliveryBackoff": return
target.getPulsarConfiguration().getAckTimeoutRedeliveryBackoff();
case "allowmanualacknowledgement":
case "allowManualAcknowledgement": return
target.getPulsarConfiguration().isAllowManualAcknowledgement();
case "authenticationclass":
@@ -240,6 +250,8 @@ public class PulsarEndpointConfigurer extends
PropertyConfigurerSupport implemen
case "messageRouter": return
target.getPulsarConfiguration().getMessageRouter();
case "messageroutingmode":
case "messageRoutingMode": return
target.getPulsarConfiguration().getMessageRoutingMode();
+ case "negativeackredeliverybackoff":
+ case "negativeAckRedeliveryBackoff": return
target.getPulsarConfiguration().getNegativeAckRedeliveryBackoff();
case "negativeackredeliverydelaymicros":
case "negativeAckRedeliveryDelayMicros": return
target.getPulsarConfiguration().getNegativeAckRedeliveryDelayMicros();
case "numberofconsumerthreads":
diff --git
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
index 87e2015e5be..3bb04d4d8c6 100644
---
a/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
+++
b/components/camel-pulsar/src/generated/java/org/apache/camel/component/pulsar/PulsarEndpointUriFactory.java
@@ -21,9 +21,10 @@ public class PulsarEndpointUriFactory extends
org.apache.camel.support.component
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Set<String> MULTI_VALUE_PREFIXES;
static {
- Set<String> props = new HashSet<>(42);
+ Set<String> props = new HashSet<>(44);
props.add("ackGroupTimeMillis");
props.add("ackTimeoutMillis");
+ props.add("ackTimeoutRedeliveryBackoff");
props.add("allowManualAcknowledgement");
props.add("authenticationClass");
props.add("authenticationParams");
@@ -49,6 +50,7 @@ public class PulsarEndpointUriFactory extends
org.apache.camel.support.component
props.add("messageRouter");
props.add("messageRoutingMode");
props.add("namespace");
+ props.add("negativeAckRedeliveryBackoff");
props.add("negativeAckRedeliveryDelayMicros");
props.add("numberOfConsumerThreads");
props.add("numberOfConsumers");
diff --git
a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
index c28d2bb37a1..d2302601763 100644
---
a/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
+++
b/components/camel-pulsar/src/generated/resources/org/apache/camel/component/pulsar/pulsar.json
@@ -28,6 +28,7 @@
"serviceUrl": { "kind": "property", "displayName": "Service Url", "group":
"common", "label": "common", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration",
"configurationField": "configuration", "description": "The Pulsar Service URL
to point while creating the client from URI" },
"ackGroupTimeMillis": { "kind": "property", "displayName": "Ack Group Time
Millis", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 100, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "Group the consumer acknowledgments for the
specified time in milliseconds - defaults to 100" },
"ackTimeoutMillis": { "kind": "property", "displayName": "Ack Timeout
Millis", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 10000, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "Timeout for unacknowledged messages in
milliseconds - defaults to 10000" },
+ "ackTimeoutRedeliveryBackoff": { "kind": "property", "displayName": "Ack
Timeout Redelivery Backoff", "group": "consumer", "label": "consumer",
"required": false, "type": "object", "javaType":
"org.apache.pulsar.client.api.RedeliveryBackoff", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "RedeliveryBackoff to use for ack timeout
redelivery b [...]
"allowManualAcknowledgement": { "kind": "property", "displayName": "Allow
Manual Acknowledgement", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration",
"configurationField": "configuration", "description": "Whether to allow manual
message acknowledgements. If this option is ena [...]
"bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error
Handler", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Allows for bridging the
consumer to the Camel routing Error Handler, which mean any exceptions occurred
while the consumer is trying to pickup incoming messages, or the likes, will
now be processed as a me [...]
"consumerName": { "kind": "property", "displayName": "Consumer Name",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": "sole-consumer", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "Name of the consumer when subscription is
EXCLUSIVE" },
@@ -36,6 +37,7 @@
"deadLetterTopic": { "kind": "property", "displayName": "Dead Letter
Topic", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "Name of the topic where the messages which
fail maxRedeliverCount times will be sent. Note: if not set, defa [...]
"maxRedeliverCount": { "kind": "property", "displayName": "Max Redeliver
Count", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "Maximum number of times that a message will be
redelivered before being sent to the dead letter queue. [...]
"messageListener": { "kind": "property", "displayName": "Message
Listener", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": true, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "Whether to use the messageListener interface,
or to receive messages using a separate thread pool" },
+ "negativeAckRedeliveryBackoff": { "kind": "property", "displayName":
"Negative Ack Redelivery Backoff", "group": "consumer", "label": "consumer",
"required": false, "type": "object", "javaType":
"org.apache.pulsar.client.api.RedeliveryBackoff", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "RedeliveryBackoff to use for negative ack
redeliver [...]
"negativeAckRedeliveryDelayMicros": { "kind": "property", "displayName":
"Negative Ack Redelivery Delay Micros", "group": "consumer", "label":
"consumer", "required": false, "type": "integer", "javaType": "long",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
60000000, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "Set the negative acknowledgement delay" },
"numberOfConsumers": { "kind": "property", "displayName": "Number Of
Consumers", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 1, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "Number of consumers - defaults to 1" },
"numberOfConsumerThreads": { "kind": "property", "displayName": "Number Of
Consumer Threads", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 1, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"configuration", "description": "Number of threads to receive and handle
messages when using a separate thread pool" },
@@ -90,6 +92,7 @@
"serviceUrl": { "kind": "parameter", "displayName": "Service Url",
"group": "common", "label": "common", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "The Pulsar Service URL to point while
creating the client from URI" },
"ackGroupTimeMillis": { "kind": "parameter", "displayName": "Ack Group
Time Millis", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 100, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Group the consumer acknowledgments for
the specified time in milliseconds - defaults [...]
"ackTimeoutMillis": { "kind": "parameter", "displayName": "Ack Timeout
Millis", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 10000, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Timeout for unacknowledged messages in
milliseconds - defaults to 10000" },
+ "ackTimeoutRedeliveryBackoff": { "kind": "parameter", "displayName": "Ack
Timeout Redelivery Backoff", "group": "consumer", "label": "consumer",
"required": false, "type": "object", "javaType":
"org.apache.pulsar.client.api.RedeliveryBackoff", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "RedeliveryBackoff to use for ack timeout
redel [...]
"allowManualAcknowledgement": { "kind": "parameter", "displayName": "Allow
Manual Acknowledgement", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.pulsar.PulsarConfiguration",
"configurationField": "pulsarConfiguration", "description": "Whether to allow
manual message acknowledgements. If this option [...]
"consumerName": { "kind": "parameter", "displayName": "Consumer Name",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": "sole-consumer", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Name of the consumer when subscription
is EXCLUSIVE" },
"consumerNamePrefix": { "kind": "parameter", "displayName": "Consumer Name
Prefix", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "cons", "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Prefix to add to consumer names when a
SHARED or FAILOVER subscription [...]
@@ -97,6 +100,7 @@
"deadLetterTopic": { "kind": "parameter", "displayName": "Dead Letter
Topic", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Name of the topic where the messages
which fail maxRedeliverCount times will be sent. Note: if not se [...]
"maxRedeliverCount": { "kind": "parameter", "displayName": "Max Redeliver
Count", "group": "consumer", "label": "consumer", "required": false, "type":
"integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Maximum number of times that a message
will be redelivered before being sent to the dead letter [...]
"messageListener": { "kind": "parameter", "displayName": "Message
Listener", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": true, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Whether to use the messageListener
interface, or to receive messages using a separate th [...]
+ "negativeAckRedeliveryBackoff": { "kind": "parameter", "displayName":
"Negative Ack Redelivery Backoff", "group": "consumer", "label": "consumer",
"required": false, "type": "object", "javaType":
"org.apache.pulsar.client.api.RedeliveryBackoff", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "RedeliveryBackoff to use for negative
ack re [...]
"negativeAckRedeliveryDelayMicros": { "kind": "parameter", "displayName":
"Negative Ack Redelivery Delay Micros", "group": "consumer", "label":
"consumer", "required": false, "type": "integer", "javaType": "long",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
60000000, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Set the negative acknowledgement delay"
},
"numberOfConsumers": { "kind": "parameter", "displayName": "Number Of
Consumers", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 1, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Number of consumers - defaults to 1" },
"numberOfConsumerThreads": { "kind": "parameter", "displayName": "Number
Of Consumer Threads", "group": "consumer", "label": "consumer", "required":
false, "type": "integer", "javaType": "int", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": 1, "configurationClass":
"org.apache.camel.component.pulsar.PulsarConfiguration", "configurationField":
"pulsarConfiguration", "description": "Number of threads to receive and handle
messages when using a separate thread [...]
diff --git
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
index 5f22892bc5c..1d9aa490e71 100644
---
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
+++
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarConfiguration.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import static
org.apache.camel.component.pulsar.utils.consumers.SubscriptionInitialPosition.LATEST;
@@ -65,6 +66,10 @@ public class PulsarConfiguration implements Cloneable {
private long ackTimeoutMillis = 10000;
@UriParam(label = "consumer", defaultValue = "60000000")
private long negativeAckRedeliveryDelayMicros = 60000000;
+ @UriParam(label = "consumer", description = "RedeliveryBackoff to use for
ack timeout redelivery backoff.")
+ private RedeliveryBackoff ackTimeoutRedeliveryBackoff;
+ @UriParam(label = "consumer", description = "RedeliveryBackoff to use for
negative ack redelivery backoff.")
+ private RedeliveryBackoff negativeAckRedeliveryBackoff;
@UriParam(label = "consumer", defaultValue = "100")
private long ackGroupTimeMillis = 100;
@UriParam(label = "consumer", defaultValue = "LATEST")
@@ -455,6 +460,28 @@ public class PulsarConfiguration implements Cloneable {
this.negativeAckRedeliveryDelayMicros =
negativeAckRedeliveryDelayMicros;
}
+ public RedeliveryBackoff getAckTimeoutRedeliveryBackoff() {
+ return ackTimeoutRedeliveryBackoff;
+ }
+
+ /**
+ * Set a RedeliveryBackoff to use for ack timeout redelivery backoff.
+ */
+ public void setAckTimeoutRedeliveryBackoff(RedeliveryBackoff
redeliveryBackoff) {
+ this.ackTimeoutRedeliveryBackoff = redeliveryBackoff;
+ }
+
+ public RedeliveryBackoff getNegativeAckRedeliveryBackoff() {
+ return negativeAckRedeliveryBackoff;
+ }
+
+ /**
+ * Set a RedeliveryBackoff to use for negative ack redelivery backoff.
+ */
+ public void setNegativeAckRedeliveryBackoff(RedeliveryBackoff
redeliveryBackoff) {
+ this.negativeAckRedeliveryBackoff = redeliveryBackoff;
+ }
+
public Integer getMaxRedeliverCount() {
return maxRedeliverCount;
}
diff --git
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
index 2cdfa62625b..32446880a28 100644
---
a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
+++
b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/consumers/CommonCreationStrategyImpl.java
@@ -22,6 +22,7 @@ import org.apache.camel.component.pulsar.PulsarConfiguration;
import org.apache.camel.component.pulsar.PulsarConsumer;
import org.apache.camel.component.pulsar.PulsarEndpoint;
import org.apache.camel.component.pulsar.PulsarMessageListener;
+import org.apache.camel.util.ObjectHelper;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.DeadLetterPolicy.DeadLetterPolicyBuilder;
@@ -66,6 +67,15 @@ public final class CommonCreationStrategyImpl {
builder.deadLetterPolicy(policy.build());
}
+
+ if
(ObjectHelper.isNotEmpty(endpointConfiguration.getAckTimeoutRedeliveryBackoff()))
{
+
builder.ackTimeoutRedeliveryBackoff(endpointConfiguration.getAckTimeoutRedeliveryBackoff());
+ }
+
+ if
(ObjectHelper.isNotEmpty(endpointConfiguration.getNegativeAckRedeliveryBackoff()))
{
+
builder.negativeAckRedeliveryBackoff(endpointConfiguration.getNegativeAckRedeliveryBackoff());
+ }
+
return builder;
}
}
diff --git
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerDeadLetterPolicyIT.java
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerDeadLetterPolicyIT.java
index 2d45cb44503..8b2223a5796 100644
---
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerDeadLetterPolicyIT.java
+++
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerDeadLetterPolicyIT.java
@@ -111,7 +111,7 @@ public class PulsarConsumerDeadLetterPolicyIT extends
PulsarITSupport {
}
@Test
- public void
givenMaxRedeliverCountverifyMessageGetsSentToDefaultDeadLetterTopicAfterCountExceeded()
+ public void
givenMaxRedeliverCountVerifyMessageGetsSentToDefaultDeadLetterTopicAfterCountExceeded()
throws Exception {
PulsarComponent component = context.getComponent("pulsar",
PulsarComponent.class);
@@ -136,7 +136,7 @@ public class PulsarConsumerDeadLetterPolicyIT extends
PulsarITSupport {
}
@Test
- public void
givenMaxRedeliverCountAndDeadLetterTopicverifyMessageGetsSentToSpecifiedDeadLetterTopicAfterCountExceeded()
+ public void
givenMaxRedeliverCountAndDeadLetterTopicVerifyMessageGetsSentToSpecifiedDeadLetterTopicAfterCountExceeded()
throws Exception {
PulsarComponent component = context.getComponent("pulsar",
PulsarComponent.class);
@@ -162,7 +162,7 @@ public class PulsarConsumerDeadLetterPolicyIT extends
PulsarITSupport {
}
@Test
- public void
givenOnlyDeadLetterTopicverifyMessageDoesNotGetSentToSpecifiedTopic() throws
Exception {
+ public void
givenOnlyDeadLetterTopicVerifyMessageDoesNotGetSentToSpecifiedTopic() throws
Exception {
PulsarComponent component = context.getComponent("pulsar",
PulsarComponent.class);
PulsarEndpoint from = (PulsarEndpoint) component
diff --git
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNoAcknowledgementIT.java
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNegativeAcknowledgementIT.java
similarity index 67%
copy from
components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNoAcknowledgementIT.java
copy to
components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNegativeAcknowledgementIT.java
index 026e0d2d41e..f5bb919fe0b 100644
---
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNoAcknowledgementIT.java
+++
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNegativeAcknowledgementIT.java
@@ -23,7 +23,9 @@ import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.pulsar.PulsarComponent;
+import org.apache.camel.component.pulsar.PulsarMessageReceipt;
import org.apache.camel.component.pulsar.utils.AutoConfiguration;
+import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
import org.apache.camel.spi.Registry;
import org.apache.camel.support.SimpleRegistry;
import org.apache.pulsar.client.api.Producer;
@@ -31,16 +33,16 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff;
import org.junit.jupiter.api.Test;
-public class PulsarConsumerNoAcknowledgementIT extends PulsarITSupport {
+public class PulsarConsumerNegativeAcknowledgementIT extends PulsarITSupport {
- private static final String TOPIC_URI =
"persistent://public/default/camel-topic";
+ private static final String TOPIC_URI =
"persistent://public/default/camel-topic-negative-ack";
private static final String PRODUCER = "camel-producer-1";
- @EndpointInject("pulsar:" + TOPIC_URI +
"?numberOfConsumers=1&subscriptionType=Exclusive"
- +
"&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
- + "&ackTimeoutMillis=1000")
+ @EndpointInject("pulsar:" + TOPIC_URI +
"?numberOfConsumers=1&subscriptionType=Exclusive&batchingEnabled=false"
+ +
"&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer")
private Endpoint from;
@EndpointInject("mock:result")
@@ -51,8 +53,12 @@ public class PulsarConsumerNoAcknowledgementIT extends
PulsarITSupport {
return new RouteBuilder() {
@Override
public void configure() {
- // Nothing in the route will ack the message.
- from(from).to(to);
+ // This route will explicitly negative acknowledge the message.
+ from(from)
+ .process(exchange -> exchange.getIn()
+
.getHeader(PulsarMessageHeaders.MESSAGE_RECEIPT, PulsarMessageReceipt.class)
+ .negativeAcknowledge())
+ .to(to);
}
};
}
@@ -74,7 +80,16 @@ public class PulsarConsumerNoAcknowledgementIT extends
PulsarITSupport {
PulsarComponent comp = new PulsarComponent(context);
comp.setAutoConfiguration(autoConfiguration);
comp.setPulsarClient(pulsarClient);
- comp.getConfiguration().setAllowManualAcknowledgement(true); // Set to
true here instead of the endpoint query parameter.
+ comp.getConfiguration()
+ .setAllowManualAcknowledgement(true); // Set to true here
instead of the endpoint query parameter.
+ comp.getConfiguration().setAckTimeoutMillis(60_000L);
+ // Given relevant millis=1000 redeliveries will occur at 1s + 0.01s,
1s + 1s, 1s + 100s, 1s + 100s, 1s + 100s...
+
comp.getConfiguration().setNegativeAckRedeliveryDelayMicros(1_000_000L);
+
comp.getConfiguration().setNegativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
+ .minDelayMs(10L)
+ .maxDelayMs(100_000L)
+ .multiplier(100.0)
+ .build());
registry.bind("pulsar", comp);
}
@@ -83,8 +98,8 @@ public class PulsarConsumerNoAcknowledgementIT extends
PulsarITSupport {
}
@Test
- public void testAMessageIsConsumedMultipleTimes() throws Exception {
- to.expectedMinimumMessageCount(2);
+ public void testAMessageIsConsumedMultipleTimesWithNegativeAckBackoff()
throws Exception {
+ to.expectedMessageCount(3);
Producer<String> producer
=
givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
@@ -92,6 +107,7 @@ public class PulsarConsumerNoAcknowledgementIT extends
PulsarITSupport {
producer.send("Hello World!");
MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
- }
+ producer.close();
+ }
}
diff --git
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNoAcknowledgementIT.java
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNoAcknowledgementIT.java
index 026e0d2d41e..aed3a199f77 100644
---
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNoAcknowledgementIT.java
+++
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarConsumerNoAcknowledgementIT.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff;
import org.junit.jupiter.api.Test;
public class PulsarConsumerNoAcknowledgementIT extends PulsarITSupport {
@@ -39,8 +40,7 @@ public class PulsarConsumerNoAcknowledgementIT extends
PulsarITSupport {
private static final String PRODUCER = "camel-producer-1";
@EndpointInject("pulsar:" + TOPIC_URI +
"?numberOfConsumers=1&subscriptionType=Exclusive"
- +
"&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer"
- + "&ackTimeoutMillis=1000")
+ +
"&subscriptionName=camel-subscription&consumerQueueSize=1&consumerName=camel-consumer")
private Endpoint from;
@EndpointInject("mock:result")
@@ -74,7 +74,15 @@ public class PulsarConsumerNoAcknowledgementIT extends
PulsarITSupport {
PulsarComponent comp = new PulsarComponent(context);
comp.setAutoConfiguration(autoConfiguration);
comp.setPulsarClient(pulsarClient);
- comp.getConfiguration().setAllowManualAcknowledgement(true); // Set to
true here instead of the endpoint query parameter.
+ comp.getConfiguration()
+ .setAllowManualAcknowledgement(true); // Set to true here
instead of the endpoint query parameter.
+ // Given relevant millis=1000 redeliveries will occur at 1s + 0.01s,
1s + 1s, 1s + 100s, 1s + 100s, 1s + 100s...
+ comp.getConfiguration().setAckTimeoutMillis(1_000L);
+
comp.getConfiguration().setAckTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
+ .minDelayMs(10L)
+ .maxDelayMs(100_000L)
+ .multiplier(100.0)
+ .build());
registry.bind("pulsar", comp);
}
@@ -92,6 +100,21 @@ public class PulsarConsumerNoAcknowledgementIT extends
PulsarITSupport {
producer.send("Hello World!");
MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+
+ producer.close();
}
+ @Test
+ public void testAMessageIsConsumedMultipleTimesWithAckTimeoutBackoff()
throws Exception {
+ to.expectedMessageCount(3);
+
+ Producer<String> producer
+ =
givenPulsarClient().newProducer(Schema.STRING).producerName(PRODUCER).topic(TOPIC_URI).create();
+
+ producer.send("Hello World!");
+
+ MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, to);
+
+ producer.close();
+ }
}