nandorsoma commented on code in PR #6225:
URL: https://github.com/apache/nifi/pull/6225#discussion_r957859728


##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java:
##########
@@ -384,16 +326,59 @@ public final void onTrigger(final ProcessContext context, 
final ProcessSessionFa
             onTrigger(context, session);
             session.commitAsync();
         } catch (final Throwable t) {
-            getLogger().error("{} failed to process due to {}; rolling back 
session", new Object[]{this, t});
+            getLogger().error("{} failed to process due to {}; rolling back 
session", this, t);
             session.rollback(true);
             throw t;
         }
     }
 
     public abstract void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException;
 
-    protected boolean isConnected(){
+    protected boolean isConnected() {
         return (mqttClient != null && mqttClient.isConnected());
     }
 
+    protected MqttClientProperties getMqttClientProperties(final 
ProcessContext context) {
+        final MqttClientProperties clientProperties = new 
MqttClientProperties();
+
+        try {
+            clientProperties.setBrokerUri(new 
URI(context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue()));
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+        }
+
+        String clientId = 
context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue();
+        if (clientId == null) {
+            clientId = UUID.randomUUID().toString();
+        }
+        clientProperties.setClientId(clientId);
+
+        
clientProperties.setMqttVersion(MqttVersion.fromVersionCode(context.getProperty(PROP_MQTT_VERSION).asInteger()));
+
+        
clientProperties.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean());
+        
clientProperties.setSessionExpiryInterval(context.getProperty(PROP_SESSION_EXPIRY_INTERVAL).asTimePeriod(TimeUnit.SECONDS));
+
+        
clientProperties.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
+        
clientProperties.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
+
+        final PropertyValue sslProp = 
context.getProperty(PROP_SSL_CONTEXT_SERVICE);
+        if (sslProp.isSet()) {
+            clientProperties.setSslContextService((SSLContextService) 
sslProp.asControllerService());

Review Comment:
   Changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to