This is an automated email from the ASF dual-hosted git repository. zehnder pushed a commit to branch 4137-updating-mqtt-adapter-can-leave-orphaned-consumer-thread in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 85f5e5e39eaa215dc9fd194aecccbfb18200c382 Author: Philipp Zehnder <[email protected]> AuthorDate: Thu Jan 29 14:50:55 2026 +0100 fix(#4137): Update MqttConsumer implementation --- .../connect/iiot/adapters/oi4/Oi4Adapter.java | 7 +- .../connectors/mqtt/adapter/MqttProtocol.java | 8 +- .../connectors/mqtt/shared/MqttConsumer.java | 159 +++++++++++++-------- 3 files changed, 110 insertions(+), 64 deletions(-) diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java index b724b43f58..0b280437bd 100644 --- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java @@ -160,8 +160,7 @@ public class Oi4Adapter implements StreamPipesAdapter { } ); - Thread thread = new Thread(this.mqttConsumer); - thread.start(); + this.mqttConsumer.start(); LOG.info("Adapter {} started", ID); LOG.info("Adapter with id {} started", @@ -206,7 +205,9 @@ public class Oi4Adapter implements StreamPipesAdapter { IAdapterParameterExtractor extractor, IAdapterRuntimeContext adapterRuntimeContext ) { - mqttConsumer.close(); + if (mqttConsumer != null) { + mqttConsumer.stop(); + } } @Override diff --git a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java index 97959774c3..36dbf81669 100644 --- a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java +++ b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java @@ -78,8 +78,7 @@ public class MqttProtocol implements StreamPipesAdapter { new BrokerEventProcessor(extractor.selectedParser(), collector) ); - Thread thread = new Thread(this.mqttConsumer); - thread.start(); + this.mqttConsumer.start(); } @Override @@ -87,7 +86,9 @@ public class MqttProtocol implements StreamPipesAdapter { IAdapterParameterExtractor extractor, IAdapterRuntimeContext adapterRuntimeContext ) { - this.mqttConsumer.close(); + if (this.mqttConsumer != null) { + this.mqttConsumer.stop(); + } } @Override @@ -115,4 +116,3 @@ public class MqttProtocol implements StreamPipesAdapter { } } - diff --git a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java index fd9f960f06..1ed4aa2b34 100644 --- a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java +++ b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java @@ -17,101 +17,146 @@ */ package org.apache.streampipes.extensions.connectors.mqtt.shared; +import org.apache.streampipes.commons.exceptions.connect.AdapterException; import org.apache.streampipes.messaging.InternalEventProcessor; import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; +import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck; import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; -public class MqttConsumer extends MqttBase implements Runnable { +public class MqttConsumer extends MqttBase { private static final Logger LOG = LoggerFactory.getLogger(MqttConsumer.class); + private static final int KEEP_ALIVE_SECONDS = 30; - private final InternalEventProcessor<byte[]> consumer; - private boolean running; - + private final InternalEventProcessor<byte[]> eventProcessor; + private final AtomicBoolean running; private Mqtt3AsyncClient client; public MqttConsumer(MqttConfig mqttConfig, InternalEventProcessor<byte[]> consumer) { super(mqttConfig); - this.consumer = consumer; + this.eventProcessor = consumer; + this.running = new AtomicBoolean(false); } - @Override - public void run() { - this.running = true; + /** + * Starts the MQTT consumer and subscribes to the configured topic. + * + * @throws AdapterException when the connection or subscription fails + */ + public void start() throws AdapterException { + if (!this.running.compareAndSet(false, true)) { + return; + } try { - this.client = super.setupMqttClient(); - client.connectWith() - .keepAlive(30) - .send() - .whenComplete((cAck, throwable) -> { - if (throwable != null) { - LOG.error("MQTT connection failed", throwable); - } else { - LOG.info("MQTT connection established"); - } - }) - .get(); - - subscribe(client); - - } catch (Exception e) { - LOG.error("Error in MQTT consumer: ", e); - throw new RuntimeException("Error when receiving data from MQTT", e); + this.client = connectClient(); + subscribeToTopic(this.client); + } catch (InterruptedException e) { + this.running.set(false); + Thread.currentThread().interrupt(); + throw new AdapterException("Interrupted while starting MQTT consumer", e); + } catch (AdapterException e) { + this.running.set(false); + throw e; } } - private void subscribe(Mqtt3AsyncClient client) throws Exception { - - CountDownLatch subscribed = new CountDownLatch(1); - - client.subscribeWith() - .topicFilter(super.mqttConfig.getTopic()) - .qos(MqttQos.AT_LEAST_ONCE) - .callback(this::handleMessage) - .send() - .whenComplete((subAck, throwable) -> { - if (throwable != null) { - LOG.error("MQTT subscribe failed", throwable); - } else { - LOG.info("Successfully subscribed to topic {}", super.mqttConfig.getTopic()); - } - subscribed.countDown(); - }); - - subscribed.await(); + /** + * Stops the MQTT consumer and disconnects from the broker. + */ + public void stop() { + this.running.set(false); + if (this.client == null) { + return; + } + disconnectSafely(); } private void handleMessage(Mqtt3Publish publish) { - if (!this.running) { + if (!this.running.get()) { return; } - try { - byte[] payload = publish.getPayloadAsBytes(); - consumer.onEvent(payload); - } catch (Exception e) { + processPayload(publish.getPayloadAsBytes()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while processing MQTT message", e); + } catch (RuntimeException e) { LOG.error("Error processing MQTT message", e); } } - public void close() { - this.running = false; + + private Mqtt3AsyncClient connectClient() throws AdapterException, InterruptedException { try { + var mqttClient = super.setupMqttClient(); + mqttClient.connectWith() + .keepAlive(KEEP_ALIVE_SECONDS) + .send() + .whenComplete(this::logConnect) + .get(); + return mqttClient; + } catch (InterruptedException e) { + throw e; + } catch (Exception e) { + throw new AdapterException("Error while connecting to MQTT broker", e); + } + } - this.client.disconnect().get(); + private void logConnect(Mqtt3ConnAck cAck, Throwable throwable) { + if (throwable != null) { + LOG.error("MQTT connection failed", throwable); + } else { + LOG.info("MQTT connection established"); + } + } + private void subscribeToTopic(Mqtt3AsyncClient client) throws AdapterException, InterruptedException { + CountDownLatch subscribed = new CountDownLatch(1); + client.subscribeWith() + .topicFilter(super.mqttConfig.getTopic()) + .qos(MqttQos.AT_LEAST_ONCE) + .callback(this::handleMessage) + .send() + .whenComplete((subAck, throwable) -> { + logSubscribe(throwable); + subscribed.countDown(); + }); + try { + subscribed.await(); } catch (InterruptedException e) { - LOG.error("Error disconnecting from MQTT due to thread interruption", e); - } catch (ExecutionException e) { - LOG.error("Error disconnecting from MQTT", e.getCause()); + Thread.currentThread().interrupt(); + throw e; } + } + + private void logSubscribe(Throwable throwable) { + if (throwable != null) { + LOG.error("MQTT subscribe failed", throwable); + } else { + LOG.info("Successfully subscribed to topic {}", super.mqttConfig.getTopic()); + } + } + private void processPayload(byte[] payload) throws InterruptedException{ + eventProcessor.onEvent(payload); + } + + private void disconnectSafely() { + try { + this.client.disconnect().whenComplete((result, throwable) -> { + if (throwable != null) { + LOG.error("Error disconnecting from MQTT", throwable); + } + }); + } catch (RuntimeException e) { + LOG.error("Error disconnecting from MQTT", e); + } } }
