[ 
https://issues.apache.org/jira/browse/NIFI-1808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15291548#comment-15291548
 ] 

ASF GitHub Bot commented on NIFI-1808:
--------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/392#discussion_r63918531
  
    --- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
 ---
    @@ -0,0 +1,354 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.mqtt.common;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.expression.AttributeExpression;
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.eclipse.paho.client.mqttv3.IMqttClient;
    +import org.eclipse.paho.client.mqttv3.MqttCallback;
    +import org.eclipse.paho.client.mqttv3.MqttClient;
    +import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    +
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
    +import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
    +import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
    +import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
    +import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
    +import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
    +import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
    +import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
    +
    +public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProcessor {
    +
    +    protected ProcessorLog logger;
    +    protected IMqttClient mqttClient;
    +    protected final Object mqttClientConnectLock = new Object();
    +    protected volatile String broker;
    +    protected volatile String clientID;
    +    protected MqttConnectOptions connOpts;
    +    protected MemoryPersistence persistence = new MemoryPersistence();
    +
    +    public ProcessSessionFactory processSessionFactory;
    +
    +    public static final Validator QOS_VALIDATOR = new Validator() {
    +
    +        @Override
    +        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
    +            Integer inputInt = Integer.parseInt(input);
    +            if (inputInt < 0 || inputInt > 2) {
    +                return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must 
be an integer between 0 and 2").build();
    +            }
    +            return new 
ValidationResult.Builder().subject(subject).valid(true).build();
    +        }
    +    };
    +
    +    public static final Validator BROKER_VALIDATOR = new Validator() {
    +
    +        @Override
    +        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
    +            try{
    +                URI brokerURI = new URI(input);
    +                if (!"".equals(brokerURI.getPath())) {
    +                    return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("the 
broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();
    +                }
    +                if (!("tcp".equals(brokerURI.getScheme()) || 
"ssl".equals(brokerURI.getScheme()))) {
    +                    return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("only the 
'tcp' and 'ssl' schemes are supported.").build();
    +                }
    +            } catch (URISyntaxException e) {
    +                return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("it is not 
valid URI syntax.").build();
    +            }
    +            return new 
ValidationResult.Builder().subject(subject).valid(true).build();
    +        }
    +    };
    +
    +    public static final Validator RETAIN_VALIDATOR = new Validator() {
    +
    +        @Override
    +        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
    +            if("true".equalsIgnoreCase(input) || 
"false".equalsIgnoreCase(input)){
    +                return new 
ValidationResult.Builder().subject(subject).valid(true).build();
    +            } else{
    +                return 
StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.BOOLEAN,
 false)
    +                        .validate(subject, input, context);
    +            }
    +
    +        }
    +    };
    +
    +    public static final PropertyDescriptor PROP_BROKER_URI = new 
