NIFI-1808 Refactored MQTT processors, and added proper unit and integration 
tests
This closes #464


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/92d648ab
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/92d648ab
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/92d648ab

Branch: refs/heads/0.x
Commit: 92d648ab4cf168fadcad5f1042b314c24a7230c0
Parents: dfd86ef
Author: jpercivall <[email protected]>
Authored: Tue May 24 10:05:07 2016 -0400
Committer: Oleg Zhurakousky <[email protected]>
Committed: Wed May 25 09:12:06 2016 -0400

----------------------------------------------------------------------
 .../nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml      |   7 +-
 .../nifi-mqtt-processors/pom.xml                |  40 +-
 .../nifi/processors/mqtt/ConsumeMQTT.java       | 342 ++++++++++++++++
 .../apache/nifi/processors/mqtt/GetMQTT.java    | 191 ---------
 .../nifi/processors/mqtt/MQTTQueueMessage.java  |  29 --
 .../nifi/processors/mqtt/PublishMQTT.java       | 244 ++++++++++++
 .../apache/nifi/processors/mqtt/PutMQTT.java    | 193 ---------
 .../mqtt/common/AbstractMQTTProcessor.java      | 372 ++++++++++++++++++
 .../mqtt/common/MQTTQueueMessage.java           |  57 +++
 .../processors/mqtt/common/MqttConstants.java   |  80 ++++
 .../org.apache.nifi.processor.Processor         |   4 +-
 .../nifi/processors/mqtt/TestConsumeMQTT.java   | 101 +++++
 .../nifi/processors/mqtt/TestGetMQTT.java       |  41 --
 .../nifi/processors/mqtt/TestPublishMQTT.java   |  92 +++++
 .../nifi/processors/mqtt/TestPutMQTT.java       |  41 --
 .../processors/mqtt/common/MqttTestClient.java  | 198 ++++++++++
 .../processors/mqtt/common/MqttTestUtils.java   |  37 ++
 .../mqtt/common/TestConsumeMqttCommon.java      | 391 +++++++++++++++++++
 .../mqtt/common/TestPublishMqttCommon.java      | 124 ++++++
 .../mqtt/integration/TestConsumeMQTT.java       | 133 +++++++
 .../mqtt/integration/TestConsumeMqttSSL.java    | 148 +++++++
 .../TestPublishAndSubscribeMqttIntegration.java | 147 +++++++
 .../mqtt/integration/TestPublishMQTT.java       |  84 ++++
 .../mqtt/integration/TestPublishMqttSSL.java    |  98 +++++
 .../src/test/resources/localhost-ks.jks         | Bin 0 -> 3512 bytes
 .../src/test/resources/localhost-ts.jks         | Bin 0 -> 1816 bytes
 nifi-nar-bundles/nifi-mqtt-bundle/pom.xml       |   6 +-
 nifi-nar-bundles/pom.xml                        |   1 -
 pom.xml                                         |   2 +-
 29 files changed, 2692 insertions(+), 511 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml
index 158e80f..3e3a6b2 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml
@@ -17,7 +17,7 @@
     <parent>
         <groupId>org.apache.nifi</groupId>
         <artifactId>nifi-mqtt-bundle</artifactId>
-        <version>1.0.0-SNAPSHOT</version>
+        <version>0.7.0-SNAPSHOT</version>
     </parent>
     <artifactId>nifi-mqtt-nar</artifactId>
     <packaging>nar</packaging>
@@ -29,6 +29,11 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mqtt-processors</artifactId>
         </dependency>
     </dependencies>

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
index 9f4e956..df411c4 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
@@ -16,7 +16,7 @@
     <parent>
         <groupId>org.apache.nifi</groupId>
         <artifactId>nifi-mqtt-bundle</artifactId>
-        <version>1.0.0-SNAPSHOT</version>
+        <version>0.7.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>nifi-mqtt-processors</artifactId>
@@ -71,16 +71,40 @@
             <groupId>javax.websocket</groupId>
             <artifactId>javax.websocket-api</artifactId>
             <version>1.1</version>
-        </dependency>       
+        </dependency>
         <dependency>
-            <groupId>org.glassfish.tyrus.bundles</groupId>
-            <artifactId>tyrus-standalone-client-jdk</artifactId>
-            <version>1.12</version>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.glassfish.tyrus</groupId>
-            <artifactId>tyrus-container-grizzly-client</artifactId>
-            <version>1.12</version>
+            <groupId>io.moquette</groupId>
+            <artifactId>moquette-broker</artifactId>
+            <version>0.8.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service</artifactId>
+            <scope>test</scope>
         </dependency>
     </dependencies>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <exclude>**/integration/TestConsumeMQTT.java</exclude>
