This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 17ac6bf9103 Manual ack changes for mqtt v3 (#18949)
17ac6bf9103 is described below

commit 17ac6bf9103dfb7f5f81dc6773c189b1b475a9ae
Author: nkokitkar <[email protected]>
AuthorDate: Tue Aug 19 22:30:38 2025 -0700

    Manual ack changes for mqtt v3 (#18949)
---
 .../component/paho/PahoComponentConfigurer.java    |  6 ++
 .../component/paho/PahoEndpointConfigurer.java     |  6 ++
 .../component/paho/PahoEndpointUriFactory.java     |  3 +-
 .../org/apache/camel/component/paho/paho.json      | 50 ++++++++-------
 .../camel/component/paho/PahoConfiguration.java    | 16 +++++
 .../apache/camel/component/paho/PahoConsumer.java  | 23 +++++++
 .../camel/component/paho/PahoManualAcksTest.java   | 73 ++++++++++++++++++++++
 7 files changed, 152 insertions(+), 25 deletions(-)

diff --git 
a/components/camel-paho/src/generated/java/org/apache/camel/component/paho/PahoComponentConfigurer.java
 
b/components/camel-paho/src/generated/java/org/apache/camel/component/paho/PahoComponentConfigurer.java
index 3a5ee75e4c0..12a918c748d 100644
--- 
a/components/camel-paho/src/generated/java/org/apache/camel/component/paho/PahoComponentConfigurer.java
+++ 
b/components/camel-paho/src/generated/java/org/apache/camel/component/paho/PahoComponentConfigurer.java
@@ -58,6 +58,8 @@ public class PahoComponentConfigurer extends 
PropertyConfigurerSupport implement
         case "keepAliveInterval": 
getOrCreateConfiguration(target).setKeepAliveInterval(property(camelContext, 
int.class, value)); return true;
         case "lazystartproducer":
         case "lazyStartProducer": 
target.setLazyStartProducer(property(camelContext, boolean.class, value)); 
return true;
+        case "manualacksenabled":
+        case "manualAcksEnabled": 
getOrCreateConfiguration(target).setManualAcksEnabled(property(camelContext, 
boolean.class, value)); return true;
         case "maxinflight":
         case "maxInflight": 
getOrCreateConfiguration(target).setMaxInflight(property(camelContext, 
int.class, value)); return true;
         case "maxreconnectdelay":
@@ -121,6 +123,8 @@ public class PahoComponentConfigurer extends 
PropertyConfigurerSupport implement
         case "keepAliveInterval": return int.class;
         case "lazystartproducer":
         case "lazyStartProducer": return boolean.class;
+        case "manualacksenabled":
+        case "manualAcksEnabled": return boolean.class;
         case "maxinflight":
         case "maxInflight": return int.class;
         case "maxreconnectdelay":
@@ -185,6 +189,8 @@ public class PahoComponentConfigurer extends 
PropertyConfigurerSupport implement
         case "keepAliveInterval": return 
getOrCreateConfiguration(target).getKeepAliveInterval();
         case "lazystartproducer":
         case "lazyStartProducer": return target.isLazyStartProducer();
+        case "manualacksenabled":
+        case "manualAcksEnabled": return 
getOrCreateConfiguration(target).isManualAcksEnabled();
         case "maxinflight":
         case "maxInflight": return 
getOrCreateConfiguration(target).getMaxInflight();
         case "maxreconnectdelay":
diff --git 
a/components/camel-paho/src/generated/java/org/apache/camel/component/paho/PahoEndpointConfigurer.java
 
b/components/camel-paho/src/generated/java/org/apache/camel/component/paho/PahoEndpointConfigurer.java
index c72189dd0c6..5724f593a15 100644
--- 
a/components/camel-paho/src/generated/java/org/apache/camel/component/paho/PahoEndpointConfigurer.java
+++ 
b/components/camel-paho/src/generated/java/org/apache/camel/component/paho/PahoEndpointConfigurer.java
@@ -52,6 +52,8 @@ public class PahoEndpointConfigurer extends 
PropertyConfigurerSupport implements
         case "keepAliveInterval": 
target.getConfiguration().setKeepAliveInterval(property(camelContext, 
int.class, value)); return true;
         case "lazystartproducer":
         case "lazyStartProducer": 
target.setLazyStartProducer(property(camelContext, boolean.class, value)); 
return true;
+        case "manualacksenabled":
+        case "manualAcksEnabled": 
target.getConfiguration().setManualAcksEnabled(property(camelContext, 
boolean.class, value)); return true;
         case "maxinflight":
         case "maxInflight": 
target.getConfiguration().setMaxInflight(property(camelContext, int.class, 
value)); return true;
         case "maxreconnectdelay":
@@ -116,6 +118,8 @@ public class PahoEndpointConfigurer extends 
PropertyConfigurerSupport implements
         case "keepAliveInterval": return int.class;
         case "lazystartproducer":
         case "lazyStartProducer": return boolean.class;
+        case "manualacksenabled":
+        case "manualAcksEnabled": return boolean.class;
         case "maxinflight":
         case "maxInflight": return int.class;
         case "maxreconnectdelay":
@@ -181,6 +185,8 @@ public class PahoEndpointConfigurer extends 
PropertyConfigurerSupport implements
         case "keepAliveInterval": return 
target.getConfiguration().getKeepAliveInterval();
         case "lazystartproducer":
         case "lazyStartProducer": return target.isLazyStartProducer();
+        case "manualacksenabled":
+        case "manualAcksEnabled": return 
target.getConfiguration().isManualAcksEnabled();
         case "maxinflight":
         case "maxInflight": return target.getConfiguration().getMaxInflight();
         case "maxreconnectdelay":
diff --git 
a/components/camel-paho/src/generated/java/org/apache/camel/component/paho/PahoEndpointUriFactory.java
 
b/components/camel-paho/src/generated/java/org/apache/camel/component/paho/PahoEndpointUriFactory.java
index c5a107312d4..c6324d4b3d3 100644
--- 
a/components/camel-paho/src/generated/java/org/apache/camel/component/paho/PahoEndpointUriFactory.java
+++ 
b/components/camel-paho/src/generated/java/org/apache/camel/component/paho/PahoEndpointUriFactory.java
@@ -23,7 +23,7 @@ public class PahoEndpointUriFactory extends 
org.apache.camel.support.component.E
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(32);
+        Set<String> props = new HashSet<>(33);
         props.add("automaticReconnect");
         props.add("bridgeErrorHandler");
         props.add("brokerUrl");
@@ -39,6 +39,7 @@ public class PahoEndpointUriFactory extends 
org.apache.camel.support.component.E
         props.add("httpsHostnameVerificationEnabled");
         props.add("keepAliveInterval");
         props.add("lazyStartProducer");
+        props.add("manualAcksEnabled");
         props.add("maxInflight");
         props.add("maxReconnectDelay");
         props.add("mqttVersion");
diff --git 
a/components/camel-paho/src/generated/resources/META-INF/org/apache/camel/component/paho/paho.json
 
b/components/camel-paho/src/generated/resources/META-INF/org/apache/camel/component/paho/paho.json
index bbef5503a4b..628b2380ca9 100644
--- 
a/components/camel-paho/src/generated/resources/META-INF/org/apache/camel/component/paho/paho.json
+++ 
b/components/camel-paho/src/generated/resources/META-INF/org/apache/camel/component/paho/paho.json
@@ -44,17 +44,18 @@
     "willRetained": { "index": 17, "kind": "property", "displayName": "Will 
Retained", "group": "common", "label": "", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Sets the Last Will and Testament (LWT) for the 
connection. In the event that this client unexpectedly [...]
     "willTopic": { "index": 18, "kind": "property", "displayName": "Will 
Topic", "group": "common", "label": "", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Sets the Last Will and Testament (LWT) for the 
connection. In the event that this client unexpectedly loses its connection 
[...]
     "bridgeErrorHandler": { "index": 19, "kind": "property", "displayName": 
"Bridge Error Handler", "group": "consumer", "label": "consumer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": 
"Allows for bridging the consumer to the Camel routing Error Handler, which 
mean any exceptions (if possible) occurred while the Camel consumer is trying 
to pickup incoming messages, or the lik [...]
-    "lazyStartProducer": { "index": 20, "kind": "property", "displayName": 
"Lazy Start Producer", "group": "producer", "label": "producer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": 
"Whether the producer should be started lazy (on the first message). By 
starting lazy you can use this to allow CamelContext and routes to startup in 
situations where a producer may otherwise fai [...]
-    "autowiredEnabled": { "index": 21, "kind": "property", "displayName": 
"Autowired Enabled", "group": "advanced", "label": "advanced", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": true, "description": 
"Whether autowiring is enabled. This is used for automatic autowiring options 
(the option must be marked as autowired) by looking up in the registry to find 
if there is a single instance of matching  [...]
-    "client": { "index": 22, "kind": "property", "displayName": "Client", 
"group": "advanced", "label": "advanced", "required": false, "type": "object", 
"javaType": "org.eclipse.paho.client.mqttv3.MqttClient", "deprecated": false, 
"autowired": false, "secret": false, "description": "To use a shared Paho 
client" },
-    "customWebSocketHeaders": { "index": 23, "kind": "property", 
"displayName": "Custom Web Socket Headers", "group": "advanced", "label": 
"advanced", "required": false, "type": "object", "javaType": 
"java.util.Properties", "deprecated": false, "autowired": false, "secret": 
false, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Sets the Custom WebSocket Headers for the 
WebSocket Connection." },
-    "executorServiceTimeout": { "index": 24, "kind": "property", 
"displayName": "Executor Service Timeout", "group": "advanced", "label": 
"advanced", "required": false, "type": "integer", "javaType": "int", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, 
"configurationClass": "org.apache.camel.component.paho.PahoConfiguration", 
"configurationField": "configuration", "description": "Set the time in seconds 
that the executor service should wait when terminating [...]
-    "httpsHostnameVerificationEnabled": { "index": 25, "kind": "property", 
"displayName": "Https Hostname Verification Enabled", "group": "security", 
"label": "security", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": true, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Whether SSL HostnameVerifier is enabled or 
not. Th [...]
-    "password": { "index": 26, "kind": "property", "displayName": "Password", 
"group": "security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": true, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Password to be used for authentication against 
the MQTT broker" },
-    "socketFactory": { "index": 27, "kind": "property", "displayName": "Socket 
Factory", "group": "security", "label": "security", "required": false, "type": 
"object", "javaType": "javax.net.SocketFactory", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Sets the SocketFactory to use. This allows an 
application to apply its own policies around the cr [...]
-    "sslClientProps": { "index": 28, "kind": "property", "displayName": "Ssl 
Client Props", "group": "security", "label": "security", "required": false, 
"type": "object", "javaType": "java.util.Properties", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Sets the SSL properties for the connection. 
Note that these properties are only valid if an imple [...]
-    "sslHostnameVerifier": { "index": 29, "kind": "property", "displayName": 
"Ssl Hostname Verifier", "group": "security", "label": "security", "required": 
false, "type": "object", "javaType": "javax.net.ssl.HostnameVerifier", 
"deprecated": false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Sets the HostnameVerifier for the SSL 
connection. Note that it will be used a [...]
-    "userName": { "index": 30, "kind": "property", "displayName": "User Name", 
"group": "security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": true, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Username to be used for authentication against 
the MQTT broker" }
+    "manualAcksEnabled": { "index": 20, "kind": "property", "displayName": 
"Manual Acks Enabled", "group": "consumer", "label": "consumer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, 
"configurationClass": "org.apache.camel.component.paho.PahoConfiguration", 
"configurationField": "configuration", "description": "Sets whether to use 
manual acknowledgements for the client. By default, this is  [...]
+    "lazyStartProducer": { "index": 21, "kind": "property", "displayName": 
"Lazy Start Producer", "group": "producer", "label": "producer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": 
"Whether the producer should be started lazy (on the first message). By 
starting lazy you can use this to allow CamelContext and routes to startup in 
situations where a producer may otherwise fai [...]
+    "autowiredEnabled": { "index": 22, "kind": "property", "displayName": 
"Autowired Enabled", "group": "advanced", "label": "advanced", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": true, "description": 
"Whether autowiring is enabled. This is used for automatic autowiring options 
(the option must be marked as autowired) by looking up in the registry to find 
if there is a single instance of matching  [...]
+    "client": { "index": 23, "kind": "property", "displayName": "Client", 
"group": "advanced", "label": "advanced", "required": false, "type": "object", 
"javaType": "org.eclipse.paho.client.mqttv3.MqttClient", "deprecated": false, 
"autowired": false, "secret": false, "description": "To use a shared Paho 
client" },
+    "customWebSocketHeaders": { "index": 24, "kind": "property", 
"displayName": "Custom Web Socket Headers", "group": "advanced", "label": 
"advanced", "required": false, "type": "object", "javaType": 
"java.util.Properties", "deprecated": false, "autowired": false, "secret": 
false, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Sets the Custom WebSocket Headers for the 
WebSocket Connection." },
+    "executorServiceTimeout": { "index": 25, "kind": "property", 
"displayName": "Executor Service Timeout", "group": "advanced", "label": 
"advanced", "required": false, "type": "integer", "javaType": "int", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, 
"configurationClass": "org.apache.camel.component.paho.PahoConfiguration", 
"configurationField": "configuration", "description": "Set the time in seconds 
that the executor service should wait when terminating [...]
+    "httpsHostnameVerificationEnabled": { "index": 26, "kind": "property", 
"displayName": "Https Hostname Verification Enabled", "group": "security", 
"label": "security", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": true, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Whether SSL HostnameVerifier is enabled or 
not. Th [...]
+    "password": { "index": 27, "kind": "property", "displayName": "Password", 
"group": "security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": true, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Password to be used for authentication against 
the MQTT broker" },
+    "socketFactory": { "index": 28, "kind": "property", "displayName": "Socket 
Factory", "group": "security", "label": "security", "required": false, "type": 
"object", "javaType": "javax.net.SocketFactory", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Sets the SocketFactory to use. This allows an 
application to apply its own policies around the cr [...]
+    "sslClientProps": { "index": 29, "kind": "property", "displayName": "Ssl 
Client Props", "group": "security", "label": "security", "required": false, 
"type": "object", "javaType": "java.util.Properties", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Sets the SSL properties for the connection. 
Note that these properties are only valid if an imple [...]
+    "sslHostnameVerifier": { "index": 30, "kind": "property", "displayName": 
"Ssl Hostname Verifier", "group": "security", "label": "security", "required": 
false, "type": "object", "javaType": "javax.net.ssl.HostnameVerifier", 
"deprecated": false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Sets the HostnameVerifier for the SSL 
connection. Note that it will be used a [...]
+    "userName": { "index": 31, "kind": "property", "displayName": "User Name", 
"group": "security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": true, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Username to be used for authentication against 
the MQTT broker" }
   },
   "headers": {
     "CamelMqttTopic": { "index": 0, "kind": "header", "displayName": "", 
"group": "consumer", "label": "consumer", "required": false, "javaType": 
"String", "deprecated": false, "deprecationNote": "", "autowired": false, 
"secret": false, "description": "The name of the topic.", "constantName": 
"org.apache.camel.component.paho.PahoConstants#MQTT_TOPIC" },
@@ -83,18 +84,19 @@
     "willQos": { "index": 16, "kind": "parameter", "displayName": "Will Qos", 
"group": "common", "label": "", "required": false, "type": "integer", 
"javaType": "int", "deprecated": false, "autowired": false, "secret": false, 
"configurationClass": "org.apache.camel.component.paho.PahoConfiguration", 
"configurationField": "configuration", "description": "Sets the Last Will and 
Testament (LWT) for the connection. In the event that this client unexpectedly 
loses its connection to the server, [...]
     "willRetained": { "index": 17, "kind": "parameter", "displayName": "Will 
Retained", "group": "common", "label": "", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Sets the Last Will and Testament (LWT) for the 
connection. In the event that this client unexpectedl [...]
     "willTopic": { "index": 18, "kind": "parameter", "displayName": "Will 
Topic", "group": "common", "label": "", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Sets the Last Will and Testament (LWT) for the 
connection. In the event that this client unexpectedly loses its connectio [...]
-    "bridgeErrorHandler": { "index": 19, "kind": "parameter", "displayName": 
"Bridge Error Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Allows for bridging the consumer to the 
Camel routing Error Handler, which mean any exceptions (if possible) occurred 
while the Camel consumer is trying to pickup incoming [...]
-    "exceptionHandler": { "index": 20, "kind": "parameter", "displayName": 
"Exception Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
let the consumer use a custom ExceptionHandler. Notice if the option 
bridgeErrorHandler is enabled then this option is not in use. By de [...]
-    "exchangePattern": { "index": 21, "kind": "parameter", "displayName": 
"Exchange Pattern", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the exchange pattern when the consumer creates an exchange." },
-    "lazyStartProducer": { "index": 22, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produ [...]
-    "client": { "index": 23, "kind": "parameter", "displayName": "Client", 
"group": "advanced", "label": "advanced", "required": false, "type": "object", 
"javaType": "org.eclipse.paho.client.mqttv3.MqttClient", "deprecated": false, 
"autowired": false, "secret": false, "description": "To use an existing mqtt 
client" },
-    "customWebSocketHeaders": { "index": 24, "kind": "parameter", 
"displayName": "Custom Web Socket Headers", "group": "advanced", "label": 
"advanced", "required": false, "type": "object", "javaType": 
"java.util.Properties", "deprecated": false, "autowired": false, "secret": 
false, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Sets the Custom WebSocket Headers for the 
WebSocket Connection." },
-    "executorServiceTimeout": { "index": 25, "kind": "parameter", 
"displayName": "Executor Service Timeout", "group": "advanced", "label": 
"advanced", "required": false, "type": "integer", "javaType": "int", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, 
"configurationClass": "org.apache.camel.component.paho.PahoConfiguration", 
"configurationField": "configuration", "description": "Set the time in seconds 
that the executor service should wait when terminatin [...]
-    "httpsHostnameVerificationEnabled": { "index": 26, "kind": "parameter", 
"displayName": "Https Hostname Verification Enabled", "group": "security", 
"label": "security", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": true, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Whether SSL HostnameVerifier is enabled or 
not. T [...]
-    "password": { "index": 27, "kind": "parameter", "displayName": "Password", 
"group": "security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": true, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Password to be used for authentication against 
the MQTT broker" },
-    "socketFactory": { "index": 28, "kind": "parameter", "displayName": 
"Socket Factory", "group": "security", "label": "security", "required": false, 
"type": "object", "javaType": "javax.net.SocketFactory", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Sets the SocketFactory to use. This allows an 
application to apply its own policies around the c [...]
-    "sslClientProps": { "index": 29, "kind": "parameter", "displayName": "Ssl 
Client Props", "group": "security", "label": "security", "required": false, 
"type": "object", "javaType": "java.util.Properties", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Sets the SSL properties for the connection. 
Note that these properties are only valid if an impl [...]
-    "sslHostnameVerifier": { "index": 30, "kind": "parameter", "displayName": 
"Ssl Hostname Verifier", "group": "security", "label": "security", "required": 
false, "type": "object", "javaType": "javax.net.ssl.HostnameVerifier", 
"deprecated": false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Sets the HostnameVerifier for the SSL 
connection. Note that it will be used  [...]
-    "userName": { "index": 31, "kind": "parameter", "displayName": "User 
Name", "group": "security", "label": "security", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": true, 
"configurationClass": "org.apache.camel.component.paho.PahoConfiguration", 
"configurationField": "configuration", "description": "Username to be used for 
authentication against the MQTT broker" }
+    "manualAcksEnabled": { "index": 19, "kind": "parameter", "displayName": 
"Manual Acks Enabled", "group": "consumer", "label": "consumer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, 
"configurationClass": "org.apache.camel.component.paho.PahoConfiguration", 
"configurationField": "configuration", "description": "Sets whether to use 
manual acknowledgements for the client. By default, this is [...]
+    "bridgeErrorHandler": { "index": 20, "kind": "parameter", "displayName": 
"Bridge Error Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Allows for bridging the consumer to the 
Camel routing Error Handler, which mean any exceptions (if possible) occurred 
while the Camel consumer is trying to pickup incoming [...]
+    "exceptionHandler": { "index": 21, "kind": "parameter", "displayName": 
"Exception Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
let the consumer use a custom ExceptionHandler. Notice if the option 
bridgeErrorHandler is enabled then this option is not in use. By de [...]
+    "exchangePattern": { "index": 22, "kind": "parameter", "displayName": 
"Exchange Pattern", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the exchange pattern when the consumer creates an exchange." },
+    "lazyStartProducer": { "index": 23, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produ [...]
+    "client": { "index": 24, "kind": "parameter", "displayName": "Client", 
"group": "advanced", "label": "advanced", "required": false, "type": "object", 
"javaType": "org.eclipse.paho.client.mqttv3.MqttClient", "deprecated": false, 
"autowired": false, "secret": false, "description": "To use an existing mqtt 
client" },
+    "customWebSocketHeaders": { "index": 25, "kind": "parameter", 
"displayName": "Custom Web Socket Headers", "group": "advanced", "label": 
"advanced", "required": false, "type": "object", "javaType": 
"java.util.Properties", "deprecated": false, "autowired": false, "secret": 
false, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Sets the Custom WebSocket Headers for the 
WebSocket Connection." },
+    "executorServiceTimeout": { "index": 26, "kind": "parameter", 
"displayName": "Executor Service Timeout", "group": "advanced", "label": 
"advanced", "required": false, "type": "integer", "javaType": "int", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, 
"configurationClass": "org.apache.camel.component.paho.PahoConfiguration", 
"configurationField": "configuration", "description": "Set the time in seconds 
that the executor service should wait when terminatin [...]
+    "httpsHostnameVerificationEnabled": { "index": 27, "kind": "parameter", 
"displayName": "Https Hostname Verification Enabled", "group": "security", 
"label": "security", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": true, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Whether SSL HostnameVerifier is enabled or 
not. T [...]
+    "password": { "index": 28, "kind": "parameter", "displayName": "Password", 
"group": "security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": true, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Password to be used for authentication against 
the MQTT broker" },
+    "socketFactory": { "index": 29, "kind": "parameter", "displayName": 
"Socket Factory", "group": "security", "label": "security", "required": false, 
"type": "object", "javaType": "javax.net.SocketFactory", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Sets the SocketFactory to use. This allows an 
application to apply its own policies around the c [...]
+    "sslClientProps": { "index": 30, "kind": "parameter", "displayName": "Ssl 
Client Props", "group": "security", "label": "security", "required": false, 
"type": "object", "javaType": "java.util.Properties", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Sets the SSL properties for the connection. 
Note that these properties are only valid if an impl [...]
+    "sslHostnameVerifier": { "index": 31, "kind": "parameter", "displayName": 
"Ssl Hostname Verifier", "group": "security", "label": "security", "required": 
false, "type": "object", "javaType": "javax.net.ssl.HostnameVerifier", 
"deprecated": false, "autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.paho.PahoConfiguration", "configurationField": 
"configuration", "description": "Sets the HostnameVerifier for the SSL 
connection. Note that it will be used  [...]
+    "userName": { "index": 32, "kind": "parameter", "displayName": "User 
Name", "group": "security", "label": "security", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": true, 
"configurationClass": "org.apache.camel.component.paho.PahoConfiguration", 
"configurationField": "configuration", "description": "Username to be used for 
authentication against the MQTT broker" }
   }
 }
diff --git 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConfiguration.java
 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConfiguration.java
index 360088abcf3..ecffc332bb8 100644
--- 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConfiguration.java
+++ 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConfiguration.java
@@ -84,6 +84,8 @@ public class PahoConfiguration implements Cloneable {
     private Properties customWebSocketHeaders;
     @UriParam(label = "advanced", defaultValue = "1")
     private int executorServiceTimeout = 1;
+    @UriParam(label = "consumer")
+    private boolean manualAcksEnabled;
 
     public String getClientId() {
         return clientId;
@@ -530,4 +532,18 @@ public class PahoConfiguration implements Cloneable {
         }
     }
 
+    /**
+     * Sets whether to use manual acknowledgements for the client.
+     *
+     * By default, this is false and message will be automatically 
acknowledged. If set to true, the acknowledgement is
+     * added in the exchange's completion callback.
+     */
+    public boolean isManualAcksEnabled() {
+        return this.manualAcksEnabled;
+    }
+
+    public void setManualAcksEnabled(boolean manualAcksEnabled) {
+        this.manualAcksEnabled = manualAcksEnabled;
+    }
+
 }
diff --git 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
index 197b066849c..b2544c42e25 100644
--- 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
+++ 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java
@@ -20,6 +20,7 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.DefaultConsumer;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
@@ -68,6 +69,10 @@ public class PahoConsumer extends DefaultConsumer {
                     clientId,
                     
PahoEndpoint.createMqttClientPersistence(getEndpoint().getConfiguration()));
             LOG.debug("Connecting client: {} to broker: {}", clientId, 
getEndpoint().getConfiguration().getBrokerUrl());
+            if (getEndpoint().getConfiguration().isManualAcksEnabled()) {
+                client.setManualAcks(true);
+
+            }
             client.connect(connectOptions);
         }
 
@@ -142,6 +147,24 @@ public class PahoConsumer extends DefaultConsumer {
         paho.setHeader(PahoConstants.MQTT_QOS, mqttMessage.getQos());
 
         exchange.setIn(paho);
+        if (getEndpoint().getConfiguration().isManualAcksEnabled()) {
+            exchange.getExchangeExtension().addOnCompletion(new 
Synchronization() {
+                @Override
+                public void onComplete(Exchange exchange) {
+                    try {
+                        
PahoConsumer.this.client.messageArrivedComplete(mqttMessage.getId(), 
mqttMessage.getQos());
+                    } catch (MqttException e) {
+                        LOG.warn("Failed to commit message with ID {} due to 
MqttException.", mqttMessage.getId());
+                    }
+                }
+
+                @Override
+                public void onFailure(Exchange exchange) {
+                    LOG.error("Rollback due to error processing Exchange ID: 
{}", exchange.getExchangeId(),
+                            exchange.getException());
+                }
+            });
+        }
         return exchange;
     }
 
diff --git 
a/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoManualAcksTest.java
 
b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoManualAcksTest.java
new file mode 100644
index 00000000000..f289cc1971b
--- /dev/null
+++ 
b/components/camel-paho/src/test/java/org/apache/camel/component/paho/PahoManualAcksTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.paho;
+
+import org.apache.camel.ConsumerTemplate;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.infra.core.CamelContextExtension;
+import org.apache.camel.test.infra.core.DefaultCamelContextExtension;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class PahoManualAcksTest extends PahoTestSupport {
+    @Order(2)
+    @RegisterExtension
+    public static CamelContextExtension camelContextExtension = new 
DefaultCamelContextExtension();
+    protected ProducerTemplate template;
+    protected ConsumerTemplate consumer;
+    @EndpointInject("mock:test")
+    MockEndpoint mock;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:test")
+                        .to("paho:queue?brokerUrl=" + service.serviceAddress() 
+ "&qos=2");
+
+                from("paho:queue?brokerUrl=" + service.serviceAddress() + 
"&qos=2&manualAcksEnabled=true")
+                        .to("mock:test");
+            }
+        };
+    }
+
+    @Test
+    public void testManualAcks() throws Exception {
+        mock.expectedMessageCount(1);
+
+        template.sendBody("direct:test", "Test Message");
+
+        mock.assertIsSatisfied();
+    }
+
+    @Override
+    public CamelContextExtension getCamelContextExtension() {
+        return camelContextExtension;
+    }
+
+    @BeforeEach
+    void setUpRequirements() {
+        template = getCamelContextExtension().getProducerTemplate();
+        consumer = getCamelContextExtension().getConsumerTemplate();
+    }
+}


Reply via email to