http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java new file mode 100644 index 0000000..cdbc67f --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt; + +import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage; +import org.apache.nifi.processors.mqtt.common.MqttTestClient; +import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon; +import org.apache.nifi.util.TestRunners; +import org.eclipse.paho.client.mqttv3.IMqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.junit.After; +import org.junit.Before; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; + + +public class TestPublishMQTT extends TestPublishMqttCommon { + + @Override + public void verifyPublishedMessage(byte[] payload, int qos, boolean retain) { + MQTTQueueMessage mqttQueueMessage = mqttTestClient.publishedMessage; + assertEquals(Arrays.toString(payload), Arrays.toString(mqttQueueMessage.getPayload())); + assertEquals(qos, mqttQueueMessage.getQos()); + assertEquals(retain, mqttQueueMessage.isRetained()); + assertEquals(topic, mqttQueueMessage.getTopic()); + } + + + public MqttTestClient mqttTestClient; + + public class UnitTestablePublishMqtt extends PublishMQTT { + + public UnitTestablePublishMqtt(){ + super(); + } + + @Override + public IMqttClient getMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException { + mqttTestClient = new MqttTestClient(broker, clientID, MqttTestClient.ConnectType.Publisher); + return mqttTestClient; + } + } + + @Before + public void init() throws IOException { + UnitTestablePublishMqtt proc = new UnitTestablePublishMqtt(); + testRunner = TestRunners.newTestRunner(proc); + testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, "tcp://localhost:1883"); + testRunner.setProperty(PublishMQTT.PROP_CLIENTID, "TestClient"); + testRunner.setProperty(PublishMQTT.PROP_RETAIN, "false"); + topic = "testTopic"; + testRunner.setProperty(PublishMQTT.PROP_TOPIC, topic); + } + + @After + public void tearDown() throws Exception { + final File folder = new File("./target"); + final File[] files = folder.listFiles( new FilenameFilter() { + @Override + public boolean accept( final File dir, + final String name ) { + return name.matches( "moquette_store.mapdb.*" ); + } + } ); + for ( final File file : files ) { + if ( !file.delete() ) { + System.err.println( "Can't remove " + file.getAbsolutePath() ); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java deleted file mode 100644 index 64a0548..0000000 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.processors.mqtt; - -import org.apache.nifi.processors.mqtt.PutMQTT; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Before; -import org.junit.Test; - - -public class TestPutMQTT { - - private TestRunner testRunner; - - @Before - public void init() { - testRunner = TestRunners.newTestRunner(PutMQTT.class); - } - - @Test - public void testProcessor() { - - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java new file mode 100644 index 0000000..81e2b18 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt.common; + +import org.eclipse.paho.client.mqttv3.IMqttClient; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.IMqttToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.MqttPersistenceException; +import org.eclipse.paho.client.mqttv3.MqttSecurityException; +import org.eclipse.paho.client.mqttv3.MqttTopic; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class MqttTestClient implements IMqttClient { + + public String serverURI; + public String clientId; + + public AtomicBoolean connected = new AtomicBoolean(false); + + public MqttCallback mqttCallback; + public ConnectType type; + public enum ConnectType {Publisher, Subscriber} + + public MQTTQueueMessage publishedMessage; + + public String subscribedTopic; + public int subscribedQos; + + + public MqttTestClient(String serverURI, String clientId, ConnectType type) throws MqttException { + this.serverURI = serverURI; + this.clientId = clientId; + this.type = type; + } + + @Override + public void connect() throws MqttSecurityException, MqttException { + connected.set(true); + } + + @Override + public void connect(MqttConnectOptions options) throws MqttSecurityException, MqttException { + connected.set(true); + } + + @Override + public IMqttToken connectWithResult(MqttConnectOptions options) throws MqttSecurityException, MqttException { + return null; + } + + @Override + public void disconnect() throws MqttException { + connected.set(false); + } + + @Override + public void disconnect(long quiesceTimeout) throws MqttException { + connected.set(false); + } + + @Override + public void disconnectForcibly() throws MqttException { + connected.set(false); + } + + @Override + public void disconnectForcibly(long disconnectTimeout) throws MqttException { + connected.set(false); + } + + @Override + public void disconnectForcibly(long quiesceTimeout, long disconnectTimeout) throws MqttException { + connected.set(false); + } + + @Override + public void subscribe(String topicFilter) throws MqttException, MqttSecurityException { + subscribedTopic = topicFilter; + subscribedQos = -1; + } + + @Override + public void subscribe(String[] topicFilters) throws MqttException { + throw new UnsupportedOperationException("Multiple topic filters is not supported"); + } + + @Override + public void subscribe(String topicFilter, int qos) throws MqttException { + subscribedTopic = topicFilter; + subscribedQos = qos; + } + + @Override + public void subscribe(String[] topicFilters, int[] qos) throws MqttException { + throw new UnsupportedOperationException("Multiple topic filters is not supported"); + } + + @Override + public void unsubscribe(String topicFilter) throws MqttException { + subscribedTopic = ""; + subscribedQos = -2; + } + + @Override + public void unsubscribe(String[] topicFilters) throws MqttException { + throw new UnsupportedOperationException("Multiple topic filters is not supported"); + } + + @Override + public void publish(String topic, byte[] payload, int qos, boolean retained) throws MqttException, MqttPersistenceException { + MqttMessage message = new MqttMessage(payload); + message.setQos(qos); + message.setRetained(retained); + switch (type) { + case Publisher: + publishedMessage = new MQTTQueueMessage(topic, message); + break; + case Subscriber: + try { + mqttCallback.messageArrived(topic, message); + } catch (Exception e) { + throw new MqttException(e); + } + break; + } + } + + @Override + public void publish(String topic, MqttMessage message) throws MqttException, MqttPersistenceException { + switch (type) { + case Publisher: + publishedMessage = new MQTTQueueMessage(topic, message); + break; + case Subscriber: + try { + mqttCallback.messageArrived(topic, message); + } catch (Exception e) { + throw new MqttException(e); + } + break; + } + } + + @Override + public void setCallback(MqttCallback callback) { + this.mqttCallback = callback; + } + + @Override + public MqttTopic getTopic(String topic) { + return null; + } + + @Override + public boolean isConnected() { + return connected.get(); + } + + @Override + public String getClientId() { + return clientId; + } + + @Override + public String getServerURI() { + return serverURI; + } + + @Override + public IMqttDeliveryToken[] getPendingDeliveryTokens() { + return new IMqttDeliveryToken[0]; + } + + @Override + public void close() throws MqttException { + + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestUtils.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestUtils.java new file mode 100644 index 0000000..5373a9f --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestUtils.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt.common; + +import org.apache.nifi.ssl.StandardSSLContextService; + +import java.util.HashMap; +import java.util.Map; + +public class MqttTestUtils { + public static Map<String, String> createSslProperties() { + + final Map<String, String> map = new HashMap<>(); + map.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); + map.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); + map.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); + map.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); + map.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); + map.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); + return map; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java new file mode 100644 index 0000000..c3f1b3d --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt.common; + +import io.moquette.proto.messages.AbstractMessage; +import io.moquette.proto.messages.PublishMessage; +import io.moquette.server.Server; +import org.apache.nifi.processors.mqtt.ConsumeMQTT; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.eclipse.paho.client.mqttv3.IMqttClient; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE; +import static org.junit.Assert.assertTrue; + +public abstract class TestConsumeMqttCommon { + + public int PUBLISH_WAIT_MS = 1000; + + public Server MQTT_server; + public TestRunner testRunner; + public String broker; + + public abstract void internalPublish(PublishMessage publishMessage); + + @Test + public void testLastWillConfig() throws Exception { + testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_MESSAGE, "lastWill message"); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_TOPIC, "lastWill topic"); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_QOS, "1"); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_RETAIN, "false"); + testRunner.assertValid(); + } + + + @Test + public void testQoS2() throws Exception { + testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2"); + + testRunner.assertValid(); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + PublishMessage testMessage = new PublishMessage(); + testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes())); + testMessage.setTopicName("testTopic"); + testMessage.setDupFlag(false); + testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE); + testMessage.setRetainFlag(false); + + internalPublish(testMessage); + + Thread.sleep(PUBLISH_WAIT_MS); + + testRunner.run(1, false, false); + + testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1); + + List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + MockFlowFile flowFile = flowFiles.get(0); + + flowFile.assertContentEquals("testMessage"); + flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker); + flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic"); + flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2"); + flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false"); + flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false"); + } + + @Test + public void testQoS2NotCleanSession() throws Exception { + testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2"); + testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE); + + testRunner.assertValid(); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + consumeMQTT.onUnscheduled(testRunner.getProcessContext()); + + PublishMessage testMessage = new PublishMessage(); + testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes())); + testMessage.setTopicName("testTopic"); + testMessage.setDupFlag(false); + testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE); + testMessage.setRetainFlag(false); + + internalPublish(testMessage); + + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + testRunner.run(1, false, false); + + testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1); + + List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + MockFlowFile flowFile = flowFiles.get(0); + + flowFile.assertContentEquals("testMessage"); + flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker); + flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic"); + flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2"); + flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false"); + flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false"); + } + + + @Test + public void testQoS1() throws Exception { + testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1"); + + testRunner.assertValid(); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + PublishMessage testMessage = new PublishMessage(); + testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes())); + testMessage.setTopicName("testTopic"); + testMessage.setDupFlag(false); + testMessage.setQos(AbstractMessage.QOSType.LEAST_ONE); + testMessage.setRetainFlag(false); + + internalPublish(testMessage); + + Thread.sleep(PUBLISH_WAIT_MS); + + testRunner.run(1, false, false); + + List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + assertTrue(flowFiles.size() > 0); + MockFlowFile flowFile = flowFiles.get(0); + + flowFile.assertContentEquals("testMessage"); + flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker); + flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic"); + flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1"); + flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false"); + flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false"); + } + + @Test + public void testQoS1NotCleanSession() throws Exception { + testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1"); + testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE); + + testRunner.assertValid(); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + consumeMQTT.onUnscheduled(testRunner.getProcessContext()); + + PublishMessage testMessage = new PublishMessage(); + testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes())); + testMessage.setTopicName("testTopic"); + testMessage.setDupFlag(false); + testMessage.setQos(AbstractMessage.QOSType.LEAST_ONE); + testMessage.setRetainFlag(false); + + internalPublish(testMessage); + + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + testRunner.run(1, false, false); + + testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1); + + List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + assertTrue(flowFiles.size() > 0); + MockFlowFile flowFile = flowFiles.get(0); + + flowFile.assertContentEquals("testMessage"); + flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker); + flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic"); + flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1"); + flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false"); + flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false"); + } + + @Test + public void testQoS0() throws Exception { + testRunner.setProperty(ConsumeMQTT.PROP_QOS, "0"); + + testRunner.assertValid(); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + PublishMessage testMessage = new PublishMessage(); + testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes())); + testMessage.setTopicName("testTopic"); + testMessage.setDupFlag(false); + testMessage.setQos(AbstractMessage.QOSType.MOST_ONE); + testMessage.setRetainFlag(false); + + internalPublish(testMessage); + + Thread.sleep(PUBLISH_WAIT_MS); + + testRunner.run(1, false, false); + + List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + assertTrue(flowFiles.size() < 2); + + if(flowFiles.size() == 1) { + MockFlowFile flowFile = flowFiles.get(0); + + flowFile.assertContentEquals("testMessage"); + flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker); + flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic"); + flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "0"); + flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false"); + flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false"); + } + } + + @Test + public void testOnStoppedFinish() throws Exception { + testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2"); + + testRunner.assertValid(); + + MqttMessage innerMessage = new MqttMessage(); + innerMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()).array()); + innerMessage.setQos(2); + MQTTQueueMessage testMessage = new MQTTQueueMessage("testTopic", innerMessage); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + consumeMQTT.processSessionFactory = testRunner.getProcessSessionFactory(); + + Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue"); + f.setAccessible(true); + LinkedBlockingQueue<MQTTQueueMessage> queue = (LinkedBlockingQueue<MQTTQueueMessage>) f.get(consumeMQTT); + queue.add(testMessage); + + consumeMQTT.onUnscheduled(testRunner.getProcessContext()); + consumeMQTT.onStopped(testRunner.getProcessContext()); + + testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1); + + List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + MockFlowFile flowFile = flowFiles.get(0); + + flowFile.assertContentEquals("testMessage"); + flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker); + flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic"); + flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2"); + flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false"); + flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false"); + } + + @Test + public void testResizeBuffer() throws Exception { + testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2"); + testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "2"); + + testRunner.assertValid(); + + PublishMessage testMessage = new PublishMessage(); + testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes())); + testMessage.setTopicName("testTopic"); + testMessage.setDupFlag(false); + testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE); + testMessage.setRetainFlag(false); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + internalPublish(testMessage); + internalPublish(testMessage); + + Thread.sleep(PUBLISH_WAIT_MS); + consumeMQTT.onUnscheduled(testRunner.getProcessContext()); + + testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "1"); + testRunner.assertNotValid(); + + testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "3"); + testRunner.assertValid(); + + testRunner.run(1); + + testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 2); + + List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + MockFlowFile flowFile = flowFiles.get(0); + + flowFile.assertContentEquals("testMessage"); + flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker); + flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic"); + flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2"); + flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false"); + flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false"); + } + + private static boolean isConnected(AbstractMQTTProcessor processor) throws NoSuchFieldException, IllegalAccessException { + Field f = AbstractMQTTProcessor.class.getDeclaredField("mqttClient"); + f.setAccessible(true); + IMqttClient mqttClient = (IMqttClient) f.get(processor); + return mqttClient.isConnected(); + } + + + public static void reconnect(ConsumeMQTT processor) throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException { + Method method = ConsumeMQTT.class.getDeclaredMethod("reconnect"); + method.setAccessible(true); + method.invoke(processor); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java new file mode 100644 index 0000000..75df6f3 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt.common; + +import io.moquette.server.Server; +import org.apache.nifi.processors.mqtt.PublishMQTT; +import org.apache.nifi.util.TestRunner; +import org.junit.Test; + +import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS; +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE; + +public abstract class TestPublishMqttCommon { + + public Server MQTT_server; + public TestRunner testRunner; + public String topic; + + public abstract void verifyPublishedMessage(byte[] payload, int qos, boolean retain); + + @Test + public void testQoS0() { + testRunner.setProperty(PublishMQTT.PROP_QOS, "0"); + + testRunner.assertValid(); + + String testMessage = "testMessage"; + testRunner.enqueue(testMessage.getBytes()); + + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + + verifyPublishedMessage(testMessage.getBytes(), 0, false); + } + + @Test + public void testQoS1() { + testRunner.setProperty(PublishMQTT.PROP_QOS, "1"); + + testRunner.assertValid(); + + String testMessage = "testMessage"; + testRunner.enqueue(testMessage.getBytes()); + + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + verifyPublishedMessage(testMessage.getBytes(), 1, false); + } + + @Test + public void testQoS2NotCleanSession() { + // Publisher executes synchronously so the only time whether its Clean or Not matters is when the processor stops in the middle of the publishing + testRunner.setProperty(PublishMQTT.PROP_QOS, "2"); + testRunner.setProperty(PublishMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE); + + testRunner.assertValid(); + + String testMessage = "testMessage"; + testRunner.enqueue(testMessage.getBytes()); + + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + verifyPublishedMessage(testMessage.getBytes(), 2, false); + } + + @Test + public void testQoS2() { + testRunner.setProperty(PublishMQTT.PROP_QOS, "2"); + + testRunner.assertValid(); + + String testMessage = "testMessage"; + testRunner.enqueue(testMessage.getBytes()); + + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + verifyPublishedMessage(testMessage.getBytes(), 2, false); + } + + @Test + public void testRetainQoS2() { + testRunner.setProperty(PublishMQTT.PROP_QOS, "2"); + testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true"); + + testRunner.assertValid(); + + String testMessage = "testMessage"; + testRunner.enqueue(testMessage.getBytes()); + + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + verifyPublishedMessage(testMessage.getBytes(), 2, true); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java new file mode 100644 index 0000000..759bf96 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt.integration; + +import io.moquette.BrokerConstants; +import io.moquette.proto.messages.AbstractMessage; +import io.moquette.proto.messages.PublishMessage; +import io.moquette.server.Server; +import io.moquette.server.config.IConfig; +import io.moquette.server.config.MemoryConfig; +import org.apache.nifi.processors.mqtt.ConsumeMQTT; +import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Properties; + +import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY; + + +public class TestConsumeMQTT extends TestConsumeMqttCommon { + + + private void startServer() throws IOException { + MQTT_server = new Server(); + final Properties configProps = new Properties(); + configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884"); + configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb"); + IConfig server_config = new MemoryConfig(configProps); + MQTT_server.startServer(server_config); + } + + @Before + public void init() throws IOException { + startServer(); + + broker = "tcp://localhost:1883"; + testRunner = TestRunners.newTestRunner(ConsumeMQTT.class); + testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, broker); + testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient"); + testRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic"); + testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100"); + } + + @After + public void tearDown() throws Exception { + if (MQTT_server != null) { + MQTT_server.stopServer(); + } + final File folder = new File("./target"); + final File[] files = folder.listFiles( new FilenameFilter() { + @Override + public boolean accept( final File dir, + final String name ) { + return name.matches( "moquette_store.mapdb.*" ); + } + } ); + for ( final File file : files ) { + if ( !file.delete() ) { + System.err.println( "Can't remove " + file.getAbsolutePath() ); + } + } + } + + @Test + public void testRetainedQoS2() throws Exception { + testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2"); + + testRunner.assertValid(); + + PublishMessage testMessage = new PublishMessage(); + testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes())); + testMessage.setTopicName("testTopic"); + testMessage.setDupFlag(false); + testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE); + testMessage.setRetainFlag(true); + + internalPublish(testMessage); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + testRunner.run(1, false, false); + + testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1); + + List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + MockFlowFile flowFile = flowFiles.get(0); + + flowFile.assertContentEquals("testMessage"); + flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker); + flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic"); + flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2"); + flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false"); + flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "true"); + } + + @Override + public void internalPublish(PublishMessage publishMessage) { + MQTT_server.internalPublish(publishMessage); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java new file mode 100644 index 0000000..693c2a9 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt.integration; + +import io.moquette.BrokerConstants; +import io.moquette.proto.messages.AbstractMessage; +import io.moquette.proto.messages.PublishMessage; +import io.moquette.server.Server; +import io.moquette.server.config.IConfig; +import io.moquette.server.config.MemoryConfig; +import org.apache.nifi.processors.mqtt.ConsumeMQTT; +import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.StandardSSLContextService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.common.MqttTestUtils.createSslProperties; + + +public class TestConsumeMqttSSL extends TestConsumeMqttCommon { + + + private void startServer() throws IOException { + MQTT_server = new Server(); + final Properties configProps = new Properties(); + + configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884"); + configProps.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, "8883"); + configProps.put(BrokerConstants.JKS_PATH_PROPERTY_NAME, "src/test/resources/localhost-ks.jks"); + configProps.put(BrokerConstants.KEY_STORE_PASSWORD_PROPERTY_NAME, "localtest"); + configProps.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "localtest"); + configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb"); + IConfig server_config = new MemoryConfig(configProps); + MQTT_server.startServer(server_config); + } + + @Before + public void init() throws IOException, InitializationException { + startServer(); + + broker = "ssl://localhost:8883"; + testRunner = TestRunners.newTestRunner(ConsumeMQTT.class); + testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, broker); + testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient"); + testRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic"); + testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100"); + + final StandardSSLContextService sslService = new StandardSSLContextService(); + Map<String, String> sslProperties = createSslProperties(); + testRunner.addControllerService("ssl-context", sslService, sslProperties); + testRunner.enableControllerService(sslService); + testRunner.setProperty(ConsumeMQTT.PROP_SSL_CONTEXT_SERVICE, "ssl-context"); + } + + @After + public void tearDown() throws Exception { + if (MQTT_server != null) { + MQTT_server.stopServer(); + } + final File folder = new File("./target"); + final File[] files = folder.listFiles( new FilenameFilter() { + @Override + public boolean accept( final File dir, + final String name ) { + return name.matches( "moquette_store.mapdb.*" ); + } + } ); + for ( final File file : files ) { + if ( !file.delete() ) { + System.err.println( "Can't remove " + file.getAbsolutePath() ); + } + } + } + + @Test + public void testRetainedQoS2() throws Exception { + testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2"); + + testRunner.assertValid(); + + PublishMessage testMessage = new PublishMessage(); + testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes())); + testMessage.setTopicName("testTopic"); + testMessage.setDupFlag(false); + testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE); + testMessage.setRetainFlag(true); + + internalPublish(testMessage); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + testRunner.run(1, false, false); + + testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1); + + List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + MockFlowFile flowFile = flowFiles.get(0); + + flowFile.assertContentEquals("testMessage"); + flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker); + flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic"); + flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2"); + flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false"); + flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "true"); + } + + @Override + public void internalPublish(PublishMessage publishMessage) { + MQTT_server.internalPublish(publishMessage); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java new file mode 100644 index 0000000..a97ac98 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt.integration; + +import io.moquette.BrokerConstants; +import io.moquette.server.Server; +import io.moquette.server.config.IConfig; +import io.moquette.server.config.MemoryConfig; +import org.apache.nifi.processors.mqtt.ConsumeMQTT; +import org.apache.nifi.processors.mqtt.PublishMQTT; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.List; +import java.util.Properties; + +import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS; +import static org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon.reconnect; + +public class TestPublishAndSubscribeMqttIntegration { + private TestRunner testSubscribeRunner; + private TestRunner testPublishRunner; + private Server MQTT_server; + + private static int PUBLISH_WAIT_MS = 1000; + + private void startServer() throws IOException { + MQTT_server = new Server(); + final Properties configProps = new Properties(); + configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884"); + configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb"); + IConfig server_config = new MemoryConfig(configProps); + MQTT_server.startServer(server_config); + } + + @Before + public void init() throws IOException { + startServer(); + testSubscribeRunner = TestRunners.newTestRunner(ConsumeMQTT.class); + testPublishRunner = TestRunners.newTestRunner(PublishMQTT.class); + } + + @After + public void tearDown() throws Exception { + if (MQTT_server != null) { + MQTT_server.stopServer(); + } + final File folder = new File("./target"); + final File[] files = folder.listFiles( new FilenameFilter() { + @Override + public boolean accept( final File dir, + final String name ) { + return name.matches( "moquette_store.mapdb.*" ); + } + } ); + for ( final File file : files ) { + if ( !file.delete() ) { + System.err.println( "Can't remove " + file.getAbsolutePath() ); + } + } + } + + @Test + public void testBasic() throws Exception { + subscribe(); + publishAndVerify(); + Thread.sleep(PUBLISH_WAIT_MS); + testSubscribeRunner.run(); + subscribeVerify(); + } + + private void publishAndVerify(){ + testPublishRunner.setProperty(PublishMQTT.PROP_BROKER_URI, "tcp://localhost:1883"); + testPublishRunner.setProperty(PublishMQTT.PROP_CLIENTID, "TestPublishClient"); + testPublishRunner.setProperty(PublishMQTT.PROP_QOS, "2"); + testPublishRunner.setProperty(PublishMQTT.PROP_RETAIN, "false"); + testPublishRunner.setProperty(PublishMQTT.PROP_TOPIC, "testTopic"); + + testPublishRunner.assertValid(); + + String testMessage = "testMessage"; + testPublishRunner.enqueue(testMessage.getBytes()); + + testPublishRunner.run(); + + testPublishRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + testPublishRunner.assertTransferCount(REL_SUCCESS, 1); + } + + private void subscribe() throws IOException, ClassNotFoundException, MqttException, InvocationTargetException, NoSuchMethodException, IllegalAccessException, NoSuchFieldException { + testSubscribeRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, "tcp://localhost:1883"); + testSubscribeRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestSubscribeClient"); + testSubscribeRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic"); + testSubscribeRunner.setProperty(ConsumeMQTT.PROP_QOS, "2"); + testSubscribeRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100"); + + testSubscribeRunner.assertValid(); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testSubscribeRunner.getProcessor(); + consumeMQTT.onScheduled(testSubscribeRunner.getProcessContext()); + reconnect(consumeMQTT); + } + + private void subscribeVerify(){ + testSubscribeRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1); + + List<MockFlowFile> flowFiles = testSubscribeRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + MockFlowFile flowFile = flowFiles.get(0); + + flowFile.assertContentEquals("testMessage"); + flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, "tcp://localhost:1883"); + flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic"); + flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2"); + flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false"); + flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false"); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMQTT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMQTT.java new file mode 100644 index 0000000..5777825 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMQTT.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt.integration; + +import io.moquette.BrokerConstants; +import io.moquette.server.Server; +import io.moquette.server.config.IConfig; +import io.moquette.server.config.MemoryConfig; +import org.apache.nifi.processors.mqtt.PublishMQTT; +import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.Properties; + +import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME; + + +public class TestPublishMQTT extends TestPublishMqttCommon { + + + private void startServer() throws IOException { + MQTT_server = new Server(); + final Properties configProps = new Properties(); + configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884"); + configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb"); + IConfig server_config = new MemoryConfig(configProps); + MQTT_server.startServer(server_config); + } + + @Before + public void init() throws IOException { + startServer(); + testRunner = TestRunners.newTestRunner(PublishMQTT.class); + testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, "tcp://localhost:1883"); + testRunner.setProperty(PublishMQTT.PROP_CLIENTID, "TestClient"); + testRunner.setProperty(PublishMQTT.PROP_RETAIN, "false"); + testRunner.setProperty(PublishMQTT.PROP_TOPIC, "testTopic"); + } + + @After + public void tearDown() throws Exception { + if (MQTT_server != null) { + MQTT_server.stopServer(); + } + final File folder = new File("./target"); + final File[] files = folder.listFiles( new FilenameFilter() { + @Override + public boolean accept( final File dir, + final String name ) { + return name.matches( "moquette_store.mapdb.*" ); + } + } ); + for ( final File file : files ) { + if ( !file.delete() ) { + System.err.println( "Can't remove " + file.getAbsolutePath() ); + } + } + } + + @Override + public void verifyPublishedMessage(byte[] payload, int qos, boolean retain) { + //Cannot verify published message without subscribing and consuming it which is outside the scope of this test. + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMqttSSL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMqttSSL.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMqttSSL.java new file mode 100644 index 0000000..6270d7a --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMqttSSL.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt.integration; + +import io.moquette.BrokerConstants; +import io.moquette.server.Server; +import io.moquette.server.config.IConfig; +import io.moquette.server.config.MemoryConfig; +import org.apache.nifi.processors.mqtt.PublishMQTT; +import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.StandardSSLContextService; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.Map; +import java.util.Properties; + +import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME; +import static org.apache.nifi.processors.mqtt.common.MqttTestUtils.createSslProperties; + + +public class TestPublishMqttSSL extends TestPublishMqttCommon { + + private void startServer() throws IOException { + MQTT_server = new Server(); + final Properties configProps = new Properties(); + + configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884"); + configProps.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, "8883"); + configProps.put(BrokerConstants.JKS_PATH_PROPERTY_NAME, "src/test/resources/localhost-ks.jks"); + configProps.put(BrokerConstants.KEY_STORE_PASSWORD_PROPERTY_NAME, "localtest"); + configProps.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "localtest"); + configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb"); + IConfig server_config = new MemoryConfig(configProps); + MQTT_server.startServer(server_config); + } + + @Before + public void init() throws IOException, InitializationException { + startServer(); + testRunner = TestRunners.newTestRunner(PublishMQTT.class); + testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, "ssl://localhost:8883"); + testRunner.setProperty(PublishMQTT.PROP_CLIENTID, "TestClient"); + testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true"); + testRunner.setProperty(PublishMQTT.PROP_TOPIC, "testTopic"); + + final StandardSSLContextService sslService = new StandardSSLContextService(); + Map<String, String> sslProperties = createSslProperties(); + testRunner.addControllerService("ssl-context", sslService, sslProperties); + testRunner.enableControllerService(sslService); + testRunner.setProperty(PublishMQTT.PROP_SSL_CONTEXT_SERVICE, "ssl-context"); + } + + @After + public void tearDown() throws Exception { + if (MQTT_server != null) { + MQTT_server.stopServer(); + } + final File folder = new File("./target"); + final File[] files = folder.listFiles(new FilenameFilter() { + @Override + public boolean accept(final File dir, + final String name) { + return name.matches("moquette_store.mapdb.*"); + } + }); + for (final File file : files) { + if (!file.delete()) { + System.err.println("Can't remove " + file.getAbsolutePath()); + } + } + } + + @Override + public void verifyPublishedMessage(byte[] payload, int qos, boolean retain) { + //Cannot verify published message without subscribing and consuming it which is outside the scope of this test. + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ks.jks ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ks.jks b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ks.jks new file mode 100755 index 0000000..df36197 Binary files /dev/null and b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ks.jks differ http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ts.jks ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ts.jks b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ts.jks new file mode 100755 index 0000000..7824378 Binary files /dev/null and b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ts.jks differ http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml b/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml index 456fae3..716052a 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml @@ -17,7 +17,7 @@ <parent> <groupId>org.apache.nifi</groupId> <artifactId>nifi-nar-bundles</artifactId> - <version>1.0.0-SNAPSHOT</version> + <version>0.7.0-SNAPSHOT</version> </parent> <artifactId>nifi-mqtt-bundle</artifactId> <packaging>pom</packaging> @@ -30,8 +30,8 @@ <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-mqtt-processors</artifactId> - <version>1.0.0-SNAPSHOT</version> + <version>0.7.0-SNAPSHOT</version> </dependency> </dependencies> - </dependencyManagement> + </dependencyManagement> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 7f3ca2c..558dc7c 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -60,7 +60,6 @@ <module>nifi-cassandra-bundle</module> <module>nifi-spring-bundle</module> <module>nifi-hive-bundle</module> - <module>nifi-site-to-site-reporting-bundle</module> <module>nifi-site-to-site-reporting-bundle</module> <module>nifi-mqtt-bundle</module> </modules> http://git-wip-us.apache.org/repos/asf/nifi/blob/92d648ab/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index e24ccf6..3a0a73a 100644 --- a/pom.xml +++ b/pom.xml @@ -1062,7 +1062,7 @@ language governing permissions and limitations under the License. --> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-mqtt-nar</artifactId> - <version>1.0.0-SNAPSHOT</version> + <version>0.7.0-SNAPSHOT</version> <type>nar</type> </dependency> <dependency>
