[
https://issues.apache.org/jira/browse/NIFI-1808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15269023#comment-15269023
]
ASF GitHub Bot commented on NIFI-1808:
--------------------------------------
Github user olegz commented on a diff in the pull request:
https://github.com/apache/nifi/pull/392#discussion_r61913545
--- Diff:
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
---
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.io.InputStream;
+import java.io.IOException;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"publish", "MQTT", "IOT"})
+@CapabilityDescription("Publishes a message to an MQTT topic")
+@SeeAlso({SubscribeMQTT.class})
+public class PublishMQTT extends AbstractMQTTProcessor {
+
+ private String broker;
+ private MemoryPersistence persistence = new MemoryPersistence();
+ private MqttClient mqttClient;
+ private final Object mqttClientLock = new Object();
+
+ public static final PropertyDescriptor PROP_TOPIC = new
PropertyDescriptor.Builder()
+ .name("Topic")
+ .description("The topic to publish the message to.")
+ .expressionLanguageSupported(true)
+ .required(true)
+
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
true))
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PROP_QOS = new
PropertyDescriptor.Builder()
+ .name("Quality of Service(QoS)")
+ .description("The Quality of Service(QoS) to send the message
with. Accepts three values '0', '1' and '2'; '0' for 'at most once', '1' for
'at least once', '2' for 'exactly once'.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(QoSValidator)
+ .build();
+
+ public static final PropertyDescriptor PROP_RETAIN = new
PropertyDescriptor.Builder()
+ .name("Retain Message")
+ .description("Whether or not the retain flag should be set on
the MQTT message.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(RetainValidator)
+ .build();
+
+ public static final PropertyDescriptor PROP_TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("QoS Timeout")
+ .description("How many milliseconds to wait for the broker to
acknowledge delivery of a message before routing the message back to the source
queue for retry. Only applicable " +
+ "when QoS is 1 or 2")
+ .required(true)
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+
+ private List<PropertyDescriptor> descriptors;
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("FlowFiles that are sent successfully to the
destination are sent out this relationship.")
+ .build();
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("FlowFiles that failed to send to the destination
are sent out this relationship.")
+ .build();
+
+ private Set<Relationship> relationships;
+ private ProcessorLog logger;
+ private MqttConnectOptions connOpts;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ logger = getLogger();
+ final List<PropertyDescriptor> descriptors =
getAbstractPropertyDescriptors();
+ descriptors.add(PROP_TOPIC);
+ descriptors.add(PROP_QOS);
+ descriptors.add(PROP_RETAIN);
+ descriptors.add(PROP_TIMEOUT);
+ this.descriptors = Collections.unmodifiableList(descriptors);
+
+ final Set<Relationship> relationships = new
HashSet<Relationship>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return this.relationships;
+ }
+
+ @Override
+ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
+ return descriptors;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ try {
+ broker = context.getProperty(PROP_BROKER_URI).getValue();
+ String clientID =
context.getProperty(PROP_CLIENTID).getValue();
+
+ connOpts = new MqttConnectOptions();
+ connOpts.setCleanSession(true);
+
+ PropertyValue sslProp =
context.getProperty(PROP_SSL_CONTEXT_SERVICE);
+ if (sslProp.isSet()) {
+ Properties sslProps =
transformSSLContextService((SSLContextService) sslProp.asControllerService());
+ connOpts.setSSLProperties(sslProps);
+ }
+
+ PropertyValue lastWillTopicProp =
context.getProperty(PROP_LAST_WILL_TOPIC);
+ if (lastWillTopicProp.isSet()){
+ PropertyValue lastWillMessage =
context.getProperty(PROP_LAST_WILL_TOPIC);
+ PropertyValue lastWillRetain =
context.getProperty(PROP_LAST_WILL_TOPIC);
+ connOpts.setWill(lastWillTopicProp.getValue(),
lastWillMessage.getValue().getBytes(), LAST_WILL_QOS, lastWillRetain.isSet() ?
lastWillRetain.asBoolean() : false);
+ }
+
+ PropertyValue usernameProp =
context.getProperty(PROP_USERNAME);
+ if(usernameProp.isSet()) {
+ connOpts.setUserName(usernameProp.getValue());
+
connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray());
+ }
+
+ synchronized (mqttClientLock) {
+ mqttClient = new MqttClient(broker, clientID, persistence);
+
+ mqttClient.setCallback(new PublishMQTTCallback());
+ getLogger().info("Connecting to broker: " + broker);
+ mqttClient.connect(connOpts);
+ }
+ } catch(MqttException me) {
+ logger.error("Failed to initialize the connection to the
"+me.getMessage());
+ }
+ }
+
+ @OnStopped
+ public void onStop(final ProcessContext context) {
+ try {
+ synchronized (mqttClientLock) {
+ if (mqttClient != null && mqttClient.isConnected()) {
+ mqttClient.disconnect();
+ }
+ }
+ logger.info("Disconnected the MQTT client.");
--- End diff --
Should the above log be moved inside of IF?
> Create General MQTT Processors
> ------------------------------
>
> Key: NIFI-1808
> URL: https://issues.apache.org/jira/browse/NIFI-1808
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Joseph Percivall
> Assignee: Joseph Percivall
>
> MQTT[1] is a great "Internet of Things" (IoT) connectivity protocol that
> implementing processors for would allow NiFi to continue expanding into the
> IoT domain. A prime opportunity would be to use in conjunction with Apache
> NiFi - MiNiFi.
> [1] http://mqtt.org/
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)