This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch 
4137-updating-mqtt-adapter-can-leave-orphaned-consumer-thread
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 85f5e5e39eaa215dc9fd194aecccbfb18200c382
Author: Philipp Zehnder <[email protected]>
AuthorDate: Thu Jan 29 14:50:55 2026 +0100

    fix(#4137): Update MqttConsumer implementation
---
 .../connect/iiot/adapters/oi4/Oi4Adapter.java      |   7 +-
 .../connectors/mqtt/adapter/MqttProtocol.java      |   8 +-
 .../connectors/mqtt/shared/MqttConsumer.java       | 159 +++++++++++++--------
 3 files changed, 110 insertions(+), 64 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
index b724b43f58..0b280437bd 100644
--- 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
+++ 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
@@ -160,8 +160,7 @@ public class Oi4Adapter implements StreamPipesAdapter {
         }
     );
 
-    Thread thread = new Thread(this.mqttConsumer);
-    thread.start();
+    this.mqttConsumer.start();
 
     LOG.info("Adapter {} started", ID);
     LOG.info("Adapter with id {} started",
@@ -206,7 +205,9 @@ public class Oi4Adapter implements StreamPipesAdapter {
       IAdapterParameterExtractor extractor,
       IAdapterRuntimeContext adapterRuntimeContext
   ) {
-    mqttConsumer.close();
+    if (mqttConsumer != null) {
+      mqttConsumer.stop();
+    }
   }
 
   @Override
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java
index 97959774c3..36dbf81669 100644
--- 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java
+++ 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java
@@ -78,8 +78,7 @@ public class MqttProtocol implements StreamPipesAdapter {
         new BrokerEventProcessor(extractor.selectedParser(), collector)
     );
 
-    Thread thread = new Thread(this.mqttConsumer);
-    thread.start();
+    this.mqttConsumer.start();
   }
 
   @Override
@@ -87,7 +86,9 @@ public class MqttProtocol implements StreamPipesAdapter {
       IAdapterParameterExtractor extractor,
       IAdapterRuntimeContext adapterRuntimeContext
   ) {
-    this.mqttConsumer.close();
+    if (this.mqttConsumer != null) {
+      this.mqttConsumer.stop();
+    }
   }
 
   @Override
@@ -115,4 +116,3 @@ public class MqttProtocol implements StreamPipesAdapter {
   }
 
 }
-
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java
index fd9f960f06..1ed4aa2b34 100644
--- 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java
+++ 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java
@@ -17,101 +17,146 @@
  */
 package org.apache.streampipes.extensions.connectors.mqtt.shared;
 
+import org.apache.streampipes.commons.exceptions.connect.AdapterException;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 
 import com.hivemq.client.mqtt.datatypes.MqttQos;
 import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
+import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck;
 import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-public class MqttConsumer extends MqttBase implements Runnable {
+public class MqttConsumer extends MqttBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MqttConsumer.class);
+    private static final int KEEP_ALIVE_SECONDS = 30;
 
-    private final InternalEventProcessor<byte[]> consumer;
-    private boolean running;
-
+    private final InternalEventProcessor<byte[]> eventProcessor;
+    private final AtomicBoolean running;
     private Mqtt3AsyncClient client;
 
     public MqttConsumer(MqttConfig mqttConfig, InternalEventProcessor<byte[]> 
consumer) {
         super(mqttConfig);
-        this.consumer = consumer;
+        this.eventProcessor = consumer;
+        this.running = new AtomicBoolean(false);
     }
 
