This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2e50f4a7d3e07689ac4eb9b7a7f8996c2fef7a79 Author: Claus Ibsen <[email protected]> AuthorDate: Sun Jun 6 09:24:17 2021 +0200 camel-paho - Use async producer --- .../apache/camel/component/paho/PahoProducer.java | 51 ++++++++++++++-------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java index 718b381..22f2cc43 100644 --- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java @@ -16,15 +16,19 @@ */ package org.apache.camel.component.paho; +import java.util.concurrent.RejectedExecutionException; + +import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; -import org.apache.camel.support.DefaultProducer; +import org.apache.camel.support.DefaultAsyncProducer; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PahoProducer extends DefaultProducer { +public class PahoProducer extends DefaultAsyncProducer { private static final Logger LOG = LoggerFactory.getLogger(PahoProducer.class); @@ -38,21 +42,34 @@ public class PahoProducer extends DefaultProducer { } @Override - public void process(Exchange exchange) throws Exception { - String topic - = exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_OVERRIDE_TOPIC, getEndpoint().getTopic(), String.class); - int qos = exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_QOS, getEndpoint().getConfiguration().getQos(), - Integer.class); - boolean retained = exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_RETAINED, - getEndpoint().getConfiguration().isRetained(), Boolean.class); - byte[] payload = exchange.getIn().getBody(byte[].class); - - MqttMessage message = new MqttMessage(payload); - message.setQos(qos); - message.setRetained(retained); - - LOG.debug("Publishing to topic: {}, qos: {}, retrained: {}", topic, qos, retained); - client.publish(topic, message); + public boolean process(Exchange exchange, AsyncCallback callback) { + if (!isRunAllowed()) { + exchange.setException(new RejectedExecutionException()); + } else { + try { + String topic + = exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_OVERRIDE_TOPIC, getEndpoint().getTopic(), + String.class); + int qos = exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_QOS, + getEndpoint().getConfiguration().getQos(), + Integer.class); + boolean retained = exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_RETAINED, + getEndpoint().getConfiguration().isRetained(), Boolean.class); + byte[] payload = exchange.getIn().getBody(byte[].class); + + MqttMessage message = new MqttMessage(payload); + message.setQos(qos); + message.setRetained(retained); + + LOG.debug("Publishing to topic: {}, qos: {}, retrained: {}", topic, qos, retained); + client.publish(topic, message); + } catch (MqttException e) { + exchange.setException(e); + } + } + + callback.done(true); + return true; } @Override