+                        
<exclude>**/integration/TestConsumeMqttSSL.java</exclude>
+                        
<exclude>**/integration/TestPublishAndSubscribeMqttIntegration.java</exclude>
+                        <exclude>**/integration/TestPublishMQTT.java</exclude>
+                        
<exclude>**/integration/TestPublishMqttSSL.java</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
new file mode 100644
index 0000000..aa87381
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
+import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
+
+
+@Tags({"subscribe", "MQTT", "IOT", "consume", "listen"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially // we want to have a consistent mapping between clientID and 
MQTT connection
+@CapabilityDescription("Subscribes to a topic and receives messages from an 
MQTT broker")
+@SeeAlso({PublishMQTT.class})
+@WritesAttributes({
+    @WritesAttribute(attribute=BROKER_ATTRIBUTE_KEY, description="MQTT broker 
that was the message source"),
+    @WritesAttribute(attribute=TOPIC_ATTRIBUTE_KEY, description="MQTT topic on 
which message was received"),
+    @WritesAttribute(attribute=QOS_ATTRIBUTE_KEY, description="The quality of 
service for this message."),
+    @WritesAttribute(attribute=IS_DUPLICATE_ATTRIBUTE_KEY, 
description="Whether or not this message might be a duplicate of one which has 
already been received."),
+    @WritesAttribute(attribute=IS_RETAINED_ATTRIBUTE_KEY, description="Whether 
or not this message was from a current publisher, or was \"retained\" by the 
server as the last message published " +
+            "on the topic.")})
+public class ConsumeMQTT extends AbstractMQTTProcessor {
+
+    public final static String BROKER_ATTRIBUTE_KEY =  "mqtt.broker";
+    public final static String TOPIC_ATTRIBUTE_KEY =  "mqtt.topic";
+    public final static String QOS_ATTRIBUTE_KEY =  "mqtt.qos";
+    public final static String IS_DUPLICATE_ATTRIBUTE_KEY =  
"mqtt.isDuplicate";
+    public final static String IS_RETAINED_ATTRIBUTE_KEY =  "mqtt.isRetained";
+
+    public static final PropertyDescriptor PROP_TOPIC_FILTER = new 
PropertyDescriptor.Builder()
+            .name("Topic Filter")
+            .description("The MQTT topic filter to designate the topics to 
subscribe to.")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROP_QOS = new 
PropertyDescriptor.Builder()
+            .name("Quality of Service(QoS)")
+            .description("The Quality of Service(QoS) to receive the message 
with. Accepts values '0', '1' or '2'; '0' for 'at most once', '1' for 'at least 
once', '2' for 'exactly once'.")
+            .required(true)
+            .defaultValue(ALLOWABLE_VALUE_QOS_0.getValue())
+            .allowableValues(
+                    ALLOWABLE_VALUE_QOS_0,
+                    ALLOWABLE_VALUE_QOS_1,
+                    ALLOWABLE_VALUE_QOS_2)
+            .build();
+
+    public static final PropertyDescriptor PROP_MAX_QUEUE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Queue Size")
+            .description("The MQTT messages are always being sent to 
subscribers on a topic. If the 'Run Schedule' is significantly behind the rate 
at which the messages are arriving to this " +
+                    "processor then a back up can occur. This property 
specifies the maximum number of messages this processor will hold in memory at 
one time.")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+
+    private static int DISCONNECT_TIMEOUT = 5000;
+    private volatile long maxQueueSize;
+
+    private volatile int qos;
+    private volatile String topicFilter;
+    private final AtomicBoolean scheduled = new AtomicBoolean(false);
+
+    private volatile LinkedBlockingQueue<MQTTQueueMessage> mqttQueue;
+
+    public static final Relationship REL_MESSAGE = new Relationship.Builder()
+            .name("Message")
+            .description("The MQTT message output")
+            .build();
+
+    private static final List<PropertyDescriptor> descriptors;
+    private static final Set<Relationship> relationships;
+
+    static{
+        final List<PropertyDescriptor> innerDescriptorsList = 
getAbstractPropertyDescriptors();
+        innerDescriptorsList.add(PROP_TOPIC_FILTER);
+        innerDescriptorsList.add(PROP_QOS);
+        innerDescriptorsList.add(PROP_MAX_QUEUE_SIZE);
+        descriptors = Collections.unmodifiableList(innerDescriptorsList);
+
+        final Set<Relationship> innerRelationshipsSet = new 
HashSet<Relationship>();
+        innerRelationshipsSet.add(REL_MESSAGE);
+        relationships = Collections.unmodifiableSet(innerRelationshipsSet);
+    }
+
+    @Override
+    public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
+        // resize the receive buffer, but preserve data
+        if (descriptor == PROP_MAX_QUEUE_SIZE) {
+            // it's a mandatory integer, never null
+            int newSize = Integer.valueOf(newValue);
+            if (mqttQueue != null) {
+                int msgPending = mqttQueue.size();
+                if (msgPending > newSize) {
+                    logger.warn("New receive buffer size ({}) is smaller than 
the number of messages pending ({}), ignoring resize request. Processor will be 
invalid.",
+                            new Object[]{newSize, msgPending});
+                    return;
+                }
+                LinkedBlockingQueue<MQTTQueueMessage> newBuffer = new 
LinkedBlockingQueue<>(newSize);
+                mqttQueue.drainTo(newBuffer);
+                mqttQueue = newBuffer;
+            }
+
+        }
+    }
+
+    @Override
+    public Collection<ValidationResult> customValidate(ValidationContext 
context) {
+        final Collection<ValidationResult> results = 
super.customValidate(context);
+        int newSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asInteger();
+        if (mqttQueue == null) {
+            mqttQueue = new 
LinkedBlockingQueue<>(context.getProperty(PROP_MAX_QUEUE_SIZE).asInteger());
+        }
+        int msgPending = mqttQueue.size();
+        if (msgPending > newSize) {
+            results.add(new ValidationResult.Builder()
+                    .valid(false)
+                    .subject("ConsumeMQTT Configuration")
+                    .explanation(String.format("%s (%d) is smaller than the 
number of messages pending (%d).",
+                            PROP_MAX_QUEUE_SIZE.getDisplayName(), newSize, 
msgPending))
+                    .build());
+        }
+
+        return results;
+    }
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        logger = getLogger();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) throws IOException, 
ClassNotFoundException {
+        qos = context.getProperty(PROP_QOS).asInteger();
+        maxQueueSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asLong();
+        topicFilter = context.getProperty(PROP_TOPIC_FILTER).getValue();
+
+        buildClient(context);
+        scheduled.set(true);
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled(final ProcessContext context) {
+        scheduled.set(false);
+
+        mqttClientConnectLock.writeLock().lock();
+        try {
+            if(isConnected()) {
+                mqttClient.disconnect(DISCONNECT_TIMEOUT);
+                logger.info("Disconnected the MQTT client.");
+            }
+        } catch(MqttException me) {
+            logger.error("Failed when disconnecting the MQTT client.", me);
+        } finally {
+            mqttClientConnectLock.writeLock().unlock();
+        }
+    }
+
+
+    @OnStopped
+    public void onStopped(final ProcessContext context) throws IOException {
+        if(mqttQueue != null && !mqttQueue.isEmpty() && processSessionFactory 
!= null) {
+            logger.info("Finishing processing leftover messages");
+            ProcessSession session = processSessionFactory.createSession();
+            transferQueue(session);
+        } else {
+            if (mqttQueue!= null && !mqttQueue.isEmpty()){
+                throw new ProcessException("Stopping the processor but there 
is no ProcessSessionFactory stored and there are messages in the MQTT internal 
queue. Removing the processor now will " +
+                        "clear the queue but will result in DATA LOSS. This is 
normally due to starting the processor, receiving messages and stopping before 
the onTrigger happens. The messages " +
+                        "in the MQTT internal queue cannot finish processing 
until until the processor is triggered to run.");
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        if (mqttQueue.isEmpty() && !isConnected() && scheduled.get()){
+            logger.info("Queue is empty and client is not connected. 
Attempting to reconnect.");
+
+            try {
+                reconnect();
+            } catch (MqttException e) {
+                logger.error("Connection to " + broker + " lost (or was never 
connected) and ontrigger connect failed. Yielding processor", e);
+                context.yield();
+            }
+        }
+
+        if (mqttQueue.isEmpty()) {
+            return;
+        }
+
+        transferQueue(session);
+    }
+
+    private void transferQueue(ProcessSession session){
+        while (!mqttQueue.isEmpty()) {
+            FlowFile messageFlowfile = session.create();
+            final MQTTQueueMessage mqttMessage = mqttQueue.peek();
+
+            Map<String, String> attrs = new HashMap<>();
+            attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+            attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
+            attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
+            attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, 
String.valueOf(mqttMessage.isDuplicate()));
+            attrs.put(IS_RETAINED_ATTRIBUTE_KEY, 
String.valueOf(mqttMessage.isRetained()));
+
+            messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
+
+            messageFlowfile = session.write(messageFlowfile, new 
OutputStreamCallback() {
+                @Override
+                public void process(final OutputStream out) throws IOException 
{
+                    out.write(mqttMessage.getPayload());
+                }
+            });
+
+            String transitUri = new 
StringBuilder(broker).append(mqttMessage.getTopic()).toString();
+            session.getProvenanceReporter().receive(messageFlowfile, 
transitUri);
+            session.transfer(messageFlowfile, REL_MESSAGE);
+            mqttQueue.remove(mqttMessage);
+            session.commit();
+        }
+    }
+
+    private class ConsumeMQTTCallback implements MqttCallback {
+
+        @Override
+        public void connectionLost(Throwable cause) {
+            logger.warn("Connection to " + broker + " lost", cause);
+            try {
+                reconnect();
+            } catch (MqttException e) {
+                logger.error("Connection to " + broker + " lost and callback 
re-connect failed.");
+            }
+        }
+
+        @Override
+        public void messageArrived(String topic, MqttMessage message) throws 
Exception {
+            if (logger.isInfoEnabled()) {
+                logger.info("MQTT message arrived on topic:" + topic);
+            }
+
+            if (mqttQueue.size() >= maxQueueSize){
+                throw new IllegalStateException("The subscriber queue is full, 
cannot receive another message until the processor is scheduled to run.");
+            } else {
+                mqttQueue.add(new MQTTQueueMessage(topic, message));
+            }
+        }
+
+        @Override
+        public void deliveryComplete(IMqttDeliveryToken token) {
+            logger.warn("Received MQTT 'delivery complete' message to 
subscriber:"+ token);
+        }
+    }
+
+    private void reconnect() throws MqttException {
+        mqttClientConnectLock.writeLock().lock();
+        try {
+            if (!mqttClient.isConnected()) {
+                setAndConnectClient(new ConsumeMQTTCallback());
+                mqttClient.subscribe(topicFilter, qos);
+            }
+        } finally {
+            mqttClientConnectLock.writeLock().unlock();
+        }
+    }
+
+    private boolean isConnected(){
+        return (mqttClient != null && mqttClient.isConnected());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java
deleted file mode 100644
index 1391317..0000000
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.mqtt;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.*;
-import org.apache.nifi.annotation.behavior.ReadsAttribute;
-import org.apache.nifi.annotation.behavior.ReadsAttributes;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-
-import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.io.OutputStream;
-import java.io.IOException;
-
-@Tags({"GetMQTT"})
-@CapabilityDescription("Gets messages from an MQTT broker")
-@SeeAlso({})
-@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
-@WritesAttributes({@WritesAttribute(attribute="broker", description="MQTT 
broker that was the message source"),
-    @WritesAttribute(attribute="topic", description="MQTT topic on which 
message was received")})
-public class GetMQTT extends AbstractProcessor implements MqttCallback {
-
-    String topic;
-    String broker;
-    String clientID;
-    double lastTime;
-    boolean firstTime = true;
-    
-    MemoryPersistence persistence = new MemoryPersistence();
-    MqttClient mqttClient;
-    
-    LinkedBlockingQueue<MQTTQueueMessage> mqttQueue = new 
LinkedBlockingQueue<>();
-
-    public static final PropertyDescriptor PROPERTY_BROKER_ADDRESS = new 
PropertyDescriptor
-            .Builder().name("Broker address")
-            .description("MQTT broker address (e.g. tcp://localhost:1883)")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor PROPERTY_MQTT_TOPIC = new 
PropertyDescriptor
-            .Builder().name("MQTT topic")
-            .description("MQTT topic to subscribe to")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor PROPERTY_MQTT_CLIENTID = new 
PropertyDescriptor
-            .Builder().name("MQTT client ID")
-            .description("MQTT client ID to use")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    public static final Relationship RELATIONSHIP_MQTTMESSAGE = new 
Relationship.Builder()
-            .name("MQTTMessage")
-            .description("MQTT message output")
-            .build();
-
-     private List<PropertyDescriptor> descriptors;
-
-    private Set<Relationship> relationships;
-    
-    @Override
-    public void connectionLost(Throwable t) {
-       getLogger().info("Connection to " + broker + " lost");
-    }
-
-    @Override
-    public void deliveryComplete(IMqttDeliveryToken token) {
-    }
-
-    @Override
-    public void messageArrived(String topic, MqttMessage message) throws 
Exception {
-         mqttQueue.add(new MQTTQueueMessage(topic, message.getPayload()));
-    }
-
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
-        final List<PropertyDescriptor> descriptors = new 
ArrayList<PropertyDescriptor>();
-        descriptors.add(PROPERTY_BROKER_ADDRESS);
-        descriptors.add(PROPERTY_MQTT_TOPIC);
-        descriptors.add(PROPERTY_MQTT_CLIENTID);
-      
-        this.descriptors = Collections.unmodifiableList(descriptors);
-
-        final Set<Relationship> relationships = new HashSet<Relationship>();
-        relationships.add(RELATIONSHIP_MQTTMESSAGE);
-        this.relationships = Collections.unmodifiableSet(relationships);
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return this.relationships;
-    }
-
-    @Override
-    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return descriptors;
-    }
-
-    @OnScheduled
-    public void onScheduled(final ProcessContext context) {
-        try {
-            broker = context.getProperty(PROPERTY_BROKER_ADDRESS).getValue();
-            topic = context.getProperty(PROPERTY_MQTT_TOPIC).getValue();
-            clientID = context.getProperty(PROPERTY_MQTT_CLIENTID).getValue();
-            mqttClient = new MqttClient(broker, clientID, persistence);
-            MqttConnectOptions connOpts = new MqttConnectOptions();
-            mqttClient.setCallback(this);
-            connOpts.setCleanSession(true);
-            getLogger().info("Connecting to broker: " + broker);
-            mqttClient.connect(connOpts);
-            mqttClient.subscribe(topic, 0);
-        } catch(MqttException me) {
-            getLogger().error("msg "+me.getMessage());
-        }
-    }
- 
-    @OnUnscheduled
-    public void onUnscheduled(final ProcessContext context) {
-        try {
-            mqttClient.disconnect();
-        } catch(MqttException me) {
-            
-        }
-        getLogger().error("Disconnected");
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        final List messageList = new LinkedList();
-        
-        mqttQueue.drainTo(messageList);
-        if (messageList.isEmpty())
-            return;
-        
-        Iterator iterator = messageList.iterator();
-        while (iterator.hasNext()) {
-            FlowFile messageFlowfile = session.create();
-            final MQTTQueueMessage m = (MQTTQueueMessage)iterator.next();
-    
-            messageFlowfile = session.putAttribute(messageFlowfile, "broker", 
broker);
-            messageFlowfile = session.putAttribute(messageFlowfile, "topic", 
topic);
-            messageFlowfile = session.write(messageFlowfile, new 
OutputStreamCallback() {
-
-                @Override
-                public void process(final OutputStream out) throws IOException 
{
-                     out.write(m.message);
-                }
-            });
-            session.transfer(messageFlowfile, RELATIONSHIP_MQTTMESSAGE);
-            session.commit();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/MQTTQueueMessage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/MQTTQueueMessage.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/MQTTQueueMessage.java
deleted file mode 100644
index 0874b10..0000000
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/MQTTQueueMessage.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.mqtt;
-
-public class MQTTQueueMessage
-{
-    public String topic;
-    public byte[] message;
-    
-    public MQTTQueueMessage(String topic, byte[] message) {
-        this.topic = topic;
-        this.message = message;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
new file mode 100644
index 0000000..95bbde4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.io.InputStream;
+import java.io.IOException;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"publish", "MQTT", "IOT"})
+@CapabilityDescription("Publishes a message to an MQTT topic")
+@SeeAlso({ConsumeMQTT.class})
+public class PublishMQTT extends AbstractMQTTProcessor {
+
+    public static final PropertyDescriptor PROP_TOPIC = new 
PropertyDescriptor.Builder()
+            .name("Topic")
+            .description("The topic to publish the message to.")
+            .expressionLanguageSupported(true)
+            .required(true)
+            
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
 true))
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROP_QOS = new 
PropertyDescriptor.Builder()
+            .name("Quality of Service(QoS)")
+            .description("The Quality of Service(QoS) to send the message 
with. Accepts three values '0', '1' and '2'; '0' for 'at most once', '1' for 
'at least once', '2' for 'exactly once'. " +
+                    "Expression language is allowed in order to support 
publishing messages with different QoS but the end value of the property must 
be either '0', '1' or '2'. ")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(QOS_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROP_RETAIN = new 
PropertyDescriptor.Builder()
+            .name("Retain Message")
+            .description("Whether or not the retain flag should be set on the 
MQTT message.")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(RETAIN_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are sent successfully to the 
destination are transferred to this relationship.")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that failed to send to the destination are 
transferred to this relationship.")
+            .build();
+
+    private static final List<PropertyDescriptor> descriptors;
+    private static final Set<Relationship> relationships;
+
+    static {
+        final List<PropertyDescriptor> innerDescriptorsList = 
getAbstractPropertyDescriptors();
+        innerDescriptorsList.add(PROP_TOPIC);
+        innerDescriptorsList.add(PROP_QOS);
+        innerDescriptorsList.add(PROP_RETAIN);
+        descriptors = Collections.unmodifiableList(innerDescriptorsList);
+
+        final Set<Relationship> innerRelationshipsSet = new HashSet<>();
+        innerRelationshipsSet.add(REL_SUCCESS);
+        innerRelationshipsSet.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(innerRelationshipsSet);
+    }
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        logger = getLogger();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        buildClient(context);
+    }
+
+    @OnStopped
+    public void onStop(final ProcessContext context) {
+        mqttClientConnectLock.writeLock().lock();
+        try {
+            if (mqttClient != null && mqttClient.isConnected()) {
+                mqttClient.disconnect();
+                logger.info("Disconnected the MQTT client.");
+            }
+        } catch(MqttException me) {
+            logger.error("Failed when disconnecting the MQTT client.", me);
+        } finally {
+            mqttClientConnectLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowfile = session.get();
+        if (flowfile == null) {
+            return;
+        }
+
+        if(mqttClient == null || !mqttClient.isConnected()){
+            logger.info("Was disconnected from client or was never connected, 
attempting to connect.");
+            try {
+                reconnect();
+            } catch (MqttException e) {
+                context.yield();
+                session.transfer(flowfile, REL_FAILURE);
+                logger.error("MQTT client is disconnected and re-connecting 
failed. Transferring FlowFile to fail and yielding", e);
+                return;
+            }
+        }
+
+        // get the MQTT topic
+        String topic = 
context.getProperty(PROP_TOPIC).evaluateAttributeExpressions(flowfile).getValue();
+
+        if (topic == null || topic.isEmpty()) {
+            logger.warn("Evaluation of the topic property returned null or 
evaluated to be empty, routing to failure");
+            session.transfer(flowfile, REL_FAILURE);
+            return;
+        }
+
+        // do the read
+        final byte[] messageContent = new byte[(int) flowfile.getSize()];
+        session.read(flowfile, new InputStreamCallback() {
+            @Override
+            public void process(final InputStream in) throws IOException {
+                StreamUtils.fillBuffer(in, messageContent, true);
+            }
+        });
+
+        int qos = 
context.getProperty(PROP_QOS).evaluateAttributeExpressions(flowfile).asInteger();
+        final MqttMessage mqttMessage = new MqttMessage(messageContent);
+        mqttMessage.setQos(qos);
+        mqttMessage.setPayload(messageContent);
+        
mqttMessage.setRetained(context.getProperty(PROP_RETAIN).evaluateAttributeExpressions(flowfile).asBoolean());
+
+        try {
+            mqttClientConnectLock.readLock().lock();
+            try {
+                /*
+                 * Underlying method waits for the message to publish 
(according to set QoS), so it executes synchronously:
+                 *     MqttClient.java:361 aClient.publish(topic, message, 
null, null).waitForCompletion(getTimeToWait());
+                 */
+                mqttClient.publish(topic, mqttMessage);
+            } finally {
+                mqttClientConnectLock.readLock().unlock();
+            }
+            session.transfer(flowfile, REL_SUCCESS);
+        } catch(MqttException me) {
+            logger.error("Failed to publish message.", me);
+            session.transfer(flowfile, REL_FAILURE);
+        }
+    }
+
+    private class PublishMQTTCallback  implements MqttCallback {
+
+        @Override
+        public void connectionLost(Throwable cause) {
+            logger.warn("Connection to " + broker + " lost", cause);
+            try {
+                reconnect();
+            } catch (MqttException e) {
+                logger.error("Connection to " + broker + " lost and re-connect 
failed");
+            }
+        }
+
+        @Override
+        public void messageArrived(String topic, MqttMessage message) throws 
Exception {
+            logger.error("Message arrived to a PublishMQTT processor { 
topic:'" + topic +"; payload:"+ Arrays.toString(message.getPayload())+"}");
+        }
+
+        @Override
+        public void deliveryComplete(IMqttDeliveryToken token) {
+            // Client.publish waits for message to be delivered so this token 
will always have a null message and is useless in this application.
+            logger.trace("Received 'delivery complete' message from broker 
for:" + token.toString());
+        }
+    }
+
+
+    private void reconnect() throws MqttException {
+        mqttClientConnectLock.writeLock().lock();
+        try {
+            if (!mqttClient.isConnected()) {
+                setAndConnectClient(new PublishMQTTCallback());
+                getLogger().info("Connecting to broker: " + broker);
+            }
+        } finally {
+            mqttClientConnectLock.writeLock().unlock();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java
deleted file mode 100644
index 29aeb10..0000000
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.mqtt;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.*;
-import org.apache.nifi.annotation.behavior.ReadsAttribute;
-import org.apache.nifi.annotation.behavior.ReadsAttributes;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.commons.io.IOUtils;
-
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-
-import java.util.*;
-import java.util.concurrent.atomic.AtomicReference;
-import java.io.InputStream;
-import java.io.IOException;
-
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"PutMQTT"})
-@CapabilityDescription("Publishes message to an MQTT topic")
-@SeeAlso({})
-@ReadsAttributes({@ReadsAttribute(attribute="topic", description="Topic to 
publish message to")})
-@WritesAttributes({@WritesAttribute(attribute="", description="")})
-public class PutMQTT extends AbstractProcessor implements MqttCallback {
-
-    String broker;
-    String clientID;
-    
-    MemoryPersistence persistence = new MemoryPersistence();
-    MqttClient mqttClient;
-    
-    public static final PropertyDescriptor PROPERTY_BROKER_ADDRESS = new 
PropertyDescriptor
-            .Builder().name("Broker address")
-            .description("MQTT broker address (e.g. tcp://localhost:1883)")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-    
-    public static final PropertyDescriptor PROPERTY_MQTT_CLIENTID = new 
PropertyDescriptor
-            .Builder().name("MQTT client ID")
-            .description("MQTT client ID to use")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-    private List<PropertyDescriptor> descriptors;
-
-    private Set<Relationship> relationships;
-    
-    @Override
-    public void connectionLost(Throwable t) {
-       getLogger().info("Connection to " + broker + " lost");
-    }
-
-    @Override
-    public void deliveryComplete(IMqttDeliveryToken token) {
-    }
-
-    @Override
-    public void messageArrived(String topic, MqttMessage message) throws 
Exception {
-    }
-
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
-        final List<PropertyDescriptor> descriptors = new 
ArrayList<PropertyDescriptor>();
-        descriptors.add(PROPERTY_BROKER_ADDRESS);
-        descriptors.add(PROPERTY_MQTT_CLIENTID);
-        this.descriptors = Collections.unmodifiableList(descriptors);
-
-        final Set<Relationship> relationships = new HashSet<Relationship>();
-        this.relationships = Collections.unmodifiableSet(relationships);
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return this.relationships;
-    }
-
-    @Override
-    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return descriptors;
-    }
-
-    @OnScheduled
-    public void onScheduled(final ProcessContext context) {
-        try {
-            broker = context.getProperty(PROPERTY_BROKER_ADDRESS).getValue();
-            clientID = context.getProperty(PROPERTY_MQTT_CLIENTID).getValue();
-            mqttClient = new MqttClient(broker, clientID, persistence);
-            MqttConnectOptions connOpts = new MqttConnectOptions();
-            mqttClient.setCallback(this);
-            connOpts.setCleanSession(true);
-            getLogger().info("Connecting to broker: " + broker);
-            mqttClient.connect(connOpts);
-        } catch(MqttException me) {
-            getLogger().error("msg "+me.getMessage());
-        }
-    }
- 
-    @OnUnscheduled
-    public void onUnscheduled(final ProcessContext context) {
-        try {
-            mqttClient.disconnect();
-        } catch(MqttException me) {
-            
-        }
-        getLogger().error("Disconnected");
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        final AtomicReference<String> message = new AtomicReference<>();
-
-        FlowFile flowfile = session.get();
-        message.set("");
-
-        // get the MQTT topic
-        
-        String topic = flowfile.getAttribute("topic");
-
-        if (topic == null) {
-            getLogger().error("No topic attribute on flowfile");
-            session.remove(flowfile);
-            return;
-        }
-        
-        // do the read
-        
-        session.read(flowfile, new InputStreamCallback() {
-            @Override
-            public void process(InputStream in) throws IOException {
-                try{
-                    message.set(IOUtils.toString(in));
-                }catch(Exception e){
-                    getLogger().error("Failed to read flowfile " + 
e.getMessage());
-                }
-            }
-        });
-        try {
-            session.remove(flowfile);
-        } catch (Exception e) {
-             getLogger().error("Failed to remove flowfile " + e.getMessage());
-             return;
-        }       
-       
-        String output = message.get();
-        
-        if ((output == null) || output.isEmpty()) {
-            return;
-        }
-        
-        try {
-            mqttClient.publish(topic, output.getBytes(), 0, false);
-        } catch(MqttException me) {
-            getLogger().error("msg "+me.getMessage());          
-        }       
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
new file mode 100644
index 0000000..733c240
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt.common;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+import org.eclipse.paho.client.mqttv3.IMqttClient;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
+
+public abstract class AbstractMQTTProcessor extends 
AbstractSessionFactoryProcessor {
+
+    protected ComponentLog logger;
+    protected IMqttClient mqttClient;
+    protected final ReadWriteLock mqttClientConnectLock = new 
ReentrantReadWriteLock(true);
+    protected volatile String broker;
+    protected volatile String clientID;
+    protected MqttConnectOptions connOpts;
+    protected MemoryPersistence persistence = new MemoryPersistence();
+
+    public ProcessSessionFactory processSessionFactory;
+
+    public static final Validator QOS_VALIDATOR = new Validator() {
+
+        @Override
+        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+            Integer inputInt = Integer.parseInt(input);
+            if (inputInt < 0 || inputInt > 2) {
+                return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must 
be an integer between 0 and 2").build();
+            }
+            return new 
ValidationResult.Builder().subject(subject).valid(true).build();
+        }
+    };
+
+    public static final Validator BROKER_VALIDATOR = new Validator() {
+
+        @Override
+        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+            try{
+                URI brokerURI = new URI(input);
+                if (!"".equals(brokerURI.getPath())) {
+                    return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("the 
broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();
+                }
+                if (!("tcp".equals(brokerURI.getScheme()) || 
"ssl".equals(brokerURI.getScheme()))) {
+                    return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("only the 
'tcp' and 'ssl' schemes are supported.").build();
+                }
+            } catch (URISyntaxException e) {
+                return new 
ValidationResult.Builder().subject(subject).valid(false).explanation("it is not 
valid URI syntax.").build();
+            }
+            return new 
ValidationResult.Builder().subject(subject).valid(true).build();
+        }
+    };
+
+    public static final Validator RETAIN_VALIDATOR = new Validator() {
+
+        @Override
+        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+            if("true".equalsIgnoreCase(input) || 
"false".equalsIgnoreCase(input)){
+                return new 
ValidationResult.Builder().subject(subject).valid(true).build();
+            } else{
+                return 
StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.BOOLEAN,
 false)
+                        .validate(subject, input, context);
+            }
+
+        }
+    };
+
+    public static final PropertyDescriptor PROP_BROKER_URI = new 
PropertyDescriptor.Builder()
+            .name("Broker URI")
+            .description("The URI to use to connect to the MQTT broker (e.g. 
tcp://localhost:1883). The 'tcp' and 'ssl' schemes are supported. In order to 
use 'ssl', the SSL Context " +
+                    "Service property must be set.")
+            .required(true)
+            .addValidator(BROKER_VALIDATOR)
+            .build();
+
+
+    public static final PropertyDescriptor PROP_CLIENTID = new 
PropertyDescriptor.Builder()
+            .name("Client ID")
+            .description("MQTT client ID to use")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROP_USERNAME = new 
PropertyDescriptor.Builder()
+            .name("Username")
+            .description("Username to use when connecting to the broker")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROP_PASSWORD = new 
PropertyDescriptor.Builder()
+            .name("Password")
+            .description("Password to use when connecting to the broker")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description("The SSL Context Service used to provide client 
certificate information for TLS/SSL connections.")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+
+    public static final PropertyDescriptor PROP_LAST_WILL_TOPIC = new 
PropertyDescriptor.Builder()
+            .name("Last Will Topic")
+            .description("The topic to send the client's Last Will to. If the 
Last Will topic and message are not set then a Last Will will not be sent.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROP_LAST_WILL_MESSAGE = new 
PropertyDescriptor.Builder()
+            .name("Last Will Message")
+            .description("The message to send as the client's Last Will. If 
the Last Will topic and message are not set then a Last Will will not be sent.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROP_LAST_WILL_RETAIN = new 
PropertyDescriptor.Builder()
+            .name("Last Will Retain")
+            .description("Whether to retain the client's Last Will. If the 
Last Will topic and message are not set then a Last Will will not be sent.")
+            .required(false)
+            .allowableValues("true","false")
+            .build();
+
+    public static final PropertyDescriptor PROP_LAST_WILL_QOS = new 
PropertyDescriptor.Builder()
+            .name("Last Will QoS Level")
+            .description("QoS level to be used when publishing the Last Will 
Message")
+            .required(false)
+            .allowableValues(
+                    ALLOWABLE_VALUE_QOS_0,
+                    ALLOWABLE_VALUE_QOS_1,
+                    ALLOWABLE_VALUE_QOS_2
+            )
+            .build();
+
+    public static final PropertyDescriptor PROP_CLEAN_SESSION = new 
PropertyDescriptor.Builder()
+            .name("Session state")
+            .description("Whether to start afresh or resume previous flows. 
See the allowable value descriptions for more details.")
+            .required(true)
+            .allowableValues(
+                    ALLOWABLE_VALUE_CLEAN_SESSION_TRUE,
+                    ALLOWABLE_VALUE_CLEAN_SESSION_FALSE
+            )
+            .defaultValue(ALLOWABLE_VALUE_CLEAN_SESSION_TRUE.getValue())
+            .build();
+
+    public static final PropertyDescriptor PROP_MQTT_VERSION = new 
PropertyDescriptor.Builder()
+            .name("MQTT Specification Version")
+            .description("The MQTT specification version when connecting with 
the broker. See the allowable value descriptions for more details.")
+            .allowableValues(
+                    ALLOWABLE_VALUE_MQTT_VERSION_AUTO,
+                    ALLOWABLE_VALUE_MQTT_VERSION_311,
+                    ALLOWABLE_VALUE_MQTT_VERSION_310
+            )
+            .defaultValue(ALLOWABLE_VALUE_MQTT_VERSION_AUTO.getValue())
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor PROP_CONN_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("Connection Timeout (seconds)")
+            .description("Maximum time interval the client will wait for the 
network connection to the MQTT server " +
+                    "to be established. The default timeout is 30 seconds. " +
+                    "A value of 0 disables timeout processing meaning the 
client will wait until the network connection is made successfully or fails.")
+            .required(false)
+            .defaultValue("30")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor PROP_KEEP_ALIVE_INTERVAL = new 
PropertyDescriptor.Builder()
+            .name("Keep Alive Interval (seconds)")
+            .description("Defines the maximum time interval between messages 
sent or received. It enables the " +
+                    "client to detect if the server is no longer available, 
without having to wait for the TCP/IP timeout. " +
+                    "The client will ensure that at least one message travels 
across the network within each keep alive period. In the absence of a 
data-related message during the time period, " +
+                    "the client sends a very small \"ping\" message, which the 
server will acknowledge. A value of 0 disables keepalive processing in the 
client.")
+            .required(false)
+            .defaultValue("60")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    public static List<PropertyDescriptor> getAbstractPropertyDescriptors(){
+        final List<PropertyDescriptor> descriptors = new 
ArrayList<PropertyDescriptor>();
+        descriptors.add(PROP_BROKER_URI);
+        descriptors.add(PROP_CLIENTID);
+        descriptors.add(PROP_USERNAME);
+        descriptors.add(PROP_PASSWORD);
+        descriptors.add(PROP_SSL_CONTEXT_SERVICE);
+        descriptors.add(PROP_LAST_WILL_TOPIC);
+        descriptors.add(PROP_LAST_WILL_MESSAGE);
+        descriptors.add(PROP_LAST_WILL_RETAIN);
+        descriptors.add(PROP_LAST_WILL_QOS);
+        descriptors.add(PROP_CLEAN_SESSION);
+        descriptors.add(PROP_MQTT_VERSION);
+        descriptors.add(PROP_CONN_TIMEOUT);
+        descriptors.add(PROP_KEEP_ALIVE_INTERVAL);
+        return descriptors;
+    }
+
+    @Override
+    public Collection<ValidationResult> customValidate(final ValidationContext 
validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(1);
+        final boolean usernameSet = 
validationContext.getProperty(PROP_USERNAME).isSet();
+        final boolean passwordSet = 
validationContext.getProperty(PROP_PASSWORD).isSet();
+
+        if ((usernameSet && !passwordSet) || (!usernameSet && passwordSet)) {
+            results.add(new ValidationResult.Builder().subject("Username and 
Password").valid(false).explanation("if username or password is set, both must 
be set").build());
+        }
+
+        final boolean lastWillTopicSet = 
validationContext.getProperty(PROP_LAST_WILL_TOPIC).isSet();
+        final boolean lastWillMessageSet = 
validationContext.getProperty(PROP_LAST_WILL_MESSAGE).isSet();
+
+        final boolean lastWillRetainSet = 
validationContext.getProperty(PROP_LAST_WILL_RETAIN).isSet();
+        final boolean lastWillQosSet = 
validationContext.getProperty(PROP_LAST_WILL_QOS).isSet();
+
+        // If any of the Last Will Properties are set
+        if (lastWillTopicSet || lastWillMessageSet || lastWillRetainSet || 
lastWillQosSet) {
+            // And any are not set
+            if(!(lastWillTopicSet && lastWillMessageSet && lastWillRetainSet 
&& lastWillQosSet)){
+                // Then mark as invalid
+                results.add(new ValidationResult.Builder().subject("Last Will 
Properties").valid(false).explanation("if any of the Last Will Properties 
(message, topic, retain and QoS) are " +
+                        "set, all must be set.").build());
+            }
+        }
+
+        try {
+            URI brokerURI = new 
URI(validationContext.getProperty(PROP_BROKER_URI).getValue());
+            if (brokerURI.getScheme().equalsIgnoreCase("ssl") && 
!validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).isSet()) {
+                results.add(new 
ValidationResult.Builder().subject(PROP_SSL_CONTEXT_SERVICE.getName() + " or " 
+ PROP_BROKER_URI.getName()).valid(false).explanation("if the 'ssl' scheme is 
used in " +
+                        "the broker URI, the SSL Context Service must be 
set.").build());
+            }
+        } catch (URISyntaxException e) {
+            results.add(new 
ValidationResult.Builder().subject(PROP_BROKER_URI.getName()).valid(false).explanation("it
 is not valid URI syntax.").build());
+        }
+
+        return results;
+    }
+
+    public static Properties transformSSLContextService(SSLContextService 
sslContextService){
+        Properties properties = new Properties();
+        properties.setProperty("com.ibm.ssl.protocol", 
sslContextService.getSslAlgorithm());
+        properties.setProperty("com.ibm.ssl.keyStore", 
sslContextService.getKeyStoreFile());
+        properties.setProperty("com.ibm.ssl.keyStorePassword", 
sslContextService.getKeyStorePassword());
+        properties.setProperty("com.ibm.ssl.keyStoreType", 
sslContextService.getKeyStoreType());
+        properties.setProperty("com.ibm.ssl.trustStore", 
sslContextService.getTrustStoreFile());
+        properties.setProperty("com.ibm.ssl.trustStorePassword", 
sslContextService.getTrustStorePassword());
+        properties.setProperty("com.ibm.ssl.trustStoreType", 
sslContextService.getTrustStoreType());
+        return  properties;
+    }
+
+    protected void buildClient(ProcessContext context){
+        try {
+            broker = context.getProperty(PROP_BROKER_URI).getValue();
+            clientID = context.getProperty(PROP_CLIENTID).getValue();
+
+            connOpts = new MqttConnectOptions();
+            
connOpts.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean());
+            
connOpts.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
+            
connOpts.setMqttVersion(context.getProperty(PROP_MQTT_VERSION).asInteger());
+            
connOpts.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
+
+            PropertyValue sslProp = 
context.getProperty(PROP_SSL_CONTEXT_SERVICE);
+            if (sslProp.isSet()) {
+                Properties sslProps = 
transformSSLContextService((SSLContextService) sslProp.asControllerService());
+                connOpts.setSSLProperties(sslProps);
+            }
+
+            PropertyValue lastWillTopicProp = 
context.getProperty(PROP_LAST_WILL_TOPIC);
+            if (lastWillTopicProp.isSet()){
+                String lastWillMessage = 
context.getProperty(PROP_LAST_WILL_MESSAGE).getValue();
+                PropertyValue lastWillRetain = 
context.getProperty(PROP_LAST_WILL_RETAIN);
+                Integer lastWillQOS = 
context.getProperty(PROP_LAST_WILL_QOS).asInteger();
+                connOpts.setWill(lastWillTopicProp.getValue(), 
lastWillMessage.getBytes(), lastWillQOS, lastWillRetain.isSet() ? 
lastWillRetain.asBoolean() : false);
+            }
+
+
+            PropertyValue usernameProp = context.getProperty(PROP_USERNAME);
+            if(usernameProp.isSet()) {
+                connOpts.setUserName(usernameProp.getValue());
+                
connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray());
+            }
+
+            mqttClientConnectLock.writeLock().lock();
+            try{
+                mqttClient = getMqttClient(broker, clientID, persistence);
+
+            } finally {
+                mqttClientConnectLock.writeLock().unlock();
+            }
+        } catch(MqttException me) {
+            logger.error("Failed to initialize the connection to the  " + 
me.getMessage());
+        }
+    }
+
+    protected IMqttClient getMqttClient(String broker, String clientID, 
MemoryPersistence persistence) throws MqttException {
+        return new MqttClient(broker, clientID, persistence);
+    }
+
+
+    @Override
+    public final void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
+        if (processSessionFactory == null) {
+            processSessionFactory = sessionFactory;
+        }
+        ProcessSession session = sessionFactory.createSession();
+        try {
+            onTrigger(context, session);
+            session.commit();
+        } catch (final Throwable t) {
+            getLogger().error("{} failed to process due to {}; rolling back 
session", new Object[]{this, t});
+            session.rollback(true);
+            throw t;
+        }
+    }
+
+    public abstract void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException;
+
+    // Caller should obtain the necessary lock
+    protected void setAndConnectClient(MqttCallback mqttCallback) throws 
MqttException {
+        mqttClient = getMqttClient(broker, clientID, persistence);
+        mqttClient.setCallback(mqttCallback);
+        mqttClient.connect(connOpts);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
new file mode 100644
index 0000000..d5e63c7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt.common;
+
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+public class MQTTQueueMessage {
+    private String topic;
+
+    private byte[] payload;
+    private int qos = 1;
+    private boolean retained = false;
+    private boolean duplicate = false;
+
+    public MQTTQueueMessage(String topic, MqttMessage message) {
+        this.topic = topic;
+        payload = message.getPayload();
+        qos = message.getQos();
+        retained = message.isRetained();
+        duplicate = message.isDuplicate();
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public byte[] getPayload() {
+        return payload;
+    }
+
+    public int getQos() {
+        return qos;
+    }
+
+    public boolean isRetained() {
+        return retained;
+    }
+
+    public boolean isDuplicate() {
+        return duplicate;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java
new file mode 100644
index 0000000..a29e6ff
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt.common;
+
+import org.apache.nifi.components.AllowableValue;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+
+public class MqttConstants {
+
+    /*
+      ------------------------------------------
+        Clean Session Values
+      ------------------------------------------
+     */
+
+    public static final AllowableValue ALLOWABLE_VALUE_CLEAN_SESSION_TRUE =
+            new AllowableValue("true", "Clean Session", "Client and Server 
discard any previous session and start a new " +
+                    "one. This session lasts as long as the network 
connection. " +
+                    "State data associated with this session is not reused in 
any subsequent session");
+
+    public static final AllowableValue ALLOWABLE_VALUE_CLEAN_SESSION_FALSE =
+            new AllowableValue("false", "Resume Session", "Server resumes 
communications with the client based on state from " +
+                    "the current session (as identified by the ClientID). The 
client and server store the session after " +
+                    "the client and server are disconnected. After the 
disconnection of a session that was not a clean session, " +
+                    "the server stores further QoS 1 and QoS 2 messages that 
match any subscriptions that the client had at " +
+                    "the time of disconnection as part of the session state");
+
+    /*
+      ------------------------------------------
+        QoS Values
+      ------------------------------------------
+     */
+
+
+    public static final AllowableValue ALLOWABLE_VALUE_QOS_0 =
+            new AllowableValue("0", "0 - At most once", "Best effort delivery. 
A message won’t be acknowledged by the receiver or stored and redelivered by 
the sender. " +
+                    "This is often called “fire and forget” and provides 
the same guarantee as the underlying TCP protocol.");
+
+    public static final AllowableValue ALLOWABLE_VALUE_QOS_1 =
+            new AllowableValue("1", "1 - At least once", "Guarantees that a 
message will be delivered at least once to the receiver. " +
+                    "The message can also be delivered more than once");
+
+    public static final AllowableValue ALLOWABLE_VALUE_QOS_2 =
+            new AllowableValue("2", "2 - Exactly once", "Guarantees that each 
message is received only once by the counterpart. It is the safest and also " +
+                    "the slowest quality of service level. The guarantee is 
provided by two round-trip flows between sender and receiver.");
+
+
+    /*
+      ------------------------------------------
+        MQTT Version Values
+      ------------------------------------------
+     */
+    public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_AUTO =
+            new 
AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_DEFAULT),
+                    "AUTO",
+                    "Start with v3.1.1 and fallback to v3.1.0 if not supported 
by a broker");
+
+    public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_311 =
+            new 
AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_3_1_1),
+                    "v3.1.1");
+
+    public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_310 =
+            new 
AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_3_1),
+                    "v3.1.0");
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index b5a30e9..3dc2efa 100644
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -12,5 +12,5 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.nifi.processors.mqtt.GetMQTT
-org.apache.nifi.processors.mqtt.PutMQTT
+org.apache.nifi.processors.mqtt.ConsumeMQTT
+org.apache.nifi.processors.mqtt.PublishMQTT

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
new file mode 100644
index 0000000..58c37e5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt;
+
+import io.moquette.proto.messages.PublishMessage;
+import org.apache.nifi.processors.mqtt.common.MqttTestClient;
+import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon;
+import org.apache.nifi.util.TestRunners;
+import org.eclipse.paho.client.mqttv3.IMqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+
+
+public class TestConsumeMQTT extends TestConsumeMqttCommon {
+
+
+    public MqttTestClient mqttTestClient;
+
+    public class UnitTestableConsumeMqtt extends ConsumeMQTT {
+
+        public UnitTestableConsumeMqtt(){
+            super();
+        }
+
+        @Override
+        public IMqttClient getMqttClient(String broker, String clientID, 
MemoryPersistence persistence) throws MqttException {
+            mqttTestClient =  new MqttTestClient(broker, clientID, 
MqttTestClient.ConnectType.Subscriber);
+            return mqttTestClient;
+        }
+    }
+
+    @Before
+    public void init() throws IOException {
+        PUBLISH_WAIT_MS = 0;
+
+        broker = "tcp://localhost:1883";
+        UnitTestableConsumeMqtt proc = new UnitTestableConsumeMqtt();
+        testRunner = TestRunners.newTestRunner(proc);
+        testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, broker);
+        testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
+        testRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
+        testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (MQTT_server != null) {
+            MQTT_server.stopServer();
+        }
+        final File folder =  new File("./target");
+        final File[] files = folder.listFiles( new FilenameFilter() {
+            @Override
+            public boolean accept( final File dir,
+                                   final String name ) {
+                return name.matches( "moquette_store.mapdb.*" );
+            }
+        } );
+        for ( final File file : files ) {
+            if ( !file.delete() ) {
+                System.err.println( "Can't remove " + file.getAbsolutePath() );
+            }
+        }
+    }
+
+    @Override
+    public void internalPublish(PublishMessage publishMessage) {
+        MqttMessage mqttMessage = new MqttMessage();
+        mqttMessage.setPayload(publishMessage.getPayload().array());
+        mqttMessage.setRetained(publishMessage.isRetainFlag());
+        mqttMessage.setQos(publishMessage.getQos().ordinal());
+
+        try {
+            mqttTestClient.publish(publishMessage.getTopicName(), mqttMessage);
+        } catch (MqttException e) {
+            Assert.fail("Should never get an MqttException when publishing 
using test client");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestGetMQTT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestGetMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestGetMQTT.java
deleted file mode 100644
index cd22735..0000000
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestGetMQTT.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- 
-package org.apache.nifi.processors.mqtt;
-
-import org.apache.nifi.processors.mqtt.GetMQTT;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public class TestGetMQTT {
-
-    private TestRunner testRunner;
-
-    @Before
-    public void init() {
-        testRunner = TestRunners.newTestRunner(GetMQTT.class);
-    }
-
-    @Test
-    public void testProcessor() {
-
-    }
-
-}

Reply via email to