[ 
https://issues.apache.org/jira/browse/NIFI-1808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15286785#comment-15286785
 ] 

ASF GitHub Bot commented on NIFI-1808:
--------------------------------------

Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/392#discussion_r63539083
  
    --- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
 ---
    @@ -0,0 +1,342 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.mqtt.common;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.expression.AttributeExpression;
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.eclipse.paho.client.mqttv3.IMqttClient;
    +import org.eclipse.paho.client.mqttv3.MqttClient;
    +import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    +
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
    +import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
    +import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
    +import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
    +import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
    +import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
    +import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
    +import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
    +
    +public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProcessor {
    +
    +    protected ProcessorLog logger;
    +    protected IMqttClient mqttClient;
    +    protected final Object mqttClientConnectLock = new Object();
    +    protected volatile String broker;
    +    protected volatile String clientID;
    +    protected MqttConnectOptions connOpts;
    +    protected MemoryPersistence persistence = new MemoryPersistence();
    +    protected long maxTimeout;
    +
    +    public ProcessSessionFactory processSessionFactory;
    +
    +    public static final Validator QoSValidator = new Validator() {
    +
    +        @Override
    +        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
    +            Integer inputInt = Integer.parseInt(input);
    +            if (inputInt < 0 || inputInt > 2) {
    +                return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must 
be an integer between 0 and 2").build();
    +            }
    +            return new 
ValidationResult.Builder().subject(subject).valid(true).build();
    +        }
    +    };
    +
    +    public static final Validator BrokerValidator = new Validator() {
    +
    +        @Override
    +        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
    +            try{
    +                URI brokerURI = new URI(input);
    +                if (!"".equals(brokerURI.getPath())) {
    +                    return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("the 
broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();
    +                }
    +                if (!("tcp".equals(brokerURI.getScheme()) || 
"ssl".equals(brokerURI.getScheme()))) {
    +                    return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("only the 
'tcp' and 'ssl' schemes are supported.").build();
    +                }
    +            } catch (URISyntaxException e) {
    +                return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("it is not 
valid URI syntax.").build();
    +            }
    +            return new 
ValidationResult.Builder().subject(subject).valid(true).build();
    +        }
    +    };
    +
    +    public static final Validator RetainValidator = new Validator() {
    +
    +        @Override
    +        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
    +            if("true".equalsIgnoreCase(input) || 
"false".equalsIgnoreCase(input)){
    +                return new 
ValidationResult.Builder().subject(subject).valid(true).build();
    +            } else{
    +                return 
StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.BOOLEAN,
 false)
    +                        .validate(subject, input, context);
    +            }
    +
    +        }
    +    };
    +
    +    public static final PropertyDescriptor PROP_BROKER_URI = new 
PropertyDescriptor.Builder()
    +            .name("Broker URI")
    +            .description("The URI to use to connect to the MQTT broker 
(e.g. tcp://localhost:1883)")
    +            .required(true)
    +            .addValidator(BrokerValidator)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor PROP_CLIENTID = new 
PropertyDescriptor.Builder()
    +            .name("Client ID")
    +            .description("MQTT client ID to use")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_USERNAME = new 
PropertyDescriptor.Builder()
    +            .name("Username")
    +            .description("Username to use when connecting to the broker")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_PASSWORD = new 
PropertyDescriptor.Builder()
    +            .name("Password")
    +            .description("Password to use when connecting to the broker")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder().name("SSL Context Service")
    +            .description("The SSL Context Service used to provide client 
certificate information for TLS/SSL connections.")
    +            .required(false)
    +            .identifiesControllerService(SSLContextService.class)
    +            .build();
    +
    +
    +    public static final PropertyDescriptor PROP_LAST_WILL_TOPIC = new 
PropertyDescriptor.Builder()
    +            .name("Last Will Topic")
    +            .description("The topic to send the client's Last Will to. If 
the Last Will topic and message are not set then a Last Will will not be sent.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_LAST_WILL_MESSAGE = new 
PropertyDescriptor.Builder()
    +            .name("Last Will Message")
    +            .description("The message to send as the client's Last Will. 
If the Last Will topic and message are not set then a Last Will will not be 
sent.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_LAST_WILL_RETAIN = new 
PropertyDescriptor.Builder()
    +            .name("Last Will Retain")
    +            .description("Whether to retain the client's Last Will. If the 
Last Will topic and message are not set then a Last Will will not be sent.")
    +            .required(false)
    +            .allowableValues("true","false")
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_LAST_WILL_QOS = new 
PropertyDescriptor.Builder()
    +            .name("'Last Will' QoS Level")
    +            .description("QoS level to be used when publishing the Will 
Message")
    +            .required(false)
    +            .allowableValues(
    +                    ALLOWABLE_VALUE_QOS_0,
    +                    ALLOWABLE_VALUE_QOS_1,
    +                    ALLOWABLE_VALUE_QOS_2
    +            )
    --- End diff --
    
    Should here be a default value or is this intentional?


> Create General MQTT Processors
> ------------------------------
>
>                 Key: NIFI-1808
>                 URL: https://issues.apache.org/jira/browse/NIFI-1808
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Joseph Percivall
>            Assignee: Joseph Percivall
>
> MQTT[1] is a great "Internet of Things" (IoT) connectivity protocol that 
> implementing processors for would allow NiFi to continue expanding into the 
> IoT domain. A prime opportunity would be to use in conjunction with Apache 
> NiFi - MiNiFi.
> [1] http://mqtt.org/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to