This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new b79ab90914 4137 updating mqtt adapter can leave orphaned consumer
thread (#4138)
b79ab90914 is described below
commit b79ab90914383f75b5770ef37334047dbd310611
Author: Philipp Zehnder <[email protected]>
AuthorDate: Fri Jan 30 08:10:27 2026 +0100
4137 updating mqtt adapter can leave orphaned consumer thread (#4138)
---
.../connect/iiot/adapters/oi4/Oi4Adapter.java | 167 +++++--------------
.../connectors/mqtt/adapter/MqttProtocol.java | 98 +++++-------
.../connectors/mqtt/shared/MqttConsumer.java | 177 ++++++++++++---------
.../connectors/mqtt/shared/MqttPublisher.java | 8 +-
.../mqtt/shared/MqttSingleMessageReceiver.java | 129 +++++++++++++++
.../integration/adapters/MQTTPublisherUtils.java | 5 -
6 files changed, 314 insertions(+), 270 deletions(-)
diff --git
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
index 06e0c76b84..0b280437bd 100644
---
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
+++
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
@@ -32,13 +32,11 @@ import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConfig;
import
org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils;
import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConsumer;
+import
org.apache.streampipes.extensions.connectors.mqtt.shared.MqttSingleMessageReceiver;
import
org.apache.streampipes.extensions.management.connect.adapter.parser.JsonParsers;
import
org.apache.streampipes.extensions.management.connect.adapter.parser.json.JsonObjectParser;
-import org.apache.streampipes.messaging.InternalEventProcessor;
-import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.connect.guess.SampleData;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
-import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
import org.apache.streampipes.sdk.helpers.Alternatives;
@@ -62,12 +60,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
-import static
org.apache.streampipes.sdk.helpers.EpProperties.timestampProperty;
-
/**
* Adapter to connect to an Open Industry 4.0 (OI4) compatible device.
*
@@ -79,8 +73,6 @@ public class Oi4Adapter implements StreamPipesAdapter {
public static final String ID =
"org.apache.streampipes.connect.iiot.adapters.oi4";
private static final Logger LOG = LoggerFactory.getLogger(Oi4Adapter.class);
- private static final long ReceiveSchemaSleepTime = 100;
- private static final long ReceiveSchemaMaxTimeout = 5000;
// Information about the topic structure can be found at page 57 of the
above-mentioned development guide
// The app id (missing here) needs to be provided by the user
@@ -95,7 +87,7 @@ public class Oi4Adapter implements StreamPipesAdapter {
public Oi4Adapter() {
mapper = JacksonSerializer.getObjectMapper(Map.of(
- DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true
+ DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true
));
}
@@ -107,7 +99,8 @@ public class Oi4Adapter implements StreamPipesAdapter {
.withLocales(Locales.EN)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.requiredTextParameter(MqttConnectUtils.getBrokerUrlLabel())
- .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(),
MqttConnectUtils.getAnonymousAccess(),
+ .requiredAlternatives(
+ MqttConnectUtils.getAccessModeLabel(),
MqttConnectUtils.getAnonymousAccess(),
MqttConnectUtils.getUsernameAccess(),
MqttConnectUtils.getClientCertAccess()
)
.requiredAlternatives(
@@ -144,7 +137,10 @@ public class Oi4Adapter implements StreamPipesAdapter {
IAdapterRuntimeContext adapterRuntimeContext
) throws AdapterException {
LOG.info("Adapter type {} starting", ID);
- LOG.info("Adapter with id {} starting",
extractor.getAdapterDescription().getElementId());
+ LOG.info("Adapter with id {} starting",
+ extractor.getAdapterDescription()
+ .getElementId()
+ );
this.applyConfiguration(extractor.getStaticPropertyExtractor());
@@ -164,11 +160,13 @@ public class Oi4Adapter implements StreamPipesAdapter {
}
);
- Thread thread = new Thread(this.mqttConsumer);
- thread.start();
+ this.mqttConsumer.start();
LOG.info("Adapter {} started", ID);
- LOG.info("Adapter with id {} started",
extractor.getAdapterDescription().getElementId());
+ LOG.info("Adapter with id {} started",
+ extractor.getAdapterDescription()
+ .getElementId()
+ );
}
private InputStream convertByte(byte[] event) {
@@ -191,7 +189,7 @@ public class Oi4Adapter implements StreamPipesAdapter {
} else {
var selectedSensorsText =
extractor.textParameter(OI4AdapterLabels.LABEL_KEY_SENSORS_LIST_INPUT);
selectedSensors = Arrays.stream(selectedSensorsText.split(","))
- .toList();
+ .toList();
}
if
(selectedAlternativeSensorDescription.equals(OI4AdapterLabels.LABEL_KEY_SENSOR_TYPE_ALTERNATIVE))
{
@@ -207,7 +205,9 @@ public class Oi4Adapter implements StreamPipesAdapter {
IAdapterParameterExtractor extractor,
IAdapterRuntimeContext adapterRuntimeContext
) {
- mqttConsumer.close();
+ if (mqttConsumer != null) {
+ mqttConsumer.stop();
+ }
}
@Override
@@ -215,48 +215,12 @@ public class Oi4Adapter implements StreamPipesAdapter {
IAdapterParameterExtractor extractor,
IAdapterGuessSchemaContext adapterGuessSchemaContext
) throws AdapterException {
- try {
- this.applyConfiguration(extractor.getStaticPropertyExtractor());
-
- var sampleMessage = getSampleMessage();
- var sampleData = transformToSampleData(sampleMessage);
-
- return sampleData;
- } catch (RuntimeException e) {
- throw new AdapterException(e.getMessage(), e);
- }
- }
-
- /**
- * Updates the timestamp property in the given GuessSchema if it exists as
it is not correctly guessed.
- * If the timestamp property exists, it is replaced with a proper timestamp
property.
- *
- * @param guessSchema The GuessSchema to update.
- */
- private void updateTimestampPropertyIfExists(GuessSchema guessSchema) {
- var eventProperties = guessSchema.getEventSchema()
- .getEventProperties();
-
- var timestampPropertyOpt = eventProperties.stream()
- .filter(eventProperty ->
- eventProperty.getRuntimeName()
- .equals(OI4AdapterLabels.EVENT_KEY_TIMESTAMP)
- )
- .findFirst();
-
- var newTimestampProperty =
timestampProperty(OI4AdapterLabels.EVENT_KEY_TIMESTAMP);
-
- // If the timestamp property exists, replace it with the new timestamp
property
- timestampPropertyOpt.ifPresent(prop -> {
- eventProperties.removeIf(eventProperty -> eventProperty.getRuntimeName()
- .equals(OI4AdapterLabels.EVENT_KEY_TIMESTAMP));
- eventProperties.add(newTimestampProperty);
- });
-
- guessSchema.setEventSchema(new EventSchema(eventProperties));
+ this.applyConfiguration(extractor.getStaticPropertyExtractor());
+ var sampleMessage = getSampleMessage();
+ return transformToSampleData(sampleMessage);
}
- private SampleData transformToSampleData(byte[] sampleMessage) {
+ private SampleData transformToSampleData(byte[] sampleMessage) throws
AdapterException {
try {
var networkMessage = mapper.readValue(sampleMessage,
NetworkMessage.class);
var payload = extractPayload(networkMessage);
@@ -265,68 +229,14 @@ public class Oi4Adapter implements StreamPipesAdapter {
return new JsonParsers(new JsonObjectParser())
.getSampleData(convertByte(plainPayload.getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
- LOG.error("Error while reading sample message: {}", sampleMessage);
- throw new RuntimeException(e);
+ throw new AdapterException("Error while reading sample message", e);
}
}
- private byte[] getSampleMessage() throws AdapterException {
- List<byte[]> sampleMessages = new ArrayList<>();
- long timeElapsed = 0;
- AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
- MqttConsumer guessConsumer = getGuessMqttConsumer(sampleMessages);
-
- var thread = new Thread(guessConsumer);
- thread.setUncaughtExceptionHandler((t, e) ->
exceptionRef.set(e.getCause()));
- thread.start();
-
- while (sampleMessages.isEmpty() && exceptionRef.get() == null &&
timeElapsed < ReceiveSchemaMaxTimeout) {
- try {
- TimeUnit.MILLISECONDS.sleep(ReceiveSchemaSleepTime);
- timeElapsed += ReceiveSchemaSleepTime;
- } catch (InterruptedException e) {
- LOG.error("Schema guessing failed during waiting for an incoming
event: {}", e.getMessage());
- break;
- }
- }
- guessConsumer.close();
-
- Throwable threadException = exceptionRef.get();
- if (threadException != null) {
- throw new AdapterException(threadException.getMessage(),
threadException);
- }
- if (!sampleMessages.isEmpty()) {
- return sampleMessages.get(0);
- } else {
- throw new AdapterException("No messages received");
- }
- }
-
- /**
- * Obtain a specialized MQTT consumer designed to infer the event schema
based on provided sampleMessages.
- *
- * @param sampleMessages A list of byte arrays representing MQTT message
payloads.
- * @return A customized MqttConsumer instance.
- */
- private MqttConsumer getGuessMqttConsumer(List<byte[]> sampleMessages) {
- // Define a specialized event processor that adds an event to the
sampleMessages array
- // only if it meets certain expectations, as verified by extractPayload.
- InternalEventProcessor<byte[]> eventProcessor = event -> {
- InputStream in = convertByte(event);
- NetworkMessage networkMessage;
- try {
- networkMessage = mapper.readValue(in, NetworkMessage.class);
- } catch (IOException e) {
- LOG.error("Error during parsing of incoming MQTT event.");
- throw new RuntimeException(e);
- }
- // Attempt to extract payload from the NetworkMessage
- extractPayload(networkMessage);
- // If successful, add the event to the sampleMessages array
- sampleMessages.add(event);
- };
- return new MqttConsumer(this.mqttConfig, eventProcessor);
+ private byte[] getSampleMessage() throws AdapterException {
+ var receiver = new MqttSingleMessageReceiver(this.mqttConfig, 10);
+ return receiver.receiveSingleMessage();
}
private List<Map<String, Object>> extractPayload(NetworkMessage message)
throws ParseException {
@@ -343,7 +253,7 @@ public class Oi4Adapter implements StreamPipesAdapter {
// Verify that the message corresponds to the designated sensor type.
// This validation relies on the assumption that the source
information includes the sensor type.
if (dataMessage.source()
- .contains(givenSensorType)) {
+ .contains(givenSensorType)) {
// an empty list of selected sensors means that we want to collect
data from all sensors available
if (selectedSensors.isEmpty() || selectedSensors.contains(sensorId))
{
@@ -361,10 +271,10 @@ public class Oi4Adapter implements StreamPipesAdapter {
private List<DataSetMessage> findProcessDataInputMessage(NetworkMessage
message) {
return message.messages()
- .stream()
- .filter(msg -> msg.filter()
- .equals(OI4AdapterLabels.MESSAGE_VALUE_FILTER))
- .toList();
+ .stream()
+ .filter(msg -> msg.filter()
+
.equals(OI4AdapterLabels.MESSAGE_VALUE_FILTER))
+ .toList();
}
private Map<String, Object> extractAndEnrichMessagePayload(DataSetMessage
dataSetMessage, String sensorId) {
@@ -388,16 +298,19 @@ public class Oi4Adapter implements StreamPipesAdapter {
private Map<String, Object> replaceSpecialChars(Map<String, Object>
originalMap) {
return originalMap.entrySet()
- .stream()
- .collect(Collectors.toMap(
- entry -> entry.getKey().replace("-", ""),
- Map.Entry::getValue,
- (oldValue, newValue) -> oldValue)
- );
+ .stream()
+ .collect(Collectors.toMap(
+ entry -> entry.getKey()
+ .replace("-", ""),
+ Map.Entry::getValue,
+ (oldValue, newValue) -> oldValue
+ )
+ );
}
private long parseDate(String timestamp) throws DateTimeParseException {
- return Instant.parse(timestamp).toEpochMilli();
+ return Instant.parse(timestamp)
+ .toEpochMilli();
}
/**
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java
index d3816c4f44..36dbf81669 100644
---
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java
+++
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java
@@ -28,27 +28,18 @@ import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConfig;
import
org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils;
import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConsumer;
+import
org.apache.streampipes.extensions.connectors.mqtt.shared.MqttSingleMessageReceiver;
import
org.apache.streampipes.extensions.management.connect.adapter.BrokerEventProcessor;
import
org.apache.streampipes.extensions.management.connect.adapter.parser.Parsers;
-import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.model.connect.guess.SampleData;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder;
import org.apache.streampipes.sdk.helpers.Locales;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.ByteArrayInputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
public class MqttProtocol implements StreamPipesAdapter {
- private static final Logger LOG =
LoggerFactory.getLogger(MqttProtocol.class);
-
public static final String ID =
"org.apache.streampipes.connect.iiot.protocol.stream.mqtt";
private MqttConsumer mqttConsumer;
@@ -57,9 +48,6 @@ public class MqttProtocol implements StreamPipesAdapter {
public MqttProtocol() {
}
- public void applyConfiguration(IStaticPropertyExtractor extractor) {
- this.mqttConfig = MqttConnectUtils.getMqttConfig(extractor);
- }
@Override
public IAdapterConfiguration declareConfig() {
@@ -69,66 +57,62 @@ public class MqttProtocol implements StreamPipesAdapter {
.withLocales(Locales.EN)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.requiredTextParameter(MqttConnectUtils.getBrokerUrlLabel())
- .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(),
MqttConnectUtils.getAnonymousAccess(),
- MqttConnectUtils.getUsernameAccess(),
MqttConnectUtils.getClientCertAccess())
+ .requiredAlternatives(
+ MqttConnectUtils.getAccessModeLabel(),
MqttConnectUtils.getAnonymousAccess(),
+ MqttConnectUtils.getUsernameAccess(),
MqttConnectUtils.getClientCertAccess()
+ )
.requiredTextParameter(MqttConnectUtils.getTopicLabel())
.buildConfiguration();
}
@Override
- public void onAdapterStarted(IAdapterParameterExtractor extractor,
- IEventCollector collector,
- IAdapterRuntimeContext adapterRuntimeContext)
throws AdapterException {
+ public void onAdapterStarted(
+ IAdapterParameterExtractor extractor,
+ IEventCollector collector,
+ IAdapterRuntimeContext adapterRuntimeContext
+ ) throws AdapterException {
- this.applyConfiguration(extractor.getStaticPropertyExtractor());
+ this.initializeMqttConfig(extractor.getStaticPropertyExtractor());
this.mqttConsumer = new MqttConsumer(
this.mqttConfig,
new BrokerEventProcessor(extractor.selectedParser(), collector)
);
- Thread thread = new Thread(this.mqttConsumer);
- thread.start();
+ this.mqttConsumer.start();
}
@Override
- public void onAdapterStopped(IAdapterParameterExtractor extractor,
- IAdapterRuntimeContext adapterRuntimeContext)
throws AdapterException {
- this.mqttConsumer.close();
+ public void onAdapterStopped(
+ IAdapterParameterExtractor extractor,
+ IAdapterRuntimeContext adapterRuntimeContext
+ ) {
+ if (this.mqttConsumer != null) {
+ this.mqttConsumer.stop();
+ }
}
@Override
- public SampleData onSampleDataRequested(IAdapterParameterExtractor extractor,
- IAdapterGuessSchemaContext
adapterGuessSchemaContext) throws AdapterException {
- try {
- AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
- this.applyConfiguration(extractor.getStaticPropertyExtractor());
- List<byte[]> elements = new ArrayList<>();
- InternalEventProcessor<byte[]> eventProcessor = elements::add;
-
-
- MqttConsumer consumer = new MqttConsumer(this.mqttConfig,
eventProcessor);
-
- Thread thread = new Thread(consumer);
- thread.setUncaughtExceptionHandler((t, e) ->
exceptionRef.set(e.getCause()));
- thread.start();
-
- while (consumer.getMessageCount() < 1 && exceptionRef.get() == null) {
- try {
- TimeUnit.MILLISECONDS.sleep(100);
- } catch (InterruptedException e) {
- break;
- }
- }
- consumer.close();
-
- Throwable threadException = exceptionRef.get();
- if (threadException != null) {
- throw new AdapterException(threadException.getMessage(),
threadException);
- }
-
- return extractor.selectedParser().getSampleData(new
ByteArrayInputStream(elements.get(0)));
- } catch (Exception e) {
- throw new AdapterException(e.getMessage(), e);
- }
+ public SampleData onSampleDataRequested(
+ IAdapterParameterExtractor extractor,
+ IAdapterGuessSchemaContext adapterGuessSchemaContext
+ ) throws AdapterException {
+
+ this.initializeMqttConfig(extractor.getStaticPropertyExtractor());
+
+ var payload = getSampleEventAsByte();
+
+ return extractor.selectedParser()
+ .getSampleData(new ByteArrayInputStream(payload));
+
+ }
+
+ private byte[] getSampleEventAsByte() throws AdapterException {
+ var receiver = new MqttSingleMessageReceiver(this.mqttConfig, 10);
+ return receiver.receiveSingleMessage();
}
+
+ public void initializeMqttConfig(IStaticPropertyExtractor extractor) {
+ this.mqttConfig = MqttConnectUtils.getMqttConfig(extractor);
+ }
+
}
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java
index 24b0a6cb93..1ed4aa2b34 100644
---
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java
+++
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java
@@ -17,119 +17,146 @@
*/
package org.apache.streampipes.extensions.connectors.mqtt.shared;
+import org.apache.streampipes.commons.exceptions.connect.AdapterException;
import org.apache.streampipes.messaging.InternalEventProcessor;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
+import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
-public class MqttConsumer extends MqttBase implements Runnable {
+public class MqttConsumer extends MqttBase {
- private final InternalEventProcessor<byte[]> consumer;
- private boolean running;
- private int maxElementsToReceive = -1;
- private int messageCount = 0;
+ private static final Logger LOG =
LoggerFactory.getLogger(MqttConsumer.class);
+ private static final int KEEP_ALIVE_SECONDS = 30;
+ private final InternalEventProcessor<byte[]> eventProcessor;
+ private final AtomicBoolean running;
private Mqtt3AsyncClient client;
- private static final Logger LOG =
LoggerFactory.getLogger(MqttConsumer.class);
-
public MqttConsumer(MqttConfig mqttConfig, InternalEventProcessor<byte[]>
consumer) {
super(mqttConfig);
- this.consumer = consumer;
- }
-
- public MqttConsumer(MqttConfig mqttConfig, InternalEventProcessor<byte[]>
consumer, int maxElementsToReceive) {
- this(mqttConfig, consumer);
- this.maxElementsToReceive = maxElementsToReceive;
+ this.eventProcessor = consumer;
+ this.running = new AtomicBoolean(false);
}
- @Override
- public void run() {
- this.running = true;
+ /**
+ * Starts the MQTT consumer and subscribes to the configured topic.
+ *
+ * @throws AdapterException when the connection or subscription fails
+ */
+ public void start() throws AdapterException {
+ if (!this.running.compareAndSet(false, true)) {
+ return;
+ }
try {
- this.client = super.setupMqttClient();
- client.connectWith()
- .keepAlive(30)
- .send()
- .whenComplete((cAck, throwable) -> {
- if (throwable != null) {
- LOG.error("MQTT connection failed", throwable);
- } else {
- LOG.info("MQTT connection established");
- }
- })
- .get();
-
- subscribe(client);
-
- } catch (Exception e) {
- LOG.error("Error in MQTT consumer: ", e);
- throw new RuntimeException("Error when receiving data from MQTT",
e);
+ this.client = connectClient();
+ subscribeToTopic(this.client);
+ } catch (InterruptedException e) {
+ this.running.set(false);
+ Thread.currentThread().interrupt();
+ throw new AdapterException("Interrupted while starting MQTT
consumer", e);
+ } catch (AdapterException e) {
+ this.running.set(false);
+ throw e;
}
}
- private void subscribe(Mqtt3AsyncClient client) throws Exception {
-
- CountDownLatch subscribed = new CountDownLatch(1);
-
- client.subscribeWith()
- .topicFilter(super.mqttConfig.getTopic())
- .qos(MqttQos.AT_LEAST_ONCE)
- .callback(this::handleMessage)
- .send()
- .whenComplete((subAck, throwable) -> {
- if (throwable != null) {
- LOG.error("MQTT subscribe failed", throwable);
- } else {
- LOG.info("Successfully subscribed to topic {}",
super.mqttConfig.getTopic());
- }
- subscribed.countDown();
- });
-
- subscribed.await();
+ /**
+ * Stops the MQTT consumer and disconnects from the broker.
+ */
+ public void stop() {
+ this.running.set(false);
+ if (this.client == null) {
+ return;
+ }
+ disconnectSafely();
}
private void handleMessage(Mqtt3Publish publish) {
- if (!this.running) {
+ if (!this.running.get()) {
return;
}
-
try {
- byte[] payload = publish.getPayloadAsBytes();
- consumer.onEvent(payload);
- messageCount++;
-
- if (maxElementsToReceive != -1 && messageCount >=
maxElementsToReceive) {
- LOG.info("Max elements ({}) received. Stopping consumer.",
maxElementsToReceive);
- this.running = false;
- }
-
- } catch (Exception e) {
+ processPayload(publish.getPayloadAsBytes());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted while processing MQTT message", e);
+ } catch (RuntimeException e) {
LOG.error("Error processing MQTT message", e);
}
}
- public void close() {
- this.running = false;
+
+ private Mqtt3AsyncClient connectClient() throws AdapterException,
InterruptedException {
try {
+ var mqttClient = super.setupMqttClient();
+ mqttClient.connectWith()
+ .keepAlive(KEEP_ALIVE_SECONDS)
+ .send()
+ .whenComplete(this::logConnect)
+ .get();
+ return mqttClient;
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new AdapterException("Error while connecting to MQTT
broker", e);
+ }
+ }
- this.client.disconnect().get();
+ private void logConnect(Mqtt3ConnAck cAck, Throwable throwable) {
+ if (throwable != null) {
+ LOG.error("MQTT connection failed", throwable);
+ } else {
+ LOG.info("MQTT connection established");
+ }
+ }
+ private void subscribeToTopic(Mqtt3AsyncClient client) throws
AdapterException, InterruptedException {
+ CountDownLatch subscribed = new CountDownLatch(1);
+ client.subscribeWith()
+ .topicFilter(super.mqttConfig.getTopic())
+ .qos(MqttQos.AT_LEAST_ONCE)
+ .callback(this::handleMessage)
+ .send()
+ .whenComplete((subAck, throwable) -> {
+ logSubscribe(throwable);
+ subscribed.countDown();
+ });
+ try {
+ subscribed.await();
} catch (InterruptedException e) {
- LOG.error("Error disconnecting from MQTT due to thread
interruption", e);
- } catch (ExecutionException e) {
- LOG.error("Error disconnecting from MQTT", e.getCause());
+ Thread.currentThread().interrupt();
+ throw e;
}
+ }
+ private void logSubscribe(Throwable throwable) {
+ if (throwable != null) {
+ LOG.error("MQTT subscribe failed", throwable);
+ } else {
+ LOG.info("Successfully subscribed to topic {}",
super.mqttConfig.getTopic());
+ }
}
- public Integer getMessageCount() {
- return messageCount;
+ private void processPayload(byte[] payload) throws InterruptedException{
+ eventProcessor.onEvent(payload);
+ }
+
+ private void disconnectSafely() {
+ try {
+ this.client.disconnect().whenComplete((result, throwable) -> {
+ if (throwable != null) {
+ LOG.error("Error disconnecting from MQTT", throwable);
+ }
+ });
+ } catch (RuntimeException e) {
+ LOG.error("Error disconnecting from MQTT", e);
+ }
}
-}
\ No newline at end of file
+}
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttPublisher.java
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttPublisher.java
index 002b5fe54d..d11cad2e4a 100644
---
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttPublisher.java
+++
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttPublisher.java
@@ -28,12 +28,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.URI;
-
-
public class MqttPublisher extends MqttBase {
- private URI uri;
private Mqtt3AsyncClient client;
private static final Logger LOG =
LoggerFactory.getLogger(MqttPublisher.class);
@@ -89,7 +85,7 @@ public class MqttPublisher extends MqttBase {
});
} catch (Exception e) {
throw new SpRuntimeException("Could not publish to MQTT broker: "
- + uri.toString() + ", " + e.getMessage(), e);
+ + super.mqttConfig.getUrl() + ", " + e.getMessage(), e);
}
}
@@ -99,7 +95,7 @@ public class MqttPublisher extends MqttBase {
.get(); // block until disconnected
} catch (Exception e) {
throw new SpRuntimeException("Could not disconnect from MQTT broker: "
- + uri.toString() + ", " + e.getMessage(), e);
+ + super.mqttConfig.getUrl() + ", " + e.getMessage(), e);
}
}
diff --git
a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttSingleMessageReceiver.java
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttSingleMessageReceiver.java
new file mode 100644
index 0000000000..f00810056f
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttSingleMessageReceiver.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.extensions.connectors.mqtt.shared;
+
+import org.apache.streampipes.commons.exceptions.connect.AdapterException;
+
+import com.hivemq.client.mqtt.datatypes.MqttQos;
+import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class MqttSingleMessageReceiver extends MqttBase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MqttSingleMessageReceiver.class);
+ private final int timeoutInSeconds;
+
+ public MqttSingleMessageReceiver(MqttConfig mqttConfig, int
timeoutInSeconds) {
+ super(mqttConfig);
+ this.timeoutInSeconds = timeoutInSeconds;
+ }
+
+ /**
+ * Receives a single MQTT message and blocks until a message arrives or the
timeout elapses.
+ *
+ * @return the payload bytes of the first received message
+ * @throws AdapterException when receiving fails or the thread is interrupted
+ */
+ public byte[] receiveSingleMessage() throws AdapterException {
+ try {
+ return receiveSingleMessageAsync().get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new AdapterException("Interrupted while waiting for an MQTT
message", e);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof TimeoutException) {
+ throw new AdapterException(
+ String.format("Timed out after %d seconds while waiting for an
MQTT message", timeoutInSeconds),
+ e.getCause()
+ );
+ }
+ throw new AdapterException("Failed to receive MQTT message",
e.getCause());
+ } catch (Exception e) {
+ throw new AdapterException("Failed to receive MQTT message", e);
+ }
+ }
+
+ private CompletableFuture<byte[]> receiveSingleMessageAsync() {
+ var result = new CompletableFuture<byte[]>();
+ var client = setupClientOrFail(result);
+ if (client == null) {
+ return result;
+ }
+ registerCleanup(result, client);
+ subscribeForSingleMessage(result, client);
+ applyTimeout(result);
+ return result;
+ }
+
+ private Mqtt3AsyncClient setupClientOrFail(CompletableFuture<byte[]> result)
{
+ try {
+ return super.setupMqttClient();
+ } catch (Exception e) {
+ result.completeExceptionally(e);
+ return null;
+ }
+ }
+
+ private void registerCleanup(CompletableFuture<byte[]> result,
Mqtt3AsyncClient client) {
+ result.whenComplete((payload, throwable) -> disconnectQuietly(client));
+ }
+
+ private void subscribeForSingleMessage(CompletableFuture<byte[]> result,
Mqtt3AsyncClient client) {
+ client.connect()
+ .thenCompose(v -> client.subscribeWith()
+ .topicFilter(mqttConfig.getTopic())
+ .qos(MqttQos.AT_LEAST_ONCE)
+ .callback(publish -> completeIfEmpty(result,
publish.getPayloadAsBytes()))
+ .send()
+ )
+ .exceptionally(ex -> {
+ result.completeExceptionally(ex);
+ return null;
+ });
+ }
+
+ private void completeIfEmpty(CompletableFuture<byte[]> result, byte[]
payload) {
+ if (!result.isDone()) {
+ result.complete(payload);
+ }
+ }
+
+ private void applyTimeout(CompletableFuture<byte[]> result) {
+ if (this.timeoutInSeconds > 0) {
+ result.orTimeout(this.timeoutInSeconds, TimeUnit.SECONDS);
+ }
+ }
+
+ private void disconnectQuietly(Mqtt3AsyncClient client) {
+ try {
+ client.disconnect().whenComplete((result, throwable) -> {
+ if (throwable != null) {
+ LOG.warn("Failed to disconnect MQTT client after receiving sample",
throwable);
+ }
+ });
+ } catch (Exception e) {
+ LOG.warn("Failed to disconnect MQTT client after receiving sample", e);
+ }
+ }
+}
diff --git
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MQTTPublisherUtils.java
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MQTTPublisherUtils.java
index 87ce58e814..c273982f09 100644
---
a/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MQTTPublisherUtils.java
+++
b/streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/adapters/MQTTPublisherUtils.java
@@ -57,9 +57,4 @@ public class MQTTPublisherUtils {
publisher.connect();
return publisher;
}
-
- public static void closeConnection(MqttPublisher publisher) {
- publisher.disconnect();
- }
-
}