Fixing stylecheck problems with storm-mqtt
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f7f3524d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f7f3524d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f7f3524d Branch: refs/heads/master Commit: f7f3524d1401d76e1779c823391dce46e58acf48 Parents: 6d20c4a Author: Kishor Patil <[email protected]> Authored: Sun Apr 22 23:32:01 2018 -0400 Committer: Kishor Patil <[email protected]> Committed: Mon Apr 23 02:32:42 2018 -0400 ---------------------------------------------------------------------- external/storm-mqtt/pom.xml | 2 +- .../java/org/apache/storm/mqtt/MqttLogger.java | 20 ++--- .../java/org/apache/storm/mqtt/MqttMessage.java | 26 +++--- .../apache/storm/mqtt/MqttMessageMapper.java | 22 ++--- .../org/apache/storm/mqtt/MqttTupleMapper.java | 24 ++---- .../org/apache/storm/mqtt/bolt/MqttBolt.java | 36 ++++---- .../apache/storm/mqtt/common/MqttOptions.java | 43 +++++----- .../apache/storm/mqtt/common/MqttPublisher.java | 28 +++---- .../org/apache/storm/mqtt/common/MqttUtils.java | 34 ++++---- .../org/apache/storm/mqtt/common/SslUtils.java | 36 ++++---- .../mqtt/mappers/ByteArrayMessageMapper.java | 23 ++---- .../storm/mqtt/mappers/StringMessageMapper.java | 23 ++---- .../apache/storm/mqtt/spout/AckableMessage.java | 49 ++++++----- .../org/apache/storm/mqtt/spout/MqttSpout.java | 86 +++++++++----------- .../storm/mqtt/ssl/DefaultKeyStoreLoader.java | 25 +++--- .../apache/storm/mqtt/ssl/KeyStoreLoader.java | 23 +++--- .../storm/mqtt/trident/MqttPublishFunction.java | 33 +++----- .../storm/mqtt/StormMqttIntegrationTest.java | 75 ++++++++--------- 18 files changed, 254 insertions(+), 354 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/pom.xml b/external/storm-mqtt/pom.xml index 6c5c48c..703b5da 100644 --- a/external/storm-mqtt/pom.xml +++ b/external/storm-mqtt/pom.xml @@ -127,7 +127,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>158</maxAllowedViolations> + <maxAllowedViolations>39</maxAllowedViolations> </configuration> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java index 3af73fd..3644d16 100644 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java @@ -1,24 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.mqtt; import org.fusesource.mqtt.client.Tracer; -import org.fusesource.mqtt.codec.MQTTFrame; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java index 5436dda..90401ef 100644 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.mqtt; /** @@ -27,15 +22,16 @@ public class MqttMessage { private byte[] message; - public MqttMessage(String topic, byte[] payload){ + public MqttMessage(String topic, byte[] payload) { this.topic = topic; this.message = payload; } - public byte[] getMessage(){ + + public byte[] getMessage() { return this.message; } - public String getTopic(){ + public String getTopic() { return this.topic; } } http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java index c6173f4..caeb605 100644 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java @@ -1,27 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.mqtt; +import java.io.Serializable; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; -import java.io.Serializable; - /** * Represents an object that can be converted to a Storm Tuple from an AckableMessage, * given a MQTT Topic Name and a byte array payload. http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java index c46c069..d3762b0 100644 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java @@ -1,31 +1,25 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.mqtt; +package org.apache.storm.mqtt; -import org.apache.storm.tuple.ITuple; import java.io.Serializable; +import org.apache.storm.tuple.ITuple; /** * Given a Tuple, converts it to an MQTT message. */ -public interface MqttTupleMapper extends Serializable{ +public interface MqttTupleMapper extends Serializable { /** * Converts a Tuple to a MqttMessage http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java index b8f5068..745607b 100644 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java @@ -1,41 +1,33 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.mqtt.bolt; +import java.util.Map; import org.apache.storm.Config; import org.apache.storm.mqtt.MqttMessage; -import org.apache.storm.mqtt.common.MqttOptions; import org.apache.storm.mqtt.MqttTupleMapper; +import org.apache.storm.mqtt.common.MqttOptions; import org.apache.storm.mqtt.common.MqttPublisher; import org.apache.storm.mqtt.common.SslUtils; import org.apache.storm.mqtt.ssl.KeyStoreLoader; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt; import org.apache.storm.tuple.Tuple; -import org.apache.storm.utils.TupleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; - public class MqttBolt extends BaseTickTupleAwareRichBolt { private static final Logger LOG = LoggerFactory.getLogger(MqttBolt.class); @@ -48,19 +40,19 @@ public class MqttBolt extends BaseTickTupleAwareRichBolt { private transient String topologyName; - public MqttBolt(MqttOptions options, MqttTupleMapper mapper){ + public MqttBolt(MqttOptions options, MqttTupleMapper mapper) { this(options, mapper, null, false); } - public MqttBolt(MqttOptions options, MqttTupleMapper mapper, boolean retain){ + public MqttBolt(MqttOptions options, MqttTupleMapper mapper, boolean retain) { this(options, mapper, null, retain); } - public MqttBolt(MqttOptions options, MqttTupleMapper mapper, KeyStoreLoader keyStoreLoader){ + public MqttBolt(MqttOptions options, MqttTupleMapper mapper, KeyStoreLoader keyStoreLoader) { this(options, mapper, keyStoreLoader, false); } - public MqttBolt(MqttOptions options, MqttTupleMapper mapper, KeyStoreLoader keyStoreLoader, boolean retain){ + public MqttBolt(MqttOptions options, MqttTupleMapper mapper, KeyStoreLoader keyStoreLoader, boolean retain) { this.options = options; this.mapper = mapper; this.retain = retain; @@ -74,7 +66,7 @@ public class MqttBolt extends BaseTickTupleAwareRichBolt { @Override public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) { this.collector = collector; - this.topologyName = (String)conf.get(Config.TOPOLOGY_NAME); + this.topologyName = (String) conf.get(Config.TOPOLOGY_NAME); this.publisher = new MqttPublisher(this.options, this.keyStoreLoader, this.retain); try { this.publisher.connectMqtt(this.topologyName + "-" + context.getThisComponentId() + "-" + context.getThisTaskId()); http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java index 2b09d6e..1e9c925 100644 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.mqtt.common; import java.io.Serializable; @@ -34,7 +29,7 @@ public class MqttOptions implements Serializable { private boolean willRetain = false; private long reconnectDelay = 10; - private long reconnectDelayMax = 30*1000; + private long reconnectDelayMax = 30 * 1000; private double reconnectBackOffMultiplier = 2.0f; private long reconnectAttemptsMax = -1; private long connectAttemptsMax = -1; @@ -205,7 +200,7 @@ public class MqttOptions implements Serializable { this.password = password; } - public int getQos(){ + public int getQos() { return this.qos; } @@ -213,14 +208,14 @@ public class MqttOptions implements Serializable { * Sets the quality of service to use for MQTT messages. Defaults to 1 (at least once). * @param qos */ - public void setQos(int qos){ - if(qos < 0 || qos > 2){ + public void setQos(int qos) { + if (qos < 0 || qos > 2) { throw new IllegalArgumentException("MQTT QoS must be >= 0 and <= 2"); } this.qos = qos; } - public int getWillQos(){ + public int getWillQos() { return this.willQos; } @@ -229,14 +224,14 @@ public class MqttOptions implements Serializable { * * @param qos */ - public void setWillQos(int qos){ - if(qos < 0 || qos > 2){ + public void setWillQos(int qos) { + if (qos < 0 || qos > 2) { throw new IllegalArgumentException("MQTT Will QoS must be >= 0 and <= 2"); } this.willQos = qos; } - public boolean getWillRetain(){ + public boolean getWillRetain() { return this.willRetain; } @@ -244,7 +239,7 @@ public class MqttOptions implements Serializable { * Set to true if you want the Will message to be published with the retain option. * @param retain */ - public void setWillRetain(boolean retain){ + public void setWillRetain(boolean retain) { this.willRetain = retain; } @@ -277,12 +272,12 @@ public class MqttOptions implements Serializable { return this; } - public Builder willRetain(boolean retain){ + public Builder willRetain(boolean retain) { this.options.willRetain = retain; return this; } - public Builder willQos(int qos){ + public Builder willQos(int qos) { this.options.setWillQos(qos); return this; } @@ -322,7 +317,7 @@ public class MqttOptions implements Serializable { return this; } - public Builder qos(int qos){ + public Builder qos(int qos) { this.options.setQos(qos); return this; } http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java index 9b36b78..5ea1da8 100644 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java @@ -1,24 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.mqtt.common; -import org.apache.storm.mqtt.MqttLogger; import org.apache.storm.mqtt.MqttMessage; import org.apache.storm.mqtt.ssl.KeyStoreLoader; import org.fusesource.mqtt.client.BlockingConnection; @@ -27,8 +21,6 @@ import org.fusesource.mqtt.client.QoS; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URI; - public class MqttPublisher { private static final Logger LOG = LoggerFactory.getLogger(MqttPublisher.class); @@ -39,15 +31,15 @@ public class MqttPublisher { private boolean retain = false; - public MqttPublisher(MqttOptions options){ + public MqttPublisher(MqttOptions options) { this(options, null, false); } - public MqttPublisher(MqttOptions options, boolean retain){ + public MqttPublisher(MqttOptions options, boolean retain) { this(options, null, retain); } - public MqttPublisher(MqttOptions options, KeyStoreLoader keyStoreLoader, boolean retain){ + public MqttPublisher(MqttOptions options, KeyStoreLoader keyStoreLoader, boolean retain) { this.retain = retain; this.options = options; this.keyStoreLoader = keyStoreLoader; http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java index 4ca0145..4995fd0 100644 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java @@ -1,23 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.mqtt.common; +import java.net.URI; import org.apache.storm.mqtt.MqttLogger; import org.apache.storm.mqtt.ssl.KeyStoreLoader; import org.fusesource.mqtt.client.MQTT; @@ -25,16 +21,14 @@ import org.fusesource.mqtt.client.QoS; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URI; - public class MqttUtils { private static final Logger LOG = LoggerFactory.getLogger(MqttUtils.class); - private MqttUtils(){} + private MqttUtils() {} - public static QoS qosFromInt(int i){ + public static QoS qosFromInt(int i) { QoS qos = null; - switch(i) { + switch (i) { case 0: qos = QoS.AT_MOST_ONCE; break; @@ -52,13 +46,13 @@ public class MqttUtils { public static MQTT configureClient(MqttOptions options, String clientId, KeyStoreLoader keyStoreLoader) - throws Exception{ + throws Exception { MQTT client = new MQTT(); URI uri = URI.create(options.getUrl()); client.setHost(uri); - if(!uri.getScheme().toLowerCase().equals("tcp")){ + if (!uri.getScheme().toLowerCase().equals("tcp")) { client.setSslContext(SslUtils.sslContext(uri.getScheme(), keyStoreLoader)); } client.setClientId(clientId); @@ -76,7 +70,7 @@ public class MqttUtils { client.setPassword(options.getPassword()); client.setTracer(new MqttLogger()); - if(options.getWillTopic() != null && options.getWillPayload() != null){ + if (options.getWillTopic() != null && options.getWillPayload() != null) { QoS qos = MqttUtils.qosFromInt(options.getWillQos()); client.setWillQos(qos); client.setWillTopic(options.getWillTopic()); http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java index 681fc1d..6ea3d0c 100644 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java @@ -1,44 +1,38 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.mqtt.common; +package org.apache.storm.mqtt.common; -import org.apache.storm.mqtt.ssl.KeyStoreLoader; +import java.net.URI; +import java.security.KeyStore; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; -import java.net.URI; -import java.security.KeyStore; +import org.apache.storm.mqtt.ssl.KeyStoreLoader; public class SslUtils { - private SslUtils(){} + private SslUtils() {} - public static void checkSslConfig(String url, KeyStoreLoader loader){ + public static void checkSslConfig(String url, KeyStoreLoader loader) { URI uri = URI.create(url); String scheme = uri.getScheme().toLowerCase(); - if(!(scheme.equals("tcp") || scheme.startsWith("tls") || scheme.startsWith("ssl"))){ + if (!(scheme.equals("tcp") || scheme.startsWith("tls") || scheme.startsWith("ssl"))) { throw new IllegalArgumentException("Unrecognized URI scheme: " + scheme); } - if(!scheme.equalsIgnoreCase("tcp") && loader == null){ + if (!scheme.equalsIgnoreCase("tcp") && loader == null) { throw new IllegalStateException("A TLS/SSL MQTT URL was specified, but no KeyStoreLoader configured. " + - "A KeyStoreLoader implementation is required when using TLS/SSL."); + "A KeyStoreLoader implementation is required when using TLS/SSL."); } } http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java index a19fce4..1dd943d 100644 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java @@ -1,26 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.mqtt.mappers; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; import org.apache.storm.mqtt.MqttMessage; import org.apache.storm.mqtt.MqttMessageMapper; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; public class ByteArrayMessageMapper implements MqttMessageMapper { http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java index e5f309b..341e52d 100644 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java @@ -1,26 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.mqtt.mappers; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; import org.apache.storm.mqtt.MqttMessage; import org.apache.storm.mqtt.MqttMessageMapper; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; /** * Given a String topic and byte[] message, emits a tuple with fields http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java index 5c3867e..0b91442 100644 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.mqtt.spout; import org.apache.commons.lang.builder.EqualsBuilder; @@ -31,41 +26,45 @@ class AckableMessage { private byte[] message; private Runnable ack; - AckableMessage(String topic, byte[] message, Runnable ack){ + AckableMessage(String topic, byte[] message, Runnable ack) { this.topic = topic; this.message = message; this.ack = ack; } - public MqttMessage getMessage(){ + public MqttMessage getMessage() { return new MqttMessage(this.topic, this.message); } @Override public int hashCode() { return new HashCodeBuilder(71, 123) - .append(this.topic) - .append(this.message) - .toHashCode(); + .append(this.topic) + .append(this.message) + .toHashCode(); } @Override public boolean equals(Object obj) { - if (obj == null) { return false; } - if (obj == this) { return true; } + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } if (obj.getClass() != getClass()) { return false; } - AckableMessage tm = (AckableMessage)obj; + AckableMessage tm = (AckableMessage) obj; return new EqualsBuilder() - .appendSuper(super.equals(obj)) - .append(this.topic, tm.topic) - .append(this.message, tm.message) - .isEquals(); + .appendSuper(super.equals(obj)) + .append(this.topic, tm.topic) + .append(this.message, tm.message) + .isEquals(); } - Runnable ack(){ + Runnable ack() { return this.ack; } } http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java index df6c6fd..268a695 100644 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java @@ -1,32 +1,31 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.mqtt.spout; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.storm.Config; -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.IRichSpout; -import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.mqtt.MqttMessageMapper; import org.apache.storm.mqtt.common.MqttOptions; import org.apache.storm.mqtt.common.MqttUtils; import org.apache.storm.mqtt.common.SslUtils; import org.apache.storm.mqtt.ssl.KeyStoreLoader; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.UTF8Buffer; import org.fusesource.mqtt.client.Callback; @@ -38,55 +37,45 @@ import org.fusesource.mqtt.client.Topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; - public class MqttSpout implements IRichSpout, Listener { private static final Logger LOG = LoggerFactory.getLogger(MqttSpout.class); - - private String topologyName; - - - private CallbackConnection connection; - protected transient SpoutOutputCollector collector; protected transient TopologyContext context; protected transient LinkedBlockingQueue<AckableMessage> incoming; protected transient HashMap<Long, AckableMessage> pending; - private transient Map<String, Object> conf; protected MqttMessageMapper type; protected MqttOptions options; protected KeyStoreLoader keyStoreLoader; - + private String topologyName; + private CallbackConnection connection; + private transient Map<String, Object> conf; private boolean mqttConnected = false; private boolean mqttConnectFailed = false; private Long sequence = Long.MIN_VALUE; - private Long nextId(){ - this.sequence++; - if(this.sequence == Long.MAX_VALUE){ - this.sequence = Long.MIN_VALUE; - } - return this.sequence; - } - - protected MqttSpout(){} + protected MqttSpout() {} - public MqttSpout(MqttMessageMapper type, MqttOptions options){ + public MqttSpout(MqttMessageMapper type, MqttOptions options) { this(type, options, null); } - public MqttSpout(MqttMessageMapper type, MqttOptions options, KeyStoreLoader keyStoreLoader){ + public MqttSpout(MqttMessageMapper type, MqttOptions options, KeyStoreLoader keyStoreLoader) { this.type = type; this.options = options; this.keyStoreLoader = keyStoreLoader; SslUtils.checkSslConfig(this.options.getUrl(), this.keyStoreLoader); } + private Long nextId() { + this.sequence++; + if (this.sequence == Long.MAX_VALUE) { + this.sequence = Long.MIN_VALUE; + } + return this.sequence; + } + public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(this.type.outputFields()); } @@ -96,7 +85,7 @@ public class MqttSpout implements IRichSpout, Listener { } public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { - this.topologyName = (String)conf.get(Config.TOPOLOGY_NAME); + this.topologyName = (String) conf.get(Config.TOPOLOGY_NAME); this.collector = collector; this.context = context; @@ -116,23 +105,23 @@ public class MqttSpout implements IRichSpout, Listener { private void connectMqtt() throws Exception { String clientId = this.topologyName + "-" + this.context.getThisComponentId() + "-" + - this.context.getThisTaskId(); + this.context.getThisTaskId(); MQTT client = MqttUtils.configureClient(this.options, clientId, this.keyStoreLoader); this.connection = client.callbackConnection(); this.connection.listener(this); this.connection.connect(new ConnectCallback()); - while(!this.mqttConnected && !this.mqttConnectFailed){ + while (!this.mqttConnected && !this.mqttConnectFailed) { LOG.info("Waiting for connection..."); Thread.sleep(500); } - if(this.mqttConnected){ + if (this.mqttConnected) { List<String> topicList = this.options.getTopics(); Topic[] topics = new Topic[topicList.size()]; QoS qos = MqttUtils.qosFromInt(this.options.getQos()); - for(int i = 0;i < topicList.size();i++){ + for (int i = 0; i < topicList.size(); i++) { topics[i] = new Topic(topicList.get(i), qos); } connection.subscribe(topics, new SubscribeCallback()); @@ -140,7 +129,6 @@ public class MqttSpout implements IRichSpout, Listener { } - public void close() { this.connection.disconnect(new DisconnectCallback()); } @@ -161,7 +149,7 @@ public class MqttSpout implements IRichSpout, Listener { */ public void nextTuple() { AckableMessage tm = this.incoming.poll(); - if(tm != null){ + if (tm != null) { Long id = nextId(); this.collector.emit(this.type.toValues(tm.getMessage()), id); this.pending.put(id, tm); @@ -237,7 +225,7 @@ public class MqttSpout implements IRichSpout, Listener { } // ################# Subscribe Callback Implementation ###################### - private class SubscribeCallback implements Callback<byte[]>{ + private class SubscribeCallback implements Callback<byte[]> { public void onSuccess(byte[] qos) { LOG.info("Subscripton sucessful."); } @@ -249,7 +237,7 @@ public class MqttSpout implements IRichSpout, Listener { } // ################# Subscribe Callback Implementation ###################### - private class DisconnectCallback implements Callback<Void>{ + private class DisconnectCallback implements Callback<Void> { public void onSuccess(Void aVoid) { LOG.info("MQTT Disconnect successful."); } http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java index 8bca407..14c3c6c 100644 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.mqtt.ssl; import java.io.FileInputStream; @@ -37,7 +32,7 @@ public class DefaultKeyStoreLoader implements KeyStoreLoader { * * @param keystore path to keystore file */ - public DefaultKeyStoreLoader(String keystore){ + public DefaultKeyStoreLoader(String keystore) { this.ksFile = keystore; } @@ -48,7 +43,7 @@ public class DefaultKeyStoreLoader implements KeyStoreLoader { * @param keystore path to keystore file * @param truststore path to truststore file */ - public DefaultKeyStoreLoader(String keystore, String truststore){ + public DefaultKeyStoreLoader(String keystore, String truststore) { this.ksFile = keystore; this.tsFile = truststore; } @@ -73,7 +68,7 @@ public class DefaultKeyStoreLoader implements KeyStoreLoader { @Override public InputStream trustStoreInputStream() throws FileNotFoundException { // if no truststore file, assume the truststore is the keystore. - if(this.tsFile == null){ + if (this.tsFile == null) { return new FileInputStream(this.ksFile); } else { return new FileInputStream(this.tsFile); http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java index 297efcc..5630ff8 100644 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.mqtt.ssl; import java.io.IOException; @@ -28,8 +23,12 @@ import java.io.Serializable; public interface KeyStoreLoader extends Serializable { String keyStorePassword(); + String trustStorePassword(); + String keyPassword(); + InputStream keyStoreInputStream() throws IOException; + InputStream trustStoreInputStream() throws IOException; } http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java index 5a821c6..329cff5 100644 --- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java +++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java @@ -1,31 +1,27 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.mqtt.trident; +import java.util.Map; import org.apache.storm.Config; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.topology.FailedException; import org.apache.storm.mqtt.MqttMessage; -import org.apache.storm.mqtt.common.MqttOptions; import org.apache.storm.mqtt.MqttTupleMapper; +import org.apache.storm.mqtt.common.MqttOptions; import org.apache.storm.mqtt.common.MqttPublisher; import org.apache.storm.mqtt.common.SslUtils; import org.apache.storm.mqtt.ssl.KeyStoreLoader; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.topology.FailedException; import org.apache.storm.trident.operation.BaseFunction; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.operation.TridentOperationContext; @@ -33,9 +29,6 @@ import org.apache.storm.trident.tuple.TridentTuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import java.util.Map; - public class MqttPublishFunction extends BaseFunction { private static final Logger LOG = LoggerFactory.getLogger(MqttPublishFunction.class); private MqttTupleMapper mapper; @@ -47,7 +40,7 @@ public class MqttPublishFunction extends BaseFunction { private transient String topologyName; - public MqttPublishFunction(MqttOptions options, MqttTupleMapper mapper, KeyStoreLoader keyStoreLoader, boolean retain){ + public MqttPublishFunction(MqttOptions options, MqttTupleMapper mapper, KeyStoreLoader keyStoreLoader, boolean retain) { this.options = options; this.mapper = mapper; this.retain = retain; @@ -61,7 +54,7 @@ public class MqttPublishFunction extends BaseFunction { @Override public void prepare(Map<String, Object> conf, TridentOperationContext context) { - this.topologyName = (String)conf.get(Config.TOPOLOGY_NAME); + this.topologyName = (String) conf.get(Config.TOPOLOGY_NAME); this.publisher = new MqttPublisher(this.options, this.keyStoreLoader, this.retain); try { this.publisher.connectMqtt(this.topologyName + "-" + context.getPartitionIndex()); http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java ---------------------------------------------------------------------- diff --git a/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java b/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java index 0dd4d73..ba214eb 100644 --- a/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java +++ b/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java @@ -1,35 +1,33 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.mqtt; +import java.io.Serializable; +import java.net.URI; +import java.util.Arrays; +import org.apache.activemq.broker.BrokerService; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.LocalCluster.LocalTopology; import org.apache.storm.generated.StormTopology; -import org.apache.storm.testing.IntegrationTest; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.ITuple; -import org.apache.activemq.broker.BrokerService; import org.apache.storm.mqtt.bolt.MqttBolt; import org.apache.storm.mqtt.common.MqttOptions; import org.apache.storm.mqtt.common.MqttPublisher; import org.apache.storm.mqtt.mappers.StringMessageMapper; import org.apache.storm.mqtt.spout.MqttSpout; +import org.apache.storm.testing.IntegrationTest; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.ITuple; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Message; @@ -43,33 +41,14 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; -import java.net.URI; -import java.util.Arrays; - @Category(IntegrationTest.class) -public class StormMqttIntegrationTest implements Serializable{ +public class StormMqttIntegrationTest implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(StormMqttIntegrationTest.class); - private static BrokerService broker; - static boolean spoutActivated = false; - private static final String TEST_TOPIC = "/mqtt-topology"; private static final String RESULT_TOPIC = "/integration-result"; private static final String RESULT_PAYLOAD = "Storm MQTT Spout"; - - public static class TestSpout extends MqttSpout{ - public TestSpout(MqttMessageMapper type, MqttOptions options){ - super(type, options); - } - - @Override - public void activate() { - super.activate(); - LOG.info("Spout activated."); - spoutActivated = true; - } - } - + static boolean spoutActivated = false; + private static BrokerService broker; @AfterClass public static void cleanup() throws Exception { @@ -86,7 +65,6 @@ public class StormMqttIntegrationTest implements Serializable{ LOG.debug("MQTT broker started"); } - @Test public void testMqttTopology() throws Exception { MQTT client = new MQTT(); @@ -98,14 +76,14 @@ public class StormMqttIntegrationTest implements Serializable{ client.setCleanSession(false); BlockingConnection connection = client.blockingConnection(); connection.connect(); - Topic[] topics = {new Topic("/integration-result", QoS.AT_LEAST_ONCE)}; + Topic[] topics = { new Topic("/integration-result", QoS.AT_LEAST_ONCE) }; byte[] qoses = connection.subscribe(topics); try (LocalCluster cluster = new LocalCluster(); LocalTopology topo = cluster.submitTopology("test", new Config(), buildMqttTopology());) { LOG.info("topology started"); - while(!spoutActivated) { + while (!spoutActivated) { Thread.sleep(500); } @@ -128,7 +106,7 @@ public class StormMqttIntegrationTest implements Serializable{ } } - public StormTopology buildMqttTopology(){ + public StormTopology buildMqttTopology() { TopologyBuilder builder = new TopologyBuilder(); MqttOptions options = new MqttOptions(); @@ -150,4 +128,17 @@ public class StormMqttIntegrationTest implements Serializable{ return builder.createTopology(); } + public static class TestSpout extends MqttSpout { + public TestSpout(MqttMessageMapper type, MqttOptions options) { + super(type, options); + } + + @Override + public void activate() { + super.activate(); + LOG.info("Spout activated."); + spoutActivated = true; + } + } + }