-    @Override
-    public void run() {
-        this.running = true;
+    /**
+     * Starts the MQTT consumer and subscribes to the configured topic.
+     *
+     * @throws AdapterException when the connection or subscription fails
+     */
+    public void start() throws AdapterException {
+        if (!this.running.compareAndSet(false, true)) {
+            return;
+        }
         try {
-            this.client = super.setupMqttClient();
-            client.connectWith()
-                    .keepAlive(30)
-                    .send()
-                    .whenComplete((cAck, throwable) -> {
-                        if (throwable != null) {
-                            LOG.error("MQTT connection failed", throwable);
-                        } else {
-                            LOG.info("MQTT connection established");
-                        }
-                    })
-                    .get();
-
-            subscribe(client);
-
-        } catch (Exception e) {
-            LOG.error("Error in MQTT consumer: ", e);
-            throw new RuntimeException("Error when receiving data from MQTT", 
e);
+            this.client = connectClient();
+            subscribeToTopic(this.client);
+        } catch (InterruptedException e) {
+            this.running.set(false);
+            Thread.currentThread().interrupt();
+            throw new AdapterException("Interrupted while starting MQTT 
consumer", e);
+        } catch (AdapterException e) {
+            this.running.set(false);
+            throw e;
         }
     }
 
-    private void subscribe(Mqtt3AsyncClient client) throws Exception {
-
-        CountDownLatch subscribed = new CountDownLatch(1);
-
-        client.subscribeWith()
-                .topicFilter(super.mqttConfig.getTopic())
-                .qos(MqttQos.AT_LEAST_ONCE)
-                .callback(this::handleMessage)
-                .send()
-                .whenComplete((subAck, throwable) -> {
-                    if (throwable != null) {
-                        LOG.error("MQTT subscribe failed", throwable);
-                    } else {
-                        LOG.info("Successfully subscribed to topic {}", 
super.mqttConfig.getTopic());
-                    }
-                    subscribed.countDown();
-                });
-
-        subscribed.await();
+    /**
+     * Stops the MQTT consumer and disconnects from the broker.
+     */
+    public void stop() {
+        this.running.set(false);
+        if (this.client == null) {
+            return;
+        }
+        disconnectSafely();
     }
 
     private void handleMessage(Mqtt3Publish publish) {
-        if (!this.running) {
+        if (!this.running.get()) {
             return;
         }
-
         try {
-            byte[] payload = publish.getPayloadAsBytes();
-            consumer.onEvent(payload);
-        } catch (Exception e) {
+            processPayload(publish.getPayloadAsBytes());
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOG.warn("Interrupted while processing MQTT message", e);
+        } catch (RuntimeException e) {
             LOG.error("Error processing MQTT message", e);
         }
     }
 
-    public void close() {
-        this.running = false;
+
+    private Mqtt3AsyncClient connectClient() throws AdapterException, 
InterruptedException {
         try {
+            var mqttClient = super.setupMqttClient();
+            mqttClient.connectWith()
+                      .keepAlive(KEEP_ALIVE_SECONDS)
+                      .send()
+                      .whenComplete(this::logConnect)
+                      .get();
+            return mqttClient;
+        } catch (InterruptedException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new AdapterException("Error while connecting to MQTT 
broker", e);
+        }
+    }
 
-            this.client.disconnect().get();
+    private void logConnect(Mqtt3ConnAck cAck, Throwable throwable) {
+        if (throwable != null) {
+            LOG.error("MQTT connection failed", throwable);
+        } else {
+            LOG.info("MQTT connection established");
+        }
+    }
 
+    private void subscribeToTopic(Mqtt3AsyncClient client) throws 
AdapterException, InterruptedException {
+        CountDownLatch subscribed = new CountDownLatch(1);
+        client.subscribeWith()
+              .topicFilter(super.mqttConfig.getTopic())
+              .qos(MqttQos.AT_LEAST_ONCE)
+              .callback(this::handleMessage)
+              .send()
+              .whenComplete((subAck, throwable) -> {
+                  logSubscribe(throwable);
+                  subscribed.countDown();
+              });
+        try {
+            subscribed.await();
         } catch (InterruptedException e) {
-            LOG.error("Error disconnecting from MQTT due to thread 
interruption", e);
-        } catch (ExecutionException e) {
-            LOG.error("Error disconnecting from MQTT", e.getCause());
+            Thread.currentThread().interrupt();
+            throw e;
         }
+    }
+
+    private void logSubscribe(Throwable throwable) {
+        if (throwable != null) {
+            LOG.error("MQTT subscribe failed", throwable);
+        } else {
+            LOG.info("Successfully subscribed to topic {}", 
super.mqttConfig.getTopic());
+        }
+    }
 
+    private void processPayload(byte[] payload) throws InterruptedException{
+        eventProcessor.onEvent(payload);
+    }
+
+    private void disconnectSafely() {
+        try {
+            this.client.disconnect().whenComplete((result, throwable) -> {
+                if (throwable != null) {
+                    LOG.error("Error disconnecting from MQTT", throwable);
+                }
+            });
+        } catch (RuntimeException e) {
+            LOG.error("Error disconnecting from MQTT", e);
+        }
     }
 }

Reply via email to