This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch
4157-mqtt-consumer-connection-is-not-restored-after-broker-restarts
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/4157-mqtt-consumer-connection-is-not-restored-after-broker-restarts
by this push:
new 9cdd51522d fix(#4157): Enable MQTT reconnect logging and resubscribe
after outages
9cdd51522d is described below
commit 9cdd51522dad484e563bdb58ce7f2b0028dc6e85
Author: Philipp Zehnder <[email protected]>
AuthorDate: Mon Feb 9 14:35:26 2026 +0100
fix(#4157): Enable MQTT reconnect logging and resubscribe after outages
---
.../extensions/connectors/mqtt/shared/MqttBase.java | 18 ++++++++++++++++--
1 file changed, 16 insertions(+), 2 deletions(-)
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttBase.java
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttBase.java
index 3c671425d8..ab0d3580d4 100644
---
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttBase.java
+++
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttBase.java
@@ -36,12 +36,14 @@ import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
public class MqttBase {
protected final MqttConfig mqttConfig;
private static final Logger LOG = LoggerFactory.getLogger(MqttBase.class);
+ private final AtomicBoolean disconnectedLogged = new AtomicBoolean(false);
public MqttBase(MqttConfig mqttConfig) {
this.mqttConfig = mqttConfig;
@@ -55,6 +57,9 @@ public class MqttBase {
.identifier(UUID.randomUUID().toString())
.serverHost(brokerUri.getHost())
.serverPort(resolvePort(brokerUri))
+ .automaticReconnectWithDefaultConfig()
+ .addConnectedListener(context -> logConnected())
+ .addDisconnectedListener(context ->
logDisconnected(context.getCause()))
.useMqttVersion3();
if (mqttConfig.getAuthenticated()) {
@@ -69,9 +74,18 @@ public class MqttBase {
builder.sslConfig(sslContext);
}
- Mqtt3AsyncClient client = builder.buildAsync();
+ return builder.buildAsync();
+ }
+
+ private void logConnected() {
+ disconnectedLogged.set(false);
+ LOG.info("MQTT connected to broker {} (topic: {})",
mqttConfig.getUrl(), mqttConfig.getTopic());
+ }
- return client;
+ private void logDisconnected(Throwable cause) {
+ if (disconnectedLogged.compareAndSet(false, true)) {
+ LOG.warn("MQTT disconnected from broker {} (topic: {})",
mqttConfig.getUrl(), mqttConfig.getTopic(), cause);
+ }
}
private int resolvePort(URI uri) {