http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
new file mode 100644
index 0000000..cdbc67f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt;
+
+import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
+import org.apache.nifi.processors.mqtt.common.MqttTestClient;
+import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
+import org.apache.nifi.util.TestRunners;
+import org.eclipse.paho.client.mqttv3.IMqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestPublishMQTT extends TestPublishMqttCommon {
+
+    @Override
+    public void verifyPublishedMessage(byte[] payload, int qos, boolean 
retain) {
+        MQTTQueueMessage mqttQueueMessage = mqttTestClient.publishedMessage;
+        assertEquals(Arrays.toString(payload), 
Arrays.toString(mqttQueueMessage.getPayload()));
+        assertEquals(qos, mqttQueueMessage.getQos());
+        assertEquals(retain, mqttQueueMessage.isRetained());
+        assertEquals(topic, mqttQueueMessage.getTopic());
+    }
+
+
+    public MqttTestClient mqttTestClient;
+
+    public class UnitTestablePublishMqtt extends PublishMQTT {
+
+        public UnitTestablePublishMqtt(){
+            super();
+        }
+
+        @Override
+        public IMqttClient getMqttClient(String broker, String clientID, 
MemoryPersistence persistence) throws MqttException {
+            mqttTestClient =  new MqttTestClient(broker, clientID, 
MqttTestClient.ConnectType.Publisher);
+            return mqttTestClient;
+        }
+    }
+
+    @Before
+    public void init() throws IOException {
+        UnitTestablePublishMqtt proc = new UnitTestablePublishMqtt();
+        testRunner = TestRunners.newTestRunner(proc);
+        testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, 
"tcp://localhost:1883");
+        testRunner.setProperty(PublishMQTT.PROP_CLIENTID, "TestClient");
+        testRunner.setProperty(PublishMQTT.PROP_RETAIN, "false");
+        topic = "testTopic";
+        testRunner.setProperty(PublishMQTT.PROP_TOPIC, topic);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        final File folder =  new File("./target");
+        final File[] files = folder.listFiles( new FilenameFilter() {
+            @Override
+            public boolean accept( final File dir,
+                                   final String name ) {
+                return name.matches( "moquette_store.mapdb.*" );
+            }
+        } );
+        for ( final File file : files ) {
+            if ( !file.delete() ) {
+                System.err.println( "Can't remove " + file.getAbsolutePath() );
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java
deleted file mode 100644
index 64a0548..0000000
--- 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- 
-package org.apache.nifi.processors.mqtt;
-
-import org.apache.nifi.processors.mqtt.PutMQTT;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public class TestPutMQTT {
-
-    private TestRunner testRunner;
-
-    @Before
-    public void init() {
-        testRunner = TestRunners.newTestRunner(PutMQTT.class);
-    }
-
-    @Test
-    public void testProcessor() {
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java
new file mode 100644
index 0000000..81e2b18
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt.common;
+
+import org.eclipse.paho.client.mqttv3.IMqttClient;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.IMqttToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
+import org.eclipse.paho.client.mqttv3.MqttSecurityException;
+import org.eclipse.paho.client.mqttv3.MqttTopic;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class MqttTestClient implements IMqttClient {
+
+    public String serverURI;
+    public String clientId;
+
+    public AtomicBoolean connected = new AtomicBoolean(false);
+
+    public MqttCallback mqttCallback;
+    public ConnectType type;
+    public enum ConnectType {Publisher, Subscriber}
+
+    public MQTTQueueMessage publishedMessage;
+
+    public String subscribedTopic;
+    public int subscribedQos;
+
+
+    public MqttTestClient(String serverURI, String clientId, ConnectType type) 
throws MqttException {
+        this.serverURI = serverURI;
+        this.clientId = clientId;
+        this.type = type;
+    }
+
+    @Override
+    public void connect() throws MqttSecurityException, MqttException {
+        connected.set(true);
+    }
+
+    @Override
+    public void connect(MqttConnectOptions options) throws 
MqttSecurityException, MqttException {
+        connected.set(true);
+    }
+
+    @Override
+    public IMqttToken connectWithResult(MqttConnectOptions options) throws 
MqttSecurityException, MqttException {
+        return null;
+    }
+
+    @Override
+    public void disconnect() throws MqttException {
+        connected.set(false);
+    }
+
+    @Override
+    public void disconnect(long quiesceTimeout) throws MqttException {
+        connected.set(false);
+    }
+
+    @Override
+    public void disconnectForcibly() throws MqttException {
+        connected.set(false);
+    }
+
+    @Override
+    public void disconnectForcibly(long disconnectTimeout) throws 
MqttException {
+        connected.set(false);
+    }
+
+    @Override
+    public void disconnectForcibly(long quiesceTimeout, long 
disconnectTimeout) throws MqttException {
+        connected.set(false);
+    }
+
+    @Override
+    public void subscribe(String topicFilter) throws MqttException, 
MqttSecurityException {
+        subscribedTopic = topicFilter;
+        subscribedQos = -1;
+    }
+
+    @Override
+    public void subscribe(String[] topicFilters) throws MqttException {
+        throw new UnsupportedOperationException("Multiple topic filters is not 
supported");
+    }
+
+    @Override
+    public void subscribe(String topicFilter, int qos) throws MqttException {
+        subscribedTopic = topicFilter;
+        subscribedQos = qos;
+    }
+
+    @Override
+    public void subscribe(String[] topicFilters, int[] qos) throws 
MqttException {
+        throw new UnsupportedOperationException("Multiple topic filters is not 
supported");
+    }
+
+    @Override
+    public void unsubscribe(String topicFilter) throws MqttException {
+        subscribedTopic = "";
+        subscribedQos = -2;
+    }
+
+    @Override
+    public void unsubscribe(String[] topicFilters) throws MqttException {
+        throw new UnsupportedOperationException("Multiple topic filters is not 
supported");
+    }
+
+    @Override
+    public void publish(String topic, byte[] payload, int qos, boolean 
retained) throws MqttException, MqttPersistenceException {
+        MqttMessage message = new MqttMessage(payload);
+        message.setQos(qos);
+        message.setRetained(retained);
+        switch (type) {
+            case Publisher:
+                publishedMessage = new MQTTQueueMessage(topic, message);
+                break;
+            case Subscriber:
+                try {
+                    mqttCallback.messageArrived(topic, message);
+                } catch (Exception e) {
+                    throw new MqttException(e);
+                }
+                break;
+        }
+    }
+
+    @Override
+    public void publish(String topic, MqttMessage message) throws 
MqttException, MqttPersistenceException {
+        switch (type) {
+            case Publisher:
+                publishedMessage = new MQTTQueueMessage(topic, message);
+                break;
+            case Subscriber:
+                try {
+                    mqttCallback.messageArrived(topic, message);
+                } catch (Exception e) {
+                    throw new MqttException(e);
+                }
+                break;
+        }
+    }
+
+    @Override
+    public void setCallback(MqttCallback callback) {
+        this.mqttCallback = callback;
+    }
+
+    @Override
+    public MqttTopic getTopic(String topic) {
+        return null;
+    }
+
+    @Override
+    public boolean isConnected() {
+        return connected.get();
+    }
+
+    @Override
+    public String getClientId() {
+        return clientId;
+    }
+
+    @Override
+    public String getServerURI() {
+        return serverURI;
+    }
+
+    @Override
+    public IMqttDeliveryToken[] getPendingDeliveryTokens() {
+        return new IMqttDeliveryToken[0];
+    }
+
+    @Override
+    public void close() throws MqttException {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestUtils.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestUtils.java
new file mode 100644
index 0000000..5373a9f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestUtils.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt.common;
+
+import org.apache.nifi.ssl.StandardSSLContextService;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MqttTestUtils {
+    public static Map<String, String> createSslProperties() {
+
+        final Map<String, String> map = new HashMap<>();
+        map.put(StandardSSLContextService.KEYSTORE.getName(), 
"src/test/resources/localhost-ks.jks");
+        map.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), 
"localtest");
+        map.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS");
+        map.put(StandardSSLContextService.TRUSTSTORE.getName(), 
"src/test/resources/localhost-ts.jks");
+        map.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), 
"localtest");
+        map.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
+        return map;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
new file mode 100644
index 0000000..c3f1b3d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt.common;
+
+import io.moquette.proto.messages.AbstractMessage;
+import io.moquette.proto.messages.PublishMessage;
+import io.moquette.server.Server;
+import org.apache.nifi.processors.mqtt.ConsumeMQTT;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.eclipse.paho.client.mqttv3.IMqttClient;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static org.junit.Assert.assertTrue;
+
+public abstract class TestConsumeMqttCommon {
+
+    public int PUBLISH_WAIT_MS = 1000;
+
+    public Server MQTT_server;
+    public TestRunner testRunner;
+    public String broker;
+
+    public abstract void internalPublish(PublishMessage publishMessage);
+
+    @Test
+    public void testLastWillConfig() throws Exception {
+        testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_MESSAGE, "lastWill 
message");
+        testRunner.assertNotValid();
+        testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_TOPIC, "lastWill 
topic");
+        testRunner.assertNotValid();
+        testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_QOS, "1");
+        testRunner.assertNotValid();
+        testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_RETAIN, "false");
+        testRunner.assertValid();
+    }
+
+
+    @Test
+    public void testQoS2() throws Exception {
+        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
+
+        testRunner.assertValid();
+
+        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        PublishMessage testMessage = new PublishMessage();
+        testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
+        testMessage.setTopicName("testTopic");
+        testMessage.setDupFlag(false);
+        testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
+        testMessage.setRetainFlag(false);
+
+        internalPublish(testMessage);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        testRunner.run(1, false, false);
+
+        testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
+
+        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        MockFlowFile flowFile = flowFiles.get(0);
+
+        flowFile.assertContentEquals("testMessage");
+        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
+        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
+        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
+        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
+    }
+
+    @Test
+    public void testQoS2NotCleanSession() throws Exception {
+        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
+        testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, 
ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
+
+        testRunner.assertValid();
+
+        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        consumeMQTT.onUnscheduled(testRunner.getProcessContext());
+
+        PublishMessage testMessage = new PublishMessage();
+        testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
+        testMessage.setTopicName("testTopic");
+        testMessage.setDupFlag(false);
+        testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
+        testMessage.setRetainFlag(false);
+
+        internalPublish(testMessage);
+
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        testRunner.run(1, false, false);
+
+        testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
+
+        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        MockFlowFile flowFile = flowFiles.get(0);
+
+        flowFile.assertContentEquals("testMessage");
+        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
+        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
+        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
+        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
+    }
+
+
+    @Test
+    public void testQoS1() throws Exception {
+        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1");
+
+        testRunner.assertValid();
+
+        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        PublishMessage testMessage = new PublishMessage();
+        testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
+        testMessage.setTopicName("testTopic");
+        testMessage.setDupFlag(false);
+        testMessage.setQos(AbstractMessage.QOSType.LEAST_ONE);
+        testMessage.setRetainFlag(false);
+
+        internalPublish(testMessage);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        testRunner.run(1, false, false);
+
+        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        assertTrue(flowFiles.size() > 0);
+        MockFlowFile flowFile = flowFiles.get(0);
+
+        flowFile.assertContentEquals("testMessage");
+        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
+        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
+        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1");
+        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
+    }
+
+    @Test
+    public void testQoS1NotCleanSession() throws Exception {
+        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1");
+        testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, 
ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
+
+        testRunner.assertValid();
+
+        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        consumeMQTT.onUnscheduled(testRunner.getProcessContext());
+
+        PublishMessage testMessage = new PublishMessage();
+        testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
+        testMessage.setTopicName("testTopic");
+        testMessage.setDupFlag(false);
+        testMessage.setQos(AbstractMessage.QOSType.LEAST_ONE);
+        testMessage.setRetainFlag(false);
+
+        internalPublish(testMessage);
+
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        testRunner.run(1, false, false);
+
+        testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
+
+        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        assertTrue(flowFiles.size() > 0);
+        MockFlowFile flowFile = flowFiles.get(0);
+
+        flowFile.assertContentEquals("testMessage");
+        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
+        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
+        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1");
+        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
+    }
+
+    @Test
+    public void testQoS0() throws Exception {
+        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "0");
+
+        testRunner.assertValid();
+
+        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        PublishMessage testMessage = new PublishMessage();
+        testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
+        testMessage.setTopicName("testTopic");
+        testMessage.setDupFlag(false);
+        testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
+        testMessage.setRetainFlag(false);
+
+        internalPublish(testMessage);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        testRunner.run(1, false, false);
+
+        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        assertTrue(flowFiles.size() < 2);
+
+        if(flowFiles.size() == 1) {
+            MockFlowFile flowFile = flowFiles.get(0);
+
+            flowFile.assertContentEquals("testMessage");
+            flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
+            flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
+            flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "0");
+            flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, 
"false");
+            flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
+        }
+    }
+
+    @Test
+    public void testOnStoppedFinish() throws Exception {
+        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
+
+        testRunner.assertValid();
+
+        MqttMessage innerMessage = new MqttMessage();
+        
innerMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()).array());
+        innerMessage.setQos(2);
+        MQTTQueueMessage testMessage = new MQTTQueueMessage("testTopic", 
innerMessage);
+
+        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        consumeMQTT.processSessionFactory = 
testRunner.getProcessSessionFactory();
+
+        Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue");
+        f.setAccessible(true);
+        LinkedBlockingQueue<MQTTQueueMessage> queue = 
(LinkedBlockingQueue<MQTTQueueMessage>) f.get(consumeMQTT);
+        queue.add(testMessage);
+
+        consumeMQTT.onUnscheduled(testRunner.getProcessContext());
+        consumeMQTT.onStopped(testRunner.getProcessContext());
+
+        testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
+
+        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        MockFlowFile flowFile = flowFiles.get(0);
+
+        flowFile.assertContentEquals("testMessage");
+        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
+        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
+        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
+        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
+    }
+
+    @Test
+    public void testResizeBuffer() throws Exception {
+        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
+        testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "2");
+
+        testRunner.assertValid();
+
+        PublishMessage testMessage = new PublishMessage();
+        testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
+        testMessage.setTopicName("testTopic");
+        testMessage.setDupFlag(false);
+        testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
+        testMessage.setRetainFlag(false);
+
+        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        assertTrue(isConnected(consumeMQTT));
+
+        internalPublish(testMessage);
+        internalPublish(testMessage);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+        consumeMQTT.onUnscheduled(testRunner.getProcessContext());
+
+        testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "1");
+        testRunner.assertNotValid();
+
+        testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "3");
+        testRunner.assertValid();
+
+        testRunner.run(1);
+
+        testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 2);
+
+        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        MockFlowFile flowFile = flowFiles.get(0);
+
+        flowFile.assertContentEquals("testMessage");
+        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
+        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
+        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
+        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
+    }
+
+    private static boolean isConnected(AbstractMQTTProcessor processor) throws 
NoSuchFieldException, IllegalAccessException {
+        Field f = AbstractMQTTProcessor.class.getDeclaredField("mqttClient");
+        f.setAccessible(true);
+        IMqttClient mqttClient = (IMqttClient) f.get(processor);
+        return mqttClient.isConnected();
+    }
+
+
+    public static void reconnect(ConsumeMQTT processor) throws 
NoSuchFieldException, IllegalAccessException, NoSuchMethodException, 
InvocationTargetException {
+        Method method = ConsumeMQTT.class.getDeclaredMethod("reconnect");
+        method.setAccessible(true);
+        method.invoke(processor);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java
new file mode 100644
index 0000000..75df6f3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt.common;
+
+import io.moquette.server.Server;
+import org.apache.nifi.processors.mqtt.PublishMQTT;
+import org.apache.nifi.util.TestRunner;
+import org.junit.Test;
+
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
+import static 
org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+
+public abstract class TestPublishMqttCommon {
+
+    public Server MQTT_server;
+    public TestRunner testRunner;
+    public String topic;
+
+    public abstract void verifyPublishedMessage(byte[] payload, int qos, 
boolean retain);
+
+    @Test
+    public void testQoS0() {
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "0");
+
+        testRunner.assertValid();
+
+        String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+
+        verifyPublishedMessage(testMessage.getBytes(), 0, false);
+    }
+
+    @Test
+    public void testQoS1() {
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "1");
+
+        testRunner.assertValid();
+
+        String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 1, false);
+    }
+
+    @Test
+    public void testQoS2NotCleanSession() {
+        // Publisher executes synchronously so the only time whether its Clean 
or Not matters is when the processor stops in the middle of the publishing
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.setProperty(PublishMQTT.PROP_CLEAN_SESSION, 
ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
+
+        testRunner.assertValid();
+
+        String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, false);
+    }
+
+    @Test
+    public void testQoS2() {
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+
+        testRunner.assertValid();
+
+        String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, false);
+    }
+
+    @Test
+    public void testRetainQoS2() {
+        testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true");
+
+        testRunner.assertValid();
+
+        String testMessage = "testMessage";
+        testRunner.enqueue(testMessage.getBytes());
+
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        verifyPublishedMessage(testMessage.getBytes(), 2, true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java
new file mode 100644
index 0000000..759bf96
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt.integration;
+
+import io.moquette.BrokerConstants;
+import io.moquette.proto.messages.AbstractMessage;
+import io.moquette.proto.messages.PublishMessage;
+import io.moquette.server.Server;
+import io.moquette.server.config.IConfig;
+import io.moquette.server.config.MemoryConfig;
+import org.apache.nifi.processors.mqtt.ConsumeMQTT;
+import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Properties;
+
+import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
+
+
+public class TestConsumeMQTT extends TestConsumeMqttCommon {
+
+
+    private void startServer() throws IOException {
+        MQTT_server = new Server();
+        final Properties configProps = new Properties();
+        configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884");
+        
configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb");
+        IConfig server_config = new MemoryConfig(configProps);
+        MQTT_server.startServer(server_config);
+    }
+
+    @Before
+    public void init() throws IOException {
+        startServer();
+
+        broker = "tcp://localhost:1883";
+        testRunner = TestRunners.newTestRunner(ConsumeMQTT.class);
+        testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, broker);
+        testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
+        testRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
+        testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (MQTT_server != null) {
+            MQTT_server.stopServer();
+        }
+        final File folder =  new File("./target");
+        final File[] files = folder.listFiles( new FilenameFilter() {
+            @Override
+            public boolean accept( final File dir,
+                                   final String name ) {
+                return name.matches( "moquette_store.mapdb.*" );
+            }
+        } );
+        for ( final File file : files ) {
+            if ( !file.delete() ) {
+                System.err.println( "Can't remove " + file.getAbsolutePath() );
+            }
+        }
+    }
+
+    @Test
+    public void testRetainedQoS2() throws Exception {
+        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
+
+        testRunner.assertValid();
+
+        PublishMessage testMessage = new PublishMessage();
+        testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
+        testMessage.setTopicName("testTopic");
+        testMessage.setDupFlag(false);
+        testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
+        testMessage.setRetainFlag(true);
+
+        internalPublish(testMessage);
+
+        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        testRunner.run(1, false, false);
+
+        testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
+
+        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        MockFlowFile flowFile = flowFiles.get(0);
+
+        flowFile.assertContentEquals("testMessage");
+        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
+        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
+        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
+        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "true");
+    }
+
+    @Override
+    public void internalPublish(PublishMessage publishMessage) {
+        MQTT_server.internalPublish(publishMessage);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java
new file mode 100644
index 0000000..693c2a9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt.integration;
+
+import io.moquette.BrokerConstants;
+import io.moquette.proto.messages.AbstractMessage;
+import io.moquette.proto.messages.PublishMessage;
+import io.moquette.server.Server;
+import io.moquette.server.config.IConfig;
+import io.moquette.server.config.MemoryConfig;
+import org.apache.nifi.processors.mqtt.ConsumeMQTT;
+import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.common.MqttTestUtils.createSslProperties;
+
+
+public class TestConsumeMqttSSL extends TestConsumeMqttCommon {
+
+
+    private void startServer() throws IOException {
+        MQTT_server = new Server();
+        final Properties configProps = new Properties();
+
+        configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884");
+        configProps.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, "8883");
+        configProps.put(BrokerConstants.JKS_PATH_PROPERTY_NAME, 
"src/test/resources/localhost-ks.jks");
+        configProps.put(BrokerConstants.KEY_STORE_PASSWORD_PROPERTY_NAME, 
"localtest");
+        configProps.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, 
"localtest");
+        
configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb");
+        IConfig server_config = new MemoryConfig(configProps);
+        MQTT_server.startServer(server_config);
+    }
+
+    @Before
+    public void init() throws IOException, InitializationException {
+        startServer();
+
+        broker = "ssl://localhost:8883";
+        testRunner = TestRunners.newTestRunner(ConsumeMQTT.class);
+        testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, broker);
+        testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
+        testRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
+        testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
+
+        final StandardSSLContextService sslService = new 
StandardSSLContextService();
+        Map<String, String> sslProperties = createSslProperties();
+        testRunner.addControllerService("ssl-context", sslService, 
sslProperties);
+        testRunner.enableControllerService(sslService);
+        testRunner.setProperty(ConsumeMQTT.PROP_SSL_CONTEXT_SERVICE, 
"ssl-context");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (MQTT_server != null) {
+            MQTT_server.stopServer();
+        }
+        final File folder =  new File("./target");
+        final File[] files = folder.listFiles( new FilenameFilter() {
+            @Override
+            public boolean accept( final File dir,
+                                   final String name ) {
+                return name.matches( "moquette_store.mapdb.*" );
+            }
+        } );
+        for ( final File file : files ) {
+            if ( !file.delete() ) {
+                System.err.println( "Can't remove " + file.getAbsolutePath() );
+            }
+        }
+    }
+
+    @Test
+    public void testRetainedQoS2() throws Exception {
+        testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
+
+        testRunner.assertValid();
+
+        PublishMessage testMessage = new PublishMessage();
+        testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
+        testMessage.setTopicName("testTopic");
+        testMessage.setDupFlag(false);
+        testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
+        testMessage.setRetainFlag(true);
+
+        internalPublish(testMessage);
+
+        ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+        consumeMQTT.onScheduled(testRunner.getProcessContext());
+        reconnect(consumeMQTT);
+
+        Thread.sleep(PUBLISH_WAIT_MS);
+
+        testRunner.run(1, false, false);
+
+        testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
+
+        List<MockFlowFile> flowFiles = 
testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        MockFlowFile flowFile = flowFiles.get(0);
+
+        flowFile.assertContentEquals("testMessage");
+        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
+        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
+        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
+        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "true");
+    }
+
+    @Override
+    public void internalPublish(PublishMessage publishMessage) {
+        MQTT_server.internalPublish(publishMessage);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java
new file mode 100644
index 0000000..a97ac98
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt.integration;
+
+import io.moquette.BrokerConstants;
+import io.moquette.server.Server;
+import io.moquette.server.config.IConfig;
+import io.moquette.server.config.MemoryConfig;
+import org.apache.nifi.processors.mqtt.ConsumeMQTT;
+import org.apache.nifi.processors.mqtt.PublishMQTT;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import java.util.Properties;
+
+import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
+import static 
org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
+import static 
org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon.reconnect;
+
+public class TestPublishAndSubscribeMqttIntegration {
+    private TestRunner testSubscribeRunner;
+    private TestRunner testPublishRunner;
+    private Server MQTT_server;
+
+    private static int PUBLISH_WAIT_MS = 1000;
+
+    private void startServer() throws IOException {
+        MQTT_server = new Server();
+        final Properties configProps = new Properties();
+        configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884");
+        
configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb");
+        IConfig server_config = new MemoryConfig(configProps);
+        MQTT_server.startServer(server_config);
+    }
+
+    @Before
+    public void init() throws IOException {
+        startServer();
+        testSubscribeRunner = TestRunners.newTestRunner(ConsumeMQTT.class);
+        testPublishRunner = TestRunners.newTestRunner(PublishMQTT.class);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (MQTT_server != null) {
+            MQTT_server.stopServer();
+        }
+        final File folder =  new File("./target");
+        final File[] files = folder.listFiles( new FilenameFilter() {
+            @Override
+            public boolean accept( final File dir,
+                                   final String name ) {
+                return name.matches( "moquette_store.mapdb.*" );
+            }
+        } );
+        for ( final File file : files ) {
+            if ( !file.delete() ) {
+                System.err.println( "Can't remove " + file.getAbsolutePath() );
+            }
+        }
+    }
+
+    @Test
+    public void testBasic() throws Exception {
+        subscribe();
+        publishAndVerify();
+        Thread.sleep(PUBLISH_WAIT_MS);
+        testSubscribeRunner.run();
+        subscribeVerify();
+    }
+
+    private void publishAndVerify(){
+        testPublishRunner.setProperty(PublishMQTT.PROP_BROKER_URI, 
"tcp://localhost:1883");
+        testPublishRunner.setProperty(PublishMQTT.PROP_CLIENTID, 
"TestPublishClient");
+        testPublishRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+        testPublishRunner.setProperty(PublishMQTT.PROP_RETAIN, "false");
+        testPublishRunner.setProperty(PublishMQTT.PROP_TOPIC, "testTopic");
+
+        testPublishRunner.assertValid();
+
+        String testMessage = "testMessage";
+        testPublishRunner.enqueue(testMessage.getBytes());
+
+        testPublishRunner.run();
+
+        testPublishRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        testPublishRunner.assertTransferCount(REL_SUCCESS, 1);
+    }
+
+    private void subscribe() throws IOException, ClassNotFoundException, 
MqttException, InvocationTargetException, NoSuchMethodException, 
IllegalAccessException, NoSuchFieldException {
+        testSubscribeRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, 
"tcp://localhost:1883");
+        testSubscribeRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, 
"TestSubscribeClient");
+        testSubscribeRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, 
"testTopic");
+        testSubscribeRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
+        testSubscribeRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, 
"100");
+
+        testSubscribeRunner.assertValid();
+
+        ConsumeMQTT consumeMQTT = (ConsumeMQTT) 
testSubscribeRunner.getProcessor();
+        consumeMQTT.onScheduled(testSubscribeRunner.getProcessContext());
+        reconnect(consumeMQTT);
+    }
+
+    private void subscribeVerify(){
+        testSubscribeRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
+
+        List<MockFlowFile> flowFiles = 
testSubscribeRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+        MockFlowFile flowFile = flowFiles.get(0);
+
+        flowFile.assertContentEquals("testMessage");
+        flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, 
"tcp://localhost:1883");
+        flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
+        flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
+        flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+        flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMQTT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMQTT.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMQTT.java
new file mode 100644
index 0000000..5777825
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMQTT.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt.integration;
+
+import io.moquette.BrokerConstants;
+import io.moquette.server.Server;
+import io.moquette.server.config.IConfig;
+import io.moquette.server.config.MemoryConfig;
+import org.apache.nifi.processors.mqtt.PublishMQTT;
+import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.Properties;
+
+import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME;
+
+
+public class TestPublishMQTT extends TestPublishMqttCommon {
+
+
+    private void startServer() throws IOException {
+        MQTT_server = new Server();
+        final Properties configProps = new Properties();
+        configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884");
+        
configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb");
+        IConfig server_config = new MemoryConfig(configProps);
+        MQTT_server.startServer(server_config);
+    }
+
+    @Before
+    public void init() throws IOException {
+        startServer();
+        testRunner = TestRunners.newTestRunner(PublishMQTT.class);
+        testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, 
"tcp://localhost:1883");
+        testRunner.setProperty(PublishMQTT.PROP_CLIENTID, "TestClient");
+        testRunner.setProperty(PublishMQTT.PROP_RETAIN, "false");
+        testRunner.setProperty(PublishMQTT.PROP_TOPIC, "testTopic");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (MQTT_server != null) {
+            MQTT_server.stopServer();
+        }
+        final File folder =  new File("./target");
+        final File[] files = folder.listFiles( new FilenameFilter() {
+            @Override
+            public boolean accept( final File dir,
+                                   final String name ) {
+                return name.matches( "moquette_store.mapdb.*" );
+            }
+        } );
+        for ( final File file : files ) {
+            if ( !file.delete() ) {
+                System.err.println( "Can't remove " + file.getAbsolutePath() );
+            }
+        }
+    }
+
+    @Override
+    public void verifyPublishedMessage(byte[] payload, int qos, boolean 
retain) {
+        //Cannot verify published message without subscribing and consuming it 
which is outside the scope of this test.
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMqttSSL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMqttSSL.java
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMqttSSL.java
new file mode 100644
index 0000000..6270d7a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMqttSSL.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.mqtt.integration;
+
+import io.moquette.BrokerConstants;
+import io.moquette.server.Server;
+import io.moquette.server.config.IConfig;
+import io.moquette.server.config.MemoryConfig;
+import org.apache.nifi.processors.mqtt.PublishMQTT;
+import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.StandardSSLContextService;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME;
+import static 
org.apache.nifi.processors.mqtt.common.MqttTestUtils.createSslProperties;
+
+
+public class TestPublishMqttSSL extends TestPublishMqttCommon {
+
+    private void startServer() throws IOException {
+        MQTT_server = new Server();
+        final Properties configProps = new Properties();
+
+        configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884");
+        configProps.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, "8883");
+        configProps.put(BrokerConstants.JKS_PATH_PROPERTY_NAME, 
"src/test/resources/localhost-ks.jks");
+        configProps.put(BrokerConstants.KEY_STORE_PASSWORD_PROPERTY_NAME, 
"localtest");
+        configProps.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, 
"localtest");
+        
configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb");
+        IConfig server_config = new MemoryConfig(configProps);
+        MQTT_server.startServer(server_config);
+    }
+
+    @Before
+    public void init() throws IOException, InitializationException {
+        startServer();
+        testRunner = TestRunners.newTestRunner(PublishMQTT.class);
+        testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, 
"ssl://localhost:8883");
+        testRunner.setProperty(PublishMQTT.PROP_CLIENTID, "TestClient");
+        testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true");
+        testRunner.setProperty(PublishMQTT.PROP_TOPIC, "testTopic");
+
+        final StandardSSLContextService sslService = new 
StandardSSLContextService();
+        Map<String, String> sslProperties = createSslProperties();
+        testRunner.addControllerService("ssl-context", sslService, 
sslProperties);
+        testRunner.enableControllerService(sslService);
+        testRunner.setProperty(PublishMQTT.PROP_SSL_CONTEXT_SERVICE, 
"ssl-context");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (MQTT_server != null) {
+            MQTT_server.stopServer();
+        }
+        final File folder = new File("./target");
+        final File[] files = folder.listFiles(new FilenameFilter() {
+            @Override
+            public boolean accept(final File dir,
+                                  final String name) {
+                return name.matches("moquette_store.mapdb.*");
+            }
+        });
+        for (final File file : files) {
+            if (!file.delete()) {
+                System.err.println("Can't remove " + file.getAbsolutePath());
+            }
+        }
+    }
+
+    @Override
+    public void verifyPublishedMessage(byte[] payload, int qos, boolean 
retain) {
+        //Cannot verify published message without subscribing and consuming it 
which is outside the scope of this test.
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ks.jks
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ks.jks
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ks.jks
new file mode 100755
index 0000000..df36197
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ks.jks
 differ

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ts.jks
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ts.jks
 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ts.jks
new file mode 100755
index 0000000..7824378
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ts.jks
 differ

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml 
b/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml
index 456fae3..716052a 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml
@@ -17,7 +17,7 @@
     <parent>
         <groupId>org.apache.nifi</groupId>
         <artifactId>nifi-nar-bundles</artifactId>
-        <version>1.0.0-SNAPSHOT</version>
+        <version>0.7.0-SNAPSHOT</version>
     </parent>
     <artifactId>nifi-mqtt-bundle</artifactId>
     <packaging>pom</packaging>
@@ -30,8 +30,8 @@
             <dependency>
                 <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-mqtt-processors</artifactId>
-                <version>1.0.0-SNAPSHOT</version>
+                <version>0.7.0-SNAPSHOT</version>
             </dependency>
         </dependencies>
-    </dependencyManagement> 
+    </dependencyManagement>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 7f3ca2c..558dc7c 100644
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -60,7 +60,6 @@
         <module>nifi-cassandra-bundle</module>
         <module>nifi-spring-bundle</module>
         <module>nifi-hive-bundle</module>
-        <module>nifi-site-to-site-reporting-bundle</module>
            <module>nifi-site-to-site-reporting-bundle</module>
         <module>nifi-mqtt-bundle</module>
     </modules>

http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e24ccf6..3a0a73a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1062,7 +1062,7 @@ language governing permissions and limitations under the 
License. -->
             <dependency>
                 <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-mqtt-nar</artifactId>
-                <version>1.0.0-SNAPSHOT</version>
+                <version>0.7.0-SNAPSHOT</version>
                 <type>nar</type>
             </dependency>
             <dependency>

Reply via email to