[
https://issues.apache.org/jira/browse/NIFI-1808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15286796#comment-15286796
]
ASF GitHub Bot commented on NIFI-1808:
--------------------------------------
Github user JPercivall commented on a diff in the pull request:
https://github.com/apache/nifi/pull/392#discussion_r63539712
--- Diff:
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
---
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt.common;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+import org.eclipse.paho.client.mqttv3.IMqttClient;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
+import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
+import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
+import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
+import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
+import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
+import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
+
+public abstract class AbstractMQTTProcessor extends
AbstractSessionFactoryProcessor {
+
+ protected ProcessorLog logger;
+ protected IMqttClient mqttClient;
+ protected final Object mqttClientConnectLock = new Object();
+ protected volatile String broker;
+ protected volatile String clientID;
+ protected MqttConnectOptions connOpts;
+ protected MemoryPersistence persistence = new MemoryPersistence();
+ protected long maxTimeout;
+
+ public ProcessSessionFactory processSessionFactory;
+
+ public static final Validator QoSValidator = new Validator() {
+
+ @Override
+ public ValidationResult validate(String subject, String input,
ValidationContext context) {
+ Integer inputInt = Integer.parseInt(input);
+ if (inputInt < 0 || inputInt > 2) {
+ return new
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must
be an integer between 0 and 2").build();
+ }
+ return new
ValidationResult.Builder().subject(subject).valid(true).build();
+ }
+ };
+
+ public static final Validator BrokerValidator = new Validator() {
+
+ @Override
+ public ValidationResult validate(String subject, String input,
ValidationContext context) {
+ try{
+ URI brokerURI = new URI(input);
+ if (!"".equals(brokerURI.getPath())) {
+ return new
ValidationResult.Builder().subject(subject).valid(false).explanation("the
broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();
+ }
+ if (!("tcp".equals(brokerURI.getScheme()) ||
"ssl".equals(brokerURI.getScheme()))) {
+ return new
ValidationResult.Builder().subject(subject).valid(false).explanation("only the
'tcp' and 'ssl' schemes are supported.").build();
+ }
+ } catch (URISyntaxException e) {
+ return new
ValidationResult.Builder().subject(subject).valid(false).explanation("it is not
valid URI syntax.").build();
+ }
+ return new
ValidationResult.Builder().subject(subject).valid(true).build();
+ }
+ };
+
+ public static final Validator RetainValidator = new Validator() {
+
+ @Override
+ public ValidationResult validate(String subject, String input,
ValidationContext context) {
+ if("true".equalsIgnoreCase(input) ||
"false".equalsIgnoreCase(input)){
+ return new
ValidationResult.Builder().subject(subject).valid(true).build();
+ } else{
+ return
StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.BOOLEAN,
false)
+ .validate(subject, input, context);
+ }
+
+ }
+ };
+
+ public static final PropertyDescriptor PROP_BROKER_URI = new
PropertyDescriptor.Builder()
+ .name("Broker URI")
+ .description("The URI to use to connect to the MQTT broker
(e.g. tcp://localhost:1883)")
+ .required(true)
+ .addValidator(BrokerValidator)
+ .build();
+
+
+ public static final PropertyDescriptor PROP_CLIENTID = new
PropertyDescriptor.Builder()
+ .name("Client ID")
+ .description("MQTT client ID to use")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PROP_USERNAME = new
PropertyDescriptor.Builder()
+ .name("Username")
+ .description("Username to use when connecting to the broker")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PROP_PASSWORD = new
PropertyDescriptor.Builder()
+ .name("Password")
+ .description("Password to use when connecting to the broker")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new
PropertyDescriptor.Builder().name("SSL Context Service")
+ .description("The SSL Context Service used to provide client
certificate information for TLS/SSL connections.")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
+
+
+ public static final PropertyDescriptor PROP_LAST_WILL_TOPIC = new
PropertyDescriptor.Builder()
+ .name("Last Will Topic")
+ .description("The topic to send the client's Last Will to. If
the Last Will topic and message are not set then a Last Will will not be sent.")
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PROP_LAST_WILL_MESSAGE = new
PropertyDescriptor.Builder()
+ .name("Last Will Message")
+ .description("The message to send as the client's Last Will.
If the Last Will topic and message are not set then a Last Will will not be
sent.")
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PROP_LAST_WILL_RETAIN = new
PropertyDescriptor.Builder()
+ .name("Last Will Retain")
+ .description("Whether to retain the client's Last Will. If the
Last Will topic and message are not set then a Last Will will not be sent.")
+ .required(false)
+ .allowableValues("true","false")
+ .build();
+
+ public static final PropertyDescriptor PROP_LAST_WILL_QOS = new
PropertyDescriptor.Builder()
+ .name("'Last Will' QoS Level")
+ .description("QoS level to be used when publishing the Will
Message")
+ .required(false)
+ .allowableValues(
+ ALLOWABLE_VALUE_QOS_0,
+ ALLOWABLE_VALUE_QOS_1,
+ ALLOWABLE_VALUE_QOS_2
+ )
--- End diff --
This was intentional, because some applications may not have a Last Will
and there is no point forcing a use to have it set when it wouldn't be used.
That said I should have better validation to make sure it is set if the
other Last Will properties are.
> Create General MQTT Processors
> ------------------------------
>
> Key: NIFI-1808
> URL: https://issues.apache.org/jira/browse/NIFI-1808
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Joseph Percivall
> Assignee: Joseph Percivall
>
> MQTT[1] is a great "Internet of Things" (IoT) connectivity protocol that
> implementing processors for would allow NiFi to continue expanding into the
> IoT domain. A prime opportunity would be to use in conjunction with Apache
> NiFi - MiNiFi.
> [1] http://mqtt.org/
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)