[ 
https://issues.apache.org/jira/browse/STORM-1406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15085256#comment-15085256
 ] 

ASF GitHub Bot commented on STORM-1406:
---------------------------------------

Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/991#discussion_r48938394
  
    --- Diff: 
external/storm-mqtt/core/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
 ---
    @@ -0,0 +1,132 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.mqtt;
    +
    +import backtype.storm.Config;
    +import backtype.storm.LocalCluster;
    +import backtype.storm.generated.StormTopology;
    +import backtype.storm.testing.IntegrationTest;
    +import backtype.storm.topology.TopologyBuilder;
    +import backtype.storm.tuple.ITuple;
    +import org.apache.activemq.broker.BrokerService;
    +import org.apache.storm.mqtt.bolt.MqttBolt;
    +import org.apache.storm.mqtt.common.MqttOptions;
    +import org.apache.storm.mqtt.common.MqttPublisher;
    +import org.apache.storm.mqtt.mappers.StringMessageMapper;
    +import org.apache.storm.mqtt.spout.MqttSpout;
    +import org.fusesource.mqtt.client.BlockingConnection;
    +import org.fusesource.mqtt.client.MQTT;
    +import org.fusesource.mqtt.client.Message;
    +import org.fusesource.mqtt.client.QoS;
    +import org.fusesource.mqtt.client.Topic;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import org.junit.experimental.categories.Category;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.Serializable;
    +import java.net.URI;
    +import java.util.Arrays;
    +
    +@Category(IntegrationTest.class)
    +public class StormMqttIntegrationTest implements Serializable{
    +    private static final Logger LOG = 
LoggerFactory.getLogger(StormMqttIntegrationTest.class);
    +    private static BrokerService broker;
    +
    +
    +    @AfterClass
    +    public static void cleanup() throws Exception {
    +        broker.stop();
    +    }
    +
    +    @BeforeClass
    +    public static void start() throws Exception {
    +        LOG.warn("Starting broker...");
    +        broker = new BrokerService();
    +        broker.addConnector("mqtt://localhost:1883");
    +        broker.setDataDirectory("target");
    +        broker.start();
    +        LOG.debug("MQTT broker started");
    +    }
    +
    +
    +    @Test
    +    public void testMqttTopology() throws Exception {
    +        MQTT client = new MQTT();
    +        client.setTracer(new MqttLogger());
    +        URI uri = URI.create("tcp://localhost:1883");
    +        client.setHost(uri);
    +
    +        client.setClientId("MQTTSubscriber");
    +        client.setCleanSession(false);
    +        BlockingConnection connection = client.blockingConnection();
    +        connection.connect();
    +        Topic[] topics = {new Topic("/integration-result", 
QoS.AT_LEAST_ONCE)};
    +        byte[] qoses = connection.subscribe(topics);
    +
    +        LocalCluster cluster = new LocalCluster();
    +        cluster.submitTopology("test", new Config(), buildMqttTopology());
    +
    +        System.out.println("topology started");
    +        Thread.sleep(10000);
    +
    +        // publish a retained message to the broker
    +        MqttOptions options = new MqttOptions();
    +        options.setCleanConnection(false);
    +        MqttPublisher publisher = new MqttPublisher(options);
    +        publisher.connectMqtt("MqttPublisher");
    +        publisher.publish(new MqttMessage("/mqtt-topology", 
"test".getBytes()), true);
    +
    +        LOG.info("published message");
    +
    +        Message message = connection.receive();
    +        LOG.info("Message recieved on topic: " + message.getTopic());
    +        LOG.info("Payload: " + new String(message.getPayload()));
    +        message.ack();
    +
    +        Assert.assertArrayEquals(message.getPayload(), "hello 
mqtt".getBytes());
    +        Assert.assertEquals(message.getTopic(), "/integration-result");
    +        cluster.shutdown();
    +    }
    +
    +    public StormTopology buildMqttTopology(){
    +        TopologyBuilder builder = new TopologyBuilder();
    +
    +        MqttOptions options = new MqttOptions();
    +        options.setTopics(Arrays.asList("/mqtt-topology"));
    +        options.setCleanConnection(false);
    +        MqttSpout spout = new MqttSpout(new StringMessageMapper(), 
options);
    +
    +        MqttBolt bolt = new MqttBolt(options, new MqttTupleMapper() {
    +            @Override
    +            public MqttMessage toMessage(ITuple tuple) {
    +                LOG.info("Received: " + tuple);
    +                return new MqttMessage("/integration-result", "hello 
mqtt".getBytes());
    --- End diff --
    
    You may want to extract constants for topic names like 
"/integration-result", "/mqtt-topology"  as they are used at multiple places.



> MQTT Support
> ------------
>
>                 Key: STORM-1406
>                 URL: https://issues.apache.org/jira/browse/STORM-1406
>             Project: Apache Storm
>          Issue Type: Improvement
>            Reporter: P. Taylor Goetz
>            Assignee: P. Taylor Goetz
>
> MQTT is a lightweight publish/subscribe protocol frequently used in IoT 
> applications.
> Further information can be found at http://mqtt.org
> Initial features include:
> * Full MQTT support (e.g. last will, QoS 0-2, retain, etc.)
> * Spout implementation(s) for subscribing to MQTT topics
> * A bolt implementation for publishing MQTT messages
> * A trident function implementation for publishing MQTT messages
> * Authentication and TLS/SSL support
> * User-defined "mappers" for converting MQTT messages to tuples (subscribers)
> * User-defined "mappers" for converting tuples to MQTT messages (publishers)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to