[
https://issues.apache.org/jira/browse/STORM-1406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15085238#comment-15085238
]
ASF GitHub Bot commented on STORM-1406:
---------------------------------------
Github user satishd commented on a diff in the pull request:
https://github.com/apache/storm/pull/991#discussion_r48937909
--- Diff:
external/storm-mqtt/core/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java
---
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.mqtt.common;
+
+
+import org.apache.storm.mqtt.MqttLogger;
+import org.apache.storm.mqtt.MqttMessage;
+import org.apache.storm.mqtt.ssl.KeyStoreLoader;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+
+public class MqttPublisher {
+ private static final Logger LOG =
LoggerFactory.getLogger(MqttPublisher.class);
+
+ private MqttOptions options;
+ private transient BlockingConnection connection;
+ private KeyStoreLoader keyStoreLoader;
+ private QoS qos;
+ private boolean retain = false;
+
+
+ public MqttPublisher(MqttOptions options){
+ this(options, null, false);
+ }
+
+ public MqttPublisher(MqttOptions options, boolean retain){
+ this(options, null, retain);
+ }
+
+ public MqttPublisher(MqttOptions options, KeyStoreLoader
keyStoreLoader, boolean retain){
+ this.retain = retain;
+ this.options = options;
+ this.keyStoreLoader = keyStoreLoader;
+ SslUtils.checkSslConfig(this.options.getUrl(), keyStoreLoader);
+ this.qos = MqttUtils.qosFromInt(this.options.getQos());
+ }
+
+ public void publish(MqttMessage message, boolean retain) throws
Exception {
+ this.connection.publish(message.getTopic(), message.getMessage(),
this.qos, this.retain);
+ }
+
+
+
+ public void connectMqtt(String clientId) throws Exception {
+ MQTT client = new MQTT();
+ URI uri = URI.create(this.options.getUrl());
+
+ client.setHost(uri);
+ if(!uri.getScheme().toLowerCase().equals("tcp")){
+ client.setSslContext(SslUtils.sslContext(uri.getScheme(),
this.keyStoreLoader));
+ }
+ client.setClientId(clientId);
+ LOG.info("MQTT ClientID: " + client.getClientId().toString());
--- End diff --
Can we use formatted strings for logging instead of stringbuilders?
> MQTT Support
> ------------
>
> Key: STORM-1406
> URL: https://issues.apache.org/jira/browse/STORM-1406
> Project: Apache Storm
> Issue Type: Improvement
> Reporter: P. Taylor Goetz
> Assignee: P. Taylor Goetz
>
> MQTT is a lightweight publish/subscribe protocol frequently used in IoT
> applications.
> Further information can be found at http://mqtt.org
> Initial features include:
> * Full MQTT support (e.g. last will, QoS 0-2, retain, etc.)
> * Spout implementation(s) for subscribing to MQTT topics
> * A bolt implementation for publishing MQTT messages
> * A trident function implementation for publishing MQTT messages
> * Authentication and TLS/SSL support
> * User-defined "mappers" for converting MQTT messages to tuples (subscribers)
> * User-defined "mappers" for converting tuples to MQTT messages (publishers)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)