exceptionfactory commented on code in PR #6225:
URL: https://github.com/apache/nifi/pull/6225#discussion_r957500825


##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java:
##########
@@ -31,89 +31,89 @@
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.util.TlsException;
 import org.apache.nifi.ssl.SSLContextService;
-import org.eclipse.paho.client.mqttv3.IMqttClient;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Properties;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
+import static org.apache.commons.lang3.EnumUtils.isValidEnumIgnoreCase;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_500;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
 
 public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProcessor {
 
-    public static int DISCONNECT_TIMEOUT = 5000;
+    private static final String DEFAULT_SESSION_EXPIRY_INTERVAL = "24 hrs";
 
     protected ComponentLog logger;
-    protected IMqttClient mqttClient;
-    protected volatile String broker;
-    protected volatile String brokerUri;
-    protected volatile String clientID;
-    protected MqttConnectOptions connOpts;
-    protected MemoryPersistence persistence = new MemoryPersistence();
 
-    public ProcessSessionFactory processSessionFactory;
+    protected MqttClientProperties clientProperties;
 
-    public static final Validator QOS_VALIDATOR = new Validator() {
+    protected MqttClientFactory mqttClientFactory = new MqttClientFactory();
+    protected MqttClient mqttClient;
 
-        @Override
-        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
-            Integer inputInt = Integer.parseInt(input);
-            if (inputInt < 0 || inputInt > 2) {
-                return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must 
be an integer between 0 and 2").build();
-            }
-            return new 
ValidationResult.Builder().subject(subject).valid(true).build();
+    public ProcessSessionFactory processSessionFactory;
+
+    public static final Validator QOS_VALIDATOR = (subject, input, context) -> 
{
+        Integer inputInt = Integer.parseInt(input);
+        if (inputInt < 0 || inputInt > 2) {
+            return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must 
be an integer between 0 and 2").build();
         }
+        return new 
ValidationResult.Builder().subject(subject).valid(true).build();
     };
 
-    public static final Validator BROKER_VALIDATOR = new Validator() {
-
-        @Override
-        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
-            try{
-                URI brokerURI = new URI(input);
-                if (!"".equals(brokerURI.getPath())) {
-                    return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("the 
broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();
-                }
-                if (!("tcp".equals(brokerURI.getScheme()) || 
"ssl".equals(brokerURI.getScheme()) || "ws".equals(brokerURI.getScheme()) || 
"wss".equals(brokerURI.getScheme()))) {
-                    return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("only the 
'tcp', 'ssl', 'ws' and 'wss' schemes are supported.").build();
-                }
-            } catch (URISyntaxException e) {
-                return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("it is not 
valid URI syntax.").build();
+    public static final Validator BROKER_VALIDATOR = (subject, input, context) 
-> {
+        try {
+            URI brokerURI = new URI(input);
+            if (!EMPTY.equals(brokerURI.getPath())) {
+                return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("the 
broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();
             }
-            return new 
ValidationResult.Builder().subject(subject).valid(true).build();
+            if (!isValidEnumIgnoreCase(MqttProtocolScheme.class, 
brokerURI.getScheme())) {
+                return new 
ValidationResult.Builder().subject(subject).valid(false)
+                        .explanation("invalid scheme! supported schemes are: " 
+ MqttProtocolScheme.getValuesAsString(", ")).build();

Review Comment:
   Error messages should not use the exclamation mark character.
   ```suggestion
                           .explanation("Invalid Scheme: supported schemes are: 
" + MqttProtocolScheme.getValuesAsString(", ")).build();
   ```



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java:
##########
@@ -649,38 +625,44 @@ private void closeWriter(final RecordSetWriter writer) {
     }
 
     private String getTransitUri(String... appends) {
-        StringBuilder stringBuilder = new StringBuilder(brokerUri);
-        for(String append : appends) {
+        String broker = clientProperties.getBrokerUri().toString();
+        StringBuilder stringBuilder = new StringBuilder(broker.endsWith("/") ? 
broker : broker + "/");
+        for (String append : appends) {
             stringBuilder.append(append);
         }
         return stringBuilder.toString();
     }
 
     @Override
     public void connectionLost(Throwable cause) {
-        logger.error("Connection to {} lost due to: {}", new Object[]{broker, 
cause.getMessage()}, cause);
+        logger.error("Connection to {} lost due to: {}", new 
Object[]{clientProperties.getBroker(), cause.getMessage()}, cause);

Review Comment:
   ```suggestion
           logger.error("Connection to {} lost due to: {}", 
clientProperties.getBroker(), cause.getMessage(), cause);
   ```



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java:
##########
@@ -201,35 +191,35 @@ private void initializeClient(ProcessContext context) {
         // non-null but not connected, so we need to handle each case and only 
create a new client when it is null
         try {
             if (mqttClient == null) {
-                logger.debug("Creating client");
-                mqttClient = createMqttClient(broker, clientID, persistence);
+                mqttClient = createMqttClient();
                 mqttClient.setCallback(this);
             }
 
             if (!mqttClient.isConnected()) {
-                logger.debug("Connecting client");
-                mqttClient.connect(connOpts);
+                mqttClient.connect();
             }
-        } catch (MqttException e) {
-            logger.error("Connection to {} lost (or was never connected) and 
connection failed. Yielding processor", new Object[]{broker}, e);
+        } catch (Exception e) {
+            logger.error("Connection to {} lost (or was never connected) and 
connection failed. Yielding processor", clientProperties.getBroker(), e);
             context.yield();
         }
     }
 
     @Override
     public void connectionLost(Throwable cause) {
-        logger.error("Connection to {} lost due to: {}", new Object[]{broker, 
cause.getMessage()}, cause);
+        logger.error("Connection to {} lost due to: {}", new 
Object[]{clientProperties.getBroker(), cause.getMessage()}, cause);

Review Comment:
   The object array wrapper can be removed.
   ```suggestion
           logger.error("Connection to {} lost due to: {}", 
clientProperties.getBroker(), cause.getMessage(), cause);
   ```



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttProtocolScheme.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.common;
+
+import java.util.Arrays;
+
+public enum MqttProtocolScheme {
+    TCP,
+    SSL,
+    WS,
+    WSS;
+
+    public static String getValuesAsString(String delimiter) {
+        return String.join(delimiter, Arrays.stream(values()).map(value -> 
value.name().toLowerCase()).toArray(String[]::new));
+    }

Review Comment:
   It looks like this method is used only in reference to an error message. 
Recommend removing it from the `enum` and creating a local private method 
instead.



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java:
##########
@@ -649,38 +625,44 @@ private void closeWriter(final RecordSetWriter writer) {
     }
 
     private String getTransitUri(String... appends) {
-        StringBuilder stringBuilder = new StringBuilder(brokerUri);
-        for(String append : appends) {
+        String broker = clientProperties.getBrokerUri().toString();
+        StringBuilder stringBuilder = new StringBuilder(broker.endsWith("/") ? 
broker : broker + "/");
+        for (String append : appends) {
             stringBuilder.append(append);
         }
         return stringBuilder.toString();
     }
 
     @Override
     public void connectionLost(Throwable cause) {
-        logger.error("Connection to {} lost due to: {}", new Object[]{broker, 
cause.getMessage()}, cause);
+        logger.error("Connection to {} lost due to: {}", new 
Object[]{clientProperties.getBroker(), cause.getMessage()}, cause);
     }
 
     @Override
-    public void messageArrived(String topic, MqttMessage message) throws 
Exception {
+    public void messageArrived(ReceivedMqttMessage message) {
         if (logger.isDebugEnabled()) {
             byte[] payload = message.getPayload();
-            String text = new String(payload, "UTF-8");
+            String text = new String(payload, StandardCharsets.UTF_8);
             if (StringUtils.isAsciiPrintable(text)) {
-                logger.debug("Message arrived from topic {}. Payload: {}", new 
Object[] {topic, text});
+                logger.debug("Message arrived from topic {}. Payload: {}", 
message.getTopic(), text);
             } else {
-                logger.debug("Message arrived from topic {}. Binary value of 
size {}", new Object[] {topic, payload.length});
+                logger.debug("Message arrived from topic {}. Binary value of 
size {}", message.getTopic(), payload.length);
             }
         }
 
-        if(!mqttQueue.offer(new MQTTQueueMessage(topic, message), 1, 
TimeUnit.SECONDS)) {
-            throw new IllegalStateException("The subscriber queue is full, 
cannot receive another message until the processor is scheduled to run.");
+        try {
+            if (!mqttQueue.offer(message, 1, TimeUnit.SECONDS)) {
+                throw new IllegalStateException("The subscriber queue is full, 
cannot receive another message until the processor is scheduled to run.");
+            }
+        } catch (InterruptedException e) {
+            throw new MqttException("Failed to process message arrived from 
topic " + message.getTopic());
         }
     }
 
     @Override
-    public void deliveryComplete(IMqttDeliveryToken token) {
-        logger.warn("Received MQTT 'delivery complete' message to subscriber: 
" + token);
+    public void deliveryComplete(String token) {
+        // Unlikely situation. Api uses the same callback for publisher and 
consumer as well.
+        // That's why we have this log message here to indicate something 
really messy thing happened.
+        logger.error("Received MQTT 'delivery complete' message to subscriber: 
" + token);

Review Comment:
   ```suggestion
           logger.error("Received MQTT 'delivery complete' message to 
subscriber token [{}]", token);
   ```



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java:
##########
@@ -384,16 +326,59 @@ public final void onTrigger(final ProcessContext context, 
final ProcessSessionFa
             onTrigger(context, session);
             session.commitAsync();
         } catch (final Throwable t) {
-            getLogger().error("{} failed to process due to {}; rolling back 
session", new Object[]{this, t});
+            getLogger().error("{} failed to process due to {}; rolling back 
session", this, t);
             session.rollback(true);
             throw t;
         }
     }
 
     public abstract void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException;
 
-    protected boolean isConnected(){
+    protected boolean isConnected() {
         return (mqttClient != null && mqttClient.isConnected());
     }
 
+    protected MqttClientProperties getMqttClientProperties(final 
ProcessContext context) {
+        final MqttClientProperties clientProperties = new 
MqttClientProperties();
+
+        try {
+            clientProperties.setBrokerUri(new 
URI(context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue()));
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);

Review Comment:
   The error should be changed to an `IllegalArgumentException` and should 
include a message.
   ```suggestion
               throw new IllegalArgumentException("Invalid Broker URI", e);
   ```



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java:
##########
@@ -289,88 +285,34 @@ public Collection<ValidationResult> customValidate(final 
ValidationContext valid
         return results;
     }
 
-    public static Properties transformSSLContextService(SSLContextService 
sslContextService){
-        Properties properties = new Properties();
-        if (sslContextService.getSslAlgorithm() != null) {
-            properties.setProperty("com.ibm.ssl.protocol", 
sslContextService.getSslAlgorithm());
-        }
-        if (sslContextService.getKeyStoreFile() != null) {
-            properties.setProperty("com.ibm.ssl.keyStore", 
sslContextService.getKeyStoreFile());
-        }
-        if (sslContextService.getKeyStorePassword() != null) {
-            properties.setProperty("com.ibm.ssl.keyStorePassword", 
sslContextService.getKeyStorePassword());
-        }
-        if (sslContextService.getKeyStoreType() != null) {
-            properties.setProperty("com.ibm.ssl.keyStoreType", 
sslContextService.getKeyStoreType());
-        }
-        if (sslContextService.getTrustStoreFile() != null) {
-            properties.setProperty("com.ibm.ssl.trustStore", 
sslContextService.getTrustStoreFile());
-        }
-        if (sslContextService.getTrustStorePassword() != null) {
-            properties.setProperty("com.ibm.ssl.trustStorePassword", 
sslContextService.getTrustStorePassword());
-        }
-        if (sslContextService.getTrustStoreType() != null) {
-            properties.setProperty("com.ibm.ssl.trustStoreType", 
sslContextService.getTrustStoreType());
-        }
-        return  properties;
+    protected void onScheduled(final ProcessContext context) {
+        clientProperties = getMqttClientProperties(context);
     }
 
-    protected void onScheduled(final ProcessContext context){
-        broker = 
context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue();
-        brokerUri = broker.endsWith("/") ? broker : broker + "/";
-        clientID = 
context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue();
-
-        if (clientID == null) {
-            clientID = UUID.randomUUID().toString();
-        }
-
-        connOpts = new MqttConnectOptions();
-        
connOpts.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean());
-        
connOpts.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
-        
connOpts.setMqttVersion(context.getProperty(PROP_MQTT_VERSION).asInteger());
-        
connOpts.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
-
-        PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE);
-        if (sslProp.isSet()) {
-            Properties sslProps = 
transformSSLContextService((SSLContextService) sslProp.asControllerService());
-            connOpts.setSSLProperties(sslProps);
-        }
-
-        PropertyValue lastWillTopicProp = 
context.getProperty(PROP_LAST_WILL_TOPIC);
-        if (lastWillTopicProp.isSet()){
-            String lastWillMessage = 
context.getProperty(PROP_LAST_WILL_MESSAGE).getValue();
-            PropertyValue lastWillRetain = 
context.getProperty(PROP_LAST_WILL_RETAIN);
-            Integer lastWillQOS = 
context.getProperty(PROP_LAST_WILL_QOS).asInteger();
-            connOpts.setWill(lastWillTopicProp.getValue(), 
lastWillMessage.getBytes(), lastWillQOS, lastWillRetain.isSet() ? 
lastWillRetain.asBoolean() : false);
-        }
-
-
-        PropertyValue usernameProp = context.getProperty(PROP_USERNAME);
-        if(usernameProp.isSet()) {
-            
connOpts.setUserName(usernameProp.evaluateAttributeExpressions().getValue());
-            
connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray());
-        }
-    }
+    protected void stopClient() {
+        // Since client is created in the onTrigger method it can happen that 
it never will be created because of an initialization error.
+        // We are preventing additional nullPtrException here, but the clean 
solution would be to create the client in the onScheduled method.
+        if (mqttClient != null) {
+            try {
+                logger.info("Disconnecting client");
+                mqttClient.disconnect();
+            } catch (Exception e) {
+                logger.error("Error disconnecting MQTT client due to {}", new 
Object[]{e.getMessage()}, e);
+            }
 
-    protected void onStopped() {
-        try {
-            logger.info("Disconnecting client");
-            mqttClient.disconnect(DISCONNECT_TIMEOUT);
-        } catch(MqttException me) {
-            logger.error("Error disconnecting MQTT client due to {}", new 
Object[]{me.getMessage()}, me);
-        }
+            try {
+                logger.info("Closing client");
+                mqttClient.close();
+            } catch (Exception e) {
+                logger.error("Error closing MQTT client due to {}", new 
Object[]{e.getMessage()}, e);

Review Comment:
   The message could be removed along with the Object array wrapper, since the 
stack trace includes the message.
   ```suggestion
                   logger.error("Error closing MQTT client", e);
   ```



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.adapters;
+
+import com.hivemq.client.mqtt.datatypes.MqttQos;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder;
+import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect;
+import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder;
+import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.mqtt.common.MqttCallback;
+import org.apache.nifi.processors.mqtt.common.MqttClient;
+import org.apache.nifi.processors.mqtt.common.MqttClientProperties;
+import org.apache.nifi.processors.mqtt.common.MqttException;
+import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
+import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
+import org.apache.nifi.security.util.KeyStoreUtils;
+import org.apache.nifi.security.util.TlsException;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.mqtt.common.MqttProtocolScheme.SSL;
+import static org.apache.nifi.processors.mqtt.common.MqttProtocolScheme.WS;
+import static org.apache.nifi.processors.mqtt.common.MqttProtocolScheme.WSS;
+
+public class HiveMqV5ClientAdapter implements MqttClient {
+
+    private final Mqtt5BlockingClient mqtt5BlockingClient;
+    private final MqttClientProperties clientProperties;
+    private final ComponentLog logger;
+
+    private MqttCallback callback;
+
+    public HiveMqV5ClientAdapter(MqttClientProperties clientProperties, 
ComponentLog logger) throws TlsException {
+        this.mqtt5BlockingClient = createClient(clientProperties, logger);
+        this.clientProperties = clientProperties;
+        this.logger = logger;
+    }
+
+    @Override
+    public boolean isConnected() {
+        return mqtt5BlockingClient.getState().isConnected();
+    }
+
+    @Override
+    public void connect() {
+        logger.debug("Connecting to broker");
+
+        final Mqtt5ConnectBuilder connectBuilder = Mqtt5Connect.builder()
+                .keepAlive(clientProperties.getKeepAliveInterval());
+
+        final boolean cleanSession = clientProperties.isCleanSession();
+        connectBuilder.cleanStart(cleanSession);
+        if (!cleanSession) {
+            
connectBuilder.sessionExpiryInterval(clientProperties.getSessionExpiryInterval());
+        }
+
+        final String lastWillTopic = clientProperties.getLastWillTopic();
+        if (lastWillTopic != null) {
+            connectBuilder.willPublish()
+                    .topic(lastWillTopic)
+                    .payload(clientProperties.getLastWillMessage().getBytes())
+                    .retain(clientProperties.getLastWillRetain())
+                    .qos(MqttQos.fromCode(clientProperties.getLastWillQos()))
+                    .applyWillPublish();
+        }
+
+        final String username = clientProperties.getUsername();
+        final String password = clientProperties.getPassword();
+        if (username != null && password != null) {
+            connectBuilder.simpleAuth()
+                    .username(clientProperties.getUsername())
+                    .password(password.getBytes(StandardCharsets.UTF_8))
+                    .applySimpleAuth();
+        }
+
+        final Mqtt5Connect mqtt5Connect = connectBuilder.build();
+        mqtt5BlockingClient.connect(mqtt5Connect);
+    }
+
+    @Override
+    public void disconnect() {
+        logger.debug("Disconnecting client");
+        // Currently it is not possible to set timeout for disconnect with 
HiveMQ Client.
+        mqtt5BlockingClient.disconnect();
+    }
+
+    @Override
+    public void close() {
+        // there is no paho's close equivalent in hivemq client
+    }
+
+    @Override
+    public void publish(String topic, StandardMqttMessage message) {
+        logger.debug("Publishing message to {} with QoS: {}", topic, 
message.getQos());
+
+        mqtt5BlockingClient.publishWith()
+                .topic(topic)
+                .payload(message.getPayload())
+                .retain(message.isRetained())
+                
.qos(Objects.requireNonNull(MqttQos.fromCode(message.getQos())))
+                .send();
+    }
+
+    @Override
+    public void subscribe(String topicFilter, int qos) {
+        Objects.requireNonNull(callback, "callback should be set");
+
+        logger.debug("Subscribing to {} with QoS: {}", topicFilter, qos);
+
+        CompletableFuture<Mqtt5SubAck> futureAck = 
mqtt5BlockingClient.toAsync().subscribeWith()
+                .topicFilter(topicFilter)
+                .qos(Objects.requireNonNull(MqttQos.fromCode(qos)))
+                .callback(mqtt5Publish -> {
+                    final ReceivedMqttMessage receivedMessage = new 
ReceivedMqttMessage(
+                            mqtt5Publish.getPayloadAsBytes(),
+                            mqtt5Publish.getQos().getCode(),
+                            mqtt5Publish.isRetain(),
+                            mqtt5Publish.getTopic().toString());
+                    callback.messageArrived(receivedMessage);
+                })
+                .send();
+
+        // Setting "listener" callback is only possible with async client, 
though sending subscribe message
+        // should happen in a blocking way to make sure the processor is 
blocked until ack is not arrived.
+        try {
+            Mqtt5SubAck ack = 
futureAck.get(clientProperties.getConnectionTimeout(), TimeUnit.SECONDS);
+            logger.debug("Received mqtt5 subscribe ack: {}", ack.toString());

Review Comment:
   The `toString()` call is not necessary.
   ```suggestion
               logger.debug("Received mqtt5 subscribe ack: {}", ack);
   ```



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientProperties.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.common;
+
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.net.URI;
+
+public class MqttClientProperties {
+    private URI brokerUri;
+    private String clientId;
+
+    private MqttVersion mqttVersion;
+
+    private int keepAliveInterval;
+    private int connectionTimeout;
+
+    private boolean cleanSession;
+    private Long sessionExpiryInterval;
+
+    private SSLContextService sslContextService;

Review Comment:
   Instead of passing the `SSLContextService` reference, the `TlsConfiguration` 
object should be used as it contains all of the necessary properties.



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java:
##########
@@ -31,89 +31,89 @@
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.util.TlsException;
 import org.apache.nifi.ssl.SSLContextService;
-import org.eclipse.paho.client.mqttv3.IMqttClient;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Properties;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
+import static org.apache.commons.lang3.EnumUtils.isValidEnumIgnoreCase;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_500;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
 import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
 
 public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProcessor {
 
-    public static int DISCONNECT_TIMEOUT = 5000;
+    private static final String DEFAULT_SESSION_EXPIRY_INTERVAL = "24 hrs";
 
     protected ComponentLog logger;
-    protected IMqttClient mqttClient;
-    protected volatile String broker;
-    protected volatile String brokerUri;
-    protected volatile String clientID;
-    protected MqttConnectOptions connOpts;
-    protected MemoryPersistence persistence = new MemoryPersistence();
 
-    public ProcessSessionFactory processSessionFactory;
+    protected MqttClientProperties clientProperties;
 
-    public static final Validator QOS_VALIDATOR = new Validator() {
+    protected MqttClientFactory mqttClientFactory = new MqttClientFactory();
+    protected MqttClient mqttClient;
 
-        @Override
-        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
-            Integer inputInt = Integer.parseInt(input);
-            if (inputInt < 0 || inputInt > 2) {
-                return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must 
be an integer between 0 and 2").build();
-            }
-            return new 
ValidationResult.Builder().subject(subject).valid(true).build();
+    public ProcessSessionFactory processSessionFactory;
+
+    public static final Validator QOS_VALIDATOR = (subject, input, context) -> 
{
+        Integer inputInt = Integer.parseInt(input);
+        if (inputInt < 0 || inputInt > 2) {
+            return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must 
be an integer between 0 and 2").build();
         }
+        return new 
ValidationResult.Builder().subject(subject).valid(true).build();
     };
 
-    public static final Validator BROKER_VALIDATOR = new Validator() {
-
-        @Override
-        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
-            try{
-                URI brokerURI = new URI(input);
-                if (!"".equals(brokerURI.getPath())) {
-                    return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("the 
broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();
-                }
-                if (!("tcp".equals(brokerURI.getScheme()) || 
"ssl".equals(brokerURI.getScheme()) || "ws".equals(brokerURI.getScheme()) || 
"wss".equals(brokerURI.getScheme()))) {
-                    return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("only the 
'tcp', 'ssl', 'ws' and 'wss' schemes are supported.").build();
-                }
-            } catch (URISyntaxException e) {
-                return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("it is not 
valid URI syntax.").build();
+    public static final Validator BROKER_VALIDATOR = (subject, input, context) 
-> {
+        try {
+            URI brokerURI = new URI(input);
+            if (!EMPTY.equals(brokerURI.getPath())) {
+                return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("the 
broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();

Review Comment:
   There should be a space before the path.
   ```suggestion
                   return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("the 
broker URI cannot have a path. It currently is: " + 
brokerURI.getPath()).build();
   ```



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java:
##########
@@ -384,16 +326,59 @@ public final void onTrigger(final ProcessContext context, 
final ProcessSessionFa
             onTrigger(context, session);
             session.commitAsync();
         } catch (final Throwable t) {
-            getLogger().error("{} failed to process due to {}; rolling back 
session", new Object[]{this, t});
+            getLogger().error("{} failed to process due to {}; rolling back 
session", this, t);
             session.rollback(true);
             throw t;
         }
     }
 
     public abstract void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException;
 
-    protected boolean isConnected(){
+    protected boolean isConnected() {
         return (mqttClient != null && mqttClient.isConnected());
     }
 
+    protected MqttClientProperties getMqttClientProperties(final 
ProcessContext context) {
+        final MqttClientProperties clientProperties = new 
MqttClientProperties();
+
+        try {
+            clientProperties.setBrokerUri(new 
URI(context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue()));
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+
+        String clientId = 
context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue();
+        if (clientId == null) {
+            clientId = UUID.randomUUID().toString();
+        }
+        clientProperties.setClientId(clientId);
+
+        
clientProperties.setMqttVersion(MqttVersion.fromVersionCode(context.getProperty(PROP_MQTT_VERSION).asInteger()));
+
+        
clientProperties.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean());
+        
clientProperties.setSessionExpiryInterval(context.getProperty(PROP_SESSION_EXPIRY_INTERVAL).asTimePeriod(TimeUnit.SECONDS));
+
+        
clientProperties.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
+        
clientProperties.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
+
+        final PropertyValue sslProp = 
context.getProperty(PROP_SSL_CONTEXT_SERVICE);
+        if (sslProp.isSet()) {
+            clientProperties.setSslContextService((SSLContextService) 
sslProp.asControllerService());

Review Comment:
   As mentioned in the Client Properties, this can be adjusted to call 
`createTlsConfiguration()` to pass the properties.



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java:
##########
@@ -201,35 +191,35 @@ private void initializeClient(ProcessContext context) {
         // non-null but not connected, so we need to handle each case and only 
create a new client when it is null
         try {
             if (mqttClient == null) {
-                logger.debug("Creating client");
-                mqttClient = createMqttClient(broker, clientID, persistence);
+                mqttClient = createMqttClient();
                 mqttClient.setCallback(this);
             }
 
             if (!mqttClient.isConnected()) {
-                logger.debug("Connecting client");
-                mqttClient.connect(connOpts);
+                mqttClient.connect();
             }
-        } catch (MqttException e) {
-            logger.error("Connection to {} lost (or was never connected) and 
connection failed. Yielding processor", new Object[]{broker}, e);
+        } catch (Exception e) {
+            logger.error("Connection to {} lost (or was never connected) and 
connection failed. Yielding processor", clientProperties.getBroker(), e);
             context.yield();
         }
     }
 
     @Override
     public void connectionLost(Throwable cause) {
-        logger.error("Connection to {} lost due to: {}", new Object[]{broker, 
cause.getMessage()}, cause);
+        logger.error("Connection to {} lost due to: {}", new 
Object[]{clientProperties.getBroker(), cause.getMessage()}, cause);
     }
 
     @Override
-    public void messageArrived(String topic, MqttMessage message) throws 
Exception {
-        logger.error("Message arrived to a PublishMQTT processor { topic:'" + 
topic +"; payload:"+ Arrays.toString(message.getPayload())+"}");
+    public void messageArrived(ReceivedMqttMessage message) {
+        // Unlikely situation. Api uses the same callback for publisher and 
consumer as well.
+        // That's why we have this log message here to indicate something 
really messy thing happened.
+        logger.error("Message arrived to a PublishMQTT processor { topic:'" + 
message.getTopic() + "; payload:" + Arrays.toString(message.getPayload()) + 
"}");
     }
 
     @Override
-    public void deliveryComplete(IMqttDeliveryToken token) {
+    public void deliveryComplete(String token) {
         // Client.publish waits for message to be delivered so this token will 
always have a null message and is useless in this application.
-        logger.trace("Received 'delivery complete' message from broker for:" + 
token.toString());
+        logger.trace("Received 'delivery complete' message from broker for:" + 
token);

Review Comment:
   This should be changed to use a placeholder variable instead of 
concatenation.
   ```suggestion
           logger.trace("Received 'delivery complete' message from broker token 
[{}]", token);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to