[ 
https://issues.apache.org/jira/browse/STORM-1406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15085770#comment-15085770
 ] 

ASF GitHub Bot commented on STORM-1406:
---------------------------------------

Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/storm/pull/991#discussion_r48976420
  
    --- Diff: 
external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java 
---
    @@ -0,0 +1,108 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.mqtt.bolt;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.base.BaseRichBolt;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.utils.TupleUtils;
    +import org.apache.storm.mqtt.MqttMessage;
    +import org.apache.storm.mqtt.common.MqttOptions;
    +import org.apache.storm.mqtt.MqttTupleMapper;
    +import org.apache.storm.mqtt.common.MqttPublisher;
    +import org.apache.storm.mqtt.common.SslUtils;
    +import org.apache.storm.mqtt.ssl.KeyStoreLoader;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +
    +
    +public class MqttBolt extends BaseRichBolt {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(MqttBolt.class);
    +    private MqttTupleMapper mapper;
    +    private transient MqttPublisher publisher;
    +    private boolean retain = false;
    +    private transient OutputCollector collector;
    +    private MqttOptions options;
    +    private KeyStoreLoader keyStoreLoader;
    +    private transient String topologyName;
    +
    +
    +    public MqttBolt(MqttOptions options, MqttTupleMapper mapper){
    +        this(options, mapper, null, false);
    +    }
    +
    +    public MqttBolt(MqttOptions options, MqttTupleMapper mapper, boolean 
retain){
    +        this(options, mapper, null, retain);
    +    }
    +
    +    public MqttBolt(MqttOptions options, MqttTupleMapper mapper, 
KeyStoreLoader keyStoreLoader){
    +        this(options, mapper, keyStoreLoader, false);
    +    }
    +
    +    public MqttBolt(MqttOptions options, MqttTupleMapper mapper, 
KeyStoreLoader keyStoreLoader, boolean retain){
    +        this.options = options;
    +        this.mapper = mapper;
    +        this.retain = retain;
    +        this.keyStoreLoader = keyStoreLoader;
    +        // the following code is duplicated in the constructor of 
MqttPublisher
    +        // we reproduce it here so we fail on the client side if SSL is 
misconfigured, rather than when the topology
    +        // is deployed to the cluster
    +        SslUtils.checkSslConfig(this.options.getUrl(), keyStoreLoader);
    +    }
    +
    +    @Override
    +    public void prepare(Map conf, TopologyContext context, OutputCollector 
collector) {
    +        this.collector = collector;
    +        this.topologyName = (String)conf.get(Config.TOPOLOGY_NAME);
    +        this.publisher = new MqttPublisher(this.options, 
this.keyStoreLoader, this.retain);
    +        try {
    +            this.publisher.connectMqtt(this.topologyName + "-" + 
context.getThisComponentId() + "-" + context.getThisTaskId());
    +        } catch (Exception e) {
    +            LOG.error("Unable to connect to MQTT Broker.", e);
    +            throw new RuntimeException("Unable to connect to MQTT 
Broker.", e);
    +        }
    +    }
    +
    +    @Override
    +    public void execute(Tuple input) {
    +        //ignore tick tuples
    +        if(!TupleUtils.isTick(input)){
    +            MqttMessage message = this.mapper.toMessage(input);
    +            try {
    +                this.publisher.publish(message, this.retain);
    +                this.collector.ack(input);
    +            } catch (Exception e) {
    +                LOG.warn("Error publishing MQTT message. Failing tuple.", 
e);
    +                // should we fail the tuple or kill the worker?
    --- End diff --
    
    Good call.


> MQTT Support
> ------------
>
>                 Key: STORM-1406
>                 URL: https://issues.apache.org/jira/browse/STORM-1406
>             Project: Apache Storm
>          Issue Type: Improvement
>            Reporter: P. Taylor Goetz
>            Assignee: P. Taylor Goetz
>
> MQTT is a lightweight publish/subscribe protocol frequently used in IoT 
> applications.
> Further information can be found at http://mqtt.org
> Initial features include:
> * Full MQTT support (e.g. last will, QoS 0-2, retain, etc.)
> * Spout implementation(s) for subscribing to MQTT topics
> * A bolt implementation for publishing MQTT messages
> * A trident function implementation for publishing MQTT messages
> * Authentication and TLS/SSL support
> * User-defined "mappers" for converting MQTT messages to tuples (subscribers)
> * User-defined "mappers" for converting tuples to MQTT messages (publishers)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to