This is an automated email from the ASF dual-hosted git repository. zehnder pushed a commit to branch 4137-updating-mqtt-adapter-can-leave-orphaned-consumer-thread in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 0fb0f273b220c9328726fb91f2b01c780a0f8086 Author: Philipp Zehnder <[email protected]> AuthorDate: Thu Jan 29 13:56:37 2026 +0100 fix(#4137): Improve logic to get single event from mqtt --- .../connect/iiot/adapters/oi4/Oi4Adapter.java | 160 +++++---------------- .../connectors/mqtt/adapter/MqttProtocol.java | 92 +++++------- .../connectors/mqtt/shared/MqttConsumer.java | 24 +--- .../connectors/mqtt/shared/MqttPublisher.java | 8 +- .../mqtt/shared/MqttSingleMessageReceiver.java | 129 +++++++++++++++++ 5 files changed, 208 insertions(+), 205 deletions(-) diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java index 06e0c76b84..b724b43f58 100644 --- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java @@ -32,13 +32,11 @@ import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor; import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConfig; import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils; import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConsumer; +import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttSingleMessageReceiver; import org.apache.streampipes.extensions.management.connect.adapter.parser.JsonParsers; import org.apache.streampipes.extensions.management.connect.adapter.parser.json.JsonObjectParser; -import org.apache.streampipes.messaging.InternalEventProcessor; -import org.apache.streampipes.model.connect.guess.GuessSchema; import org.apache.streampipes.model.connect.guess.SampleData; import org.apache.streampipes.model.extensions.ExtensionAssetType; -import org.apache.streampipes.model.schema.EventSchema; import org.apache.streampipes.sdk.StaticProperties; import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder; import org.apache.streampipes.sdk.helpers.Alternatives; @@ -62,12 +60,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import static org.apache.streampipes.sdk.helpers.EpProperties.timestampProperty; - /** * Adapter to connect to an Open Industry 4.0 (OI4) compatible device. * @@ -79,8 +73,6 @@ public class Oi4Adapter implements StreamPipesAdapter { public static final String ID = "org.apache.streampipes.connect.iiot.adapters.oi4"; private static final Logger LOG = LoggerFactory.getLogger(Oi4Adapter.class); - private static final long ReceiveSchemaSleepTime = 100; - private static final long ReceiveSchemaMaxTimeout = 5000; // Information about the topic structure can be found at page 57 of the above-mentioned development guide // The app id (missing here) needs to be provided by the user @@ -95,7 +87,7 @@ public class Oi4Adapter implements StreamPipesAdapter { public Oi4Adapter() { mapper = JacksonSerializer.getObjectMapper(Map.of( - DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true + DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true )); } @@ -107,7 +99,8 @@ public class Oi4Adapter implements StreamPipesAdapter { .withLocales(Locales.EN) .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) .requiredTextParameter(MqttConnectUtils.getBrokerUrlLabel()) - .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(), MqttConnectUtils.getAnonymousAccess(), + .requiredAlternatives( + MqttConnectUtils.getAccessModeLabel(), MqttConnectUtils.getAnonymousAccess(), MqttConnectUtils.getUsernameAccess(), MqttConnectUtils.getClientCertAccess() ) .requiredAlternatives( @@ -144,7 +137,10 @@ public class Oi4Adapter implements StreamPipesAdapter { IAdapterRuntimeContext adapterRuntimeContext ) throws AdapterException { LOG.info("Adapter type {} starting", ID); - LOG.info("Adapter with id {} starting", extractor.getAdapterDescription().getElementId()); + LOG.info("Adapter with id {} starting", + extractor.getAdapterDescription() + .getElementId() + ); this.applyConfiguration(extractor.getStaticPropertyExtractor()); @@ -168,7 +164,10 @@ public class Oi4Adapter implements StreamPipesAdapter { thread.start(); LOG.info("Adapter {} started", ID); - LOG.info("Adapter with id {} started", extractor.getAdapterDescription().getElementId()); + LOG.info("Adapter with id {} started", + extractor.getAdapterDescription() + .getElementId() + ); } private InputStream convertByte(byte[] event) { @@ -191,7 +190,7 @@ public class Oi4Adapter implements StreamPipesAdapter { } else { var selectedSensorsText = extractor.textParameter(OI4AdapterLabels.LABEL_KEY_SENSORS_LIST_INPUT); selectedSensors = Arrays.stream(selectedSensorsText.split(",")) - .toList(); + .toList(); } if (selectedAlternativeSensorDescription.equals(OI4AdapterLabels.LABEL_KEY_SENSOR_TYPE_ALTERNATIVE)) { @@ -215,48 +214,12 @@ public class Oi4Adapter implements StreamPipesAdapter { IAdapterParameterExtractor extractor, IAdapterGuessSchemaContext adapterGuessSchemaContext ) throws AdapterException { - try { - this.applyConfiguration(extractor.getStaticPropertyExtractor()); - - var sampleMessage = getSampleMessage(); - var sampleData = transformToSampleData(sampleMessage); - - return sampleData; - } catch (RuntimeException e) { - throw new AdapterException(e.getMessage(), e); - } - } - - /** - * Updates the timestamp property in the given GuessSchema if it exists as it is not correctly guessed. - * If the timestamp property exists, it is replaced with a proper timestamp property. - * - * @param guessSchema The GuessSchema to update. - */ - private void updateTimestampPropertyIfExists(GuessSchema guessSchema) { - var eventProperties = guessSchema.getEventSchema() - .getEventProperties(); - - var timestampPropertyOpt = eventProperties.stream() - .filter(eventProperty -> - eventProperty.getRuntimeName() - .equals(OI4AdapterLabels.EVENT_KEY_TIMESTAMP) - ) - .findFirst(); - - var newTimestampProperty = timestampProperty(OI4AdapterLabels.EVENT_KEY_TIMESTAMP); - - // If the timestamp property exists, replace it with the new timestamp property - timestampPropertyOpt.ifPresent(prop -> { - eventProperties.removeIf(eventProperty -> eventProperty.getRuntimeName() - .equals(OI4AdapterLabels.EVENT_KEY_TIMESTAMP)); - eventProperties.add(newTimestampProperty); - }); - - guessSchema.setEventSchema(new EventSchema(eventProperties)); + this.applyConfiguration(extractor.getStaticPropertyExtractor()); + var sampleMessage = getSampleMessage(); + return transformToSampleData(sampleMessage); } - private SampleData transformToSampleData(byte[] sampleMessage) { + private SampleData transformToSampleData(byte[] sampleMessage) throws AdapterException { try { var networkMessage = mapper.readValue(sampleMessage, NetworkMessage.class); var payload = extractPayload(networkMessage); @@ -265,68 +228,14 @@ public class Oi4Adapter implements StreamPipesAdapter { return new JsonParsers(new JsonObjectParser()) .getSampleData(convertByte(plainPayload.getBytes(StandardCharsets.UTF_8))); } catch (IOException e) { - LOG.error("Error while reading sample message: {}", sampleMessage); - throw new RuntimeException(e); + throw new AdapterException("Error while reading sample message", e); } } - private byte[] getSampleMessage() throws AdapterException { - List<byte[]> sampleMessages = new ArrayList<>(); - long timeElapsed = 0; - AtomicReference<Throwable> exceptionRef = new AtomicReference<>(); - MqttConsumer guessConsumer = getGuessMqttConsumer(sampleMessages); - - var thread = new Thread(guessConsumer); - thread.setUncaughtExceptionHandler((t, e) -> exceptionRef.set(e.getCause())); - thread.start(); - - while (sampleMessages.isEmpty() && exceptionRef.get() == null && timeElapsed < ReceiveSchemaMaxTimeout) { - try { - TimeUnit.MILLISECONDS.sleep(ReceiveSchemaSleepTime); - timeElapsed += ReceiveSchemaSleepTime; - } catch (InterruptedException e) { - LOG.error("Schema guessing failed during waiting for an incoming event: {}", e.getMessage()); - break; - } - } - guessConsumer.close(); - - Throwable threadException = exceptionRef.get(); - if (threadException != null) { - throw new AdapterException(threadException.getMessage(), threadException); - } - - if (!sampleMessages.isEmpty()) { - return sampleMessages.get(0); - } else { - throw new AdapterException("No messages received"); - } - } - /** - * Obtain a specialized MQTT consumer designed to infer the event schema based on provided sampleMessages. - * - * @param sampleMessages A list of byte arrays representing MQTT message payloads. - * @return A customized MqttConsumer instance. - */ - private MqttConsumer getGuessMqttConsumer(List<byte[]> sampleMessages) { - // Define a specialized event processor that adds an event to the sampleMessages array - // only if it meets certain expectations, as verified by extractPayload. - InternalEventProcessor<byte[]> eventProcessor = event -> { - InputStream in = convertByte(event); - NetworkMessage networkMessage; - try { - networkMessage = mapper.readValue(in, NetworkMessage.class); - } catch (IOException e) { - LOG.error("Error during parsing of incoming MQTT event."); - throw new RuntimeException(e); - } - // Attempt to extract payload from the NetworkMessage - extractPayload(networkMessage); - // If successful, add the event to the sampleMessages array - sampleMessages.add(event); - }; - return new MqttConsumer(this.mqttConfig, eventProcessor); + private byte[] getSampleMessage() throws AdapterException { + var receiver = new MqttSingleMessageReceiver(this.mqttConfig, 10); + return receiver.receiveSingleMessage(); } private List<Map<String, Object>> extractPayload(NetworkMessage message) throws ParseException { @@ -343,7 +252,7 @@ public class Oi4Adapter implements StreamPipesAdapter { // Verify that the message corresponds to the designated sensor type. // This validation relies on the assumption that the source information includes the sensor type. if (dataMessage.source() - .contains(givenSensorType)) { + .contains(givenSensorType)) { // an empty list of selected sensors means that we want to collect data from all sensors available if (selectedSensors.isEmpty() || selectedSensors.contains(sensorId)) { @@ -361,10 +270,10 @@ public class Oi4Adapter implements StreamPipesAdapter { private List<DataSetMessage> findProcessDataInputMessage(NetworkMessage message) { return message.messages() - .stream() - .filter(msg -> msg.filter() - .equals(OI4AdapterLabels.MESSAGE_VALUE_FILTER)) - .toList(); + .stream() + .filter(msg -> msg.filter() + .equals(OI4AdapterLabels.MESSAGE_VALUE_FILTER)) + .toList(); } private Map<String, Object> extractAndEnrichMessagePayload(DataSetMessage dataSetMessage, String sensorId) { @@ -388,16 +297,19 @@ public class Oi4Adapter implements StreamPipesAdapter { private Map<String, Object> replaceSpecialChars(Map<String, Object> originalMap) { return originalMap.entrySet() - .stream() - .collect(Collectors.toMap( - entry -> entry.getKey().replace("-", ""), - Map.Entry::getValue, - (oldValue, newValue) -> oldValue) - ); + .stream() + .collect(Collectors.toMap( + entry -> entry.getKey() + .replace("-", ""), + Map.Entry::getValue, + (oldValue, newValue) -> oldValue + ) + ); } private long parseDate(String timestamp) throws DateTimeParseException { - return Instant.parse(timestamp).toEpochMilli(); + return Instant.parse(timestamp) + .toEpochMilli(); } /** diff --git a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java index d3816c4f44..97959774c3 100644 --- a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java +++ b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/adapter/MqttProtocol.java @@ -28,27 +28,18 @@ import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor; import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConfig; import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConnectUtils; import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttConsumer; +import org.apache.streampipes.extensions.connectors.mqtt.shared.MqttSingleMessageReceiver; import org.apache.streampipes.extensions.management.connect.adapter.BrokerEventProcessor; import org.apache.streampipes.extensions.management.connect.adapter.parser.Parsers; -import org.apache.streampipes.messaging.InternalEventProcessor; import org.apache.streampipes.model.connect.guess.SampleData; import org.apache.streampipes.model.extensions.ExtensionAssetType; import org.apache.streampipes.sdk.builder.adapter.AdapterConfigurationBuilder; import org.apache.streampipes.sdk.helpers.Locales; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.ByteArrayInputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; public class MqttProtocol implements StreamPipesAdapter { - private static final Logger LOG = LoggerFactory.getLogger(MqttProtocol.class); - public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.mqtt"; private MqttConsumer mqttConsumer; @@ -57,9 +48,6 @@ public class MqttProtocol implements StreamPipesAdapter { public MqttProtocol() { } - public void applyConfiguration(IStaticPropertyExtractor extractor) { - this.mqttConfig = MqttConnectUtils.getMqttConfig(extractor); - } @Override public IAdapterConfiguration declareConfig() { @@ -69,18 +57,22 @@ public class MqttProtocol implements StreamPipesAdapter { .withLocales(Locales.EN) .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) .requiredTextParameter(MqttConnectUtils.getBrokerUrlLabel()) - .requiredAlternatives(MqttConnectUtils.getAccessModeLabel(), MqttConnectUtils.getAnonymousAccess(), - MqttConnectUtils.getUsernameAccess(), MqttConnectUtils.getClientCertAccess()) + .requiredAlternatives( + MqttConnectUtils.getAccessModeLabel(), MqttConnectUtils.getAnonymousAccess(), + MqttConnectUtils.getUsernameAccess(), MqttConnectUtils.getClientCertAccess() + ) .requiredTextParameter(MqttConnectUtils.getTopicLabel()) .buildConfiguration(); } @Override - public void onAdapterStarted(IAdapterParameterExtractor extractor, - IEventCollector collector, - IAdapterRuntimeContext adapterRuntimeContext) throws AdapterException { + public void onAdapterStarted( + IAdapterParameterExtractor extractor, + IEventCollector collector, + IAdapterRuntimeContext adapterRuntimeContext + ) throws AdapterException { - this.applyConfiguration(extractor.getStaticPropertyExtractor()); + this.initializeMqttConfig(extractor.getStaticPropertyExtractor()); this.mqttConsumer = new MqttConsumer( this.mqttConfig, new BrokerEventProcessor(extractor.selectedParser(), collector) @@ -91,44 +83,36 @@ public class MqttProtocol implements StreamPipesAdapter { } @Override - public void onAdapterStopped(IAdapterParameterExtractor extractor, - IAdapterRuntimeContext adapterRuntimeContext) throws AdapterException { + public void onAdapterStopped( + IAdapterParameterExtractor extractor, + IAdapterRuntimeContext adapterRuntimeContext + ) { this.mqttConsumer.close(); } @Override - public SampleData onSampleDataRequested(IAdapterParameterExtractor extractor, - IAdapterGuessSchemaContext adapterGuessSchemaContext) throws AdapterException { - try { - AtomicReference<Throwable> exceptionRef = new AtomicReference<>(); - this.applyConfiguration(extractor.getStaticPropertyExtractor()); - List<byte[]> elements = new ArrayList<>(); - InternalEventProcessor<byte[]> eventProcessor = elements::add; - - - MqttConsumer consumer = new MqttConsumer(this.mqttConfig, eventProcessor); - - Thread thread = new Thread(consumer); - thread.setUncaughtExceptionHandler((t, e) -> exceptionRef.set(e.getCause())); - thread.start(); - - while (consumer.getMessageCount() < 1 && exceptionRef.get() == null) { - try { - TimeUnit.MILLISECONDS.sleep(100); - } catch (InterruptedException e) { - break; - } - } - consumer.close(); - - Throwable threadException = exceptionRef.get(); - if (threadException != null) { - throw new AdapterException(threadException.getMessage(), threadException); - } - - return extractor.selectedParser().getSampleData(new ByteArrayInputStream(elements.get(0))); - } catch (Exception e) { - throw new AdapterException(e.getMessage(), e); - } + public SampleData onSampleDataRequested( + IAdapterParameterExtractor extractor, + IAdapterGuessSchemaContext adapterGuessSchemaContext + ) throws AdapterException { + + this.initializeMqttConfig(extractor.getStaticPropertyExtractor()); + + var payload = getSampleEventAsByte(); + + return extractor.selectedParser() + .getSampleData(new ByteArrayInputStream(payload)); + } + + private byte[] getSampleEventAsByte() throws AdapterException { + var receiver = new MqttSingleMessageReceiver(this.mqttConfig, 10); + return receiver.receiveSingleMessage(); + } + + public void initializeMqttConfig(IStaticPropertyExtractor extractor) { + this.mqttConfig = MqttConnectUtils.getMqttConfig(extractor); + } + } + diff --git a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java index 24b0a6cb93..fd9f960f06 100644 --- a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java +++ b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttConsumer.java @@ -30,25 +30,18 @@ import java.util.concurrent.ExecutionException; public class MqttConsumer extends MqttBase implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(MqttConsumer.class); + private final InternalEventProcessor<byte[]> consumer; private boolean running; - private int maxElementsToReceive = -1; - private int messageCount = 0; private Mqtt3AsyncClient client; - private static final Logger LOG = LoggerFactory.getLogger(MqttConsumer.class); - public MqttConsumer(MqttConfig mqttConfig, InternalEventProcessor<byte[]> consumer) { super(mqttConfig); this.consumer = consumer; } - public MqttConsumer(MqttConfig mqttConfig, InternalEventProcessor<byte[]> consumer, int maxElementsToReceive) { - this(mqttConfig, consumer); - this.maxElementsToReceive = maxElementsToReceive; - } - @Override public void run() { this.running = true; @@ -103,13 +96,6 @@ public class MqttConsumer extends MqttBase implements Runnable { try { byte[] payload = publish.getPayloadAsBytes(); consumer.onEvent(payload); - messageCount++; - - if (maxElementsToReceive != -1 && messageCount >= maxElementsToReceive) { - LOG.info("Max elements ({}) received. Stopping consumer.", maxElementsToReceive); - this.running = false; - } - } catch (Exception e) { LOG.error("Error processing MQTT message", e); } @@ -128,8 +114,4 @@ public class MqttConsumer extends MqttBase implements Runnable { } } - - public Integer getMessageCount() { - return messageCount; - } -} \ No newline at end of file +} diff --git a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttPublisher.java b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttPublisher.java index 002b5fe54d..d11cad2e4a 100644 --- a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttPublisher.java +++ b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttPublisher.java @@ -28,12 +28,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URI; - - public class MqttPublisher extends MqttBase { - private URI uri; private Mqtt3AsyncClient client; private static final Logger LOG = LoggerFactory.getLogger(MqttPublisher.class); @@ -89,7 +85,7 @@ public class MqttPublisher extends MqttBase { }); } catch (Exception e) { throw new SpRuntimeException("Could not publish to MQTT broker: " - + uri.toString() + ", " + e.getMessage(), e); + + super.mqttConfig.getUrl() + ", " + e.getMessage(), e); } } @@ -99,7 +95,7 @@ public class MqttPublisher extends MqttBase { .get(); // block until disconnected } catch (Exception e) { throw new SpRuntimeException("Could not disconnect from MQTT broker: " - + uri.toString() + ", " + e.getMessage(), e); + + super.mqttConfig.getUrl() + ", " + e.getMessage(), e); } } diff --git a/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttSingleMessageReceiver.java b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttSingleMessageReceiver.java new file mode 100644 index 0000000000..f00810056f --- /dev/null +++ b/streampipes-extensions/streampipes-connectors-mqtt/src/main/java/org/apache/streampipes/extensions/connectors/mqtt/shared/MqttSingleMessageReceiver.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.streampipes.extensions.connectors.mqtt.shared; + +import org.apache.streampipes.commons.exceptions.connect.AdapterException; + +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class MqttSingleMessageReceiver extends MqttBase { + + private static final Logger LOG = LoggerFactory.getLogger(MqttSingleMessageReceiver.class); + private final int timeoutInSeconds; + + public MqttSingleMessageReceiver(MqttConfig mqttConfig, int timeoutInSeconds) { + super(mqttConfig); + this.timeoutInSeconds = timeoutInSeconds; + } + + /** + * Receives a single MQTT message and blocks until a message arrives or the timeout elapses. + * + * @return the payload bytes of the first received message + * @throws AdapterException when receiving fails or the thread is interrupted + */ + public byte[] receiveSingleMessage() throws AdapterException { + try { + return receiveSingleMessageAsync().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AdapterException("Interrupted while waiting for an MQTT message", e); + } catch (ExecutionException e) { + if (e.getCause() instanceof TimeoutException) { + throw new AdapterException( + String.format("Timed out after %d seconds while waiting for an MQTT message", timeoutInSeconds), + e.getCause() + ); + } + throw new AdapterException("Failed to receive MQTT message", e.getCause()); + } catch (Exception e) { + throw new AdapterException("Failed to receive MQTT message", e); + } + } + + private CompletableFuture<byte[]> receiveSingleMessageAsync() { + var result = new CompletableFuture<byte[]>(); + var client = setupClientOrFail(result); + if (client == null) { + return result; + } + registerCleanup(result, client); + subscribeForSingleMessage(result, client); + applyTimeout(result); + return result; + } + + private Mqtt3AsyncClient setupClientOrFail(CompletableFuture<byte[]> result) { + try { + return super.setupMqttClient(); + } catch (Exception e) { + result.completeExceptionally(e); + return null; + } + } + + private void registerCleanup(CompletableFuture<byte[]> result, Mqtt3AsyncClient client) { + result.whenComplete((payload, throwable) -> disconnectQuietly(client)); + } + + private void subscribeForSingleMessage(CompletableFuture<byte[]> result, Mqtt3AsyncClient client) { + client.connect() + .thenCompose(v -> client.subscribeWith() + .topicFilter(mqttConfig.getTopic()) + .qos(MqttQos.AT_LEAST_ONCE) + .callback(publish -> completeIfEmpty(result, publish.getPayloadAsBytes())) + .send() + ) + .exceptionally(ex -> { + result.completeExceptionally(ex); + return null; + }); + } + + private void completeIfEmpty(CompletableFuture<byte[]> result, byte[] payload) { + if (!result.isDone()) { + result.complete(payload); + } + } + + private void applyTimeout(CompletableFuture<byte[]> result) { + if (this.timeoutInSeconds > 0) { + result.orTimeout(this.timeoutInSeconds, TimeUnit.SECONDS); + } + } + + private void disconnectQuietly(Mqtt3AsyncClient client) { + try { + client.disconnect().whenComplete((result, throwable) -> { + if (throwable != null) { + LOG.warn("Failed to disconnect MQTT client after receiving sample", throwable); + } + }); + } catch (Exception e) { + LOG.warn("Failed to disconnect MQTT client after receiving sample", e); + } + } +}