PropertyDescriptor.Builder()
    +            .name("Broker URI")
    +            .description("The URI to use to connect to the MQTT broker 
(e.g. tcp://localhost:1883)")
    +            .required(true)
    +            .addValidator(BROKER_VALIDATOR)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor PROP_CLIENTID = new 
PropertyDescriptor.Builder()
    +            .name("Client ID")
    +            .description("MQTT client ID to use")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_USERNAME = new 
PropertyDescriptor.Builder()
    +            .name("Username")
    +            .description("Username to use when connecting to the broker")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_PASSWORD = new 
PropertyDescriptor.Builder()
    +            .name("Password")
    +            .description("Password to use when connecting to the broker")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder().name("SSL Context Service")
    +            .description("The SSL Context Service used to provide client 
certificate information for TLS/SSL connections.")
    +            .required(false)
    +            .identifiesControllerService(SSLContextService.class)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor PROP_LAST_WILL_TOPIC = new 
PropertyDescriptor.Builder()
    +            .name("Last Will Topic")
    +            .description("The topic to send the client's Last Will to. If 
the Last Will topic and message are not set then a Last Will will not be sent.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_LAST_WILL_MESSAGE = new 
PropertyDescriptor.Builder()
    +            .name("Last Will Message")
    +            .description("The message to send as the client's Last Will. 
If the Last Will topic and message are not set then a Last Will will not be 
sent.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_LAST_WILL_RETAIN = new 
PropertyDescriptor.Builder()
    +            .name("Last Will Retain")
    +            .description("Whether to retain the client's Last Will. If the 
Last Will topic and message are not set then a Last Will will not be sent.")
    +            .required(false)
    +            .allowableValues("true","false")
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_LAST_WILL_QOS = new 
PropertyDescriptor.Builder()
    +            .name("Last Will QoS Level")
    +            .description("QoS level to be used when publishing the Last 
Will Message")
    +            .required(false)
    +            .allowableValues(
    +                    ALLOWABLE_VALUE_QOS_0,
    +                    ALLOWABLE_VALUE_QOS_1,
    +                    ALLOWABLE_VALUE_QOS_2
    +            )
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_CLEAN_SESSION = new 
PropertyDescriptor.Builder()
    +            .name("Session state")
    +            .description("Whether to start afresh or resume previous 
flows. See the allowable value descriptions for more details.")
    +            .required(true)
    +            .allowableValues(
    +                    ALLOWABLE_VALUE_CLEAN_SESSION_TRUE,
    +                    ALLOWABLE_VALUE_CLEAN_SESSION_FALSE
    +            )
    +            .defaultValue(ALLOWABLE_VALUE_CLEAN_SESSION_TRUE.getValue())
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_MQTT_VERSION = new 
PropertyDescriptor.Builder()
    +            .name("MQTT Specification Version")
    +            .description("The MQTT specification version when connecting 
with the broker. See the allowable value descriptions for more details.")
    +            .allowableValues(
    +                    ALLOWABLE_VALUE_MQTT_VERSION_AUTO,
    +                    ALLOWABLE_VALUE_MQTT_VERSION_311,
    +                    ALLOWABLE_VALUE_MQTT_VERSION_310
    +            )
    +            .defaultValue(ALLOWABLE_VALUE_MQTT_VERSION_AUTO.getValue())
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_CONN_TIMEOUT = new 
PropertyDescriptor.Builder()
    +            .name("Connection Timeout (seconds)")
    +            .description("Maximum time interval the client will wait for 
the network connection to the MQTT server " +
    +                    "to be established. The default timeout is 30 seconds. 
" +
    +                    "A value of 0 disables timeout processing meaning the 
client will wait until the network connection is made successfully or fails.")
    +            .required(false)
    +            .defaultValue("30")
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_KEEP_ALIVE_INTERVAL = new 
PropertyDescriptor.Builder()
    +            .name("Keep Alive Interval (seconds)")
    +            .description("Defines the maximum time interval between 
messages sent or received. It enables the " +
    +                    "client to detect if the server is no longer 
available, without having to wait for the TCP/IP timeout. " +
    +                    "The client will ensure that at least one message 
travels across the network within each keep alive period. In the absence of a 
data-related message during the time period, " +
    +                    "the client sends a very small \"ping\" message, which 
the server will acknowledge. A value of 0 disables keepalive processing in the 
client.")
    +            .required(false)
    +            .defaultValue("60")
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static List<PropertyDescriptor> 
getAbstractPropertyDescriptors(){
    +        final List<PropertyDescriptor> descriptors = new 
ArrayList<PropertyDescriptor>();
    +        descriptors.add(PROP_BROKER_URI);
    +        descriptors.add(PROP_CLIENTID);
    +        descriptors.add(PROP_USERNAME);
    +        descriptors.add(PROP_PASSWORD);
    +        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
    +        descriptors.add(PROP_LAST_WILL_TOPIC);
    +        descriptors.add(PROP_LAST_WILL_MESSAGE);
    +        descriptors.add(PROP_LAST_WILL_RETAIN);
    +        descriptors.add(PROP_LAST_WILL_QOS);
    +        descriptors.add(PROP_CLEAN_SESSION);
    +        descriptors.add(PROP_MQTT_VERSION);
    +        descriptors.add(PROP_CONN_TIMEOUT);
    +        descriptors.add(PROP_KEEP_ALIVE_INTERVAL);
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
    +        final List<ValidationResult> results = new ArrayList<>(1);
    +        final boolean usernameSet = 
validationContext.getProperty(PROP_USERNAME).isSet();
    +        final boolean passwordSet = 
validationContext.getProperty(PROP_PASSWORD).isSet();
    +
    +        if ((usernameSet && !passwordSet) || (!usernameSet && 
passwordSet)) {
    +            results.add(new ValidationResult.Builder().subject("Username 
and Password").valid(false).explanation("if username or password is set, both 
must be set").build());
    +        }
    +
    +        final boolean lastWillTopicSet = 
validationContext.getProperty(PROP_LAST_WILL_TOPIC).isSet();
    +        final boolean lastWillMessageSet = 
validationContext.getProperty(PROP_LAST_WILL_MESSAGE).isSet();
    +
    +        final boolean lastWillRetainSet = 
validationContext.getProperty(PROP_LAST_WILL_RETAIN).isSet();
    +        final boolean lastWillQosSet = 
validationContext.getProperty(PROP_LAST_WILL_QOS).isSet();
    +
    +        // If any of the Last Will Properties are set
    +        if(lastWillTopicSet || lastWillMessageSet || lastWillRetainSet || 
lastWillQosSet){
    +            // And any are not set
    +            if(!(lastWillTopicSet && lastWillMessageSet && 
lastWillRetainSet && lastWillQosSet)){
    +                // Then mark as invalid
    +                results.add(new ValidationResult.Builder().subject("Last 
Will Properties").valid(false).explanation("if any of the Last Will Properties 
(message, topic, retain and QoS) are " +
    +                        "set, all must be set.").build());
    +            }
    +        }
    +
    +        return results;
    +    }
    +
    +    public static Properties transformSSLContextService(SSLContextService 
sslContextService){
    +        Properties properties = new Properties();
    +        properties.setProperty("com.ibm.ssl.protocol", 
sslContextService.getSslAlgorithm());
    --- End diff --
    
    Great idea, will add that information


> Create General MQTT Processors
> ------------------------------
>
>                 Key: NIFI-1808
>                 URL: https://issues.apache.org/jira/browse/NIFI-1808
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Joseph Percivall
>            Assignee: Joseph Percivall
>
> MQTT[1] is a great "Internet of Things" (IoT) connectivity protocol that 
> implementing processors for would allow NiFi to continue expanding into the 
> IoT domain. A prime opportunity would be to use in conjunction with Apache 
> NiFi - MiNiFi.
> [1] http://mqtt.org/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to