This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch 
4157-mqtt-consumer-connection-is-not-restored-after-broker-restarts
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/4157-mqtt-consumer-connection-is-not-restored-after-broker-restarts 
by this push:
     new 9cdd51522d fix(#4157): Enable MQTT reconnect logging and resubscribe 
after outages
9cdd51522d is described below

commit 9cdd51522dad484e563bdb58ce7f2b0028dc6e85
Author: Philipp Zehnder <[email protected]>
AuthorDate: Mon Feb 9 14:35:26 2026 +0100

    fix(#4157): Enable MQTT reconnect logging and resubscribe after outages
---
 .../extensions/connectors/mqtt/shared/MqttBase.java    | 18 ++++++++++++++++--
 1 file changed, 16 insertions(+), 2 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttBase.java
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttBase.java
index 3c671425d8..ab0d3580d4 100644
--- 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttBase.java
+++ 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttBase.java
@@ -36,12 +36,14 @@ import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
 import java.security.cert.CertificateException;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class MqttBase {
 
     protected final MqttConfig mqttConfig;
 
     private static final Logger LOG = LoggerFactory.getLogger(MqttBase.class);
+    private final AtomicBoolean disconnectedLogged = new AtomicBoolean(false);
 
     public MqttBase(MqttConfig mqttConfig) {
         this.mqttConfig = mqttConfig;
@@ -55,6 +57,9 @@ public class MqttBase {
                 .identifier(UUID.randomUUID().toString())
                 .serverHost(brokerUri.getHost())
                 .serverPort(resolvePort(brokerUri))
+                .automaticReconnectWithDefaultConfig()
+                .addConnectedListener(context -> logConnected())
+                .addDisconnectedListener(context -> 
logDisconnected(context.getCause()))
                 .useMqttVersion3();
 
         if (mqttConfig.getAuthenticated()) {
@@ -69,9 +74,18 @@ public class MqttBase {
             builder.sslConfig(sslContext);
         }
 
-        Mqtt3AsyncClient client = builder.buildAsync();
+        return builder.buildAsync();
+    }
+
+    private void logConnected() {
+        disconnectedLogged.set(false);
+        LOG.info("MQTT connected to broker {} (topic: {})", 
mqttConfig.getUrl(), mqttConfig.getTopic());
+    }
 
-        return client;
+    private void logDisconnected(Throwable cause) {
+        if (disconnectedLogged.compareAndSet(false, true)) {
+            LOG.warn("MQTT disconnected from broker {} (topic: {})", 
mqttConfig.getUrl(), mqttConfig.getTopic(), cause);
+        }
     }
 
     private int resolvePort(URI uri) {

Reply via email to