exceptionfactory commented on code in PR #6225:
URL: https://github.com/apache/nifi/pull/6225#discussion_r957500825
##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java:
##########
@@ -31,89 +31,89 @@
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.SSLContextService;
-import org.eclipse.paho.client.mqttv3.IMqttClient;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Properties;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import static org.apache.commons.lang3.EnumUtils.isValidEnumIgnoreCase;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
+import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_500;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
public abstract class AbstractMQTTProcessor extends
AbstractSessionFactoryProcessor {
- public static int DISCONNECT_TIMEOUT = 5000;
+ private static final String DEFAULT_SESSION_EXPIRY_INTERVAL = "24 hrs";
protected ComponentLog logger;
- protected IMqttClient mqttClient;
- protected volatile String broker;
- protected volatile String brokerUri;
- protected volatile String clientID;
- protected MqttConnectOptions connOpts;
- protected MemoryPersistence persistence = new MemoryPersistence();
- public ProcessSessionFactory processSessionFactory;
+ protected MqttClientProperties clientProperties;
- public static final Validator QOS_VALIDATOR = new Validator() {
+ protected MqttClientFactory mqttClientFactory = new MqttClientFactory();
+ protected MqttClient mqttClient;
- @Override
- public ValidationResult validate(String subject, String input,
ValidationContext context) {
- Integer inputInt = Integer.parseInt(input);
- if (inputInt < 0 || inputInt > 2) {
- return new
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must
be an integer between 0 and 2").build();
- }
- return new
ValidationResult.Builder().subject(subject).valid(true).build();
+ public ProcessSessionFactory processSessionFactory;
+
+ public static final Validator QOS_VALIDATOR = (subject, input, context) ->
{
+ Integer inputInt = Integer.parseInt(input);
+ if (inputInt < 0 || inputInt > 2) {
+ return new
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must
be an integer between 0 and 2").build();
}
+ return new
ValidationResult.Builder().subject(subject).valid(true).build();
};
- public static final Validator BROKER_VALIDATOR = new Validator() {
-
- @Override
- public ValidationResult validate(String subject, String input,
ValidationContext context) {
- try{
- URI brokerURI = new URI(input);
- if (!"".equals(brokerURI.getPath())) {
- return new
ValidationResult.Builder().subject(subject).valid(false).explanation("the
broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();
- }
- if (!("tcp".equals(brokerURI.getScheme()) ||
"ssl".equals(brokerURI.getScheme()) || "ws".equals(brokerURI.getScheme()) ||
"wss".equals(brokerURI.getScheme()))) {
- return new
ValidationResult.Builder().subject(subject).valid(false).explanation("only the
'tcp', 'ssl', 'ws' and 'wss' schemes are supported.").build();
- }
- } catch (URISyntaxException e) {
- return new
ValidationResult.Builder().subject(subject).valid(false).explanation("it is not
valid URI syntax.").build();
+ public static final Validator BROKER_VALIDATOR = (subject, input, context)
-> {
+ try {
+ URI brokerURI = new URI(input);
+ if (!EMPTY.equals(brokerURI.getPath())) {
+ return new
ValidationResult.Builder().subject(subject).valid(false).explanation("the
broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();
}
- return new
ValidationResult.Builder().subject(subject).valid(true).build();
+ if (!isValidEnumIgnoreCase(MqttProtocolScheme.class,
brokerURI.getScheme())) {
+ return new
ValidationResult.Builder().subject(subject).valid(false)
+ .explanation("invalid scheme! supported schemes are: "
+ MqttProtocolScheme.getValuesAsString(", ")).build();
Review Comment:
Error messages should not use the exclamation mark character.
```suggestion
.explanation("Invalid Scheme: supported schemes are:
" + MqttProtocolScheme.getValuesAsString(", ")).build();
```
##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java:
##########
@@ -649,38 +625,44 @@ private void closeWriter(final RecordSetWriter writer) {
}
private String getTransitUri(String... appends) {
- StringBuilder stringBuilder = new StringBuilder(brokerUri);
- for(String append : appends) {
+ String broker = clientProperties.getBrokerUri().toString();
+ StringBuilder stringBuilder = new StringBuilder(broker.endsWith("/") ?
broker : broker + "/");
+ for (String append : appends) {
stringBuilder.append(append);
}
return stringBuilder.toString();
}
@Override
public void connectionLost(Throwable cause) {
- logger.error("Connection to {} lost due to: {}", new Object[]{broker,
cause.getMessage()}, cause);
+ logger.error("Connection to {} lost due to: {}", new
Object[]{clientProperties.getBroker(), cause.getMessage()}, cause);
Review Comment:
```suggestion
logger.error("Connection to {} lost due to: {}",
clientProperties.getBroker(), cause.getMessage(), cause);
```
##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java:
##########
@@ -201,35 +191,35 @@ private void initializeClient(ProcessContext context) {
// non-null but not connected, so we need to handle each case and only
create a new client when it is null
try {
if (mqttClient == null) {
- logger.debug("Creating client");
- mqttClient = createMqttClient(broker, clientID, persistence);
+ mqttClient = createMqttClient();
mqttClient.setCallback(this);
}
if (!mqttClient.isConnected()) {
- logger.debug("Connecting client");
- mqttClient.connect(connOpts);
+ mqttClient.connect();
}
- } catch (MqttException e) {
- logger.error("Connection to {} lost (or was never connected) and
connection failed. Yielding processor", new Object[]{broker}, e);
+ } catch (Exception e) {
+ logger.error("Connection to {} lost (or was never connected) and
connection failed. Yielding processor", clientProperties.getBroker(), e);
context.yield();
}
}
@Override
public void connectionLost(Throwable cause) {
- logger.error("Connection to {} lost due to: {}", new Object[]{broker,
cause.getMessage()}, cause);
+ logger.error("Connection to {} lost due to: {}", new
Object[]{clientProperties.getBroker(), cause.getMessage()}, cause);
Review Comment:
The object array wrapper can be removed.
```suggestion
logger.error("Connection to {} lost due to: {}",
clientProperties.getBroker(), cause.getMessage(), cause);
```
##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttProtocolScheme.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.common;
+
+import java.util.Arrays;
+
+public enum MqttProtocolScheme {
+ TCP,
+ SSL,
+ WS,
+ WSS;
+
+ public static String getValuesAsString(String delimiter) {
+ return String.join(delimiter, Arrays.stream(values()).map(value ->
value.name().toLowerCase()).toArray(String[]::new));
+ }
Review Comment:
It looks like this method is used only in reference to an error message.
Recommend removing it from the `enum` and creating a local private method
instead.
##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java:
##########
@@ -649,38 +625,44 @@ private void closeWriter(final RecordSetWriter writer) {
}
private String getTransitUri(String... appends) {
- StringBuilder stringBuilder = new StringBuilder(brokerUri);
- for(String append : appends) {
+ String broker = clientProperties.getBrokerUri().toString();
+ StringBuilder stringBuilder = new StringBuilder(broker.endsWith("/") ?
broker : broker + "/");
+ for (String append : appends) {
stringBuilder.append(append);
}
return stringBuilder.toString();
}
@Override
public void connectionLost(Throwable cause) {
- logger.error("Connection to {} lost due to: {}", new Object[]{broker,
cause.getMessage()}, cause);
+ logger.error("Connection to {} lost due to: {}", new
Object[]{clientProperties.getBroker(), cause.getMessage()}, cause);
}
@Override
- public void messageArrived(String topic, MqttMessage message) throws
Exception {
+ public void messageArrived(ReceivedMqttMessage message) {
if (logger.isDebugEnabled()) {
byte[] payload = message.getPayload();
- String text = new String(payload, "UTF-8");
+ String text = new String(payload, StandardCharsets.UTF_8);
if (StringUtils.isAsciiPrintable(text)) {
- logger.debug("Message arrived from topic {}. Payload: {}", new
Object[] {topic, text});
+ logger.debug("Message arrived from topic {}. Payload: {}",
message.getTopic(), text);
} else {
- logger.debug("Message arrived from topic {}. Binary value of
size {}", new Object[] {topic, payload.length});
+ logger.debug("Message arrived from topic {}. Binary value of
size {}", message.getTopic(), payload.length);
}
}
- if(!mqttQueue.offer(new MQTTQueueMessage(topic, message), 1,
TimeUnit.SECONDS)) {
- throw new IllegalStateException("The subscriber queue is full,
cannot receive another message until the processor is scheduled to run.");
+ try {
+ if (!mqttQueue.offer(message, 1, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("The subscriber queue is full,
cannot receive another message until the processor is scheduled to run.");
+ }
+ } catch (InterruptedException e) {
+ throw new MqttException("Failed to process message arrived from
topic " + message.getTopic());
}
}
@Override
- public void deliveryComplete(IMqttDeliveryToken token) {
- logger.warn("Received MQTT 'delivery complete' message to subscriber:
" + token);
+ public void deliveryComplete(String token) {
+ // Unlikely situation. Api uses the same callback for publisher and
consumer as well.
+ // That's why we have this log message here to indicate something
really messy thing happened.
+ logger.error("Received MQTT 'delivery complete' message to subscriber:
" + token);
Review Comment:
```suggestion
logger.error("Received MQTT 'delivery complete' message to
subscriber token [{}]", token);
```
##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java:
##########
@@ -384,16 +326,59 @@ public final void onTrigger(final ProcessContext context,
final ProcessSessionFa
onTrigger(context, session);
session.commitAsync();
} catch (final Throwable t) {
- getLogger().error("{} failed to process due to {}; rolling back
session", new Object[]{this, t});
+ getLogger().error("{} failed to process due to {}; rolling back
session", this, t);
session.rollback(true);
throw t;
}
}
public abstract void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException;
- protected boolean isConnected(){
+ protected boolean isConnected() {
return (mqttClient != null && mqttClient.isConnected());
}
+ protected MqttClientProperties getMqttClientProperties(final
ProcessContext context) {
+ final MqttClientProperties clientProperties = new
MqttClientProperties();
+
+ try {
+ clientProperties.setBrokerUri(new
URI(context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue()));
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
Review Comment:
The error should be changed to an `IllegalArgumentException` and should
include a message.
```suggestion
throw new IllegalArgumentException("Invalid Broker URI", e);
```
##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java:
##########
@@ -289,88 +285,34 @@ public Collection<ValidationResult> customValidate(final
ValidationContext valid
return results;
}
- public static Properties transformSSLContextService(SSLContextService
sslContextService){
- Properties properties = new Properties();
- if (sslContextService.getSslAlgorithm() != null) {
- properties.setProperty("com.ibm.ssl.protocol",
sslContextService.getSslAlgorithm());
- }
- if (sslContextService.getKeyStoreFile() != null) {
- properties.setProperty("com.ibm.ssl.keyStore",
sslContextService.getKeyStoreFile());
- }
- if (sslContextService.getKeyStorePassword() != null) {
- properties.setProperty("com.ibm.ssl.keyStorePassword",
sslContextService.getKeyStorePassword());
- }
- if (sslContextService.getKeyStoreType() != null) {
- properties.setProperty("com.ibm.ssl.keyStoreType",
sslContextService.getKeyStoreType());
- }
- if (sslContextService.getTrustStoreFile() != null) {
- properties.setProperty("com.ibm.ssl.trustStore",
sslContextService.getTrustStoreFile());
- }
- if (sslContextService.getTrustStorePassword() != null) {
- properties.setProperty("com.ibm.ssl.trustStorePassword",
sslContextService.getTrustStorePassword());
- }
- if (sslContextService.getTrustStoreType() != null) {
- properties.setProperty("com.ibm.ssl.trustStoreType",
sslContextService.getTrustStoreType());
- }
- return properties;
+ protected void onScheduled(final ProcessContext context) {
+ clientProperties = getMqttClientProperties(context);
}
- protected void onScheduled(final ProcessContext context){
- broker =
context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue();
- brokerUri = broker.endsWith("/") ? broker : broker + "/";
- clientID =
context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue();
-
- if (clientID == null) {
- clientID = UUID.randomUUID().toString();
- }
-
- connOpts = new MqttConnectOptions();
-
connOpts.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean());
-
connOpts.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
-
connOpts.setMqttVersion(context.getProperty(PROP_MQTT_VERSION).asInteger());
-
connOpts.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
-
- PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE);
- if (sslProp.isSet()) {
- Properties sslProps =
transformSSLContextService((SSLContextService) sslProp.asControllerService());
- connOpts.setSSLProperties(sslProps);
- }
-
- PropertyValue lastWillTopicProp =
context.getProperty(PROP_LAST_WILL_TOPIC);
- if (lastWillTopicProp.isSet()){
- String lastWillMessage =
context.getProperty(PROP_LAST_WILL_MESSAGE).getValue();
- PropertyValue lastWillRetain =
context.getProperty(PROP_LAST_WILL_RETAIN);
- Integer lastWillQOS =
context.getProperty(PROP_LAST_WILL_QOS).asInteger();
- connOpts.setWill(lastWillTopicProp.getValue(),
lastWillMessage.getBytes(), lastWillQOS, lastWillRetain.isSet() ?
lastWillRetain.asBoolean() : false);
- }
-
-
- PropertyValue usernameProp = context.getProperty(PROP_USERNAME);
- if(usernameProp.isSet()) {
-
connOpts.setUserName(usernameProp.evaluateAttributeExpressions().getValue());
-
connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray());
- }
- }
+ protected void stopClient() {
+ // Since client is created in the onTrigger method it can happen that
it never will be created because of an initialization error.
+ // We are preventing additional nullPtrException here, but the clean
solution would be to create the client in the onScheduled method.
+ if (mqttClient != null) {
+ try {
+ logger.info("Disconnecting client");
+ mqttClient.disconnect();
+ } catch (Exception e) {
+ logger.error("Error disconnecting MQTT client due to {}", new
Object[]{e.getMessage()}, e);
+ }
- protected void onStopped() {
- try {
- logger.info("Disconnecting client");
- mqttClient.disconnect(DISCONNECT_TIMEOUT);
- } catch(MqttException me) {
- logger.error("Error disconnecting MQTT client due to {}", new
Object[]{me.getMessage()}, me);
- }
+ try {
+ logger.info("Closing client");
+ mqttClient.close();
+ } catch (Exception e) {
+ logger.error("Error closing MQTT client due to {}", new
Object[]{e.getMessage()}, e);
Review Comment:
The message could be removed along with the Object array wrapper, since the
stack trace includes the message.
```suggestion
logger.error("Error closing MQTT client", e);
```
##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.adapters;
+
+import com.hivemq.client.mqtt.datatypes.MqttQos;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder;
+import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect;
+import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder;
+import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.mqtt.common.MqttCallback;
+import org.apache.nifi.processors.mqtt.common.MqttClient;
+import org.apache.nifi.processors.mqtt.common.MqttClientProperties;
+import org.apache.nifi.processors.mqtt.common.MqttException;
+import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
+import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
+import org.apache.nifi.security.util.KeyStoreUtils;
+import org.apache.nifi.security.util.TlsException;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.mqtt.common.MqttProtocolScheme.SSL;
+import static org.apache.nifi.processors.mqtt.common.MqttProtocolScheme.WS;
+import static org.apache.nifi.processors.mqtt.common.MqttProtocolScheme.WSS;
+
+public class HiveMqV5ClientAdapter implements MqttClient {
+
+ private final Mqtt5BlockingClient mqtt5BlockingClient;
+ private final MqttClientProperties clientProperties;
+ private final ComponentLog logger;
+
+ private MqttCallback callback;
+
+ public HiveMqV5ClientAdapter(MqttClientProperties clientProperties,
ComponentLog logger) throws TlsException {
+ this.mqtt5BlockingClient = createClient(clientProperties, logger);
+ this.clientProperties = clientProperties;
+ this.logger = logger;
+ }
+
+ @Override
+ public boolean isConnected() {
+ return mqtt5BlockingClient.getState().isConnected();
+ }
+
+ @Override
+ public void connect() {
+ logger.debug("Connecting to broker");
+
+ final Mqtt5ConnectBuilder connectBuilder = Mqtt5Connect.builder()
+ .keepAlive(clientProperties.getKeepAliveInterval());
+
+ final boolean cleanSession = clientProperties.isCleanSession();
+ connectBuilder.cleanStart(cleanSession);
+ if (!cleanSession) {
+
connectBuilder.sessionExpiryInterval(clientProperties.getSessionExpiryInterval());
+ }
+
+ final String lastWillTopic = clientProperties.getLastWillTopic();
+ if (lastWillTopic != null) {
+ connectBuilder.willPublish()
+ .topic(lastWillTopic)
+ .payload(clientProperties.getLastWillMessage().getBytes())
+ .retain(clientProperties.getLastWillRetain())
+ .qos(MqttQos.fromCode(clientProperties.getLastWillQos()))
+ .applyWillPublish();
+ }
+
+ final String username = clientProperties.getUsername();
+ final String password = clientProperties.getPassword();
+ if (username != null && password != null) {
+ connectBuilder.simpleAuth()
+ .username(clientProperties.getUsername())
+ .password(password.getBytes(StandardCharsets.UTF_8))
+ .applySimpleAuth();
+ }
+
+ final Mqtt5Connect mqtt5Connect = connectBuilder.build();
+ mqtt5BlockingClient.connect(mqtt5Connect);
+ }
+
+ @Override
+ public void disconnect() {
+ logger.debug("Disconnecting client");
+ // Currently it is not possible to set timeout for disconnect with
HiveMQ Client.
+ mqtt5BlockingClient.disconnect();
+ }
+
+ @Override
+ public void close() {
+ // there is no paho's close equivalent in hivemq client
+ }
+
+ @Override
+ public void publish(String topic, StandardMqttMessage message) {
+ logger.debug("Publishing message to {} with QoS: {}", topic,
message.getQos());
+
+ mqtt5BlockingClient.publishWith()
+ .topic(topic)
+ .payload(message.getPayload())
+ .retain(message.isRetained())
+
.qos(Objects.requireNonNull(MqttQos.fromCode(message.getQos())))
+ .send();
+ }
+
+ @Override
+ public void subscribe(String topicFilter, int qos) {
+ Objects.requireNonNull(callback, "callback should be set");
+
+ logger.debug("Subscribing to {} with QoS: {}", topicFilter, qos);
+
+ CompletableFuture<Mqtt5SubAck> futureAck =
mqtt5BlockingClient.toAsync().subscribeWith()
+ .topicFilter(topicFilter)
+ .qos(Objects.requireNonNull(MqttQos.fromCode(qos)))
+ .callback(mqtt5Publish -> {
+ final ReceivedMqttMessage receivedMessage = new
ReceivedMqttMessage(
+ mqtt5Publish.getPayloadAsBytes(),
+ mqtt5Publish.getQos().getCode(),
+ mqtt5Publish.isRetain(),
+ mqtt5Publish.getTopic().toString());
+ callback.messageArrived(receivedMessage);
+ })
+ .send();
+
+ // Setting "listener" callback is only possible with async client,
though sending subscribe message
+ // should happen in a blocking way to make sure the processor is
blocked until ack is not arrived.
+ try {
+ Mqtt5SubAck ack =
futureAck.get(clientProperties.getConnectionTimeout(), TimeUnit.SECONDS);
+ logger.debug("Received mqtt5 subscribe ack: {}", ack.toString());
Review Comment:
The `toString()` call is not necessary.
```suggestion
logger.debug("Received mqtt5 subscribe ack: {}", ack);
```
##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientProperties.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.common;
+
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.net.URI;
+
+public class MqttClientProperties {
+ private URI brokerUri;
+ private String clientId;
+
+ private MqttVersion mqttVersion;
+
+ private int keepAliveInterval;
+ private int connectionTimeout;
+
+ private boolean cleanSession;
+ private Long sessionExpiryInterval;
+
+ private SSLContextService sslContextService;
Review Comment:
Instead of passing the `SSLContextService` reference, the `TlsConfiguration`
object should be used as it contains all of the necessary properties.
##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java:
##########
@@ -31,89 +31,89 @@
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.SSLContextService;
-import org.eclipse.paho.client.mqttv3.IMqttClient;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Properties;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import static org.apache.commons.lang3.EnumUtils.isValidEnumIgnoreCase;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
+import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_500;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
public abstract class AbstractMQTTProcessor extends
AbstractSessionFactoryProcessor {
- public static int DISCONNECT_TIMEOUT = 5000;
+ private static final String DEFAULT_SESSION_EXPIRY_INTERVAL = "24 hrs";
protected ComponentLog logger;
- protected IMqttClient mqttClient;
- protected volatile String broker;
- protected volatile String brokerUri;
- protected volatile String clientID;
- protected MqttConnectOptions connOpts;
- protected MemoryPersistence persistence = new MemoryPersistence();
- public ProcessSessionFactory processSessionFactory;
+ protected MqttClientProperties clientProperties;
- public static final Validator QOS_VALIDATOR = new Validator() {
+ protected MqttClientFactory mqttClientFactory = new MqttClientFactory();
+ protected MqttClient mqttClient;
- @Override
- public ValidationResult validate(String subject, String input,
ValidationContext context) {
- Integer inputInt = Integer.parseInt(input);
- if (inputInt < 0 || inputInt > 2) {
- return new
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must
be an integer between 0 and 2").build();
- }
- return new
ValidationResult.Builder().subject(subject).valid(true).build();
+ public ProcessSessionFactory processSessionFactory;
+
+ public static final Validator QOS_VALIDATOR = (subject, input, context) ->
{
+ Integer inputInt = Integer.parseInt(input);
+ if (inputInt < 0 || inputInt > 2) {
+ return new
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must
be an integer between 0 and 2").build();
}
+ return new
ValidationResult.Builder().subject(subject).valid(true).build();
};
- public static final Validator BROKER_VALIDATOR = new Validator() {
-
- @Override
- public ValidationResult validate(String subject, String input,
ValidationContext context) {
- try{
- URI brokerURI = new URI(input);
- if (!"".equals(brokerURI.getPath())) {
- return new
ValidationResult.Builder().subject(subject).valid(false).explanation("the
broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();
- }
- if (!("tcp".equals(brokerURI.getScheme()) ||
"ssl".equals(brokerURI.getScheme()) || "ws".equals(brokerURI.getScheme()) ||
"wss".equals(brokerURI.getScheme()))) {
- return new
ValidationResult.Builder().subject(subject).valid(false).explanation("only the
'tcp', 'ssl', 'ws' and 'wss' schemes are supported.").build();
- }
- } catch (URISyntaxException e) {
- return new
ValidationResult.Builder().subject(subject).valid(false).explanation("it is not
valid URI syntax.").build();
+ public static final Validator BROKER_VALIDATOR = (subject, input, context)
-> {
+ try {
+ URI brokerURI = new URI(input);
+ if (!EMPTY.equals(brokerURI.getPath())) {
+ return new
ValidationResult.Builder().subject(subject).valid(false).explanation("the
broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();
Review Comment:
There should be a space before the path.
```suggestion
return new
ValidationResult.Builder().subject(subject).valid(false).explanation("the
broker URI cannot have a path. It currently is: " +
brokerURI.getPath()).build();
```
##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java:
##########
@@ -384,16 +326,59 @@ public final void onTrigger(final ProcessContext context,
final ProcessSessionFa
onTrigger(context, session);
session.commitAsync();
} catch (final Throwable t) {
- getLogger().error("{} failed to process due to {}; rolling back
session", new Object[]{this, t});
+ getLogger().error("{} failed to process due to {}; rolling back
session", this, t);
session.rollback(true);
throw t;
}
}
public abstract void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException;
- protected boolean isConnected(){
+ protected boolean isConnected() {
return (mqttClient != null && mqttClient.isConnected());
}
+ protected MqttClientProperties getMqttClientProperties(final
ProcessContext context) {
+ final MqttClientProperties clientProperties = new
MqttClientProperties();
+
+ try {
+ clientProperties.setBrokerUri(new
URI(context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue()));
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+
+ String clientId =
context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue();
+ if (clientId == null) {
+ clientId = UUID.randomUUID().toString();
+ }
+ clientProperties.setClientId(clientId);
+
+
clientProperties.setMqttVersion(MqttVersion.fromVersionCode(context.getProperty(PROP_MQTT_VERSION).asInteger()));
+
+
clientProperties.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean());
+
clientProperties.setSessionExpiryInterval(context.getProperty(PROP_SESSION_EXPIRY_INTERVAL).asTimePeriod(TimeUnit.SECONDS));
+
+
clientProperties.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
+
clientProperties.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
+
+ final PropertyValue sslProp =
context.getProperty(PROP_SSL_CONTEXT_SERVICE);
+ if (sslProp.isSet()) {
+ clientProperties.setSslContextService((SSLContextService)
sslProp.asControllerService());
Review Comment:
As mentioned in the Client Properties, this can be adjusted to call
`createTlsConfiguration()` to pass the properties.
##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java:
##########
@@ -201,35 +191,35 @@ private void initializeClient(ProcessContext context) {
// non-null but not connected, so we need to handle each case and only
create a new client when it is null
try {
if (mqttClient == null) {
- logger.debug("Creating client");
- mqttClient = createMqttClient(broker, clientID, persistence);
+ mqttClient = createMqttClient();
mqttClient.setCallback(this);
}
if (!mqttClient.isConnected()) {
- logger.debug("Connecting client");
- mqttClient.connect(connOpts);
+ mqttClient.connect();
}
- } catch (MqttException e) {
- logger.error("Connection to {} lost (or was never connected) and
connection failed. Yielding processor", new Object[]{broker}, e);
+ } catch (Exception e) {
+ logger.error("Connection to {} lost (or was never connected) and
connection failed. Yielding processor", clientProperties.getBroker(), e);
context.yield();
}
}
@Override
public void connectionLost(Throwable cause) {
- logger.error("Connection to {} lost due to: {}", new Object[]{broker,
cause.getMessage()}, cause);
+ logger.error("Connection to {} lost due to: {}", new
Object[]{clientProperties.getBroker(), cause.getMessage()}, cause);
}
@Override
- public void messageArrived(String topic, MqttMessage message) throws
Exception {
- logger.error("Message arrived to a PublishMQTT processor { topic:'" +
topic +"; payload:"+ Arrays.toString(message.getPayload())+"}");
+ public void messageArrived(ReceivedMqttMessage message) {
+ // Unlikely situation. Api uses the same callback for publisher and
consumer as well.
+ // That's why we have this log message here to indicate something
really messy thing happened.
+ logger.error("Message arrived to a PublishMQTT processor { topic:'" +
message.getTopic() + "; payload:" + Arrays.toString(message.getPayload()) +
"}");
}
@Override
- public void deliveryComplete(IMqttDeliveryToken token) {
+ public void deliveryComplete(String token) {
// Client.publish waits for message to be delivered so this token will
always have a null message and is useless in this application.
- logger.trace("Received 'delivery complete' message from broker for:" +
token.toString());
+ logger.trace("Received 'delivery complete' message from broker for:" +
token);
Review Comment:
This should be changed to use a placeholder variable instead of
concatenation.
```suggestion
logger.trace("Received 'delivery complete' message from broker token
[{}]", token);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]