[ 
https://issues.apache.org/jira/browse/NIFI-1808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15286890#comment-15286890
 ] 

ASF GitHub Bot commented on NIFI-1808:
--------------------------------------

Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/392#discussion_r63549263
  
    --- Diff: 
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
 ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.mqtt;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
    +import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
    +import org.eclipse.paho.client.mqttv3.MqttCallback;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +import org.eclipse.paho.client.mqttv3.MqttMessage;
    +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.io.OutputStream;
    +import java.io.IOException;
    +
    +import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
    +import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
    +import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
    +import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
    +import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
    +import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
    +import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
    +import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
    +
    +
    +@Tags({"subscribe", "MQTT", "IOT", "consume", "listen"})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@TriggerSerially // we want to have a consistent mapping between clientID 
and MQTT connection
    +@CapabilityDescription("Subscribes to a topic and receives messages from 
an MQTT broker")
    +@SeeAlso({PublishMQTT.class})
    +@WritesAttributes({
    +    @WritesAttribute(attribute=BROKER_ATTRIBUTE_KEY, description="MQTT 
broker that was the message source"),
    +    @WritesAttribute(attribute=TOPIC_ATTRIBUTE_KEY, description="MQTT 
topic on which message was received"),
    +    @WritesAttribute(attribute=QOS_ATTRIBUTE_KEY, description="The quality 
of service for this message."),
    +    @WritesAttribute(attribute=IS_DUPLICATE_ATTRIBUTE_KEY, 
description="Whether or not this message might be a duplicate of one which has 
already been received."),
    +    @WritesAttribute(attribute=IS_RETAINED_ATTRIBUTE_KEY, 
description="Whether or not this message was from a current publisher, or was 
\"retained\" by the server as the last message published " +
    +            "on the topic.")})
    +public class ConsumeMQTT extends AbstractMQTTProcessor {
    +    public final static String BROKER_ATTRIBUTE_KEY =  "mqtt.broker";
    +    public final static String TOPIC_ATTRIBUTE_KEY =  "mqtt.topic";
    +    public final static String QOS_ATTRIBUTE_KEY =  "mqtt.qos";
    +    public final static String IS_DUPLICATE_ATTRIBUTE_KEY =  
"mqtt.isDuplicate";
    +    public final static String IS_RETAINED_ATTRIBUTE_KEY =  
"mqtt.isRetained";
    +
    +    public static final PropertyDescriptor PROP_TOPIC_FILTER = new 
PropertyDescriptor.Builder()
    +            .name("Topic Filter")
    +            .description("The MQTT topic filter to designate the topics to 
subscribe to.")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_QOS = new 
PropertyDescriptor.Builder()
    +            .name("Quality of Service(QoS)")
    +            .description("The Quality of Service(QoS) to receive the 
message with. Accepts values '0', '1' or '2'; '0' for 'at most once', '1' for 
'at least once', '2' for 'exactly once'.")
    +            .required(true)
    +            .defaultValue(ALLOWABLE_VALUE_QOS_0.getValue())
    +            .allowableValues(
    +                    ALLOWABLE_VALUE_QOS_0,
    +                    ALLOWABLE_VALUE_QOS_1,
    +                    ALLOWABLE_VALUE_QOS_2)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_MAX_QUEUE_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("Max Queue Size")
    +            .description("The MQTT messages are always being sent to 
subscribers on a topic. If the 'Run Schedule' is significantly behind the rate 
at which the messages are arriving to this " +
    +                    "processor then a back up can occur. This property 
specifies the maximum number of messages this processor will hold in memory at 
one time.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +
    +    private static int DISCONNECT_TIMEOUT = 5000;
    +    private long maxQueueSize;
    +
    +    private volatile boolean subscribed = false;
    +    private volatile int qos;
    +    private volatile String topicFilter;
    +
    +    private volatile LinkedBlockingQueue<MQTTQueueMessage> mqttQueue;
    +
    +    // For Testing
    +    public LinkedBlockingQueue<MQTTQueueMessage> getMqttQueue(){
    +        return mqttQueue;
    +    }
    +
    +    public static final Relationship REL_MESSAGE = new 
Relationship.Builder()
    +            .name("Message")
    +            .description("The MQTT message output")
    +            .build();
    +
    +    private static final List<PropertyDescriptor> descriptors;
    +    private static final Set<Relationship> relationships;
    +
    +    static{
    +        final List<PropertyDescriptor> innerDescriptorsList = 
getAbstractPropertyDescriptors();
    +        innerDescriptorsList.add(PROP_TOPIC_FILTER);
    +        innerDescriptorsList.add(PROP_QOS);
    +        innerDescriptorsList.add(PROP_MAX_QUEUE_SIZE);
    +        descriptors = Collections.unmodifiableList(innerDescriptorsList);
    +
    +        final Set<Relationship> innerRelationshipsSet = new 
HashSet<Relationship>();
    +        innerRelationshipsSet.add(REL_MESSAGE);
    +        relationships = Collections.unmodifiableSet(innerRelationshipsSet);
    +    }
    +
    +    @Override
    +    public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
    +        // resize the receive buffer, but preserve data
    +        if (descriptor == PROP_MAX_QUEUE_SIZE) {
    +            // it's a mandatory integer, never null
    +            int newSize = Integer.valueOf(newValue);
    +            if (mqttQueue != null) {
    +                int msgPending = mqttQueue.size();
    +                if (msgPending > newSize) {
    +                    logger.debug("New receive buffer size ({}) is smaller 
than the number of messages pending ({}), ignoring resize request",
    +                            new Object[]{newSize, msgPending});
    +                    return;
    +                }
    +                LinkedBlockingQueue<MQTTQueueMessage> newBuffer = new 
LinkedBlockingQueue<>(newSize);
    +                mqttQueue.drainTo(newBuffer);
    +                mqttQueue = newBuffer;
    +            }
    +
    +        }
    +    }
    +
    +    @Override
    +    public Collection<ValidationResult> customValidate(ValidationContext 
context) {
    +        final Collection<ValidationResult> results = 
super.customValidate(context);
    +        int newSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asInteger();
    +        if (mqttQueue == null) {
    +            mqttQueue = new 
LinkedBlockingQueue<>(context.getProperty(PROP_MAX_QUEUE_SIZE).asInteger());
    +        }
    +        int msgPending = mqttQueue.size();
    +        if (msgPending > newSize) {
    +            results.add(new ValidationResult.Builder()
    +                    .valid(false)
    +                    .subject("ConsumeMQTT Configuration")
    +                    .explanation(String.format("%s (%d) is smaller than 
the number of messages pending (%d).",
    +                            PROP_MAX_QUEUE_SIZE.getDisplayName(), newSize, 
msgPending))
    +                    .build());
    +        }
    +
    +        return results;
    +    }
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        logger = getLogger();
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) throws 
IOException, ClassNotFoundException {
    +        qos = context.getProperty(PROP_QOS).asInteger();
    +        maxQueueSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asLong();
    +        topicFilter = context.getProperty(PROP_TOPIC_FILTER).getValue();
    +
    +        buildClient(context);
    +    }
    +
    +    @OnUnscheduled
    +    public void onUnscheduled(final ProcessContext context) {
    +        try {
    +            synchronized (mqttClientConnectLock) {
    +                if(mqttClient != null && mqttClient.isConnected()) {
    +                    mqttClient.disconnect(DISCONNECT_TIMEOUT);
    +                    logger.info("Disconnected the MQTT client.");
    +                }
    +            }
    +        } catch(MqttException me) {
    +            logger.error("Failed when disconnecting the MQTT client.", me);
    +        }
    +    }
    +
    +
    +    @OnStopped
    +    public void onStopped(final ProcessContext context) throws IOException 
{
    +        if(processSessionFactory != null) {
    +            logger.info("Finishing processing leftover messages");
    +            ProcessSession session = processSessionFactory.createSession();
    +            transferQueue(session);
    +        } else {
    +            if (mqttQueue!= null && mqttQueue.size() > 0){
    +                throw new ProcessException("Attempting to stop processor 
but stored ProcessSessionFactory is not set, there are messages in the MQTT 
Queue and it is configured to " +
    +                        "finish processing the messages.");
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +        if (mqttQueue.isEmpty() && !isConnected()){
    +            logger.info("Queue is empty and client is not connected. 
Attempting to reconnect.");
    +
    +            try {
    +                reconnect();
    +            } catch (MqttException e) {
    +                logger.error("Connection to " + broker + " lost (or was 
never connected) and ontrigger connect failed. Yielding processor", e);
    +                context.yield();
    +            }
    +        }
    +
    +        if (mqttQueue.isEmpty()) {
    +            return;
    +        }
    +
    +        transferQueue(session);
    +    }
    +
    +    private void transferQueue(ProcessSession session){
    +        while (!mqttQueue.isEmpty()) {
    +            FlowFile messageFlowfile = session.create();
    +            final MQTTQueueMessage mqttMessage = mqttQueue.peek();
    +
    +            Map<String, String> attrs = new HashMap<>();
    +            attrs.put(BROKER_ATTRIBUTE_KEY, broker);
    +            attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
    +            attrs.put(QOS_ATTRIBUTE_KEY, 
String.valueOf(mqttMessage.getQos()));
    +            attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, 
String.valueOf(mqttMessage.isDuplicate()));
    +            attrs.put(IS_RETAINED_ATTRIBUTE_KEY, 
String.valueOf(mqttMessage.isRetained()));
    +
    +            messageFlowfile = session.putAllAttributes(messageFlowfile, 
attrs);
    +
    +            messageFlowfile = session.write(messageFlowfile, new 
OutputStreamCallback() {
    +                @Override
    +                public void process(final OutputStream out) throws 
IOException {
    +                    out.write(mqttMessage.getPayload());
    +                }
    +            });
    +
    +            String transitUri = new 
StringBuilder(broker).append(mqttMessage.getTopic()).toString();
    +            session.getProvenanceReporter().receive(messageFlowfile, 
transitUri);
    +            session.transfer(messageFlowfile, REL_MESSAGE);
    +            mqttQueue.remove(mqttMessage);
    +            session.commit();
    +        }
    +    }
    +
    +    private class ConsumeMQTTCallback implements MqttCallback {
    +
    +        @Override
    +        public void connectionLost(Throwable cause) {
    +            logger.warn("Connection to " + broker + " lost", cause);
    +            try {
    +                reconnect();
    +            } catch (MqttException e) {
    +                logger.error("Connection to " + broker + " lost and 
callback re-connect failed.");
    +            }
    +        }
    +
    +        @Override
    +        public void messageArrived(String topic, MqttMessage message) 
throws Exception {
    +            logger.info("MQTT message arrived on topic:" + topic);
    +            if (mqttQueue.size() >= maxQueueSize){
    +                throw new IllegalStateException("The subscriber queue is 
full, cannot receive another message until the processor is scheduled to run.");
    +            } else {
    +                mqttQueue.add(new MQTTQueueMessage(topic, message));
    +            }
    +        }
    +
    +        @Override
    +        public void deliveryComplete(IMqttDeliveryToken token) {
    +            logger.warn("Received MQTT 'delivery complete' message to 
subscriber:"+ token);
    +        }
    +    }
    +
    +    // Public for testing
    +    public void reconnect() throws MqttException {
    +        synchronized (mqttClientConnectLock) {
    +            if (!mqttClient.isConnected()) {
    +                mqttClient = getMqttClient(broker, clientID, persistence);
    +                mqttClient.setCallback(new ConsumeMQTTCallback());
    +                mqttClient.connect(connOpts);
    +                if(!subscribed){
    +                    mqttClient.subscribe(topicFilter, qos);
    +                    subscribed = true;
    +                }
    +            }
    +        }
    +    }
    +
    +    // Public for testing
    +    public boolean isConnected(){
    --- End diff --
    
    "minimal in it's scope of impact" - may be for today, but you can not 
possibly answer this question for tomorrow. In fact by making it public one can 
argue you've created the 'scope' (however minimal it may be)  when it didn't 
even have to exist in the first place.
    As for an example just get an instance of 'mqttClient' instance using the 
_Class.getDeclaredField_ dance and perform the same evaluation.
    ```
    ConsumeMQTT mqtt = ...
    Field f = mqtt.getClass().getDeclaredField("mqttClient");
    f.setAccessible(true);
    MqttClient mqttClient = f.get(mqtt);
    // do your check on mqttClient
    ```
    Put it in some private method within your test cases (some abstract class 
or some util class) and you're done.


> Create General MQTT Processors
> ------------------------------
>
>                 Key: NIFI-1808
>                 URL: https://issues.apache.org/jira/browse/NIFI-1808
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Joseph Percivall
>            Assignee: Joseph Percivall
>
> MQTT[1] is a great "Internet of Things" (IoT) connectivity protocol that 
> implementing processors for would allow NiFi to continue expanding into the 
> IoT domain. A prime opportunity would be to use in conjunction with Apache 
> NiFi - MiNiFi.
> [1] http://mqtt.org/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to