[ https://issues.apache.org/jira/browse/NIFI-1808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15269519#comment-15269519 ]
ASF GitHub Bot commented on NIFI-1808: -------------------------------------- Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/392#discussion_r61949840 --- Diff: nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/SubscribeMQTT.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.io.OutputStreamCallback; + +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor; +import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage; +import org.apache.nifi.ssl.SSLContextService; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.io.OutputStream; +import java.io.IOException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + + +@Tags({"subscribe", "MQTT", "IOT"}) +@CapabilityDescription("Subscribes to a topic and receives messages from an MQTT broker") +@SeeAlso({PublishMQTT.class}) +@WritesAttributes({@WritesAttribute(attribute="broker", description="MQTT broker that was the message source"), + @WritesAttribute(attribute="topic", description="MQTT topic on which message was received"), + @WritesAttribute(attribute="qos", description="The quality of service for this message."), + @WritesAttribute(attribute="isDuplicate", description="Whether or not this message might be a duplicate of one which has already been received."), + @WritesAttribute(attribute="isRetained", description="Whether or not this message was from a current publisher, or was \"retained\" by the server as the last message published on the topic.")}) +public class SubscribeMQTT extends AbstractMQTTProcessor { + + public static final PropertyDescriptor PROP_TOPIC_FILTER = new PropertyDescriptor.Builder() + .name("Topic Filter") + .description("The MQTT topic filter to designate the topics to subscribe to.") + .required(true) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_QOS = new PropertyDescriptor.Builder() + .name("Quality of Service(QoS)") + .description("The Quality of Service(QoS) to receive the message with. Accepts three values '0', '1' and '2'; '0' for 'at most once', '1' for 'at least once', '2' for 'exactly once'.") + .required(true) + .allowableValues("0", "1", "2") + .build(); + + + public static final PropertyDescriptor PROP_MAX_QUEUE_SIZE = new PropertyDescriptor.Builder() + .name("Max Queue Size") + .description("The MQTT messages are always being sent to subscribers on a topic. If the 'Run Schedule' is significantly behind the rate at which the messages are arriving to this " + + "processor then a back up can occur. This property specifies the maximum number of messages this processor will hold in memory at one time.") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + private String broker; + private ProcessorLog logger; + private MqttConnectOptions connOpts; + private MqttClient mqttClient; + private final Object mqttClientLock = new Object(); + private ScheduledExecutorService autoReconnectExecutor; + private final Object autoReconnectLock = new Object(); + private ScheduledFuture<?> autoReconnectScheduledFuture; + private static int AUTO_RECONNECT_PERIOD = 5; + private static int DISCONNECT_TIMEOUT = 5000; + private long maxQueueSize; + + private MemoryPersistence persistence = new MemoryPersistence(); + + LinkedBlockingQueue<MQTTQueueMessage> mqttQueue = new LinkedBlockingQueue<>(); + + public static final Relationship REL_MESSAGE = new Relationship.Builder() + .name("Message") + .description("The MQTT message output") + .build(); + + private List<PropertyDescriptor> descriptors; + + private Set<Relationship> relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + logger = getLogger(); + final List<PropertyDescriptor> descriptors = getAbstractPropertyDescriptors(); + descriptors.add(PROP_TOPIC_FILTER); + descriptors.add(PROP_QOS); + descriptors.add(PROP_MAX_QUEUE_SIZE); + + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<Relationship>(); + relationships.add(REL_MESSAGE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + try { + maxQueueSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asLong(); + + broker = context.getProperty(PROP_BROKER_URI).getValue(); + String clientID = context.getProperty(PROP_CLIENTID).getValue(); + + connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(true); + + PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE); + if (sslProp.isSet()) { + Properties sslProps = transformSSLContextService((SSLContextService) sslProp.asControllerService()); + connOpts.setSSLProperties(sslProps); + } + + PropertyValue lastWillTopicProp = context.getProperty(PROP_LAST_WILL_TOPIC); + if (lastWillTopicProp.isSet()){ + PropertyValue lastWillMessage = context.getProperty(PROP_LAST_WILL_TOPIC); + PropertyValue lastWillRetain = context.getProperty(PROP_LAST_WILL_TOPIC); + connOpts.setWill(lastWillTopicProp.getValue(), lastWillMessage.getValue().getBytes(), LAST_WILL_QOS, lastWillRetain.isSet() ? lastWillRetain.asBoolean() : false); + } + + PropertyValue usernameProp = context.getProperty(PROP_USERNAME); + if(usernameProp.isSet()) { + connOpts.setUserName(usernameProp.getValue()); + connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray()); + } + + String topicFilter = context.getProperty(PROP_TOPIC_FILTER).getValue(); + synchronized (mqttClientLock) { + mqttClient = new MqttClient(broker, clientID, persistence); + + mqttClient.setCallback(new SubscribeMQTTCallback()); + getLogger().info("Connecting to broker: " + broker); + mqttClient.connect(connOpts); + mqttClient.subscribe(topicFilter, context.getProperty(PROP_QOS).asInteger()); + } + autoReconnectExecutor = new ScheduledThreadPoolExecutor(1); + } catch(MqttException me) { + getLogger().error("msg "+me.getMessage()); + } + } --- End diff -- I agree, I will refactor it a bit > Create General MQTT Processors > ------------------------------ > > Key: NIFI-1808 > URL: https://issues.apache.org/jira/browse/NIFI-1808 > Project: Apache NiFi > Issue Type: Improvement > Reporter: Joseph Percivall > Assignee: Joseph Percivall > > MQTT[1] is a great "Internet of Things" (IoT) connectivity protocol that > implementing processors for would allow NiFi to continue expanding into the > IoT domain. A prime opportunity would be to use in conjunction with Apache > NiFi - MiNiFi. > [1] http://mqtt.org/ -- This message was sent by Atlassian JIRA (v6.3.4#6332)