This is an automated email from the ASF dual-hosted git repository. rzo1 pushed a commit to branch STORM-3988 in repository https://gitbox.apache.org/repos/asf/storm.git
commit 8a2070dfb15ec70aff4c9c21137ec80858eff69b Author: Richard Zowalla <[email protected]> AuthorDate: Thu Oct 19 08:58:53 2023 +0200 STORM-3988 - Remove "storm-mqtt" --- examples/storm-mqtt-examples/pom.xml | 145 -------- .../storm-mqtt-examples/src/main/flux/sample.yaml | 62 ---- .../src/main/flux/ssl-sample.yaml | 78 ----- .../storm/mqtt/examples/CustomMessageMapper.java | 79 ----- .../storm/mqtt/examples/MqttBrokerPublisher.java | 137 -------- .../apache/storm/mqtt/examples/package-info.java | 20 -- .../src/main/resources/log4j2.xml | 32 -- external/storm-mqtt/README.md | 375 --------------------- external/storm-mqtt/pom.xml | 148 -------- .../java/org/apache/storm/mqtt/MqttLogger.java | 30 -- .../java/org/apache/storm/mqtt/MqttMessage.java | 37 -- .../org/apache/storm/mqtt/MqttMessageMapper.java | 38 --- .../org/apache/storm/mqtt/MqttTupleMapper.java | 31 -- .../java/org/apache/storm/mqtt/bolt/MqttBolt.java | 97 ------ .../org/apache/storm/mqtt/common/MqttOptions.java | 301 ----------------- .../apache/storm/mqtt/common/MqttPublisher.java | 59 ---- .../org/apache/storm/mqtt/common/MqttUtils.java | 82 ----- .../org/apache/storm/mqtt/common/SslUtils.java | 57 ---- .../storm/mqtt/mappers/ByteArrayMessageMapper.java | 31 -- .../storm/mqtt/mappers/StringMessageMapper.java | 34 -- .../apache/storm/mqtt/spout/AckableMessage.java | 70 ---- .../org/apache/storm/mqtt/spout/MqttSpout.java | 270 --------------- .../storm/mqtt/ssl/DefaultKeyStoreLoader.java | 92 ----- .../org/apache/storm/mqtt/ssl/KeyStoreLoader.java | 34 -- .../storm/mqtt/trident/MqttPublishFunction.java | 78 ----- .../storm/mqtt/StormMqttIntegrationTest.java | 145 -------- pom.xml | 2 - 27 files changed, 2564 deletions(-) diff --git a/examples/storm-mqtt-examples/pom.xml b/examples/storm-mqtt-examples/pom.xml deleted file mode 100644 index 64a5790da..000000000 --- a/examples/storm-mqtt-examples/pom.xml +++ /dev/null @@ -1,145 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <artifactId>storm-mqtt-examples</artifactId> - <packaging>jar</packaging> - - <name>storm-mqtt-examples</name> - - <parent> - <artifactId>storm</artifactId> - <groupId>org.apache.storm</groupId> - <version>2.6.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <dependencies> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-client</artifactId> - <version>${project.version}</version> - <scope>${provided.scope}</scope> - </dependency> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-mqtt</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <!-- required to bypass failure in mvn deploy --> - <groupId>org.apache.activemq.protobuf</groupId> - <artifactId>activemq-protobuf</artifactId> - </exclusion> - </exclusions> - - </dependency> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>flux-core</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.fusesource.mqtt-client</groupId> - <artifactId>mqtt-client</artifactId> - <version>1.16</version> - </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-broker</artifactId> - </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-mqtt</artifactId> - <exclusions> - <exclusion> - <!-- required to bypass failure in mvn deploy --> - <groupId>org.apache.activemq.protobuf</groupId> - <artifactId>activemq-protobuf</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-kahadb-store</artifactId> - <exclusions> - <exclusion> - <!-- required to bypass failure in mvn deploy --> - <groupId>org.apache.activemq.protobuf</groupId> - <artifactId>activemq-protobuf</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> - <build> - <plugins> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <configuration> - <createDependencyReducedPom>true</createDependencyReducedPom> - <filters> - <filter> - <artifact>*:*</artifact> - <excludes> - <exclude>META-INF/*.SF</exclude> - <exclude>META-INF/*.sf</exclude> - <exclude>META-INF/*.DSA</exclude> - <exclude>META-INF/*.dsa</exclude> - <exclude>META-INF/*.RSA</exclude> - <exclude>META-INF/*.rsa</exclude> - <exclude>META-INF/*.EC</exclude> - <exclude>META-INF/*.ec</exclude> - <exclude>META-INF/MSFTSIG.SF</exclude> - <exclude>META-INF/MSFTSIG.RSA</exclude> - </excludes> - </filter> - </filters> - </configuration> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> - <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> - <mainClass>org.apache.storm.flux.Flux</mainClass> - </transformer> - </transformers> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <!--Note - the version would be inherited--> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-pmd-plugin</artifactId> - </plugin> - </plugins> - </build> - -</project> diff --git a/examples/storm-mqtt-examples/src/main/flux/sample.yaml b/examples/storm-mqtt-examples/src/main/flux/sample.yaml deleted file mode 100644 index c2902dcff..000000000 --- a/examples/storm-mqtt-examples/src/main/flux/sample.yaml +++ /dev/null @@ -1,62 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - ---- - -# topology definition -# name to be used when submitting -name: "mqtt-topology" - -components: - ########## MQTT Spout Config ############ - - id: "mqtt-type" - className: "org.apache.storm.mqtt.examples.CustomMessageMapper" - - - id: "mqtt-options" - className: "org.apache.storm.mqtt.common.MqttOptions" - properties: - - name: "url" - value: "tcp://localhost:1883" - - name: "topics" - value: - - "/users/tgoetz/#" - -# topology configuration -config: - topology.workers: 1 - topology.max.spout.pending: 1000 - -# spout definitions -spouts: - - id: "mqtt-spout" - className: "org.apache.storm.mqtt.spout.MqttSpout" - constructorArgs: - - ref: "mqtt-type" - - ref: "mqtt-options" - parallelism: 1 - -# bolt definitions -bolts: - - id: "log" - className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" - parallelism: 1 - - -streams: - - from: "mqtt-spout" - to: "log" - grouping: - type: SHUFFLE diff --git a/examples/storm-mqtt-examples/src/main/flux/ssl-sample.yaml b/examples/storm-mqtt-examples/src/main/flux/ssl-sample.yaml deleted file mode 100644 index bfb668d32..000000000 --- a/examples/storm-mqtt-examples/src/main/flux/ssl-sample.yaml +++ /dev/null @@ -1,78 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - ---- - -# topology definition -# name to be used when submitting -name: "mqtt-topology" - -components: - ########## MQTT Spout Config ############ - - id: "mqtt-type" - className: "org.apache.storm.mqtt.examples.CustomMessageMapper" - - - id: "keystore-loader" - className: "org.apache.storm.mqtt.ssl.DefaultKeyStoreLoader" - constructorArgs: - - "keystore.jks" - - "truststore.jks" - properties: - - name: "keyPassword" - value: "password" - - name: "keyStorePassword" - value: "password" - - name: "trustStorePassword" - value: "password" - - - id: "mqtt-options" - className: "org.apache.storm.mqtt.common.MqttOptions" - properties: - - name: "url" - value: "ssl://raspberrypi.local:8883" - - name: "topics" - value: - - "/users/tgoetz/#" - -# topology configuration -config: - topology.workers: 1 - topology.max.spout.pending: 1000 - -# spout definitions -spouts: - - id: "mqtt-spout" - className: "org.apache.storm.mqtt.spout.MqttSpout" - constructorArgs: - - ref: "mqtt-type" - - ref: "mqtt-options" - - ref: "keystore-loader" - parallelism: 1 - -# bolt definitions -bolts: - - - id: "log" - className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" - parallelism: 1 - - -streams: - - - from: "mqtt-spout" - to: "log" - grouping: - type: SHUFFLE diff --git a/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java b/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java deleted file mode 100644 index b9a7c1fb0..000000000 --- a/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/CustomMessageMapper.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mqtt.examples; - -import org.apache.storm.mqtt.MqttMessage; -import org.apache.storm.mqtt.MqttMessageMapper; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Given a topic name: "users/{user}/{location}/{deviceId}" - * and a payload of "{temperature}/{humidity}" - * emits a tuple containing - * {@code user(String), deviceId(String), location(String), temperature(float), - * humidity(float)}. - */ -public final class CustomMessageMapper implements MqttMessageMapper { - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger( - CustomMessageMapper.class); - private static final int TOPIC_INDEX_1 = 2; - private static final int TOPIC_INDEX_2 = 4; - private static final int TOPIC_INDEX_3 = 3; - - /** - * Converts MQTT message to an instance of {@code Values}. - * @param message the message to convert - * @return the converted values - */ - @Override - public Values toValues(final MqttMessage message) { - String topic = message.getTopic(); - String[] topicElements = topic.split("/"); - String[] payloadElements = new String(message.getMessage()).split("/"); - - return new Values(topicElements[TOPIC_INDEX_1], - topicElements[TOPIC_INDEX_2], - topicElements[TOPIC_INDEX_3], - Float.parseFloat(payloadElements[0]), - Float.parseFloat(payloadElements[1])); - } - - /** - * Gets the output fields. - * @return the output fields - */ - @Override - public Fields outputFields() { - return new Fields("user", - "deviceId", - "location", - "temperature", - "humidity"); - } - - /** - * Constructor. - */ - public CustomMessageMapper() { - } -} diff --git a/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/MqttBrokerPublisher.java b/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/MqttBrokerPublisher.java deleted file mode 100644 index 4232eb007..000000000 --- a/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/MqttBrokerPublisher.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.mqtt.examples; - -import java.util.Random; - -import org.apache.activemq.broker.BrokerService; -import org.apache.storm.mqtt.MqttLogger; -import org.fusesource.mqtt.client.BlockingConnection; -import org.fusesource.mqtt.client.MQTT; -import org.fusesource.mqtt.client.QoS; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A MQTT example using a Storm topology. - */ -public final class MqttBrokerPublisher { - private static final Logger LOG = LoggerFactory.getLogger( - MqttBrokerPublisher.class); - private static BrokerService broker; - private static BlockingConnection connection; - private static final int TEMPERATURE_MAX = 100; - private static final int HUMIDITY_MAX = 100; - /** - * The default wait in milliseconds. - */ - private static final int WAIT_MILLIS_DEFAULT = 500; - - /** - * Initializes {@code broker} and starts it. - * @throws Exception if an exception during adding a connector occurs - */ - public static void startBroker() throws Exception { - LOG.info("Starting broker..."); - broker = new BrokerService(); - broker.addConnector("mqtt://localhost:1883"); - broker.setDataDirectory("target"); - broker.start(); - LOG.info("MQTT broker started"); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - try { - LOG.info("Shutting down MQTT broker..."); - broker.stop(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }); - } - - /** - * Initializes {@code connection}. - * @throws Exception if an exception during connecting to connector occurs - */ - public static void startPublisher() throws Exception { - MQTT client = new MQTT(); - client.setTracer(new MqttLogger()); - client.setHost("tcp://localhost:1883"); - client.setClientId("MqttBrokerPublisher"); - connection = client.blockingConnection(); - - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - try { - LOG.info("Shutting down MQTT client..."); - connection.disconnect(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }); - - connection.connect(); - } - - /** - * Publishes topics on connection. - * @throws Exception if an exception during publishing occurs - */ - public static void publish() throws Exception { - String topic = "/users/tgoetz/office/1234"; - Random rand = new Random(); - LOG.info("Publishing to topic {}", topic); - LOG.info("Cntrl+C to exit."); - - while (true) { - int temp = rand.nextInt(TEMPERATURE_MAX); - int hum = rand.nextInt(HUMIDITY_MAX); - String payload = temp + "/" + hum; - - connection.publish(topic, - payload.getBytes(), - QoS.AT_LEAST_ONCE, - false); - Thread.sleep(WAIT_MILLIS_DEFAULT); - } - } - - /** - * The main method. - * @param args the command line arguments - * @throws Exception if an exception during connections or transmission - * occurs - */ - public static void main(final String[] args) throws Exception { - startBroker(); - startPublisher(); - publish(); - } - - /** - * Utility constructor to prevent initialization. - */ - private MqttBrokerPublisher() { - } -} diff --git a/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/package-info.java b/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/package-info.java deleted file mode 100644 index 52c9270b0..000000000 --- a/examples/storm-mqtt-examples/src/main/java/org/apache/storm/mqtt/examples/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2018 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * MQTT examples. - */ -package org.apache.storm.mqtt.examples; diff --git a/examples/storm-mqtt-examples/src/main/resources/log4j2.xml b/examples/storm-mqtt-examples/src/main/resources/log4j2.xml deleted file mode 100644 index bfe57a197..000000000 --- a/examples/storm-mqtt-examples/src/main/resources/log4j2.xml +++ /dev/null @@ -1,32 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<Configuration status="WARN"> - <Appenders> - <Console name="Console" target="SYSTEM_OUT"> - <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> - </Console> - </Appenders> - - <Loggers> - <Logger name="org.apache.storm.flux.wrappers" level="INFO"/> - <Logger name="org.apache.storm.mqtt" level="DEBUG"/> - <Root level="error"> - <AppenderRef ref="Console"/> - </Root> - </Loggers> -</Configuration> \ No newline at end of file diff --git a/external/storm-mqtt/README.md b/external/storm-mqtt/README.md deleted file mode 100644 index fb5c71ef1..000000000 --- a/external/storm-mqtt/README.md +++ /dev/null @@ -1,375 +0,0 @@ -# Storm MQTT Integration - -## About - -MQTT is a lightweight publish/subscribe protocol frequently used in IoT applications. - -Further information can be found at http://mqtt.org. The HiveMQ website has a great series on -[MQTT Essentials](http://www.hivemq.com/mqtt-essentials/). - -Features include: - -* Full MQTT support (e.g. last will, QoS 0-2, retain, etc.) -* Spout implementation(s) for subscribing to MQTT topics -* A bolt implementation for publishing MQTT messages -* A trident function implementation for publishing MQTT messages -* Authentication and TLS/SSL support -* User-defined "mappers" for converting MQTT messages to tuples (subscribers) -* User-defined "mappers" for converting tuples to MQTT messages (publishers) - - -## Quick Start -To quickly see MQTT integration in action, follow the instructions below. - -**Start a MQTT broker and publisher** - -The command below will create an MQTT broker on port 1883, and start a publsher that will publish random -temperature/humidity values to an MQTT topic. - -Open a terminal and execute the following command (change the path as necessary): - -```bash -java -cp examples/target/storm-mqtt-examples-*-SNAPSHOT.jar org.apache.storm.mqtt.examples.MqttBrokerPublisher -``` - -**Run the example toplogy** - -Run the sample topology using Flux. This will start a local mode cluster and topology that consists of the MQTT Spout -publishing to a bolt that simply logs the information it receives. - -In a separate terminal, run the following command (Note that the `storm` executable must be on your PATH): - -```bash -storm jar ./examples/target/storm-mqtt-examples-*-SNAPSHOT.jar org.apache.storm.flux.Flux ./examples/src/main/flux/sample.yaml --local -``` - -You should see data from MQTT being logged by the bolt: - -``` -27020 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=67.0, humidity=65.0} -27030 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=47.0, humidity=85.0} -27040 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=69.0, humidity=94.0} -27049 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=4.0, humidity=98.0} -27059 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=51.0, humidity=12.0} -27069 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=27.0, humidity=65.0} -``` - -Either allow the local cluster to exit, or stop it by typing Cntrl-C. - -**MQTT Fault Tolerance In Action** - -After the toplogy has been shutdown, the MQTT subscription created by the MQTT spout will persist with the broker, -and it will continue to receive and queue messages (as long as the broker is running). - -If you run the toplogy again (while the broker is still running), when the spout initially connects to the MQTT broker, -it will receive all the messages it missed while it was down. You should see this as burst of messages, followed by a -rate of about two messages per second. - -This happens because, by default, the MQTT Spout creates a *session* when it subscribes -- that means it requests that -the broker hold onto and redeliver any messages it missed while offline. Another important factor is the the -`MqttBrokerPublisher` publishes messages with a MQTT QoS of `1`, meaning *at least once delivery*. - -For more information about MQTT fault tolerance, see the **Delivery Guarantees** section below. - - - -## Delivery Guarantees -In Storm terms, ***the MQTT Spout provides at least once delivery***, depending on the configuration of the publisher as -well as the MQTT spout. - -The MQTT protocol defines the following QoS levels: - -* `0` - At Most Once (AKA "Fire and Forget") -* `1` - At Least Once -* `2` - Exactly Once - -This can be a little confusing as the MQTT protocol specification does not really address the concept of a node being -completely incinerated by a catasrophic event. This is in stark contrast with Storm's reliability model, which expects -and embraces the concept of node failure. - -So resiliancy is ultimately dependent on the underlying MQTT implementation and infrastructure. - -###Recommendations - -*You will never get at exactly once processing with this spout. It can be used with Trident, but it won't provide -transational semantics. You will only get at least once guarantees.* - -If you need reliability guarantees (i.e. *at least once processing*): - -1. For MQTT publishers (outside of Storm), publish messages with a QoS of `1` so the broker saves messages if/when the -spout is offline. -2. Use the spout defaults (`cleanSession = false` and `qos = 1`) -3. If you can, make sure any result of receiving and MQTT message is idempotent. -4. Make sure your MQTT brokers don't die or get isolated due to a network partition. Be prepared for natural and -man-made disasters and network partitions. Incineration and destruction happens. - - - - - -## Configuration -For the full range of configuration options, see the JavaDoc for `org.apache.storm.mqtt.common.MqttOptions`. - -### Message Mappers -To define how MQTT messages are mapped to Storm tuples, you configure the MQTT spout with an implementation of the -`org.apache.storm.mqtt.MqttMessageMapper` interface, which looks like this: - -```java -public interface MqttMessageMapper extends Serializable { - - Values toValues(MqttMessage message); - - Fields outputFields(); -} -``` - -The `MqttMessage` class contains the topic to which the message was published (`String`) and the message payload -(`byte[]`). For example, here is a `MqttMessageMapper` implementation that produces tuples based on the content of both -the message topic and payload: - -```java -/** - * Given a topic name: "users/{user}/{location}/{deviceId}" - * and a payload of "{temperature}/{humidity}" - * emits a tuple containing user(String), deviceId(String), location(String), temperature(float), humidity(float) - * - */ -public class CustomMessageMapper implements MqttMessageMapper { - private static final Logger LOG = LoggerFactory.getLogger(CustomMessageMapper.class); - - - public Values toValues(MqttMessage message) { - String topic = message.getTopic(); - String[] topicElements = topic.split("/"); - String[] payloadElements = new String(message.getMessage()).split("/"); - - return new Values(topicElements[2], topicElements[4], topicElements[3], Float.parseFloat(payloadElements[0]), - Float.parseFloat(payloadElements[1])); - } - - public Fields outputFields() { - return new Fields("user", "deviceId", "location", "temperature", "humidity"); - } -} -``` - -### Tuple Mappers -When publishing MQTT messages with the MQTT bolt or Trident function, you need to map Storm tuple data to MQTT messages -(topic/payload). This is done by implementing the `org.apache.storm.mqtt.MqttTupleMapper` interface: - -```java -public interface MqttTupleMapper extends Serializable{ - - MqttMessage toMessage(ITuple tuple); - -} -``` - -For example, a simple `MqttTupleMapper` implementation might look like this: - -```java -public class MyTupleMapper implements MqttTupleMapper { - public MqttMessage toMessage(ITuple tuple) { - String topic = "users/" + tuple.getStringByField("userId") + "/" + tuple.getStringByField("device"); - byte[] payload = tuple.getStringByField("message").getBytes(); - return new MqttMessage(topic, payload); - } -} -``` - -### MQTT Spout Parallelism -It's recommended that you use a parallelism of 1 for the MQTT spout, otherwise you will end up with multiple instances -of the spout subscribed to the same topic(s), resulting in duplicate messages. - -If you want to parallelize the spout, it's recommended that you use multiple instances of the spout in your topolgoy -and use MQTT topic selectors to parition the data. How you implement the partitioning strategy is ultimately determined -by your MQTT topic structure. As an example, if you had topics partitioned by region (e.g. east/west) you could do -something like the following: - -```java -String spout1Topic = "users/east/#"; -String spout2Topic = "users/west/#"; -``` - -and then join the resulting streams together by subscribing a bolt to each stream. - - -### Using Flux - -The following Flux YAML configuration creates the toplolgy used in the example: - -```yaml -name: "mqtt-topology" - -components: - ########## MQTT Spout Config ############ - - id: "mqtt-type" - className: "org.apache.storm.mqtt.examples.CustomMessageMapper" - - - id: "mqtt-options" - className: "org.apache.storm.mqtt.common.MqttOptions" - properties: - - name: "url" - value: "tcp://localhost:1883" - - name: "topics" - value: - - "/users/tgoetz/#" - -# topology configuration -config: - topology.workers: 1 - topology.max.spout.pending: 1000 - -# spout definitions -spouts: - - id: "mqtt-spout" - className: "org.apache.storm.mqtt.spout.MqttSpout" - constructorArgs: - - ref: "mqtt-type" - - ref: "mqtt-options" - parallelism: 1 - -# bolt definitions -bolts: - - id: "log" - className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" - parallelism: 1 - - -streams: - - from: "mqtt-spout" - to: "log" - grouping: - type: SHUFFLE - -``` - - -### Using Java - -Similarly, you can create the same topology using the Storm Core Java API: - -```java -TopologyBuilder builder = new TopologyBuilder(); -MqttOptions options = new MqttOptions(); -options.setTopics(Arrays.asList("/users/tgoetz/#")); -options.setCleanConnection(false); -MqttSpout spout = new MqttSpout(new StringMessageMapper(), options); - -MqttBolt bolt = new LogInfoBolt(); - -builder.setSpout("mqtt-spout", spout); -builder.setBolt("log-bolt", bolt).shuffleGrouping("mqtt-spout"); - -return builder.createTopology(); -``` - -## SSL/TLS -If the MQTT broker you are connecting to requires SSL or SSL client authentication, you need to configure the spout -with an appropriate URI, and the location of keystore/truststore files containing the necessary certificates. - -### SSL/TLS URIs -To connect over SSL/TLS use a URI with a prefix of `ssl://` or `tls://` instead of `tcp://`. For further control over -the algorithm, you can specify a specific protocol: - - * `ssl://` Use the JVM default version of the SSL protocol. - * `sslv*://` Use a specific version of the SSL protocol, where `*` is replaced by the version (e.g. `sslv3://`) - * `tls://` Use the JVM default version of the TLS protocol. - * `tlsv*://` Use a specific version of the TLS protocol, where `*` is replaced by the version (e.g. `tlsv1.1://`) - - -### Specifying Keystore/Truststore Locations - - The `MqttSpout`, `MqttBolt` and `MqttPublishFunction` all have constructors that take a `KeyStoreLoader` instance that - is used to load the certificates required for TLS/SSL connections. For example: - -```java - public MqttSpout(MqttMessageMapper type, MqttOptions options, KeyStoreLoader keyStoreLoader) -``` - -The `DefaultKeyStoreLoader` class can be used to load certificates from the local filesystem. Note that the -keystore/truststore need to be available on all worker nodes where the spout/bolt might be executed. To use -`DefaultKeyStoreLoader` you specify the location of the keystore/truststore file(s), and set the necessary passwords: - -```java -DefaultKeyStoreLoader ksl = new DefaultKeyStoreLoader("/path/to/keystore.jks", "/path/to/truststore.jks"); -ksl.setKeyStorePassword("password"); -ksl.setTrustStorePassword("password"); -//... -``` - -If your keystore/truststore certificates are stored in a single file, you can use the one-argument constructor: - -```java -DefaultKeyStoreLoader ksl = new DefaultKeyStoreLoader("/path/to/keystore.jks"); -ksl.setKeyStorePassword("password"); -//... -``` - -SSL/TLS can also be configured using Flux: - -```yaml -name: "mqtt-topology" - -components: - ########## MQTT Spout Config ############ - - id: "mqtt-type" - className: "org.apache.storm.mqtt.examples.CustomMessageMapper" - - - id: "keystore-loader" - className: "org.apache.storm.mqtt.ssl.DefaultKeyStoreLoader" - constructorArgs: - - "keystore.jks" - - "truststore.jks" - properties: - - name: "keyPassword" - value: "password" - - name: "keyStorePassword" - value: "password" - - name: "trustStorePassword" - value: "password" - - - id: "mqtt-options" - className: "org.apache.storm.mqtt.common.MqttOptions" - properties: - - name: "url" - value: "ssl://raspberrypi.local:8883" - - name: "topics" - value: - - "/users/tgoetz/#" - -# topology configuration -config: - topology.workers: 1 - topology.max.spout.pending: 1000 - -# spout definitions -spouts: - - id: "mqtt-spout" - className: "org.apache.storm.mqtt.spout.MqttSpout" - constructorArgs: - - ref: "mqtt-type" - - ref: "mqtt-options" - - ref: "keystore-loader" - parallelism: 1 - -# bolt definitions -bolts: - - - id: "log" - className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" - parallelism: 1 - - -streams: - - - from: "mqtt-spout" - to: "log" - grouping: - type: SHUFFLE - -``` - -## Committer Sponsors - - * P. Taylor Goetz ([[email protected]](mailto:[email protected])) \ No newline at end of file diff --git a/external/storm-mqtt/pom.xml b/external/storm-mqtt/pom.xml deleted file mode 100644 index 11377fd52..000000000 --- a/external/storm-mqtt/pom.xml +++ /dev/null @@ -1,148 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <artifactId>storm-mqtt</artifactId> - <packaging>jar</packaging> - - <name>storm-mqtt</name> - - <parent> - <groupId>org.apache.storm</groupId> - <artifactId>storm</artifactId> - <version>2.6.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <repositories> - <repository> - <id>bintray</id> - <url>https://dl.bintray.com/andsel/maven/</url> - <releases> - <enabled>true</enabled> - </releases> - <snapshots> - <enabled>false</enabled> - </snapshots> - </repository> - </repositories> - - <dependencies> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-broker</artifactId> - <version>${activemq.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <!-- required to bypass failure in mvn deploy --> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-protobuf</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-mqtt</artifactId> - <version>${activemq.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-kahadb-store</artifactId> - <version>${activemq.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-client</artifactId> - <version>${project.version}</version> - <scope>${provided.scope}</scope> - </dependency> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-server</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.fusesource.mqtt-client</groupId> - <artifactId>mqtt-client</artifactId> - <version>1.16</version> - </dependency> - <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - <version>2.5</version> - </dependency> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-client</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <enableAssertions>false</enableAssertions> - <redirectTestOutputToFile>true</redirectTestOutputToFile> - <excludedGroups>${java.unit.test.exclude}</excludedGroups> - <includes> - <include>${java.unit.test.include}</include> - </includes> - <argLine>-Djava.net.preferIPv4Stack=true -Xmx1536m</argLine> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-failsafe-plugin</artifactId> - <configuration> - <enableAssertions>false</enableAssertions> - <redirectTestOutputToFile>true</redirectTestOutputToFile> - <includes> - <include>${java.integration.test.include}</include> - </includes> - <groups>${java.integration.test.group}</groups> <!--set in integration-test the profile--> - <argLine>-Djava.net.preferIPv4Stack=true -Xmx1536m</argLine> - </configuration> - <executions> - <execution> - <goals> - <goal>integration-test</goal> - <goal>verify</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <!--Note - the version would be inherited--> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-pmd-plugin</artifactId> - </plugin> - </plugins> - </build> - -</project> diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java deleted file mode 100644 index 3644d16c7..000000000 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package org.apache.storm.mqtt; - -import org.fusesource.mqtt.client.Tracer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Wrapper around SLF4J logger that allows MQTT messages to be logged. - */ -public class MqttLogger extends Tracer { - private static final Logger LOG = LoggerFactory.getLogger(MqttLogger.class); - - @Override - public void debug(String message, Object... args) { - LOG.debug(String.format(message, args)); - } - -} diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java deleted file mode 100644 index 90401efba..000000000 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package org.apache.storm.mqtt; - -/** - * Represents an MQTT Message consisting of a topic string (e.g. "/users/ptgoetz/office/thermostat") - * and a byte array message/payload. - * - */ -public class MqttMessage { - private String topic; - private byte[] message; - - - public MqttMessage(String topic, byte[] payload) { - this.topic = topic; - this.message = payload; - } - - public byte[] getMessage() { - return this.message; - } - - public String getTopic() { - return this.topic; - } -} diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java deleted file mode 100644 index caeb6057e..000000000 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package org.apache.storm.mqtt; - -import java.io.Serializable; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; - -/** - * Represents an object that can be converted to a Storm Tuple from an AckableMessage, - * given a MQTT Topic Name and a byte array payload. - */ -public interface MqttMessageMapper extends Serializable { - /** - * Convert a `MqttMessage` to a set of Values that can be emitted as a Storm Tuple. - * - * @param message An MQTT Message. - * @return Values representing a Storm Tuple. - */ - Values toValues(MqttMessage message); - - /** - * Returns the list of output fields this Mapper produces. - * - * @return the list of output fields this mapper produces. - */ - Fields outputFields(); -} diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java deleted file mode 100644 index 58473b7a4..000000000 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package org.apache.storm.mqtt; - -import java.io.Serializable; - -import org.apache.storm.tuple.ITuple; - -/** - * Given a Tuple, converts it to an MQTT message. - */ -public interface MqttTupleMapper extends Serializable { - - /** - * Converts a Tuple to a MqttMessage. - * @param tuple the incoming tuple - * @return the message to publish - */ - MqttMessage toMessage(ITuple tuple); - -} diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java deleted file mode 100644 index 745607b9e..000000000 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package org.apache.storm.mqtt.bolt; - -import java.util.Map; -import org.apache.storm.Config; -import org.apache.storm.mqtt.MqttMessage; -import org.apache.storm.mqtt.MqttTupleMapper; -import org.apache.storm.mqtt.common.MqttOptions; -import org.apache.storm.mqtt.common.MqttPublisher; -import org.apache.storm.mqtt.common.SslUtils; -import org.apache.storm.mqtt.ssl.KeyStoreLoader; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt; -import org.apache.storm.tuple.Tuple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class MqttBolt extends BaseTickTupleAwareRichBolt { - private static final Logger LOG = LoggerFactory.getLogger(MqttBolt.class); - private MqttTupleMapper mapper; - private transient MqttPublisher publisher; - private boolean retain = false; - private transient OutputCollector collector; - private MqttOptions options; - private KeyStoreLoader keyStoreLoader; - private transient String topologyName; - - - public MqttBolt(MqttOptions options, MqttTupleMapper mapper) { - this(options, mapper, null, false); - } - - public MqttBolt(MqttOptions options, MqttTupleMapper mapper, boolean retain) { - this(options, mapper, null, retain); - } - - public MqttBolt(MqttOptions options, MqttTupleMapper mapper, KeyStoreLoader keyStoreLoader) { - this(options, mapper, keyStoreLoader, false); - } - - public MqttBolt(MqttOptions options, MqttTupleMapper mapper, KeyStoreLoader keyStoreLoader, boolean retain) { - this.options = options; - this.mapper = mapper; - this.retain = retain; - this.keyStoreLoader = keyStoreLoader; - // the following code is duplicated in the constructor of MqttPublisher - // we reproduce it here so we fail on the client side if SSL is misconfigured, rather than when the topology - // is deployed to the cluster - SslUtils.checkSslConfig(this.options.getUrl(), keyStoreLoader); - } - - @Override - public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - this.topologyName = (String) conf.get(Config.TOPOLOGY_NAME); - this.publisher = new MqttPublisher(this.options, this.keyStoreLoader, this.retain); - try { - this.publisher.connectMqtt(this.topologyName + "-" + context.getThisComponentId() + "-" + context.getThisTaskId()); - } catch (Exception e) { - LOG.error("Unable to connect to MQTT Broker.", e); - throw new RuntimeException("Unable to connect to MQTT Broker.", e); - } - } - - @Override - protected void process(Tuple input) { - MqttMessage message = this.mapper.toMessage(input); - try { - this.publisher.publish(message); - this.collector.ack(input); - } catch (Exception e) { - LOG.warn("Error publishing MQTT message. Failing tuple.", e); - // should we fail the tuple or kill the worker? - collector.reportError(e); - collector.fail(input); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - // this bolt does not emit tuples - } -} diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java deleted file mode 100644 index 90c922247..000000000 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java +++ /dev/null @@ -1,301 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package org.apache.storm.mqtt.common; - -import java.io.Serializable; -import java.util.List; - -/** - * MQTT Configuration Options. - */ -public class MqttOptions implements Serializable { - private String url = "tcp://localhost:1883"; - private List<String> topics = null; - private boolean cleanConnection = false; - - private String willTopic; - private String willPayload; - private int willQos = 1; - private boolean willRetain = false; - - private long reconnectDelay = 10; - private long reconnectDelayMax = 30 * 1000; - private double reconnectBackOffMultiplier = 2.0f; - private long reconnectAttemptsMax = -1; - private long connectAttemptsMax = -1; - - private String userName = ""; - private String password = ""; - - private int qos = 1; - - public String getUrl() { - return url; - } - - /** - * Sets the url for connecting to the MQTT broker, e.g. {@code tcp://localhost:1883}. - */ - public void setUrl(String url) { - this.url = url; - } - - public List<String> getTopics() { - return topics; - } - - /** - * A list of MQTT topics to subscribe to. - */ - public void setTopics(List<String> topics) { - this.topics = topics; - } - - public boolean isCleanConnection() { - return cleanConnection; - } - - /** - * Set to false if you want the MQTT server to persist topic subscriptions and ack positions across client sessions. - * Defaults to false. - */ - public void setCleanConnection(boolean cleanConnection) { - this.cleanConnection = cleanConnection; - } - - public String getWillTopic() { - return willTopic; - } - - /** - * If set the server will publish the client's Will message to the specified topics if the client has an unexpected - * disconnection. - */ - public void setWillTopic(String willTopic) { - this.willTopic = willTopic; - } - - public String getWillPayload() { - return willPayload; - } - - /** - * The Will message to send. Defaults to a zero length message. - */ - public void setWillPayload(String willPayload) { - this.willPayload = willPayload; - } - - public long getReconnectDelay() { - return reconnectDelay; - } - - /** - * How long to wait in ms before the first reconnect attempt. Defaults to 10. - */ - public void setReconnectDelay(long reconnectDelay) { - this.reconnectDelay = reconnectDelay; - } - - public long getReconnectDelayMax() { - return reconnectDelayMax; - } - - /** - * The maximum amount of time in ms to wait between reconnect attempts. Defaults to 30,000. - */ - public void setReconnectDelayMax(long reconnectDelayMax) { - this.reconnectDelayMax = reconnectDelayMax; - } - - public double getReconnectBackOffMultiplier() { - return reconnectBackOffMultiplier; - } - - /** - * The Exponential backoff be used between reconnect attempts. Set to 1 to disable exponential backoff. Defaults to - * 2. - */ - public void setReconnectBackOffMultiplier(double reconnectBackOffMultiplier) { - this.reconnectBackOffMultiplier = reconnectBackOffMultiplier; - } - - public long getReconnectAttemptsMax() { - return reconnectAttemptsMax; - } - - /** - * The maximum number of reconnect attempts before an error is reported back to the client after a server - * connection had previously been established. Set to -1 to use unlimited attempts. Defaults to -1. - */ - public void setReconnectAttemptsMax(long reconnectAttemptsMax) { - this.reconnectAttemptsMax = reconnectAttemptsMax; - } - - public long getConnectAttemptsMax() { - return connectAttemptsMax; - } - - /** - * The maximum number of reconnect attempts before an error is reported back to the client on the first attempt by - * the client to connect to a server. Set to -1 to use unlimited attempts. Defaults to -1. - */ - public void setConnectAttemptsMax(long connectAttemptsMax) { - this.connectAttemptsMax = connectAttemptsMax; - } - - public String getUserName() { - return userName; - } - - /** - * The username for authenticated sessions. - */ - public void setUserName(String userName) { - this.userName = userName; - } - - public String getPassword() { - return password; - } - - /** - * The password for authenticated sessions. - */ - public void setPassword(String password) { - this.password = password; - } - - public int getQos() { - return this.qos; - } - - /** - * Sets the quality of service to use for MQTT messages. Defaults to 1 (at least once). - */ - public void setQos(int qos) { - if (qos < 0 || qos > 2) { - throw new IllegalArgumentException("MQTT QoS must be >= 0 and <= 2"); - } - this.qos = qos; - } - - public int getWillQos() { - return this.willQos; - } - - /** - * Sets the quality of service to use for the MQTT Will message. Defaults to 1 (at least once). - */ - public void setWillQos(int qos) { - if (qos < 0 || qos > 2) { - throw new IllegalArgumentException("MQTT Will QoS must be >= 0 and <= 2"); - } - this.willQos = qos; - } - - public boolean getWillRetain() { - return this.willRetain; - } - - /** - * Set to true if you want the Will message to be published with the retain option. - */ - public void setWillRetain(boolean retain) { - this.willRetain = retain; - } - - public static class Builder { - private MqttOptions options = new MqttOptions(); - - public Builder url(String url) { - this.options.url = url; - return this; - } - - - public Builder topics(List<String> topics) { - this.options.topics = topics; - return this; - } - - public Builder cleanConnection(boolean cleanConnection) { - this.options.cleanConnection = cleanConnection; - return this; - } - - public Builder willTopic(String willTopic) { - this.options.willTopic = willTopic; - return this; - } - - public Builder willPayload(String willPayload) { - this.options.willPayload = willPayload; - return this; - } - - public Builder willRetain(boolean retain) { - this.options.willRetain = retain; - return this; - } - - public Builder willQos(int qos) { - this.options.setWillQos(qos); - return this; - } - - public Builder reconnectDelay(long reconnectDelay) { - this.options.reconnectDelay = reconnectDelay; - return this; - } - - public Builder reconnectDelayMax(long reconnectDelayMax) { - this.options.reconnectDelayMax = reconnectDelayMax; - return this; - } - - public Builder reconnectBackOffMultiplier(double reconnectBackOffMultiplier) { - this.options.reconnectBackOffMultiplier = reconnectBackOffMultiplier; - return this; - } - - public Builder reconnectAttemptsMax(long reconnectAttemptsMax) { - this.options.reconnectAttemptsMax = reconnectAttemptsMax; - return this; - } - - public Builder connectAttemptsMax(long connectAttemptsMax) { - this.options.connectAttemptsMax = connectAttemptsMax; - return this; - } - - public Builder userName(String userName) { - this.options.userName = userName; - return this; - } - - public Builder password(String password) { - this.options.password = password; - return this; - } - - public Builder qos(int qos) { - this.options.setQos(qos); - return this; - } - - public MqttOptions build() { - return this.options; - } - } -} diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java deleted file mode 100644 index 1f70b9315..000000000 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package org.apache.storm.mqtt.common; - -import org.apache.storm.mqtt.MqttMessage; -import org.apache.storm.mqtt.ssl.KeyStoreLoader; - -import org.fusesource.mqtt.client.BlockingConnection; -import org.fusesource.mqtt.client.MQTT; -import org.fusesource.mqtt.client.QoS; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MqttPublisher { - private static final Logger LOG = LoggerFactory.getLogger(MqttPublisher.class); - - private MqttOptions options; - private transient BlockingConnection connection; - private KeyStoreLoader keyStoreLoader; - private QoS qos; - private boolean retain = false; - - - public MqttPublisher(MqttOptions options) { - this(options, null, false); - } - - public MqttPublisher(MqttOptions options, boolean retain) { - this(options, null, retain); - } - - public MqttPublisher(MqttOptions options, KeyStoreLoader keyStoreLoader, boolean retain) { - this.retain = retain; - this.options = options; - this.keyStoreLoader = keyStoreLoader; - SslUtils.checkSslConfig(this.options.getUrl(), keyStoreLoader); - this.qos = MqttUtils.qosFromInt(this.options.getQos()); - } - - public void publish(MqttMessage message) throws Exception { - this.connection.publish(message.getTopic(), message.getMessage(), this.qos, this.retain); - } - - public void connectMqtt(String clientId) throws Exception { - MQTT client = MqttUtils.configureClient(this.options, clientId, this.keyStoreLoader); - this.connection = client.blockingConnection(); - this.connection.connect(); - } -} diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java deleted file mode 100644 index 029b4c6a5..000000000 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package org.apache.storm.mqtt.common; - -import java.net.URI; - -import org.apache.storm.mqtt.MqttLogger; -import org.apache.storm.mqtt.ssl.KeyStoreLoader; -import org.fusesource.mqtt.client.MQTT; -import org.fusesource.mqtt.client.QoS; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MqttUtils { - private static final Logger LOG = LoggerFactory.getLogger(MqttUtils.class); - - private MqttUtils() {} - - public static QoS qosFromInt(int i) { - QoS qos = null; - switch (i) { - case 0: - qos = QoS.AT_MOST_ONCE; - break; - case 1: - qos = QoS.AT_LEAST_ONCE; - break; - case 2: - qos = QoS.EXACTLY_ONCE; - break; - default: - throw new IllegalArgumentException(i + "is not a valid MQTT QoS."); - } - return qos; - } - - - public static MQTT configureClient(MqttOptions options, String clientId, KeyStoreLoader keyStoreLoader) - throws Exception { - - MQTT client = new MQTT(); - URI uri = URI.create(options.getUrl()); - - client.setHost(uri); - if (!uri.getScheme().toLowerCase().equals("tcp")) { - client.setSslContext(SslUtils.sslContext(uri.getScheme(), keyStoreLoader)); - } - client.setClientId(clientId); - LOG.info("MQTT ClientID: {}", client.getClientId().toString()); - client.setCleanSession(options.isCleanConnection()); - - client.setReconnectDelay(options.getReconnectDelay()); - client.setReconnectDelayMax(options.getReconnectDelayMax()); - client.setReconnectBackOffMultiplier(options.getReconnectBackOffMultiplier()); - client.setConnectAttemptsMax(options.getConnectAttemptsMax()); - client.setReconnectAttemptsMax(options.getReconnectAttemptsMax()); - - - client.setUserName(options.getUserName()); - client.setPassword(options.getPassword()); - client.setTracer(new MqttLogger()); - - if (options.getWillTopic() != null && options.getWillPayload() != null) { - QoS qos = MqttUtils.qosFromInt(options.getWillQos()); - client.setWillQos(qos); - client.setWillTopic(options.getWillTopic()); - client.setWillMessage(options.getWillPayload()); - client.setWillRetain(options.getWillRetain()); - } - return client; - } -} diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java deleted file mode 100644 index 83f311fbe..000000000 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package org.apache.storm.mqtt.common; - -import java.net.URI; -import java.security.KeyStore; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManager; -import javax.net.ssl.TrustManagerFactory; -import org.apache.storm.mqtt.ssl.KeyStoreLoader; - -public class SslUtils { - private SslUtils() {} - - public static void checkSslConfig(String url, KeyStoreLoader loader) { - URI uri = URI.create(url); - String scheme = uri.getScheme().toLowerCase(); - if (!(scheme.equals("tcp") || scheme.startsWith("tls") || scheme.startsWith("ssl"))) { - throw new IllegalArgumentException("Unrecognized URI scheme: " + scheme); - } - if (!scheme.equalsIgnoreCase("tcp") && loader == null) { - throw new IllegalStateException("A TLS/SSL MQTT URL was specified, but no KeyStoreLoader configured. " - + "A KeyStoreLoader implementation is required when using TLS/SSL."); - } - } - - public static SSLContext sslContext(String scheme, KeyStoreLoader keyStoreLoader) throws Exception { - KeyStore ks = KeyStore.getInstance("JKS"); - ks.load(keyStoreLoader.keyStoreInputStream(), keyStoreLoader.keyStorePassword().toCharArray()); - - KeyStore ts = KeyStore.getInstance("JKS"); - ts.load(keyStoreLoader.trustStoreInputStream(), keyStoreLoader.trustStorePassword().toCharArray()); - - KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); - kmf.init(ks, keyStoreLoader.keyPassword().toCharArray()); - - TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); - tmf.init(ts); - - SSLContext sc = SSLContext.getInstance(scheme.toUpperCase()); - TrustManager[] trustManagers = tmf.getTrustManagers(); - sc.init(kmf.getKeyManagers(), trustManagers, null); - - return sc; - } -} diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java deleted file mode 100644 index a943f8653..000000000 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package org.apache.storm.mqtt.mappers; - -import org.apache.storm.mqtt.MqttMessage; -import org.apache.storm.mqtt.MqttMessageMapper; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; - - -public class ByteArrayMessageMapper implements MqttMessageMapper { - @Override - public Values toValues(MqttMessage message) { - return new Values(message.getTopic(), message.getMessage()); - } - - @Override - public Fields outputFields() { - return new Fields("topic", "message"); - } -} diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java deleted file mode 100644 index 3f76b6177..000000000 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package org.apache.storm.mqtt.mappers; - -import org.apache.storm.mqtt.MqttMessage; -import org.apache.storm.mqtt.MqttMessageMapper; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; - -/** - * Given a String topic and byte[] message, emits a tuple with fields - * "topic" and "message", both of which are Strings. - */ -public class StringMessageMapper implements MqttMessageMapper { - @Override - public Values toValues(MqttMessage message) { - return new Values(message.getTopic(), new String(message.getMessage())); - } - - @Override - public Fields outputFields() { - return new Fields("topic", "message"); - } -} diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java deleted file mode 100644 index 0b9144215..000000000 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package org.apache.storm.mqtt.spout; - -import org.apache.commons.lang.builder.EqualsBuilder; -import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.storm.mqtt.MqttMessage; - -/** - * Represents an MQTT Message consisting of a topic string (e.g. "/users/ptgoetz/office/thermostat") - * and a byte array message/payload. - * - */ -class AckableMessage { - private String topic; - private byte[] message; - private Runnable ack; - - AckableMessage(String topic, byte[] message, Runnable ack) { - this.topic = topic; - this.message = message; - this.ack = ack; - } - - public MqttMessage getMessage() { - return new MqttMessage(this.topic, this.message); - } - - @Override - public int hashCode() { - return new HashCodeBuilder(71, 123) - .append(this.topic) - .append(this.message) - .toHashCode(); - } - - - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (obj == this) { - return true; - } - if (obj.getClass() != getClass()) { - return false; - } - AckableMessage tm = (AckableMessage) obj; - return new EqualsBuilder() - .appendSuper(super.equals(obj)) - .append(this.topic, tm.topic) - .append(this.message, tm.message) - .isEquals(); - } - - Runnable ack() { - return this.ack; - } -} diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java deleted file mode 100644 index 265a6f97c..000000000 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java +++ /dev/null @@ -1,270 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package org.apache.storm.mqtt.spout; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; -import org.apache.storm.Config; -import org.apache.storm.mqtt.MqttMessageMapper; -import org.apache.storm.mqtt.common.MqttOptions; -import org.apache.storm.mqtt.common.MqttUtils; -import org.apache.storm.mqtt.common.SslUtils; -import org.apache.storm.mqtt.ssl.KeyStoreLoader; -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.IRichSpout; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.fusesource.hawtbuf.Buffer; -import org.fusesource.hawtbuf.UTF8Buffer; -import org.fusesource.mqtt.client.Callback; -import org.fusesource.mqtt.client.CallbackConnection; -import org.fusesource.mqtt.client.Listener; -import org.fusesource.mqtt.client.MQTT; -import org.fusesource.mqtt.client.QoS; -import org.fusesource.mqtt.client.Topic; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MqttSpout implements IRichSpout, Listener { - private static final Logger LOG = LoggerFactory.getLogger(MqttSpout.class); - protected transient SpoutOutputCollector collector; - protected transient TopologyContext context; - protected transient LinkedBlockingQueue<AckableMessage> incoming; - protected transient HashMap<Long, AckableMessage> pending; - protected MqttMessageMapper type; - protected MqttOptions options; - protected KeyStoreLoader keyStoreLoader; - private String topologyName; - private CallbackConnection connection; - private transient Map<String, Object> conf; - private boolean mqttConnected = false; - private boolean mqttConnectFailed = false; - - - private Long sequence = Long.MIN_VALUE; - - protected MqttSpout() {} - - public MqttSpout(MqttMessageMapper type, MqttOptions options) { - this(type, options, null); - } - - public MqttSpout(MqttMessageMapper type, MqttOptions options, KeyStoreLoader keyStoreLoader) { - this.type = type; - this.options = options; - this.keyStoreLoader = keyStoreLoader; - SslUtils.checkSslConfig(this.options.getUrl(), this.keyStoreLoader); - } - - private Long nextId() { - this.sequence++; - if (this.sequence == Long.MAX_VALUE) { - this.sequence = Long.MIN_VALUE; - } - return this.sequence; - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(this.type.outputFields()); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - - @Override - public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { - this.topologyName = (String) conf.get(Config.TOPOLOGY_NAME); - - this.collector = collector; - this.context = context; - this.conf = conf; - - this.incoming = new LinkedBlockingQueue<>(); - this.pending = new HashMap<>(); - - try { - connectMqtt(); - } catch (Exception e) { - this.collector.reportError(e); - throw new RuntimeException("MQTT Connection failed.", e); - } - - } - - private void connectMqtt() throws Exception { - String clientId = this.topologyName + "-" - + this.context.getThisComponentId() + "-" - + this.context.getThisTaskId(); - - MQTT client = MqttUtils.configureClient(this.options, clientId, this.keyStoreLoader); - this.connection = client.callbackConnection(); - this.connection.listener(this); - this.connection.connect(new ConnectCallback()); - - while (!this.mqttConnected && !this.mqttConnectFailed) { - LOG.info("Waiting for connection..."); - Thread.sleep(500); - } - - if (this.mqttConnected) { - List<String> topicList = this.options.getTopics(); - Topic[] topics = new Topic[topicList.size()]; - QoS qos = MqttUtils.qosFromInt(this.options.getQos()); - for (int i = 0; i < topicList.size(); i++) { - topics[i] = new Topic(topicList.get(i), qos); - } - connection.subscribe(topics, new SubscribeCallback()); - } - } - - - @Override - public void close() { - this.connection.disconnect(new DisconnectCallback()); - } - - @Override - public void activate() { - } - - @Override - public void deactivate() { - } - - /** - * When this method is called, Storm is requesting that the Spout emit tuples to the - * output collector. This method should be non-blocking, so if the Spout has no tuples - * to emit, this method should return. nextTuple, ack, and fail are all called in a tight - * loop in a single thread in the spout task. When there are no tuples to emit, it is courteous - * to have nextTuple sleep for a short amount of time (like a single millisecond) - * so as not to waste too much CPU. - */ - @Override - public void nextTuple() { - AckableMessage tm = this.incoming.poll(); - if (tm != null) { - Long id = nextId(); - this.collector.emit(this.type.toValues(tm.getMessage()), id); - this.pending.put(id, tm); - } else { - Thread.yield(); - } - - } - - /** - * Storm has determined that the tuple emitted by this spout with the msgId identifier - * has been fully processed. Typically, an implementation of this method will take that - * message off the queue and prevent it from being replayed. - * - * @param msgId the id of the message to acknowledge - */ - @Override - public void ack(Object msgId) { - AckableMessage msg = this.pending.remove(msgId); - this.connection.getDispatchQueue().execute(msg.ack()); - } - - /** - * The tuple emitted by this spout with the msgId identifier has failed to be - * fully processed. Typically, an implementation of this method will put that - * message back on the queue to be replayed at a later time. - * - * @param msgId the id of the failed message - */ - @Override - public void fail(Object msgId) { - try { - this.incoming.put(this.pending.remove(msgId)); - } catch (InterruptedException e) { - LOG.warn("Interrupted while re-queueing message.", e); - } - } - - - // ################# Listener Implementation ###################### - @Override - public void onConnected() { - // this gets called repeatedly for no apparent reason, don't do anything - } - - @Override - public void onDisconnected() { - // this gets called repeatedly for no apparent reason, don't do anything - } - - @Override - public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) { - LOG.debug("Received message: topic={}, payload={}", topic.toString(), new String(payload.toByteArray())); - try { - this.incoming.put(new AckableMessage(topic.toString(), payload.toByteArray(), ack)); - } catch (InterruptedException e) { - LOG.warn("Interrupted while queueing an MQTT message."); - } - } - - @Override - public void onFailure(Throwable throwable) { - LOG.error("MQTT Connection Failure.", throwable); - MqttSpout.this.connection.disconnect(new DisconnectCallback()); - throw new RuntimeException("MQTT Connection failure.", throwable); - } - - // ################# Connect Callback Implementation ###################### - private class ConnectCallback implements Callback<Void> { - @Override - public void onSuccess(Void v) { - LOG.info("MQTT Connected. Subscribing to topic..."); - MqttSpout.this.mqttConnected = true; - } - - @Override - public void onFailure(Throwable throwable) { - LOG.info("MQTT Connection failed."); - MqttSpout.this.mqttConnectFailed = true; - } - } - - // ################# Subscribe Callback Implementation ###################### - private class SubscribeCallback implements Callback<byte[]> { - @Override - public void onSuccess(byte[] qos) { - LOG.info("Subscripton sucessful."); - } - - @Override - public void onFailure(Throwable throwable) { - LOG.error("MQTT Subscripton failed.", throwable); - throw new RuntimeException("MQTT Subscribe failed.", throwable); - } - } - - // ################# Subscribe Callback Implementation ###################### - private class DisconnectCallback implements Callback<Void> { - @Override - public void onSuccess(Void theVoid) { - LOG.info("MQTT Disconnect successful."); - } - - @Override - public void onFailure(Throwable throwable) { - // Disconnects don't fail. - } - } - -} diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java deleted file mode 100644 index 14c3c6cd0..000000000 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package org.apache.storm.mqtt.ssl; - -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.InputStream; - -/** - * KeyStoreLoader implementation that uses local files. - */ -public class DefaultKeyStoreLoader implements KeyStoreLoader { - private String ksFile = null; - private String tsFile = null; - private String keyStorePassword = ""; - private String trustStorePassword = ""; - private String keyPassword = ""; - - /** - * Creates a DefaultKeystoreLoader that uses the same file - * for both the keystore and truststore. - * - * @param keystore path to keystore file - */ - public DefaultKeyStoreLoader(String keystore) { - this.ksFile = keystore; - } - - /** - * Creates a DefaultKeystoreLoader that uses separate files - * for the keystore and truststore. - * - * @param keystore path to keystore file - * @param truststore path to truststore file - */ - public DefaultKeyStoreLoader(String keystore, String truststore) { - this.ksFile = keystore; - this.tsFile = truststore; - } - - public void setKeyStorePassword(String keyStorePassword) { - this.keyStorePassword = keyStorePassword; - } - - public void setTrustStorePassword(String trustStorePassword) { - this.trustStorePassword = trustStorePassword; - } - - public void setKeyPassword(String keyPassword) { - this.keyPassword = keyPassword; - } - - @Override - public InputStream keyStoreInputStream() throws FileNotFoundException { - return new FileInputStream(this.ksFile); - } - - @Override - public InputStream trustStoreInputStream() throws FileNotFoundException { - // if no truststore file, assume the truststore is the keystore. - if (this.tsFile == null) { - return new FileInputStream(this.ksFile); - } else { - return new FileInputStream(this.tsFile); - } - } - - @Override - public String keyStorePassword() { - return this.keyStorePassword; - } - - @Override - public String trustStorePassword() { - return this.trustStorePassword; - } - - @Override - public String keyPassword() { - return this.keyPassword; - } -} diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java deleted file mode 100644 index 5630ff80e..000000000 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package org.apache.storm.mqtt.ssl; - -import java.io.IOException; -import java.io.InputStream; -import java.io.Serializable; - -/** - * Abstraction for loading keystore/truststore data. This allows keystores - * to be loaded from different sources (File system, HDFS, etc.). - */ -public interface KeyStoreLoader extends Serializable { - - String keyStorePassword(); - - String trustStorePassword(); - - String keyPassword(); - - InputStream keyStoreInputStream() throws IOException; - - InputStream trustStoreInputStream() throws IOException; -} diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java deleted file mode 100644 index 329cff599..000000000 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package org.apache.storm.mqtt.trident; - -import java.util.Map; -import org.apache.storm.Config; -import org.apache.storm.mqtt.MqttMessage; -import org.apache.storm.mqtt.MqttTupleMapper; -import org.apache.storm.mqtt.common.MqttOptions; -import org.apache.storm.mqtt.common.MqttPublisher; -import org.apache.storm.mqtt.common.SslUtils; -import org.apache.storm.mqtt.ssl.KeyStoreLoader; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.topology.FailedException; -import org.apache.storm.trident.operation.BaseFunction; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.operation.TridentOperationContext; -import org.apache.storm.trident.tuple.TridentTuple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MqttPublishFunction extends BaseFunction { - private static final Logger LOG = LoggerFactory.getLogger(MqttPublishFunction.class); - private MqttTupleMapper mapper; - private transient MqttPublisher publisher; - private boolean retain = false; - private transient OutputCollector collector; - private MqttOptions options; - private KeyStoreLoader keyStoreLoader; - private transient String topologyName; - - - public MqttPublishFunction(MqttOptions options, MqttTupleMapper mapper, KeyStoreLoader keyStoreLoader, boolean retain) { - this.options = options; - this.mapper = mapper; - this.retain = retain; - this.keyStoreLoader = keyStoreLoader; - // the following code is duplicated in the constructor of MqttPublisher - // we reproduce it here so we fail on the client side if SSL is misconfigured, rather than when the topology - // is deployed to the cluster - SslUtils.checkSslConfig(this.options.getUrl(), keyStoreLoader); - } - - - @Override - public void prepare(Map<String, Object> conf, TridentOperationContext context) { - this.topologyName = (String) conf.get(Config.TOPOLOGY_NAME); - this.publisher = new MqttPublisher(this.options, this.keyStoreLoader, this.retain); - try { - this.publisher.connectMqtt(this.topologyName + "-" + context.getPartitionIndex()); - } catch (Exception e) { - LOG.error("Unable to connect to MQTT Broker.", e); - throw new RuntimeException("Unable to connect to MQTT Broker.", e); - } - } - - @Override - public void execute(TridentTuple tuple, TridentCollector collector) { - MqttMessage message = this.mapper.toMessage(tuple); - try { - this.publisher.publish(message); - } catch (Exception e) { - LOG.warn("Error publishing MQTT message. Failing tuple.", e); - // should we fail the batch or kill the worker? - throw new FailedException(); - } - } -} diff --git a/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java b/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java deleted file mode 100644 index b823456af..000000000 --- a/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - */ - -package org.apache.storm.mqtt; - -import java.io.Serializable; -import java.net.URI; -import java.util.Arrays; -import org.apache.activemq.broker.BrokerService; -import org.apache.storm.Config; -import org.apache.storm.LocalCluster; -import org.apache.storm.LocalCluster.LocalTopology; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.mqtt.bolt.MqttBolt; -import org.apache.storm.mqtt.common.MqttOptions; -import org.apache.storm.mqtt.common.MqttPublisher; -import org.apache.storm.mqtt.mappers.StringMessageMapper; -import org.apache.storm.mqtt.spout.MqttSpout; -import org.apache.storm.testing.IntegrationTest; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.ITuple; -import org.fusesource.mqtt.client.BlockingConnection; -import org.fusesource.mqtt.client.MQTT; -import org.fusesource.mqtt.client.Message; -import org.fusesource.mqtt.client.QoS; -import org.fusesource.mqtt.client.Topic; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; - -@IntegrationTest -public class StormMqttIntegrationTest implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(StormMqttIntegrationTest.class); - private static final String TEST_TOPIC = "/mqtt-topology"; - private static final String RESULT_TOPIC = "/integration-result"; - private static final String RESULT_PAYLOAD = "Storm MQTT Spout"; - static boolean spoutActivated = false; - private static BrokerService broker; - - @AfterAll - public static void cleanup() throws Exception { - broker.stop(); - } - - @BeforeAll - public static void start() throws Exception { - LOG.warn("Starting broker..."); - broker = new BrokerService(); - broker.addConnector("mqtt://localhost:1883"); - broker.setDataDirectory("target"); - broker.start(); - LOG.debug("MQTT broker started"); - } - - @Test - public void testMqttTopology() throws Exception { - MQTT client = new MQTT(); - client.setTracer(new MqttLogger()); - URI uri = URI.create("tcp://localhost:1883"); - client.setHost(uri); - - client.setClientId("MQTTSubscriber"); - client.setCleanSession(false); - BlockingConnection connection = client.blockingConnection(); - connection.connect(); - Topic[] topics = { new Topic("/integration-result", QoS.AT_LEAST_ONCE) }; - byte[] qoses = connection.subscribe(topics); - - try (LocalCluster cluster = new LocalCluster(); - LocalTopology topo = cluster.submitTopology("test", new Config(), buildMqttTopology());) { - - LOG.info("topology started"); - while (!spoutActivated) { - Thread.sleep(500); - } - - // publish a retained message to the broker - MqttOptions options = new MqttOptions(); - options.setCleanConnection(false); - MqttPublisher publisher = new MqttPublisher(options, true); - publisher.connectMqtt("MqttPublisher"); - publisher.publish(new MqttMessage(TEST_TOPIC, "test".getBytes())); - - LOG.info("published message"); - - Message message = connection.receive(); - LOG.info("Message recieved on topic: {}", message.getTopic()); - LOG.info("Payload: {}", new String(message.getPayload())); - message.ack(); - - assertArrayEquals(message.getPayload(), RESULT_PAYLOAD.getBytes()); - assertEquals(message.getTopic(), RESULT_TOPIC); - } - } - - public StormTopology buildMqttTopology() { - TopologyBuilder builder = new TopologyBuilder(); - - MqttOptions options = new MqttOptions(); - options.setTopics(Arrays.asList(TEST_TOPIC)); - options.setCleanConnection(false); - TestSpout spout = new TestSpout(new StringMessageMapper(), options); - - MqttBolt bolt = new MqttBolt(options, new MqttTupleMapper() { - @Override - public MqttMessage toMessage(ITuple tuple) { - LOG.info("Received: {}", tuple); - return new MqttMessage(RESULT_TOPIC, RESULT_PAYLOAD.getBytes()); - } - }); - - builder.setSpout("mqtt-spout", spout); - builder.setBolt("mqtt-bolt", bolt).shuffleGrouping("mqtt-spout"); - - return builder.createTopology(); - } - - public static class TestSpout extends MqttSpout { - public TestSpout(MqttMessageMapper type, MqttOptions options) { - super(type, options); - } - - @Override - public void activate() { - super.activate(); - LOG.info("Spout activated."); - spoutActivated = true; - } - } - -} diff --git a/pom.xml b/pom.xml index 8d04950e1..32b49bed8 100644 --- a/pom.xml +++ b/pom.xml @@ -495,7 +495,6 @@ <module>external/storm-elasticsearch</module> <module>external/storm-solr</module> <module>external/storm-metrics</module> - <module>external/storm-mqtt</module> <module>external/storm-kafka-client</module> <module>external/storm-kafka-migration</module> <module>external/storm-opentsdb</module> @@ -527,7 +526,6 @@ <module>examples/storm-hdfs-examples</module> <module>examples/storm-hive-examples</module> <module>examples/storm-elasticsearch-examples</module> - <module>examples/storm-mqtt-examples</module> <module>examples/storm-pmml-examples</module> <module>examples/storm-jms-examples</module> <module>examples/storm-rocketmq-examples</module>
