[ 
https://issues.apache.org/jira/browse/NIFI-1808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15269518#comment-15269518
 ] 

ASF GitHub Bot commented on NIFI-1808:
--------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/392#discussion_r61949787
  
    --- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/SubscribeMQTT.java
 ---
    @@ -0,0 +1,301 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.mqtt;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
    +import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.eclipse.paho.client.mqttv3.MqttCallback;
    +import org.eclipse.paho.client.mqttv3.MqttClient;
    +import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +import org.eclipse.paho.client.mqttv3.MqttMessage;
    +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    +
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.io.OutputStream;
    +import java.io.IOException;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.ScheduledFuture;
    +import java.util.concurrent.ScheduledThreadPoolExecutor;
    +import java.util.concurrent.TimeUnit;
    +
    +
    +@Tags({"subscribe", "MQTT", "IOT"})
    +@CapabilityDescription("Subscribes to a topic and receives messages from 
an MQTT broker")
    +@SeeAlso({PublishMQTT.class})
    +@WritesAttributes({@WritesAttribute(attribute="broker", description="MQTT 
broker that was the message source"),
    +    @WritesAttribute(attribute="topic", description="MQTT topic on which 
message was received"),
    +    @WritesAttribute(attribute="qos", description="The quality of service 
for this message."),
    +    @WritesAttribute(attribute="isDuplicate", description="Whether or not 
this message might be a duplicate of one which has already been received."),
    +    @WritesAttribute(attribute="isRetained", description="Whether or not 
this message was from a current publisher, or was \"retained\" by the server as 
the last message published on the topic.")})
    +public class SubscribeMQTT extends AbstractMQTTProcessor {
    +
    +    public static final PropertyDescriptor PROP_TOPIC_FILTER = new 
PropertyDescriptor.Builder()
    +            .name("Topic Filter")
    +            .description("The MQTT topic filter to designate the topics to 
subscribe to.")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_QOS = new 
PropertyDescriptor.Builder()
    +            .name("Quality of Service(QoS)")
    +            .description("The Quality of Service(QoS) to receive the 
message with. Accepts three values '0', '1' and '2'; '0' for 'at most once', 
'1' for 'at least once', '2' for 'exactly once'.")
    +            .required(true)
    +            .allowableValues("0", "1", "2")
    +            .build();
    +
    +
    +    public static final PropertyDescriptor PROP_MAX_QUEUE_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("Max Queue Size")
    +            .description("The MQTT messages are always being sent to 
subscribers on a topic. If the 'Run Schedule' is significantly behind the rate 
at which the messages are arriving to this " +
    +                    "processor then a back up can occur. This property 
specifies the maximum number of messages this processor will hold in memory at 
one time.")
    +            .required(true)
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    private String broker;
    +    private ProcessorLog logger;
    +    private MqttConnectOptions connOpts;
    +    private MqttClient mqttClient;
    +    private final Object mqttClientLock = new Object();
    +    private ScheduledExecutorService autoReconnectExecutor;
    +    private final Object autoReconnectLock = new Object();
    +    private ScheduledFuture<?> autoReconnectScheduledFuture;
    +    private static int AUTO_RECONNECT_PERIOD = 5;
    +    private static int DISCONNECT_TIMEOUT = 5000;
    +    private long maxQueueSize;
    +
    +    private MemoryPersistence persistence = new MemoryPersistence();
    +
    +    LinkedBlockingQueue<MQTTQueueMessage> mqttQueue = new 
LinkedBlockingQueue<>();
    +
    +    public static final Relationship REL_MESSAGE = new 
Relationship.Builder()
    +            .name("Message")
    +            .description("The MQTT message output")
    +            .build();
    +
    +     private List<PropertyDescriptor> descriptors;
    --- End diff --
    
    Good catch, will fix


> Create General MQTT Processors
> ------------------------------
>
>                 Key: NIFI-1808
>                 URL: https://issues.apache.org/jira/browse/NIFI-1808
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Joseph Percivall
>            Assignee: Joseph Percivall
>
> MQTT[1] is a great "Internet of Things" (IoT) connectivity protocol that 
> implementing processors for would allow NiFi to continue expanding into the 
> IoT domain. A prime opportunity would be to use in conjunction with Apache 
> NiFi - MiNiFi.
> [1] http://mqtt.org/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to