This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2e50f4a7d3e07689ac4eb9b7a7f8996c2fef7a79
Author: Claus Ibsen <[email protected]>
AuthorDate: Sun Jun 6 09:24:17 2021 +0200

    camel-paho - Use async producer
---
 .../apache/camel/component/paho/PahoProducer.java  | 51 ++++++++++++++--------
 1 file changed, 34 insertions(+), 17 deletions(-)

diff --git 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java
 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java
index 718b381..22f2cc43 100644
--- 
a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java
+++ 
b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java
@@ -16,15 +16,19 @@
  */
 package org.apache.camel.component.paho;
 
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
-import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.support.DefaultAsyncProducer;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PahoProducer extends DefaultProducer {
+public class PahoProducer extends DefaultAsyncProducer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PahoProducer.class);
 
@@ -38,21 +42,34 @@ public class PahoProducer extends DefaultProducer {
     }
 
     @Override
-    public void process(Exchange exchange) throws Exception {
-        String topic
-                = 
exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_OVERRIDE_TOPIC, 
getEndpoint().getTopic(), String.class);
-        int qos = exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_QOS, 
getEndpoint().getConfiguration().getQos(),
-                Integer.class);
-        boolean retained = 
exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_RETAINED,
-                getEndpoint().getConfiguration().isRetained(), Boolean.class);
-        byte[] payload = exchange.getIn().getBody(byte[].class);
-
-        MqttMessage message = new MqttMessage(payload);
-        message.setQos(qos);
-        message.setRetained(retained);
-
-        LOG.debug("Publishing to topic: {}, qos: {}, retrained: {}", topic, 
qos, retained);
-        client.publish(topic, message);
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        if (!isRunAllowed()) {
+            exchange.setException(new RejectedExecutionException());
+        } else {
+            try {
+                String topic
+                        = 
exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_OVERRIDE_TOPIC, 
getEndpoint().getTopic(),
+                                String.class);
+                int qos = 
exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_QOS,
+                        getEndpoint().getConfiguration().getQos(),
+                        Integer.class);
+                boolean retained = 
exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_RETAINED,
+                        getEndpoint().getConfiguration().isRetained(), 
Boolean.class);
+                byte[] payload = exchange.getIn().getBody(byte[].class);
+
+                MqttMessage message = new MqttMessage(payload);
+                message.setQos(qos);
+                message.setRetained(retained);
+
+                LOG.debug("Publishing to topic: {}, qos: {}, retrained: {}", 
topic, qos, retained);
+                client.publish(topic, message);
+            } catch (MqttException e) {
+                exchange.setException(e);
+            }
+        }
+
+        callback.done(true);
+        return true;
     }
 
     @Override

Reply via email to