http://git-wip-us.apache.org/repos/asf/storm/blob/1b514cbd/external/storm-mqtt/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/pom.xml b/external/storm-mqtt/pom.xml index 7535119..f9b3774 100644 --- a/external/storm-mqtt/pom.xml +++ b/external/storm-mqtt/pom.xml @@ -14,43 +14,112 @@ See the License for the specific language governing permissions and limitations under the License. --> - <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <artifactId>storm</artifactId> - <groupId>org.apache.storm</groupId> - <version>1.1.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <artifactId>storm-mqtt-parent</artifactId> - <packaging>pom</packaging> - - <name>storm-mqtt-parent</name> - - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <!-- see comment below... This fixes an annoyance with intellij --> - <provided.scope>provided</provided.scope> - </properties> - - <profiles> - <!-- - Hack to make intellij behave. - If you use intellij, enable this profile in your IDE. - It should make life easier. - --> - <profile> - <id>intellij</id> - <properties> - <provided.scope>compile</provided.scope> - </properties> - </profile> - </profiles> - - <modules> - <module>core</module> - </modules> + <modelVersion>4.0.0</modelVersion> + + <artifactId>storm-mqtt</artifactId> + <packaging>jar</packaging> + + <name>storm-mqtt</name> + + <parent> + <groupId>org.apache.storm</groupId> + <artifactId>storm</artifactId> + <version>1.1.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + + + + <repositories> + <repository> + <id>bintray</id> + <url>http://dl.bintray.com/andsel/maven/</url> + <releases> + <enabled>true</enabled> + </releases> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-broker</artifactId> + <version>5.9.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-mqtt</artifactId> + <version>5.9.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-kahadb-store</artifactId> + <version>5.9.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${project.version}</version> + <scope>${provided.scope}</scope> + </dependency> + <dependency> + <groupId>org.fusesource.mqtt-client</groupId> + <artifactId>mqtt-client</artifactId> + <version>1.10</version> + </dependency> + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <version>2.5</version> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <forkMode>perTest</forkMode> + <enableAssertions>false</enableAssertions> + <redirectTestOutputToFile>true</redirectTestOutputToFile> + <excludedGroups>${java.unit.test.exclude}</excludedGroups> + <includes> + <include>${java.unit.test.include}</include> + </includes> + <argLine>-Djava.net.preferIPv4Stack=true -Xmx1536m</argLine> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <version>${maven-surefire.version}</version> + <configuration> + <forkMode>perTest</forkMode> + <enableAssertions>false</enableAssertions> + <redirectTestOutputToFile>true</redirectTestOutputToFile> + <includes> + <include>${java.integration.test.include}</include> + </includes> + <groups>${java.integration.test.group}</groups> <!--set in integration-test the profile--> + <argLine>-Djava.net.preferIPv4Stack=true -Xmx1536m</argLine> + </configuration> + <executions> + <execution> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project>
http://git-wip-us.apache.org/repos/asf/storm/blob/1b514cbd/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java new file mode 100644 index 0000000..3af73fd --- /dev/null +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mqtt; + +import org.fusesource.mqtt.client.Tracer; +import org.fusesource.mqtt.codec.MQTTFrame; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Wrapper around SLF4J logger that allows MQTT messages to be logged. + */ +public class MqttLogger extends Tracer { + private static final Logger LOG = LoggerFactory.getLogger(MqttLogger.class); + + @Override + public void debug(String message, Object... args) { + LOG.debug(String.format(message, args)); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/1b514cbd/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java new file mode 100644 index 0000000..5436dda --- /dev/null +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mqtt; + +/** + * Represents an MQTT Message consisting of a topic string (e.g. "/users/ptgoetz/office/thermostat") + * and a byte array message/payload. + * + */ +public class MqttMessage { + private String topic; + private byte[] message; + + + public MqttMessage(String topic, byte[] payload){ + this.topic = topic; + this.message = payload; + } + public byte[] getMessage(){ + return this.message; + } + + public String getTopic(){ + return this.topic; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/1b514cbd/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java new file mode 100644 index 0000000..c6173f4 --- /dev/null +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mqtt; + +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.io.Serializable; + +/** + * Represents an object that can be converted to a Storm Tuple from an AckableMessage, + * given a MQTT Topic Name and a byte array payload. + */ +public interface MqttMessageMapper extends Serializable { + /** + * Convert a `MqttMessage` to a set of Values that can be emitted as a Storm Tuple. + * + * @param message An MQTT Message. + * @return Values representing a Storm Tuple. + */ + Values toValues(MqttMessage message); + + /** + * Returns the list of output fields this Mapper produces. + * + * @return the list of output fields this mapper produces. + */ + Fields outputFields(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/1b514cbd/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java new file mode 100644 index 0000000..c46c069 --- /dev/null +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mqtt; + + +import org.apache.storm.tuple.ITuple; + +import java.io.Serializable; + +/** + * Given a Tuple, converts it to an MQTT message. + */ +public interface MqttTupleMapper extends Serializable{ + + /** + * Converts a Tuple to a MqttMessage + * @param tuple the incoming tuple + * @return the message to publish + */ + MqttMessage toMessage(ITuple tuple); + +} http://git-wip-us.apache.org/repos/asf/storm/blob/1b514cbd/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java new file mode 100644 index 0000000..f6ca1bf --- /dev/null +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mqtt.bolt; + +import org.apache.storm.Config; +import org.apache.storm.mqtt.MqttMessage; +import org.apache.storm.mqtt.common.MqttOptions; +import org.apache.storm.mqtt.MqttTupleMapper; +import org.apache.storm.mqtt.common.MqttPublisher; +import org.apache.storm.mqtt.common.SslUtils; +import org.apache.storm.mqtt.ssl.KeyStoreLoader; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.TupleUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + + +public class MqttBolt extends BaseTickTupleAwareRichBolt { + private static final Logger LOG = LoggerFactory.getLogger(MqttBolt.class); + private MqttTupleMapper mapper; + private transient MqttPublisher publisher; + private boolean retain = false; + private transient OutputCollector collector; + private MqttOptions options; + private KeyStoreLoader keyStoreLoader; + private transient String topologyName; + + + public MqttBolt(MqttOptions options, MqttTupleMapper mapper){ + this(options, mapper, null, false); + } + + public MqttBolt(MqttOptions options, MqttTupleMapper mapper, boolean retain){ + this(options, mapper, null, retain); + } + + public MqttBolt(MqttOptions options, MqttTupleMapper mapper, KeyStoreLoader keyStoreLoader){ + this(options, mapper, keyStoreLoader, false); + } + + public MqttBolt(MqttOptions options, MqttTupleMapper mapper, KeyStoreLoader keyStoreLoader, boolean retain){ + this.options = options; + this.mapper = mapper; + this.retain = retain; + this.keyStoreLoader = keyStoreLoader; + // the following code is duplicated in the constructor of MqttPublisher + // we reproduce it here so we fail on the client side if SSL is misconfigured, rather than when the topology + // is deployed to the cluster + SslUtils.checkSslConfig(this.options.getUrl(), keyStoreLoader); + } + + @Override + public void prepare(Map conf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + this.topologyName = (String)conf.get(Config.TOPOLOGY_NAME); + this.publisher = new MqttPublisher(this.options, this.keyStoreLoader, this.retain); + try { + this.publisher.connectMqtt(this.topologyName + "-" + context.getThisComponentId() + "-" + context.getThisTaskId()); + } catch (Exception e) { + LOG.error("Unable to connect to MQTT Broker.", e); + throw new RuntimeException("Unable to connect to MQTT Broker.", e); + } + } + + @Override + protected void process(Tuple input) { + MqttMessage message = this.mapper.toMessage(input); + try { + this.publisher.publish(message); + this.collector.ack(input); + } catch (Exception e) { + LOG.warn("Error publishing MQTT message. Failing tuple.", e); + // should we fail the tuple or kill the worker? + collector.reportError(e); + collector.fail(input); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // this bolt does not emit tuples + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/1b514cbd/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java new file mode 100644 index 0000000..2b09d6e --- /dev/null +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mqtt.common; + +import java.io.Serializable; +import java.util.List; + +/** + * MQTT Configuration Options + */ +public class MqttOptions implements Serializable { + private String url = "tcp://localhost:1883"; + private List<String> topics = null; + private boolean cleanConnection = false; + + private String willTopic; + private String willPayload; + private int willQos = 1; + private boolean willRetain = false; + + private long reconnectDelay = 10; + private long reconnectDelayMax = 30*1000; + private double reconnectBackOffMultiplier = 2.0f; + private long reconnectAttemptsMax = -1; + private long connectAttemptsMax = -1; + + private String userName = ""; + private String password = ""; + + private int qos = 1; + + public String getUrl() { + return url; + } + + /** + * Sets the url for connecting to the MQTT broker. + * + * Default: `tcp://localhost:1883' + * @param url + */ + public void setUrl(String url) { + this.url = url; + } + + public List<String> getTopics() { + return topics; + } + + /** + * A list of MQTT topics to subscribe to. + * + * @param topics + */ + public void setTopics(List<String> topics) { + this.topics = topics; + } + + public boolean isCleanConnection() { + return cleanConnection; + } + + /** + * Set to false if you want the MQTT server to persist topic subscriptions and ack positions across client sessions. + * Defaults to false. + * + * @param cleanConnection + */ + public void setCleanConnection(boolean cleanConnection) { + this.cleanConnection = cleanConnection; + } + + public String getWillTopic() { + return willTopic; + } + + /** + * If set the server will publish the client's Will message to the specified topics if the client has an unexpected + * disconnection. + * + * @param willTopic + */ + public void setWillTopic(String willTopic) { + this.willTopic = willTopic; + } + + public String getWillPayload() { + return willPayload; + } + + /** + * The Will message to send. Defaults to a zero length message. + * + * @param willPayload + */ + public void setWillPayload(String willPayload) { + this.willPayload = willPayload; + } + + public long getReconnectDelay() { + return reconnectDelay; + } + + /** + * How long to wait in ms before the first reconnect attempt. Defaults to 10. + * + * @param reconnectDelay + */ + public void setReconnectDelay(long reconnectDelay) { + this.reconnectDelay = reconnectDelay; + } + + public long getReconnectDelayMax() { + return reconnectDelayMax; + } + + /** + * The maximum amount of time in ms to wait between reconnect attempts. Defaults to 30,000. + * + * @param reconnectDelayMax + */ + public void setReconnectDelayMax(long reconnectDelayMax) { + this.reconnectDelayMax = reconnectDelayMax; + } + + public double getReconnectBackOffMultiplier() { + return reconnectBackOffMultiplier; + } + + /** + * The Exponential backoff be used between reconnect attempts. Set to 1 to disable exponential backoff. Defaults to + * 2. + * + * @param reconnectBackOffMultiplier + */ + public void setReconnectBackOffMultiplier(double reconnectBackOffMultiplier) { + this.reconnectBackOffMultiplier = reconnectBackOffMultiplier; + } + + public long getReconnectAttemptsMax() { + return reconnectAttemptsMax; + } + + /** + * The maximum number of reconnect attempts before an error is reported back to the client after a server + * connection had previously been established. Set to -1 to use unlimited attempts. Defaults to -1. + * + * @param reconnectAttemptsMax + */ + public void setReconnectAttemptsMax(long reconnectAttemptsMax) { + this.reconnectAttemptsMax = reconnectAttemptsMax; + } + + public long getConnectAttemptsMax() { + return connectAttemptsMax; + } + + /** + * The maximum number of reconnect attempts before an error is reported back to the client on the first attempt by + * the client to connect to a server. Set to -1 to use unlimited attempts. Defaults to -1. + * + * @param connectAttemptsMax + */ + public void setConnectAttemptsMax(long connectAttemptsMax) { + this.connectAttemptsMax = connectAttemptsMax; + } + + public String getUserName() { + return userName; + } + + /** + * The username for authenticated sessions. + * + * @param userName + */ + public void setUserName(String userName) { + this.userName = userName; + } + + public String getPassword() { + return password; + } + + /** + * The password for authenticated sessions. + * @param password + */ + public void setPassword(String password) { + this.password = password; + } + + public int getQos(){ + return this.qos; + } + + /** + * Sets the quality of service to use for MQTT messages. Defaults to 1 (at least once). + * @param qos + */ + public void setQos(int qos){ + if(qos < 0 || qos > 2){ + throw new IllegalArgumentException("MQTT QoS must be >= 0 and <= 2"); + } + this.qos = qos; + } + + public int getWillQos(){ + return this.willQos; + } + + /** + * Sets the quality of service to use for the MQTT Will message. Defaults to 1 (at least once). + * + * @param qos + */ + public void setWillQos(int qos){ + if(qos < 0 || qos > 2){ + throw new IllegalArgumentException("MQTT Will QoS must be >= 0 and <= 2"); + } + this.willQos = qos; + } + + public boolean getWillRetain(){ + return this.willRetain; + } + + /** + * Set to true if you want the Will message to be published with the retain option. + * @param retain + */ + public void setWillRetain(boolean retain){ + this.willRetain = retain; + } + + public static class Builder { + private MqttOptions options = new MqttOptions(); + + public Builder url(String url) { + this.options.url = url; + return this; + } + + + public Builder topics(List<String> topics) { + this.options.topics = topics; + return this; + } + + public Builder cleanConnection(boolean cleanConnection) { + this.options.cleanConnection = cleanConnection; + return this; + } + + public Builder willTopic(String willTopic) { + this.options.willTopic = willTopic; + return this; + } + + public Builder willPayload(String willPayload) { + this.options.willPayload = willPayload; + return this; + } + + public Builder willRetain(boolean retain){ + this.options.willRetain = retain; + return this; + } + + public Builder willQos(int qos){ + this.options.setWillQos(qos); + return this; + } + + public Builder reconnectDelay(long reconnectDelay) { + this.options.reconnectDelay = reconnectDelay; + return this; + } + + public Builder reconnectDelayMax(long reconnectDelayMax) { + this.options.reconnectDelayMax = reconnectDelayMax; + return this; + } + + public Builder reconnectBackOffMultiplier(double reconnectBackOffMultiplier) { + this.options.reconnectBackOffMultiplier = reconnectBackOffMultiplier; + return this; + } + + public Builder reconnectAttemptsMax(long reconnectAttemptsMax) { + this.options.reconnectAttemptsMax = reconnectAttemptsMax; + return this; + } + + public Builder connectAttemptsMax(long connectAttemptsMax) { + this.options.connectAttemptsMax = connectAttemptsMax; + return this; + } + + public Builder userName(String userName) { + this.options.userName = userName; + return this; + } + + public Builder password(String password) { + this.options.password = password; + return this; + } + + public Builder qos(int qos){ + this.options.setQos(qos); + return this; + } + + public MqttOptions build() { + return this.options; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/1b514cbd/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java new file mode 100644 index 0000000..9b36b78 --- /dev/null +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mqtt.common; + + +import org.apache.storm.mqtt.MqttLogger; +import org.apache.storm.mqtt.MqttMessage; +import org.apache.storm.mqtt.ssl.KeyStoreLoader; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.QoS; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; + +public class MqttPublisher { + private static final Logger LOG = LoggerFactory.getLogger(MqttPublisher.class); + + private MqttOptions options; + private transient BlockingConnection connection; + private KeyStoreLoader keyStoreLoader; + private QoS qos; + private boolean retain = false; + + + public MqttPublisher(MqttOptions options){ + this(options, null, false); + } + + public MqttPublisher(MqttOptions options, boolean retain){ + this(options, null, retain); + } + + public MqttPublisher(MqttOptions options, KeyStoreLoader keyStoreLoader, boolean retain){ + this.retain = retain; + this.options = options; + this.keyStoreLoader = keyStoreLoader; + SslUtils.checkSslConfig(this.options.getUrl(), keyStoreLoader); + this.qos = MqttUtils.qosFromInt(this.options.getQos()); + } + + public void publish(MqttMessage message) throws Exception { + this.connection.publish(message.getTopic(), message.getMessage(), this.qos, this.retain); + } + + public void connectMqtt(String clientId) throws Exception { + MQTT client = MqttUtils.configureClient(this.options, clientId, this.keyStoreLoader); + this.connection = client.blockingConnection(); + this.connection.connect(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/1b514cbd/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java new file mode 100644 index 0000000..4ca0145 --- /dev/null +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mqtt.common; + + +import org.apache.storm.mqtt.MqttLogger; +import org.apache.storm.mqtt.ssl.KeyStoreLoader; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.QoS; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; + +public class MqttUtils { + private static final Logger LOG = LoggerFactory.getLogger(MqttUtils.class); + + private MqttUtils(){} + + public static QoS qosFromInt(int i){ + QoS qos = null; + switch(i) { + case 0: + qos = QoS.AT_MOST_ONCE; + break; + case 1: + qos = QoS.AT_LEAST_ONCE; + break; + case 2: + qos = QoS.EXACTLY_ONCE; + break; + default: + throw new IllegalArgumentException(i + "is not a valid MQTT QoS."); + } + return qos; + } + + + public static MQTT configureClient(MqttOptions options, String clientId, KeyStoreLoader keyStoreLoader) + throws Exception{ + + MQTT client = new MQTT(); + URI uri = URI.create(options.getUrl()); + + client.setHost(uri); + if(!uri.getScheme().toLowerCase().equals("tcp")){ + client.setSslContext(SslUtils.sslContext(uri.getScheme(), keyStoreLoader)); + } + client.setClientId(clientId); + LOG.info("MQTT ClientID: {}", client.getClientId().toString()); + client.setCleanSession(options.isCleanConnection()); + + client.setReconnectDelay(options.getReconnectDelay()); + client.setReconnectDelayMax(options.getReconnectDelayMax()); + client.setReconnectBackOffMultiplier(options.getReconnectBackOffMultiplier()); + client.setConnectAttemptsMax(options.getConnectAttemptsMax()); + client.setReconnectAttemptsMax(options.getReconnectAttemptsMax()); + + + client.setUserName(options.getUserName()); + client.setPassword(options.getPassword()); + client.setTracer(new MqttLogger()); + + if(options.getWillTopic() != null && options.getWillPayload() != null){ + QoS qos = MqttUtils.qosFromInt(options.getWillQos()); + client.setWillQos(qos); + client.setWillTopic(options.getWillTopic()); + client.setWillMessage(options.getWillPayload()); + client.setWillRetain(options.getWillRetain()); + } + return client; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/1b514cbd/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java new file mode 100644 index 0000000..681fc1d --- /dev/null +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mqtt.common; + + +import org.apache.storm.mqtt.ssl.KeyStoreLoader; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; +import java.net.URI; +import java.security.KeyStore; + +public class SslUtils { + private SslUtils(){} + + public static void checkSslConfig(String url, KeyStoreLoader loader){ + URI uri = URI.create(url); + String scheme = uri.getScheme().toLowerCase(); + if(!(scheme.equals("tcp") || scheme.startsWith("tls") || scheme.startsWith("ssl"))){ + throw new IllegalArgumentException("Unrecognized URI scheme: " + scheme); + } + if(!scheme.equalsIgnoreCase("tcp") && loader == null){ + throw new IllegalStateException("A TLS/SSL MQTT URL was specified, but no KeyStoreLoader configured. " + + "A KeyStoreLoader implementation is required when using TLS/SSL."); + } + } + + public static SSLContext sslContext(String scheme, KeyStoreLoader keyStoreLoader) throws Exception { + KeyStore ks = KeyStore.getInstance("JKS"); + ks.load(keyStoreLoader.keyStoreInputStream(), keyStoreLoader.keyStorePassword().toCharArray()); + + KeyStore ts = KeyStore.getInstance("JKS"); + ts.load(keyStoreLoader.trustStoreInputStream(), keyStoreLoader.trustStorePassword().toCharArray()); + + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(ks, keyStoreLoader.keyPassword().toCharArray()); + + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(ts); + + SSLContext sc = SSLContext.getInstance(scheme.toUpperCase()); + TrustManager[] trustManagers = tmf.getTrustManagers(); + sc.init(kmf.getKeyManagers(), trustManagers, null); + + return sc; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/1b514cbd/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java new file mode 100644 index 0000000..a19fce4 --- /dev/null +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mqtt.mappers; + +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.mqtt.MqttMessage; +import org.apache.storm.mqtt.MqttMessageMapper; + + +public class ByteArrayMessageMapper implements MqttMessageMapper { + public Values toValues(MqttMessage message) { + return new Values(message.getTopic(), message.getMessage()); + } + + public Fields outputFields() { + return new Fields("topic", "message"); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/1b514cbd/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java new file mode 100644 index 0000000..e5f309b --- /dev/null +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mqtt.mappers; + +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.mqtt.MqttMessage; +import org.apache.storm.mqtt.MqttMessageMapper; + +/** + * Given a String topic and byte[] message, emits a tuple with fields + * "topic" and "message", both of which are Strings. + */ +public class StringMessageMapper implements MqttMessageMapper { + public Values toValues(MqttMessage message) { + return new Values(message.getTopic(), new String(message.getMessage())); + } + + public Fields outputFields() { + return new Fields("topic", "message"); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/1b514cbd/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java new file mode 100644 index 0000000..08348c9 --- /dev/null +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mqtt.spout; + +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.storm.mqtt.MqttMessage; + +/** + * Represents an MQTT Message consisting of a topic string (e.g. "/users/ptgoetz/office/thermostat") + * and a byte array message/payload. + * + */ +class AckableMessage { + private String topic; + private byte[] message; + private Runnable ack; + + AckableMessage(String topic, byte[] message, Runnable ack){ + this.topic = topic; + this.message = message; + this.ack = ack; + } + + public MqttMessage getMessage(){ + return new MqttMessage(this.topic, this.message); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(71, 123) + .append(this.topic) + .append(this.message) + .toHashCode(); + } + + + @Override + public boolean equals(Object obj) { + if (obj == null) { return false; } + if (obj == this) { return true; } + if (obj.getClass() != getClass()) { + return false; + } + AckableMessage tm = (AckableMessage)obj; + return new EqualsBuilder() + .appendSuper(super.equals(obj)) + .append(this.topic, tm.topic) + .append(this.message, tm.message) + .isEquals(); + } + + Runnable ack(){ + return this.ack; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/1b514cbd/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java new file mode 100644 index 0000000..7f10cc5 --- /dev/null +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mqtt.spout; + +import org.apache.storm.Config; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.mqtt.MqttMessageMapper; +import org.apache.storm.mqtt.common.MqttOptions; +import org.apache.storm.mqtt.common.MqttUtils; +import org.apache.storm.mqtt.common.SslUtils; +import org.apache.storm.mqtt.ssl.KeyStoreLoader; +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.hawtbuf.UTF8Buffer; +import org.fusesource.mqtt.client.Callback; +import org.fusesource.mqtt.client.CallbackConnection; +import org.fusesource.mqtt.client.Listener; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; + +public class MqttSpout implements IRichSpout, Listener { + private static final Logger LOG = LoggerFactory.getLogger(MqttSpout.class); + + private String topologyName; + + + private CallbackConnection connection; + + protected transient SpoutOutputCollector collector; + protected transient TopologyContext context; + protected transient LinkedBlockingQueue<AckableMessage> incoming; + protected transient HashMap<Long, AckableMessage> pending; + private transient Map conf; + protected MqttMessageMapper type; + protected MqttOptions options; + protected KeyStoreLoader keyStoreLoader; + + private boolean mqttConnected = false; + private boolean mqttConnectFailed = false; + + + private Long sequence = Long.MIN_VALUE; + + private Long nextId(){ + this.sequence++; + if(this.sequence == Long.MAX_VALUE){ + this.sequence = Long.MIN_VALUE; + } + return this.sequence; + } + + protected MqttSpout(){} + + public MqttSpout(MqttMessageMapper type, MqttOptions options){ + this(type, options, null); + } + + public MqttSpout(MqttMessageMapper type, MqttOptions options, KeyStoreLoader keyStoreLoader){ + this.type = type; + this.options = options; + this.keyStoreLoader = keyStoreLoader; + SslUtils.checkSslConfig(this.options.getUrl(), this.keyStoreLoader); + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(this.type.outputFields()); + } + + public Map<String, Object> getComponentConfiguration() { + return null; + } + + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.topologyName = (String)conf.get(Config.TOPOLOGY_NAME); + + this.collector = collector; + this.context = context; + this.conf = conf; + + this.incoming = new LinkedBlockingQueue<>(); + this.pending = new HashMap<>(); + + try { + connectMqtt(); + } catch (Exception e) { + this.collector.reportError(e); + throw new RuntimeException("MQTT Connection failed.", e); + } + + } + + private void connectMqtt() throws Exception { + String clientId = this.topologyName + "-" + this.context.getThisComponentId() + "-" + + this.context.getThisTaskId(); + + MQTT client = MqttUtils.configureClient(this.options, clientId, this.keyStoreLoader); + this.connection = client.callbackConnection(); + this.connection.listener(this); + this.connection.connect(new ConnectCallback()); + + while(!this.mqttConnected && !this.mqttConnectFailed){ + LOG.info("Waiting for connection..."); + Thread.sleep(500); + } + + if(this.mqttConnected){ + List<String> topicList = this.options.getTopics(); + Topic[] topics = new Topic[topicList.size()]; + QoS qos = MqttUtils.qosFromInt(this.options.getQos()); + for(int i = 0;i < topicList.size();i++){ + topics[i] = new Topic(topicList.get(i), qos); + } + connection.subscribe(topics, new SubscribeCallback()); + } + } + + + + public void close() { + this.connection.disconnect(new DisconnectCallback()); + } + + public void activate() { + } + + public void deactivate() { + } + + /** + * When this method is called, Storm is requesting that the Spout emit tuples to the + * output collector. This method should be non-blocking, so if the Spout has no tuples + * to emit, this method should return. nextTuple, ack, and fail are all called in a tight + * loop in a single thread in the spout task. When there are no tuples to emit, it is courteous + * to have nextTuple sleep for a short amount of time (like a single millisecond) + * so as not to waste too much CPU. + */ + public void nextTuple() { + AckableMessage tm = this.incoming.poll(); + if(tm != null){ + Long id = nextId(); + this.collector.emit(this.type.toValues(tm.getMessage()), id); + this.pending.put(id, tm); + } else { + Thread.yield(); + } + + } + + /** + * Storm has determined that the tuple emitted by this spout with the msgId identifier + * has been fully processed. Typically, an implementation of this method will take that + * message off the queue and prevent it from being replayed. + * + * @param msgId + */ + public void ack(Object msgId) { + AckableMessage msg = this.pending.remove(msgId); + this.connection.getDispatchQueue().execute(msg.ack()); + } + + /** + * The tuple emitted by this spout with the msgId identifier has failed to be + * fully processed. Typically, an implementation of this method will put that + * message back on the queue to be replayed at a later time. + * + * @param msgId + */ + public void fail(Object msgId) { + try { + this.incoming.put(this.pending.remove(msgId)); + } catch (InterruptedException e) { + LOG.warn("Interrupted while re-queueing message.", e); + } + } + + + // ################# Listener Implementation ###################### + public void onConnected() { + // this gets called repeatedly for no apparent reason, don't do anything + } + + public void onDisconnected() { + // this gets called repeatedly for no apparent reason, don't do anything + } + + public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) { + LOG.debug("Received message: topic={}, payload={}", topic.toString(), new String(payload.toByteArray())); + try { + this.incoming.put(new AckableMessage(topic.toString(), payload.toByteArray(), ack)); + } catch (InterruptedException e) { + LOG.warn("Interrupted while queueing an MQTT message."); + } + } + + public void onFailure(Throwable throwable) { + LOG.error("MQTT Connection Failure.", throwable); + MqttSpout.this.connection.disconnect(new DisconnectCallback()); + throw new RuntimeException("MQTT Connection failure.", throwable); + } + + // ################# Connect Callback Implementation ###################### + private class ConnectCallback implements Callback<Void> { + public void onSuccess(Void v) { + LOG.info("MQTT Connected. Subscribing to topic..."); + MqttSpout.this.mqttConnected = true; + } + + public void onFailure(Throwable throwable) { + LOG.info("MQTT Connection failed."); + MqttSpout.this.mqttConnectFailed = true; + } + } + + // ################# Subscribe Callback Implementation ###################### + private class SubscribeCallback implements Callback<byte[]>{ + public void onSuccess(byte[] qos) { + LOG.info("Subscripton sucessful."); + } + + public void onFailure(Throwable throwable) { + LOG.error("MQTT Subscripton failed.", throwable); + throw new RuntimeException("MQTT Subscribe failed.", throwable); + } + } + + // ################# Subscribe Callback Implementation ###################### + private class DisconnectCallback implements Callback<Void>{ + public void onSuccess(Void aVoid) { + LOG.info("MQTT Disconnect successful."); + } + + public void onFailure(Throwable throwable) { + // Disconnects don't fail. + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/1b514cbd/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java new file mode 100644 index 0000000..8bca407 --- /dev/null +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mqtt.ssl; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.InputStream; + +/** + * KeyStoreLoader implementation that uses local files. + */ +public class DefaultKeyStoreLoader implements KeyStoreLoader { + private String ksFile = null; + private String tsFile = null; + private String keyStorePassword = ""; + private String trustStorePassword = ""; + private String keyPassword = ""; + + /** + * Creates a DefaultKeystoreLoader that uses the same file + * for both the keystore and truststore. + * + * @param keystore path to keystore file + */ + public DefaultKeyStoreLoader(String keystore){ + this.ksFile = keystore; + } + + /** + * Creates a DefaultKeystoreLoader that uses separate files + * for the keystore and truststore. + * + * @param keystore path to keystore file + * @param truststore path to truststore file + */ + public DefaultKeyStoreLoader(String keystore, String truststore){ + this.ksFile = keystore; + this.tsFile = truststore; + } + + public void setKeyStorePassword(String keyStorePassword) { + this.keyStorePassword = keyStorePassword; + } + + public void setTrustStorePassword(String trustStorePassword) { + this.trustStorePassword = trustStorePassword; + } + + public void setKeyPassword(String keyPassword) { + this.keyPassword = keyPassword; + } + + @Override + public InputStream keyStoreInputStream() throws FileNotFoundException { + return new FileInputStream(this.ksFile); + } + + @Override + public InputStream trustStoreInputStream() throws FileNotFoundException { + // if no truststore file, assume the truststore is the keystore. + if(this.tsFile == null){ + return new FileInputStream(this.ksFile); + } else { + return new FileInputStream(this.tsFile); + } + } + + @Override + public String keyStorePassword() { + return this.keyStorePassword; + } + + @Override + public String trustStorePassword() { + return this.trustStorePassword; + } + + @Override + public String keyPassword() { + return this.keyPassword; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/1b514cbd/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java new file mode 100644 index 0000000..297efcc --- /dev/null +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mqtt.ssl; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; + +/** + * Abstraction for loading keystore/truststore data. This allows keystores + * to be loaded from different sources (File system, HDFS, etc.). + */ +public interface KeyStoreLoader extends Serializable { + + String keyStorePassword(); + String trustStorePassword(); + String keyPassword(); + InputStream keyStoreInputStream() throws IOException; + InputStream trustStoreInputStream() throws IOException; +} http://git-wip-us.apache.org/repos/asf/storm/blob/1b514cbd/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java new file mode 100644 index 0000000..e53c983 --- /dev/null +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mqtt.trident; + +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.topology.FailedException; +import org.apache.storm.mqtt.MqttMessage; +import org.apache.storm.mqtt.common.MqttOptions; +import org.apache.storm.mqtt.MqttTupleMapper; +import org.apache.storm.mqtt.common.MqttPublisher; +import org.apache.storm.mqtt.common.SslUtils; +import org.apache.storm.mqtt.ssl.KeyStoreLoader; +import org.apache.storm.trident.operation.BaseFunction; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.operation.TridentOperationContext; +import org.apache.storm.trident.tuple.TridentTuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import java.util.Map; + +public class MqttPublishFunction extends BaseFunction { + private static final Logger LOG = LoggerFactory.getLogger(MqttPublishFunction.class); + private MqttTupleMapper mapper; + private transient MqttPublisher publisher; + private boolean retain = false; + private transient OutputCollector collector; + private MqttOptions options; + private KeyStoreLoader keyStoreLoader; + private transient String topologyName; + + + public MqttPublishFunction(MqttOptions options, MqttTupleMapper mapper, KeyStoreLoader keyStoreLoader, boolean retain){ + this.options = options; + this.mapper = mapper; + this.retain = retain; + this.keyStoreLoader = keyStoreLoader; + // the following code is duplicated in the constructor of MqttPublisher + // we reproduce it here so we fail on the client side if SSL is misconfigured, rather than when the topology + // is deployed to the cluster + SslUtils.checkSslConfig(this.options.getUrl(), keyStoreLoader); + } + + + @Override + public void prepare(Map conf, TridentOperationContext context) { + this.topologyName = (String)conf.get(Config.TOPOLOGY_NAME); + this.publisher = new MqttPublisher(this.options, this.keyStoreLoader, this.retain); + try { + this.publisher.connectMqtt(this.topologyName + "-" + context.getPartitionIndex()); + } catch (Exception e) { + LOG.error("Unable to connect to MQTT Broker.", e); + throw new RuntimeException("Unable to connect to MQTT Broker.", e); + } + } + + @Override + public void execute(TridentTuple tuple, TridentCollector collector) { + MqttMessage message = this.mapper.toMessage(tuple); + try { + this.publisher.publish(message); + } catch (Exception e) { + LOG.warn("Error publishing MQTT message. Failing tuple.", e); + // should we fail the batch or kill the worker? + throw new FailedException(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/1b514cbd/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java b/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java new file mode 100644 index 0000000..57725b4 --- /dev/null +++ b/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mqtt; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.testing.IntegrationTest; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.ITuple; +import org.apache.activemq.broker.BrokerService; +import org.apache.storm.mqtt.bolt.MqttBolt; +import org.apache.storm.mqtt.common.MqttOptions; +import org.apache.storm.mqtt.common.MqttPublisher; +import org.apache.storm.mqtt.mappers.StringMessageMapper; +import org.apache.storm.mqtt.spout.MqttSpout; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.Message; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.net.URI; +import java.util.Arrays; + +@Category(IntegrationTest.class) +public class StormMqttIntegrationTest implements Serializable{ + private static final Logger LOG = LoggerFactory.getLogger(StormMqttIntegrationTest.class); + private static BrokerService broker; + static boolean spoutActivated = false; + + private static final String TEST_TOPIC = "/mqtt-topology"; + private static final String RESULT_TOPIC = "/integration-result"; + private static final String RESULT_PAYLOAD = "Storm MQTT Spout"; + + public static class TestSpout extends MqttSpout{ + public TestSpout(MqttMessageMapper type, MqttOptions options){ + super(type, options); + } + + @Override + public void activate() { + super.activate(); + LOG.info("Spout activated."); + spoutActivated = true; + } + } + + + @AfterClass + public static void cleanup() throws Exception { + broker.stop(); + } + + @BeforeClass + public static void start() throws Exception { + LOG.warn("Starting broker..."); + broker = new BrokerService(); + broker.addConnector("mqtt://localhost:1883"); + broker.setDataDirectory("target"); + broker.start(); + LOG.debug("MQTT broker started"); + } + + + @Test + public void testMqttTopology() throws Exception { + MQTT client = new MQTT(); + client.setTracer(new MqttLogger()); + URI uri = URI.create("tcp://localhost:1883"); + client.setHost(uri); + + client.setClientId("MQTTSubscriber"); + client.setCleanSession(false); + BlockingConnection connection = client.blockingConnection(); + connection.connect(); + Topic[] topics = {new Topic("/integration-result", QoS.AT_LEAST_ONCE)}; + byte[] qoses = connection.subscribe(topics); + + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", new Config(), buildMqttTopology()); + + LOG.info("topology started"); + while(!spoutActivated) { + Thread.sleep(500); + } + + // publish a retained message to the broker + MqttOptions options = new MqttOptions(); + options.setCleanConnection(false); + MqttPublisher publisher = new MqttPublisher(options, true); + publisher.connectMqtt("MqttPublisher"); + publisher.publish(new MqttMessage(TEST_TOPIC, "test".getBytes())); + + LOG.info("published message"); + + Message message = connection.receive(); + LOG.info("Message recieved on topic: {}", message.getTopic()); + LOG.info("Payload: {}", new String(message.getPayload())); + message.ack(); + + Assert.assertArrayEquals(message.getPayload(), RESULT_PAYLOAD.getBytes()); + Assert.assertEquals(message.getTopic(), RESULT_TOPIC); + cluster.shutdown(); + } + + public StormTopology buildMqttTopology(){ + TopologyBuilder builder = new TopologyBuilder(); + + MqttOptions options = new MqttOptions(); + options.setTopics(Arrays.asList(TEST_TOPIC)); + options.setCleanConnection(false); + TestSpout spout = new TestSpout(new StringMessageMapper(), options); + + MqttBolt bolt = new MqttBolt(options, new MqttTupleMapper() { + @Override + public MqttMessage toMessage(ITuple tuple) { + LOG.info("Received: {}", tuple); + return new MqttMessage(RESULT_TOPIC, RESULT_PAYLOAD.getBytes()); + } + }); + + builder.setSpout("mqtt-spout", spout); + builder.setBolt("mqtt-bolt", bolt).shuffleGrouping("mqtt-spout"); + + return builder.createTopology(); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/1b514cbd/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d6475a3..ff4dba2 100644 --- a/pom.xml +++ b/pom.xml @@ -280,6 +280,9 @@ <wagonVersion>1.0</wagonVersion> <qpid.version>0.32</qpid.version> <eventhubs.client.version>1.0.1</eventhubs.client.version> + + <!-- see intellij profile below... This fixes an annoyance with intellij --> + <provided.scope>provided</provided.scope> </properties> <modules> @@ -345,6 +348,18 @@ </dependencies> <profiles> + <!-- + Hack to make intellij behave. + If you use intellij, enable this profile in your IDE. + It should make life easier. + --> + <profile> + <id>intellij</id> + <properties> + <provided.scope>compile</provided.scope> + </properties> + </profile> + <profile> <id>rat</id> <build> http://git-wip-us.apache.org/repos/asf/storm/blob/1b514cbd/storm-perf/pom.xml ---------------------------------------------------------------------- diff --git a/storm-perf/pom.xml b/storm-perf/pom.xml index b9b2738..05d0ea6 100644 --- a/storm-perf/pom.xml +++ b/storm-perf/pom.xml @@ -22,8 +22,8 @@ <parent> <artifactId>storm</artifactId> <groupId>org.apache.storm</groupId> - <version>1.1.1-SNAPSHOT</version> - <relativePath>..</relativePath> + <version>1.1.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> </parent> <groupId>org.apache.storm</groupId> @@ -32,21 +32,6 @@ <name>Storm Perf</name> <description>Topologies and tools to measure performance.</description> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <!-- see comment below... This fixes an annoyance with intellij --> - <provided.scope>provided</provided.scope> - </properties> - - <profiles> - <profile> - <id>intellij</id> - <properties> - <provided.scope>compile</provided.scope> - </properties> - </profile> - </profiles> - <build> <plugins> <plugin>
