This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch 
4137-updating-mqtt-adapter-can-leave-orphaned-consumer-thread
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 0fb0f273b220c9328726fb91f2b01c780a0f8086
Author: Philipp Zehnder <[email protected]>
AuthorDate: Thu Jan 29 13:56:37 2026 +0100

    fix(#4137): Improve logic to get single event from mqtt
---
 .../connect/iiot/adapters/oi4/Oi4Adapter.java      | 160 +++++----------------
 .../connectors/mqtt/adapter/MqttProtocol.java      |  92 +++++-------
 .../connectors/mqtt/shared/MqttConsumer.java       |  24 +---
 .../connectors/mqtt/shared/MqttPublisher.java      |   8 +-
 .../mqtt/shared/MqttSingleMessageReceiver.java     | 129 +++++++++++++++++
 5 files changed, 208 insertions(+), 205 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
index 06e0c76b84..b724b43f58 100644
--- 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
+++ 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
@@ -32,13 +32,11 @@ import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
 import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConfig;
 import 
org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils;
 import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConsumer;
+import 
org.apache.streampipes.extensions.connectors.mqtt.shared.MqttSingleMessageReceiver;
 import 
org.apache.streampipes.extensions.management.connect.adapter.parser.JsonParsers;
 import 
org.apache.streampipes.extensions.management.connect.adapter.parser.json.JsonObjectParser;
-import org.apache.streampipes.messaging.InternalEventProcessor;
-import org.apache.streampipes.model.connect.guess.GuessSchema;
 import org.apache.streampipes.model.connect.guess.SampleData;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
 import org.apache.streampipes.sdk.helpers.Alternatives;
@@ -62,12 +60,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
-import static 
org.apache.streampipes.sdk.helpers.EpProperties.timestampProperty;
-
 /**
  * Adapter to connect to an Open Industry 4.0 (OI4) compatible device.
  *
@@ -79,8 +73,6 @@ public class Oi4Adapter implements StreamPipesAdapter {
   public static final String ID = 
"org.apache.streampipes.connect.iiot.adapters.oi4";
 
   private static final Logger LOG = LoggerFactory.getLogger(Oi4Adapter.class);
-  private static final long ReceiveSchemaSleepTime = 100;
-  private static final long ReceiveSchemaMaxTimeout = 5000;
 
   // Information about the topic structure can be found at page 57 of the 
above-mentioned development guide
   // The app id (missing here) needs to be provided by the user
@@ -95,7 +87,7 @@ public class Oi4Adapter implements StreamPipesAdapter {
 
   public Oi4Adapter() {
     mapper = JacksonSerializer.getObjectMapper(Map.of(
-      DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true
+        DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true
     ));
   }
 
@@ -107,7 +99,8 @@ public class Oi4Adapter implements StreamPipesAdapter {
         .withLocales(Locales.EN)
         .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
         .requiredTextParameter(MqttConnectUtils.getBrokerUrlLabel())
-        .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(), 
MqttConnectUtils.getAnonymousAccess(),
+        .requiredAlternatives(
+            MqttConnectUtils.getAccessModeLabel(), 
MqttConnectUtils.getAnonymousAccess(),
             MqttConnectUtils.getUsernameAccess(), 
MqttConnectUtils.getClientCertAccess()
         )
         .requiredAlternatives(
@@ -144,7 +137,10 @@ public class Oi4Adapter implements StreamPipesAdapter {
       IAdapterRuntimeContext adapterRuntimeContext
   ) throws AdapterException {
     LOG.info("Adapter type {} starting", ID);
-    LOG.info("Adapter with id {} starting", 
extractor.getAdapterDescription().getElementId());
+    LOG.info("Adapter with id {} starting",
+             extractor.getAdapterDescription()
+                      .getElementId()
+    );
 
     this.applyConfiguration(extractor.getStaticPropertyExtractor());
 
@@ -168,7 +164,10 @@ public class Oi4Adapter implements StreamPipesAdapter {
     thread.start();
 
     LOG.info("Adapter {} started", ID);
-    LOG.info("Adapter with id {} started", 
extractor.getAdapterDescription().getElementId());
+    LOG.info("Adapter with id {} started",
+             extractor.getAdapterDescription()
+                      .getElementId()
+    );
   }
 
   private InputStream convertByte(byte[] event) {
@@ -191,7 +190,7 @@ public class Oi4Adapter implements StreamPipesAdapter {
     } else {
       var selectedSensorsText = 
extractor.textParameter(OI4AdapterLabels.LABEL_KEY_SENSORS_LIST_INPUT);
       selectedSensors = Arrays.stream(selectedSensorsText.split(","))
-          .toList();
+                              .toList();
     }
 
     if 
(selectedAlternativeSensorDescription.equals(OI4AdapterLabels.LABEL_KEY_SENSOR_TYPE_ALTERNATIVE))
 {
@@ -215,48 +214,12 @@ public class Oi4Adapter implements StreamPipesAdapter {
       IAdapterParameterExtractor extractor,
       IAdapterGuessSchemaContext adapterGuessSchemaContext
   ) throws AdapterException {
-    try {
-      this.applyConfiguration(extractor.getStaticPropertyExtractor());
-
-      var sampleMessage = getSampleMessage();
-      var sampleData = transformToSampleData(sampleMessage);
-
-      return sampleData;
-    } catch (RuntimeException e) {
-      throw new AdapterException(e.getMessage(), e);
-    }
-  }
-
-  /**
-   * Updates the timestamp property in the given GuessSchema if it exists as 
it is not correctly guessed.
-   * If the timestamp property exists, it is replaced with a proper timestamp 
property.
-   *
-   * @param guessSchema The GuessSchema to update.
-   */
-  private void updateTimestampPropertyIfExists(GuessSchema guessSchema) {
-    var eventProperties = guessSchema.getEventSchema()
-        .getEventProperties();
-
-    var timestampPropertyOpt = eventProperties.stream()
-        .filter(eventProperty ->
-            eventProperty.getRuntimeName()
-                .equals(OI4AdapterLabels.EVENT_KEY_TIMESTAMP)
-        )
-        .findFirst();
-
-    var newTimestampProperty = 
timestampProperty(OI4AdapterLabels.EVENT_KEY_TIMESTAMP);
-
-    // If the timestamp property exists, replace it with the new timestamp 
property
-    timestampPropertyOpt.ifPresent(prop -> {
-      eventProperties.removeIf(eventProperty -> eventProperty.getRuntimeName()
-          .equals(OI4AdapterLabels.EVENT_KEY_TIMESTAMP));
-      eventProperties.add(newTimestampProperty);
-    });
-
-    guessSchema.setEventSchema(new EventSchema(eventProperties));
+    this.applyConfiguration(extractor.getStaticPropertyExtractor());
+    var sampleMessage = getSampleMessage();
+    return transformToSampleData(sampleMessage);
   }
 
-  private SampleData transformToSampleData(byte[] sampleMessage) {
+  private SampleData transformToSampleData(byte[] sampleMessage) throws 
AdapterException {
     try {
       var networkMessage = mapper.readValue(sampleMessage, 
NetworkMessage.class);
       var payload = extractPayload(networkMessage);
@@ -265,68 +228,14 @@ public class Oi4Adapter implements StreamPipesAdapter {
       return new JsonParsers(new JsonObjectParser())
           
.getSampleData(convertByte(plainPayload.getBytes(StandardCharsets.UTF_8)));
     } catch (IOException e) {
-      LOG.error("Error while reading sample message: {}", sampleMessage);
-      throw new RuntimeException(e);
+      throw new AdapterException("Error while reading sample message", e);
     }
   }
 
-  private byte[] getSampleMessage() throws AdapterException {
-    List<byte[]> sampleMessages = new ArrayList<>();
-    long timeElapsed = 0;
-    AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
-    MqttConsumer guessConsumer = getGuessMqttConsumer(sampleMessages);
-
-    var thread = new Thread(guessConsumer);
-    thread.setUncaughtExceptionHandler((t, e) -> 
exceptionRef.set(e.getCause()));
-    thread.start();
-
-    while (sampleMessages.isEmpty() && exceptionRef.get() == null && 
timeElapsed < ReceiveSchemaMaxTimeout) {
-      try {
-        TimeUnit.MILLISECONDS.sleep(ReceiveSchemaSleepTime);
-        timeElapsed += ReceiveSchemaSleepTime;
-      } catch (InterruptedException e) {
-        LOG.error("Schema guessing failed during waiting for an incoming 
event: {}", e.getMessage());
-        break;
-      }
-    }
-    guessConsumer.close();
-
-    Throwable threadException = exceptionRef.get();
-    if (threadException != null) {
-      throw new AdapterException(threadException.getMessage(), 
threadException);
-    }
-
-    if (!sampleMessages.isEmpty()) {
-      return sampleMessages.get(0);
-    } else {
-      throw new AdapterException("No messages received");
-    }
-  }
 
-  /**
-   * Obtain a specialized MQTT consumer designed to infer the event schema 
based on provided sampleMessages.
-   *
-   * @param sampleMessages A list of byte arrays representing MQTT message 
payloads.
-   * @return A customized MqttConsumer instance.
-   */
-  private MqttConsumer getGuessMqttConsumer(List<byte[]> sampleMessages) {
-    // Define a specialized event processor that adds an event to the 
sampleMessages array
-    // only if it meets certain expectations, as verified by extractPayload.
-    InternalEventProcessor<byte[]> eventProcessor = event -> {
-      InputStream in = convertByte(event);
-      NetworkMessage networkMessage;
-      try {
-        networkMessage = mapper.readValue(in, NetworkMessage.class);
-      } catch (IOException e) {
-        LOG.error("Error during parsing of incoming MQTT event.");
-        throw new RuntimeException(e);
-      }
-      // Attempt to extract payload from the NetworkMessage
-      extractPayload(networkMessage);
-      // If successful, add the event to the sampleMessages array
-      sampleMessages.add(event);
-    };
-    return new MqttConsumer(this.mqttConfig, eventProcessor);
+  private byte[] getSampleMessage() throws AdapterException {
+    var receiver = new MqttSingleMessageReceiver(this.mqttConfig, 10);
+    return receiver.receiveSingleMessage();
   }
 
   private List<Map<String, Object>> extractPayload(NetworkMessage message) 
throws ParseException {
@@ -343,7 +252,7 @@ public class Oi4Adapter implements StreamPipesAdapter {
         // Verify that the message corresponds to the designated sensor type.
         // This validation relies on the assumption that the source 
information includes the sensor type.
         if (dataMessage.source()
-            .contains(givenSensorType)) {
+                       .contains(givenSensorType)) {
 
           // an empty list of selected sensors means that we want to collect 
data from all sensors available
           if (selectedSensors.isEmpty() || selectedSensors.contains(sensorId)) 
{
@@ -361,10 +270,10 @@ public class Oi4Adapter implements StreamPipesAdapter {
 
   private List<DataSetMessage> findProcessDataInputMessage(NetworkMessage 
message) {
     return message.messages()
-        .stream()
-        .filter(msg -> msg.filter()
-            .equals(OI4AdapterLabels.MESSAGE_VALUE_FILTER))
-        .toList();
+                  .stream()
+                  .filter(msg -> msg.filter()
+                                    
.equals(OI4AdapterLabels.MESSAGE_VALUE_FILTER))
+                  .toList();
   }
 
   private Map<String, Object> extractAndEnrichMessagePayload(DataSetMessage 
dataSetMessage, String sensorId) {
@@ -388,16 +297,19 @@ public class Oi4Adapter implements StreamPipesAdapter {
 
   private Map<String, Object> replaceSpecialChars(Map<String, Object> 
originalMap) {
     return originalMap.entrySet()
-        .stream()
-        .collect(Collectors.toMap(
-            entry -> entry.getKey().replace("-", ""),
-            Map.Entry::getValue,
-            (oldValue, newValue) -> oldValue)
-        );
+                      .stream()
+                      .collect(Collectors.toMap(
+                                   entry -> entry.getKey()
+                                                 .replace("-", ""),
+                                   Map.Entry::getValue,
+                                   (oldValue, newValue) -> oldValue
+                               )
+                      );
   }
 
   private long parseDate(String timestamp) throws DateTimeParseException {
-    return Instant.parse(timestamp).toEpochMilli();
+    return Instant.parse(timestamp)
+                  .toEpochMilli();
   }
 
   /**
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java
index d3816c4f44..97959774c3 100644
--- 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java
+++ 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java
@@ -28,27 +28,18 @@ import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
 import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConfig;
 import 
org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils;
 import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConsumer;
+import 
org.apache.streampipes.extensions.connectors.mqtt.shared.MqttSingleMessageReceiver;
 import 
org.apache.streampipes.extensions.management.connect.adapter.BrokerEventProcessor;
 import 
org.apache.streampipes.extensions.management.connect.adapter.parser.Parsers;
-import org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.streampipes.model.connect.guess.SampleData;
 import org.apache.streampipes.model.extensions.ExtensionAssetType;
 import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
 import org.apache.streampipes.sdk.helpers.Locales;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.ByteArrayInputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class MqttProtocol implements StreamPipesAdapter {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(MqttProtocol.class);
-
   public static final String ID = 
"org.apache.streampipes.connect.iiot.protocol.stream.mqtt";
 
   private MqttConsumer mqttConsumer;
@@ -57,9 +48,6 @@ public class MqttProtocol implements StreamPipesAdapter {
   public MqttProtocol() {
   }
 
-  public void applyConfiguration(IStaticPropertyExtractor extractor) {
-    this.mqttConfig = MqttConnectUtils.getMqttConfig(extractor);
-  }
 
   @Override
   public IAdapterConfiguration declareConfig() {
@@ -69,18 +57,22 @@ public class MqttProtocol implements StreamPipesAdapter {
         .withLocales(Locales.EN)
         .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
         .requiredTextParameter(MqttConnectUtils.getBrokerUrlLabel())
-        .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(), 
MqttConnectUtils.getAnonymousAccess(),
-            MqttConnectUtils.getUsernameAccess(),  
MqttConnectUtils.getClientCertAccess())
+        .requiredAlternatives(
+            MqttConnectUtils.getAccessModeLabel(), 
MqttConnectUtils.getAnonymousAccess(),
+            MqttConnectUtils.getUsernameAccess(), 
MqttConnectUtils.getClientCertAccess()
+        )
         .requiredTextParameter(MqttConnectUtils.getTopicLabel())
         .buildConfiguration();
   }
 
   @Override
-  public void onAdapterStarted(IAdapterParameterExtractor extractor,
-                               IEventCollector collector,
-                               IAdapterRuntimeContext adapterRuntimeContext) 
throws AdapterException {
+  public void onAdapterStarted(
+      IAdapterParameterExtractor extractor,
+      IEventCollector collector,
+      IAdapterRuntimeContext adapterRuntimeContext
+  ) throws AdapterException {
 
-    this.applyConfiguration(extractor.getStaticPropertyExtractor());
+    this.initializeMqttConfig(extractor.getStaticPropertyExtractor());
     this.mqttConsumer = new MqttConsumer(
         this.mqttConfig,
         new BrokerEventProcessor(extractor.selectedParser(), collector)
@@ -91,44 +83,36 @@ public class MqttProtocol implements StreamPipesAdapter {
   }
 
   @Override
-  public void onAdapterStopped(IAdapterParameterExtractor extractor,
-                               IAdapterRuntimeContext adapterRuntimeContext) 
throws AdapterException {
+  public void onAdapterStopped(
+      IAdapterParameterExtractor extractor,
+      IAdapterRuntimeContext adapterRuntimeContext
+  ) {
     this.mqttConsumer.close();
   }
 
   @Override
-  public SampleData onSampleDataRequested(IAdapterParameterExtractor extractor,
-                                      IAdapterGuessSchemaContext 
adapterGuessSchemaContext) throws AdapterException {
-    try {
-      AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
-      this.applyConfiguration(extractor.getStaticPropertyExtractor());
-      List<byte[]> elements = new ArrayList<>();
-      InternalEventProcessor<byte[]> eventProcessor = elements::add;
-
-
-      MqttConsumer consumer = new MqttConsumer(this.mqttConfig, 
eventProcessor);
-
-      Thread thread = new Thread(consumer);
-      thread.setUncaughtExceptionHandler((t, e) -> 
exceptionRef.set(e.getCause()));
-      thread.start();
-
-      while (consumer.getMessageCount() < 1 && exceptionRef.get() == null) {
-        try {
-          TimeUnit.MILLISECONDS.sleep(100);
-        } catch (InterruptedException e) {
-          break;
-        }
-      }
-      consumer.close();
-
-      Throwable threadException = exceptionRef.get();
-      if (threadException != null) {
-        throw new AdapterException(threadException.getMessage(), 
threadException);
-      }
-
-      return extractor.selectedParser().getSampleData(new 
ByteArrayInputStream(elements.get(0)));
-    } catch (Exception e) {
-      throw new AdapterException(e.getMessage(), e);
-    }
+  public SampleData onSampleDataRequested(
+      IAdapterParameterExtractor extractor,
+      IAdapterGuessSchemaContext adapterGuessSchemaContext
+  ) throws AdapterException {
+
+    this.initializeMqttConfig(extractor.getStaticPropertyExtractor());
+
+    var payload = getSampleEventAsByte();
+
+    return extractor.selectedParser()
+                    .getSampleData(new ByteArrayInputStream(payload));
+
   }
+
+  private byte[] getSampleEventAsByte() throws AdapterException {
+    var receiver = new MqttSingleMessageReceiver(this.mqttConfig, 10);
+    return receiver.receiveSingleMessage();
+  }
+
+  public void initializeMqttConfig(IStaticPropertyExtractor extractor) {
+    this.mqttConfig = MqttConnectUtils.getMqttConfig(extractor);
+  }
+
 }
+
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java
index 24b0a6cb93..fd9f960f06 100644
--- 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java
+++ 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java
@@ -30,25 +30,18 @@ import java.util.concurrent.ExecutionException;
 
 public class MqttConsumer extends MqttBase implements Runnable {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(MqttConsumer.class);
+
     private final InternalEventProcessor<byte[]> consumer;
     private boolean running;
-    private int maxElementsToReceive = -1;
-    private int messageCount = 0;
 
     private Mqtt3AsyncClient client;
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(MqttConsumer.class);
-
     public MqttConsumer(MqttConfig mqttConfig, InternalEventProcessor<byte[]> 
consumer) {
         super(mqttConfig);
         this.consumer = consumer;
     }
 
-    public MqttConsumer(MqttConfig mqttConfig, InternalEventProcessor<byte[]> 
consumer, int maxElementsToReceive) {
-        this(mqttConfig, consumer);
-        this.maxElementsToReceive = maxElementsToReceive;
-    }
-
     @Override
     public void run() {
         this.running = true;
@@ -103,13 +96,6 @@ public class MqttConsumer extends MqttBase implements 
Runnable {
         try {
             byte[] payload = publish.getPayloadAsBytes();
             consumer.onEvent(payload);
-            messageCount++;
-
-            if (maxElementsToReceive != -1 && messageCount >= 
maxElementsToReceive) {
-                LOG.info("Max elements ({}) received. Stopping consumer.", 
maxElementsToReceive);
-                this.running = false;
-            }
-
         } catch (Exception e) {
             LOG.error("Error processing MQTT message", e);
         }
@@ -128,8 +114,4 @@ public class MqttConsumer extends MqttBase implements 
Runnable {
         }
 
     }
-
-    public Integer getMessageCount() {
-        return messageCount;
-    }
-}
\ No newline at end of file
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttPublisher.java
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttPublisher.java
index 002b5fe54d..d11cad2e4a 100644
--- 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttPublisher.java
+++ 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttPublisher.java
@@ -28,12 +28,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-import java.net.URI;
-
-
 public class MqttPublisher extends MqttBase {
 
-  private URI uri;
   private Mqtt3AsyncClient client;
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MqttPublisher.class);
@@ -89,7 +85,7 @@ public class MqttPublisher extends MqttBase {
           });
     } catch (Exception e) {
       throw new SpRuntimeException("Could not publish to MQTT broker: "
-          + uri.toString() + ", " + e.getMessage(), e);
+          + super.mqttConfig.getUrl() + ", " + e.getMessage(), e);
     }
   }
 
@@ -99,7 +95,7 @@ public class MqttPublisher extends MqttBase {
           .get(); // block until disconnected
     } catch (Exception e) {
       throw new SpRuntimeException("Could not disconnect from MQTT broker: "
-          + uri.toString() + ", " + e.getMessage(), e);
+          + super.mqttConfig.getUrl() + ", " + e.getMessage(), e);
     }
   }
 
diff --git 
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttSingleMessageReceiver.java
 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttSingleMessageReceiver.java
new file mode 100644
index 0000000000..f00810056f
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttSingleMessageReceiver.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.extensions.connectors.mqtt.shared;
+
+import org.apache.streampipes.commons.exceptions.connect.AdapterException;
+
+import com.hivemq.client.mqtt.datatypes.MqttQos;
+import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class MqttSingleMessageReceiver extends MqttBase {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MqttSingleMessageReceiver.class);
+  private final int timeoutInSeconds;
+
+  public MqttSingleMessageReceiver(MqttConfig mqttConfig, int 
timeoutInSeconds) {
+    super(mqttConfig);
+    this.timeoutInSeconds = timeoutInSeconds;
+  }
+
+  /**
+   * Receives a single MQTT message and blocks until a message arrives or the 
timeout elapses.
+   *
+   * @return the payload bytes of the first received message
+   * @throws AdapterException when receiving fails or the thread is interrupted
+   */
+  public byte[] receiveSingleMessage() throws AdapterException {
+    try {
+      return receiveSingleMessageAsync().get();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new AdapterException("Interrupted while waiting for an MQTT 
message", e);
+    } catch (ExecutionException e) {
+      if (e.getCause() instanceof TimeoutException) {
+        throw new AdapterException(
+            String.format("Timed out after %d seconds while waiting for an 
MQTT message", timeoutInSeconds),
+            e.getCause()
+        );
+      }
+      throw new AdapterException("Failed to receive MQTT message", 
e.getCause());
+    } catch (Exception e) {
+      throw new AdapterException("Failed to receive MQTT message", e);
+    }
+  }
+
+  private CompletableFuture<byte[]> receiveSingleMessageAsync() {
+    var result = new CompletableFuture<byte[]>();
+    var client = setupClientOrFail(result);
+    if (client == null) {
+      return result;
+    }
+    registerCleanup(result, client);
+    subscribeForSingleMessage(result, client);
+    applyTimeout(result);
+    return result;
+  }
+
+  private Mqtt3AsyncClient setupClientOrFail(CompletableFuture<byte[]> result) 
{
+    try {
+      return super.setupMqttClient();
+    } catch (Exception e) {
+      result.completeExceptionally(e);
+      return null;
+    }
+  }
+
+  private void registerCleanup(CompletableFuture<byte[]> result, 
Mqtt3AsyncClient client) {
+    result.whenComplete((payload, throwable) -> disconnectQuietly(client));
+  }
+
+  private void subscribeForSingleMessage(CompletableFuture<byte[]> result, 
Mqtt3AsyncClient client) {
+    client.connect()
+          .thenCompose(v -> client.subscribeWith()
+                                  .topicFilter(mqttConfig.getTopic())
+                                  .qos(MqttQos.AT_LEAST_ONCE)
+                                  .callback(publish -> completeIfEmpty(result, 
publish.getPayloadAsBytes()))
+                                  .send()
+          )
+          .exceptionally(ex -> {
+            result.completeExceptionally(ex);
+            return null;
+          });
+  }
+
+  private void completeIfEmpty(CompletableFuture<byte[]> result, byte[] 
payload) {
+    if (!result.isDone()) {
+      result.complete(payload);
+    }
+  }
+
+  private void applyTimeout(CompletableFuture<byte[]> result) {
+    if (this.timeoutInSeconds > 0) {
+      result.orTimeout(this.timeoutInSeconds, TimeUnit.SECONDS);
+    }
+  }
+
+  private void disconnectQuietly(Mqtt3AsyncClient client) {
+    try {
+      client.disconnect().whenComplete((result, throwable) -> {
+        if (throwable != null) {
+          LOG.warn("Failed to disconnect MQTT client after receiving sample", 
throwable);
+        }
+      });
+    } catch (Exception e) {
+      LOG.warn("Failed to disconnect MQTT client after receiving sample", e);
+    }
+  }
+}

Reply via email to