[
https://issues.apache.org/jira/browse/NIFI-1808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15286890#comment-15286890
]
ASF GitHub Bot commented on NIFI-1808:
--------------------------------------
Github user olegz commented on a diff in the pull request:
https://github.com/apache/nifi/pull/392#discussion_r63549263
--- Diff:
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
+import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.io.OutputStream;
+import java.io.IOException;
+
+import static
org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
+import static
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
+import static
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
+import static
org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
+import static
org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
+import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
+import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
+import static
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
+
+
+@Tags({"subscribe", "MQTT", "IOT", "consume", "listen"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially // we want to have a consistent mapping between clientID
and MQTT connection
+@CapabilityDescription("Subscribes to a topic and receives messages from
an MQTT broker")
+@SeeAlso({PublishMQTT.class})
+@WritesAttributes({
+ @WritesAttribute(attribute=BROKER_ATTRIBUTE_KEY, description="MQTT
broker that was the message source"),
+ @WritesAttribute(attribute=TOPIC_ATTRIBUTE_KEY, description="MQTT
topic on which message was received"),
+ @WritesAttribute(attribute=QOS_ATTRIBUTE_KEY, description="The quality
of service for this message."),
+ @WritesAttribute(attribute=IS_DUPLICATE_ATTRIBUTE_KEY,
description="Whether or not this message might be a duplicate of one which has
already been received."),
+ @WritesAttribute(attribute=IS_RETAINED_ATTRIBUTE_KEY,
description="Whether or not this message was from a current publisher, or was
\"retained\" by the server as the last message published " +
+ "on the topic.")})
+public class ConsumeMQTT extends AbstractMQTTProcessor {
+ public final static String BROKER_ATTRIBUTE_KEY = "mqtt.broker";
+ public final static String TOPIC_ATTRIBUTE_KEY = "mqtt.topic";
+ public final static String QOS_ATTRIBUTE_KEY = "mqtt.qos";
+ public final static String IS_DUPLICATE_ATTRIBUTE_KEY =
"mqtt.isDuplicate";
+ public final static String IS_RETAINED_ATTRIBUTE_KEY =
"mqtt.isRetained";
+
+ public static final PropertyDescriptor PROP_TOPIC_FILTER = new
PropertyDescriptor.Builder()
+ .name("Topic Filter")
+ .description("The MQTT topic filter to designate the topics to
subscribe to.")
+ .required(true)
+ .expressionLanguageSupported(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PROP_QOS = new
PropertyDescriptor.Builder()
+ .name("Quality of Service(QoS)")
+ .description("The Quality of Service(QoS) to receive the
message with. Accepts values '0', '1' or '2'; '0' for 'at most once', '1' for
'at least once', '2' for 'exactly once'.")
+ .required(true)
+ .defaultValue(ALLOWABLE_VALUE_QOS_0.getValue())
+ .allowableValues(
+ ALLOWABLE_VALUE_QOS_0,
+ ALLOWABLE_VALUE_QOS_1,
+ ALLOWABLE_VALUE_QOS_2)
+ .build();
+
+ public static final PropertyDescriptor PROP_MAX_QUEUE_SIZE = new
PropertyDescriptor.Builder()
+ .name("Max Queue Size")
+ .description("The MQTT messages are always being sent to
subscribers on a topic. If the 'Run Schedule' is significantly behind the rate
at which the messages are arriving to this " +
+ "processor then a back up can occur. This property
specifies the maximum number of messages this processor will hold in memory at
one time.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+
+ private static int DISCONNECT_TIMEOUT = 5000;
+ private long maxQueueSize;
+
+ private volatile boolean subscribed = false;
+ private volatile int qos;
+ private volatile String topicFilter;
+
+ private volatile LinkedBlockingQueue<MQTTQueueMessage> mqttQueue;
+
+ // For Testing
+ public LinkedBlockingQueue<MQTTQueueMessage> getMqttQueue(){
+ return mqttQueue;
+ }
+
+ public static final Relationship REL_MESSAGE = new
Relationship.Builder()
+ .name("Message")
+ .description("The MQTT message output")
+ .build();
+
+ private static final List<PropertyDescriptor> descriptors;
+ private static final Set<Relationship> relationships;
+
+ static{
+ final List<PropertyDescriptor> innerDescriptorsList =
getAbstractPropertyDescriptors();
+ innerDescriptorsList.add(PROP_TOPIC_FILTER);
+ innerDescriptorsList.add(PROP_QOS);
+ innerDescriptorsList.add(PROP_MAX_QUEUE_SIZE);
+ descriptors = Collections.unmodifiableList(innerDescriptorsList);
+
+ final Set<Relationship> innerRelationshipsSet = new
HashSet<Relationship>();
+ innerRelationshipsSet.add(REL_MESSAGE);
+ relationships = Collections.unmodifiableSet(innerRelationshipsSet);
+ }
+
+ @Override
+ public void onPropertyModified(PropertyDescriptor descriptor, String
oldValue, String newValue) {
+ // resize the receive buffer, but preserve data
+ if (descriptor == PROP_MAX_QUEUE_SIZE) {
+ // it's a mandatory integer, never null
+ int newSize = Integer.valueOf(newValue);
+ if (mqttQueue != null) {
+ int msgPending = mqttQueue.size();
+ if (msgPending > newSize) {
+ logger.debug("New receive buffer size ({}) is smaller
than the number of messages pending ({}), ignoring resize request",
+ new Object[]{newSize, msgPending});
+ return;
+ }
+ LinkedBlockingQueue<MQTTQueueMessage> newBuffer = new
LinkedBlockingQueue<>(newSize);
+ mqttQueue.drainTo(newBuffer);
+ mqttQueue = newBuffer;
+ }
+
+ }
+ }
+
+ @Override
+ public Collection<ValidationResult> customValidate(ValidationContext
context) {
+ final Collection<ValidationResult> results =
super.customValidate(context);
+ int newSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asInteger();
+ if (mqttQueue == null) {
+ mqttQueue = new
LinkedBlockingQueue<>(context.getProperty(PROP_MAX_QUEUE_SIZE).asInteger());
+ }
+ int msgPending = mqttQueue.size();
+ if (msgPending > newSize) {
+ results.add(new ValidationResult.Builder()
+ .valid(false)
+ .subject("ConsumeMQTT Configuration")
+ .explanation(String.format("%s (%d) is smaller than
the number of messages pending (%d).",
+ PROP_MAX_QUEUE_SIZE.getDisplayName(), newSize,
msgPending))
+ .build());
+ }
+
+ return results;
+ }
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ logger = getLogger();
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ public final List<PropertyDescriptor>
getSupportedPropertyDescriptors() {
+ return descriptors;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) throws
IOException, ClassNotFoundException {
+ qos = context.getProperty(PROP_QOS).asInteger();
+ maxQueueSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asLong();
+ topicFilter = context.getProperty(PROP_TOPIC_FILTER).getValue();
+
+ buildClient(context);
+ }
+
+ @OnUnscheduled
+ public void onUnscheduled(final ProcessContext context) {
+ try {
+ synchronized (mqttClientConnectLock) {
+ if(mqttClient != null && mqttClient.isConnected()) {
+ mqttClient.disconnect(DISCONNECT_TIMEOUT);
+ logger.info("Disconnected the MQTT client.");
+ }
+ }
+ } catch(MqttException me) {
+ logger.error("Failed when disconnecting the MQTT client.", me);
+ }
+ }
+
+
+ @OnStopped
+ public void onStopped(final ProcessContext context) throws IOException
{
+ if(processSessionFactory != null) {
+ logger.info("Finishing processing leftover messages");
+ ProcessSession session = processSessionFactory.createSession();
+ transferQueue(session);
+ } else {
+ if (mqttQueue!= null && mqttQueue.size() > 0){
+ throw new ProcessException("Attempting to stop processor
but stored ProcessSessionFactory is not set, there are messages in the MQTT
Queue and it is configured to " +
+ "finish processing the messages.");
+ }
+ }
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ if (mqttQueue.isEmpty() && !isConnected()){
+ logger.info("Queue is empty and client is not connected.
Attempting to reconnect.");
+
+ try {
+ reconnect();
+ } catch (MqttException e) {
+ logger.error("Connection to " + broker + " lost (or was
never connected) and ontrigger connect failed. Yielding processor", e);
+ context.yield();
+ }
+ }
+
+ if (mqttQueue.isEmpty()) {
+ return;
+ }
+
+ transferQueue(session);
+ }
+
+ private void transferQueue(ProcessSession session){
+ while (!mqttQueue.isEmpty()) {
+ FlowFile messageFlowfile = session.create();
+ final MQTTQueueMessage mqttMessage = mqttQueue.peek();
+
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+ attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+ attrs.put(QOS_ATTRIBUTE_KEY,
String.valueOf(mqttMessage.getQos()));
+ attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY,
String.valueOf(mqttMessage.isDuplicate()));
+ attrs.put(IS_RETAINED_ATTRIBUTE_KEY,
String.valueOf(mqttMessage.isRetained()));
+
+ messageFlowfile = session.putAllAttributes(messageFlowfile,
attrs);
+
+ messageFlowfile = session.write(messageFlowfile, new
OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws
IOException {
+ out.write(mqttMessage.getPayload());
+ }
+ });
+
+ String transitUri = new
StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+ session.getProvenanceReporter().receive(messageFlowfile,
transitUri);
+ session.transfer(messageFlowfile, REL_MESSAGE);
+ mqttQueue.remove(mqttMessage);
+ session.commit();
+ }
+ }
+
+ private class ConsumeMQTTCallback implements MqttCallback {
+
+ @Override
+ public void connectionLost(Throwable cause) {
+ logger.warn("Connection to " + broker + " lost", cause);
+ try {
+ reconnect();
+ } catch (MqttException e) {
+ logger.error("Connection to " + broker + " lost and
callback re-connect failed.");
+ }
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message)
throws Exception {
+ logger.info("MQTT message arrived on topic:" + topic);
+ if (mqttQueue.size() >= maxQueueSize){
+ throw new IllegalStateException("The subscriber queue is
full, cannot receive another message until the processor is scheduled to run.");
+ } else {
+ mqttQueue.add(new MQTTQueueMessage(topic, message));
+ }
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ logger.warn("Received MQTT 'delivery complete' message to
subscriber:"+ token);
+ }
+ }
+
+ // Public for testing
+ public void reconnect() throws MqttException {
+ synchronized (mqttClientConnectLock) {
+ if (!mqttClient.isConnected()) {
+ mqttClient = getMqttClient(broker, clientID, persistence);
+ mqttClient.setCallback(new ConsumeMQTTCallback());
+ mqttClient.connect(connOpts);
+ if(!subscribed){
+ mqttClient.subscribe(topicFilter, qos);
+ subscribed = true;
+ }
+ }
+ }
+ }
+
+ // Public for testing
+ public boolean isConnected(){
--- End diff --
"minimal in it's scope of impact" - may be for today, but you can not
possibly answer this question for tomorrow. In fact by making it public one can
argue you've created the 'scope' (however minimal it may be) when it didn't
even have to exist in the first place.
As for an example just get an instance of 'mqttClient' instance using the
_Class.getDeclaredField_ dance and perform the same evaluation.
```
ConsumeMQTT mqtt = ...
Field f = mqtt.getClass().getDeclaredField("mqttClient");
f.setAccessible(true);
MqttClient mqttClient = f.get(mqtt);
// do your check on mqttClient
```
Put it in some private method within your test cases (some abstract class
or some util class) and you're done.
> Create General MQTT Processors
> ------------------------------
>
> Key: NIFI-1808
> URL: https://issues.apache.org/jira/browse/NIFI-1808
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Joseph Percivall
> Assignee: Joseph Percivall
>
> MQTT[1] is a great "Internet of Things" (IoT) connectivity protocol that
> implementing processors for would allow NiFi to continue expanding into the
> IoT domain. A prime opportunity would be to use in conjunction with Apache
> NiFi - MiNiFi.
> [1] http://mqtt.org/
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)