[
https://issues.apache.org/jira/browse/STORM-1406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15085256#comment-15085256
]
ASF GitHub Bot commented on STORM-1406:
---------------------------------------
Github user satishd commented on a diff in the pull request:
https://github.com/apache/storm/pull/991#discussion_r48938394
--- Diff:
external/storm-mqtt/core/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
---
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.testing.IntegrationTest;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.ITuple;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.storm.mqtt.bolt.MqttBolt;
+import org.apache.storm.mqtt.common.MqttOptions;
+import org.apache.storm.mqtt.common.MqttPublisher;
+import org.apache.storm.mqtt.mappers.StringMessageMapper;
+import org.apache.storm.mqtt.spout.MqttSpout;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.Message;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Arrays;
+
+@Category(IntegrationTest.class)
+public class StormMqttIntegrationTest implements Serializable{
+ private static final Logger LOG =
LoggerFactory.getLogger(StormMqttIntegrationTest.class);
+ private static BrokerService broker;
+
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ broker.stop();
+ }
+
+ @BeforeClass
+ public static void start() throws Exception {
+ LOG.warn("Starting broker...");
+ broker = new BrokerService();
+ broker.addConnector("mqtt://localhost:1883");
+ broker.setDataDirectory("target");
+ broker.start();
+ LOG.debug("MQTT broker started");
+ }
+
+
+ @Test
+ public void testMqttTopology() throws Exception {
+ MQTT client = new MQTT();
+ client.setTracer(new MqttLogger());
+ URI uri = URI.create("tcp://localhost:1883");
+ client.setHost(uri);
+
+ client.setClientId("MQTTSubscriber");
+ client.setCleanSession(false);
+ BlockingConnection connection = client.blockingConnection();
+ connection.connect();
+ Topic[] topics = {new Topic("/integration-result",
QoS.AT_LEAST_ONCE)};
+ byte[] qoses = connection.subscribe(topics);
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", new Config(), buildMqttTopology());
+
+ System.out.println("topology started");
+ Thread.sleep(10000);
+
+ // publish a retained message to the broker
+ MqttOptions options = new MqttOptions();
+ options.setCleanConnection(false);
+ MqttPublisher publisher = new MqttPublisher(options);
+ publisher.connectMqtt("MqttPublisher");
+ publisher.publish(new MqttMessage("/mqtt-topology",
"test".getBytes()), true);
+
+ LOG.info("published message");
+
+ Message message = connection.receive();
+ LOG.info("Message recieved on topic: " + message.getTopic());
+ LOG.info("Payload: " + new String(message.getPayload()));
+ message.ack();
+
+ Assert.assertArrayEquals(message.getPayload(), "hello
mqtt".getBytes());
+ Assert.assertEquals(message.getTopic(), "/integration-result");
+ cluster.shutdown();
+ }
+
+ public StormTopology buildMqttTopology(){
+ TopologyBuilder builder = new TopologyBuilder();
+
+ MqttOptions options = new MqttOptions();
+ options.setTopics(Arrays.asList("/mqtt-topology"));
+ options.setCleanConnection(false);
+ MqttSpout spout = new MqttSpout(new StringMessageMapper(),
options);
+
+ MqttBolt bolt = new MqttBolt(options, new MqttTupleMapper() {
+ @Override
+ public MqttMessage toMessage(ITuple tuple) {
+ LOG.info("Received: " + tuple);
+ return new MqttMessage("/integration-result", "hello
mqtt".getBytes());
--- End diff --
You may want to extract constants for topic names like
"/integration-result", "/mqtt-topology" as they are used at multiple places.
> MQTT Support
> ------------
>
> Key: STORM-1406
> URL: https://issues.apache.org/jira/browse/STORM-1406
> Project: Apache Storm
> Issue Type: Improvement
> Reporter: P. Taylor Goetz
> Assignee: P. Taylor Goetz
>
> MQTT is a lightweight publish/subscribe protocol frequently used in IoT
> applications.
> Further information can be found at http://mqtt.org
> Initial features include:
> * Full MQTT support (e.g. last will, QoS 0-2, retain, etc.)
> * Spout implementation(s) for subscribing to MQTT topics
> * A bolt implementation for publishing MQTT messages
> * A trident function implementation for publishing MQTT messages
> * Authentication and TLS/SSL support
> * User-defined "mappers" for converting MQTT messages to tuples (subscribers)
> * User-defined "mappers" for converting tuples to MQTT messages (publishers)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)