Fixing stylecheck problems with storm-mqtt

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f7f3524d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f7f3524d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f7f3524d

Branch: refs/heads/master
Commit: f7f3524d1401d76e1779c823391dce46e58acf48
Parents: 6d20c4a
Author: Kishor Patil <[email protected]>
Authored: Sun Apr 22 23:32:01 2018 -0400
Committer: Kishor Patil <[email protected]>
Committed: Mon Apr 23 02:32:42 2018 -0400

----------------------------------------------------------------------
 external/storm-mqtt/pom.xml                     |  2 +-
 .../java/org/apache/storm/mqtt/MqttLogger.java  | 20 ++---
 .../java/org/apache/storm/mqtt/MqttMessage.java | 26 +++---
 .../apache/storm/mqtt/MqttMessageMapper.java    | 22 ++---
 .../org/apache/storm/mqtt/MqttTupleMapper.java  | 24 ++----
 .../org/apache/storm/mqtt/bolt/MqttBolt.java    | 36 ++++----
 .../apache/storm/mqtt/common/MqttOptions.java   | 43 +++++-----
 .../apache/storm/mqtt/common/MqttPublisher.java | 28 +++----
 .../org/apache/storm/mqtt/common/MqttUtils.java | 34 ++++----
 .../org/apache/storm/mqtt/common/SslUtils.java  | 36 ++++----
 .../mqtt/mappers/ByteArrayMessageMapper.java    | 23 ++----
 .../storm/mqtt/mappers/StringMessageMapper.java | 23 ++----
 .../apache/storm/mqtt/spout/AckableMessage.java | 49 ++++++-----
 .../org/apache/storm/mqtt/spout/MqttSpout.java  | 86 +++++++++-----------
 .../storm/mqtt/ssl/DefaultKeyStoreLoader.java   | 25 +++---
 .../apache/storm/mqtt/ssl/KeyStoreLoader.java   | 23 +++---
 .../storm/mqtt/trident/MqttPublishFunction.java | 33 +++-----
 .../storm/mqtt/StormMqttIntegrationTest.java    | 75 ++++++++---------
 18 files changed, 254 insertions(+), 354 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-mqtt/pom.xml b/external/storm-mqtt/pom.xml
index 6c5c48c..703b5da 100644
--- a/external/storm-mqtt/pom.xml
+++ b/external/storm-mqtt/pom.xml
@@ -127,7 +127,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>158</maxAllowedViolations>
+                    <maxAllowedViolations>39</maxAllowedViolations>
                 </configuration>
             </plugin>
         </plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java
----------------------------------------------------------------------
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java
index 3af73fd..3644d16 100644
--- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttLogger.java
@@ -1,24 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.mqtt;
 
 import org.fusesource.mqtt.client.Tracer;
-import org.fusesource.mqtt.codec.MQTTFrame;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java
----------------------------------------------------------------------
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java
index 5436dda..90401ef 100644
--- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessage.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.mqtt;
 
 /**
@@ -27,15 +22,16 @@ public class MqttMessage {
     private byte[] message;
 
 
-    public MqttMessage(String topic, byte[] payload){
+    public MqttMessage(String topic, byte[] payload) {
         this.topic = topic;
         this.message = payload;
     }
-    public byte[] getMessage(){
+
+    public byte[] getMessage() {
         return this.message;
     }
 
-    public String getTopic(){
+    public String getTopic() {
         return this.topic;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java
 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java
index c6173f4..caeb605 100644
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java
+++ 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttMessageMapper.java
@@ -1,27 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.mqtt;
 
+import java.io.Serializable;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
-import java.io.Serializable;
-
 /**
  * Represents an object that can be converted to a Storm Tuple from an 
AckableMessage,
  * given a MQTT Topic Name and a byte array payload.

http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java
index c46c069..d3762b0 100644
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java
+++ 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/MqttTupleMapper.java
@@ -1,31 +1,25 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.mqtt;
 
+package org.apache.storm.mqtt;
 
-import org.apache.storm.tuple.ITuple;
 
 import java.io.Serializable;
+import org.apache.storm.tuple.ITuple;
 
 /**
  * Given a Tuple, converts it to an MQTT message.
  */
-public interface MqttTupleMapper extends Serializable{
+public interface MqttTupleMapper extends Serializable {
 
     /**
      * Converts a Tuple to a MqttMessage

http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java
index b8f5068..745607b 100644
--- a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java
+++ b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/bolt/MqttBolt.java
@@ -1,41 +1,33 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.mqtt.bolt;
 
+import java.util.Map;
 import org.apache.storm.Config;
 import org.apache.storm.mqtt.MqttMessage;
-import org.apache.storm.mqtt.common.MqttOptions;
 import org.apache.storm.mqtt.MqttTupleMapper;
+import org.apache.storm.mqtt.common.MqttOptions;
 import org.apache.storm.mqtt.common.MqttPublisher;
 import org.apache.storm.mqtt.common.SslUtils;
 import org.apache.storm.mqtt.ssl.KeyStoreLoader;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
 import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.TupleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 
 public class MqttBolt extends BaseTickTupleAwareRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(MqttBolt.class);
@@ -48,19 +40,19 @@ public class MqttBolt extends BaseTickTupleAwareRichBolt {
     private transient String topologyName;
 
 
-    public MqttBolt(MqttOptions options, MqttTupleMapper mapper){
+    public MqttBolt(MqttOptions options, MqttTupleMapper mapper) {
         this(options, mapper, null, false);
     }
 
-    public MqttBolt(MqttOptions options, MqttTupleMapper mapper, boolean 
retain){
+    public MqttBolt(MqttOptions options, MqttTupleMapper mapper, boolean 
retain) {
         this(options, mapper, null, retain);
     }
 
-    public MqttBolt(MqttOptions options, MqttTupleMapper mapper, 
KeyStoreLoader keyStoreLoader){
+    public MqttBolt(MqttOptions options, MqttTupleMapper mapper, 
KeyStoreLoader keyStoreLoader) {
         this(options, mapper, keyStoreLoader, false);
     }
 
-    public MqttBolt(MqttOptions options, MqttTupleMapper mapper, 
KeyStoreLoader keyStoreLoader, boolean retain){
+    public MqttBolt(MqttOptions options, MqttTupleMapper mapper, 
KeyStoreLoader keyStoreLoader, boolean retain) {
         this.options = options;
         this.mapper = mapper;
         this.retain = retain;
@@ -74,7 +66,7 @@ public class MqttBolt extends BaseTickTupleAwareRichBolt {
     @Override
     public void prepare(Map<String, Object> conf, TopologyContext context, 
OutputCollector collector) {
         this.collector = collector;
-        this.topologyName = (String)conf.get(Config.TOPOLOGY_NAME);
+        this.topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
         this.publisher = new MqttPublisher(this.options, this.keyStoreLoader, 
this.retain);
         try {
             this.publisher.connectMqtt(this.topologyName + "-" + 
context.getThisComponentId() + "-" + context.getThisTaskId());

http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java
----------------------------------------------------------------------
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java
 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java
index 2b09d6e..1e9c925 100644
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java
+++ 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttOptions.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.mqtt.common;
 
 import java.io.Serializable;
@@ -34,7 +29,7 @@ public class MqttOptions implements Serializable {
     private boolean willRetain = false;
 
     private long reconnectDelay = 10;
-    private long reconnectDelayMax = 30*1000;
+    private long reconnectDelayMax = 30 * 1000;
     private double reconnectBackOffMultiplier = 2.0f;
     private long reconnectAttemptsMax = -1;
     private long connectAttemptsMax = -1;
@@ -205,7 +200,7 @@ public class MqttOptions implements Serializable {
         this.password = password;
     }
 
-    public int getQos(){
+    public int getQos() {
         return this.qos;
     }
 
@@ -213,14 +208,14 @@ public class MqttOptions implements Serializable {
      * Sets the quality of service to use for MQTT messages. Defaults to 1 (at 
least once).
      * @param qos
      */
-    public void setQos(int qos){
-        if(qos < 0 || qos > 2){
+    public void setQos(int qos) {
+        if (qos < 0 || qos > 2) {
             throw new IllegalArgumentException("MQTT QoS must be >= 0 and <= 
2");
         }
         this.qos = qos;
     }
 
-    public int getWillQos(){
+    public int getWillQos() {
         return this.willQos;
     }
 
@@ -229,14 +224,14 @@ public class MqttOptions implements Serializable {
      *
      * @param qos
      */
-    public void setWillQos(int qos){
-        if(qos < 0 || qos > 2){
+    public void setWillQos(int qos) {
+        if (qos < 0 || qos > 2) {
             throw new IllegalArgumentException("MQTT Will QoS must be >= 0 and 
<= 2");
         }
         this.willQos = qos;
     }
 
-    public boolean getWillRetain(){
+    public boolean getWillRetain() {
         return this.willRetain;
     }
 
@@ -244,7 +239,7 @@ public class MqttOptions implements Serializable {
      * Set to true if you want the Will message to be published with the 
retain option.
      * @param retain
      */
-    public void setWillRetain(boolean retain){
+    public void setWillRetain(boolean retain) {
         this.willRetain = retain;
     }
 
@@ -277,12 +272,12 @@ public class MqttOptions implements Serializable {
             return this;
         }
 
-        public Builder willRetain(boolean retain){
+        public Builder willRetain(boolean retain) {
             this.options.willRetain = retain;
             return this;
         }
 
-        public Builder willQos(int qos){
+        public Builder willQos(int qos) {
             this.options.setWillQos(qos);
             return this;
         }
@@ -322,7 +317,7 @@ public class MqttOptions implements Serializable {
             return this;
         }
 
-        public Builder qos(int qos){
+        public Builder qos(int qos) {
             this.options.setQos(qos);
             return this;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java
----------------------------------------------------------------------
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java
 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java
index 9b36b78..5ea1da8 100644
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java
+++ 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttPublisher.java
@@ -1,24 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.mqtt.common;
 
 
-import org.apache.storm.mqtt.MqttLogger;
 import org.apache.storm.mqtt.MqttMessage;
 import org.apache.storm.mqtt.ssl.KeyStoreLoader;
 import org.fusesource.mqtt.client.BlockingConnection;
@@ -27,8 +21,6 @@ import org.fusesource.mqtt.client.QoS;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
-
 public class MqttPublisher {
     private static final Logger LOG = 
LoggerFactory.getLogger(MqttPublisher.class);
 
@@ -39,15 +31,15 @@ public class MqttPublisher {
     private boolean retain = false;
 
 
-    public MqttPublisher(MqttOptions options){
+    public MqttPublisher(MqttOptions options) {
         this(options, null, false);
     }
 
-    public MqttPublisher(MqttOptions options, boolean retain){
+    public MqttPublisher(MqttOptions options, boolean retain) {
         this(options, null, retain);
     }
 
-    public MqttPublisher(MqttOptions options, KeyStoreLoader keyStoreLoader, 
boolean retain){
+    public MqttPublisher(MqttOptions options, KeyStoreLoader keyStoreLoader, 
boolean retain) {
         this.retain = retain;
         this.options = options;
         this.keyStoreLoader = keyStoreLoader;

http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java
----------------------------------------------------------------------
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java
index 4ca0145..4995fd0 100644
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java
+++ 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/MqttUtils.java
@@ -1,23 +1,19 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.mqtt.common;
 
 
+import java.net.URI;
 import org.apache.storm.mqtt.MqttLogger;
 import org.apache.storm.mqtt.ssl.KeyStoreLoader;
 import org.fusesource.mqtt.client.MQTT;
@@ -25,16 +21,14 @@ import org.fusesource.mqtt.client.QoS;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
-
 public class MqttUtils {
     private static final Logger LOG = LoggerFactory.getLogger(MqttUtils.class);
 
-    private MqttUtils(){}
+    private MqttUtils() {}
 
-    public static QoS qosFromInt(int i){
+    public static QoS qosFromInt(int i) {
         QoS qos = null;
-        switch(i) {
+        switch (i) {
             case 0:
                 qos = QoS.AT_MOST_ONCE;
                 break;
@@ -52,13 +46,13 @@ public class MqttUtils {
 
 
     public static MQTT configureClient(MqttOptions options, String clientId, 
KeyStoreLoader keyStoreLoader)
-            throws Exception{
+        throws Exception {
 
         MQTT client = new MQTT();
         URI uri = URI.create(options.getUrl());
 
         client.setHost(uri);
-        if(!uri.getScheme().toLowerCase().equals("tcp")){
+        if (!uri.getScheme().toLowerCase().equals("tcp")) {
             client.setSslContext(SslUtils.sslContext(uri.getScheme(), 
keyStoreLoader));
         }
         client.setClientId(clientId);
@@ -76,7 +70,7 @@ public class MqttUtils {
         client.setPassword(options.getPassword());
         client.setTracer(new MqttLogger());
 
-        if(options.getWillTopic() != null && options.getWillPayload() != null){
+        if (options.getWillTopic() != null && options.getWillPayload() != 
null) {
             QoS qos = MqttUtils.qosFromInt(options.getWillQos());
             client.setWillQos(qos);
             client.setWillTopic(options.getWillTopic());

http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java
----------------------------------------------------------------------
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java
index 681fc1d..6ea3d0c 100644
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java
+++ 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/common/SslUtils.java
@@ -1,44 +1,38 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.mqtt.common;
 
+package org.apache.storm.mqtt.common;
 
-import org.apache.storm.mqtt.ssl.KeyStoreLoader;
 
+import java.net.URI;
+import java.security.KeyStore;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
-import java.net.URI;
-import java.security.KeyStore;
+import org.apache.storm.mqtt.ssl.KeyStoreLoader;
 
 public class SslUtils {
-    private SslUtils(){}
+    private SslUtils() {}
 
-    public static void checkSslConfig(String url, KeyStoreLoader loader){
+    public static void checkSslConfig(String url, KeyStoreLoader loader) {
         URI uri = URI.create(url);
         String scheme = uri.getScheme().toLowerCase();
-        if(!(scheme.equals("tcp") || scheme.startsWith("tls") || 
scheme.startsWith("ssl"))){
+        if (!(scheme.equals("tcp") || scheme.startsWith("tls") || 
scheme.startsWith("ssl"))) {
             throw new IllegalArgumentException("Unrecognized URI scheme: " + 
scheme);
         }
-        if(!scheme.equalsIgnoreCase("tcp") && loader == null){
+        if (!scheme.equalsIgnoreCase("tcp") && loader == null) {
             throw new IllegalStateException("A TLS/SSL MQTT URL was specified, 
but no KeyStoreLoader configured. " +
-                    "A KeyStoreLoader implementation is required when using 
TLS/SSL.");
+                                            "A KeyStoreLoader implementation 
is required when using TLS/SSL.");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java
 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java
index a19fce4..1dd943d 100644
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java
+++ 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/ByteArrayMessageMapper.java
@@ -1,26 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.mqtt.mappers;
 
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
 import org.apache.storm.mqtt.MqttMessage;
 import org.apache.storm.mqtt.MqttMessageMapper;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 
 
 public class ByteArrayMessageMapper implements MqttMessageMapper {

http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java
----------------------------------------------------------------------
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java
 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java
index e5f309b..341e52d 100644
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java
+++ 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/mappers/StringMessageMapper.java
@@ -1,26 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.mqtt.mappers;
 
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
 import org.apache.storm.mqtt.MqttMessage;
 import org.apache.storm.mqtt.MqttMessageMapper;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 
 /**
  * Given a String topic and byte[] message, emits a tuple with fields

http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java
----------------------------------------------------------------------
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java
 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java
index 5c3867e..0b91442 100644
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java
+++ 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/AckableMessage.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.mqtt.spout;
 
 import org.apache.commons.lang.builder.EqualsBuilder;
@@ -31,41 +26,45 @@ class AckableMessage {
     private byte[] message;
     private Runnable ack;
 
-    AckableMessage(String topic, byte[] message, Runnable ack){
+    AckableMessage(String topic, byte[] message, Runnable ack) {
         this.topic = topic;
         this.message = message;
         this.ack = ack;
     }
 
-    public MqttMessage getMessage(){
+    public MqttMessage getMessage() {
         return new MqttMessage(this.topic, this.message);
     }
 
     @Override
     public int hashCode() {
         return new HashCodeBuilder(71, 123)
-                .append(this.topic)
-                .append(this.message)
-                .toHashCode();
+            .append(this.topic)
+            .append(this.message)
+            .toHashCode();
     }
 
 
     @Override
     public boolean equals(Object obj) {
-        if (obj == null) { return false; }
-        if (obj == this) { return true; }
+        if (obj == null) {
+            return false;
+        }
+        if (obj == this) {
+            return true;
+        }
         if (obj.getClass() != getClass()) {
             return false;
         }
-        AckableMessage tm = (AckableMessage)obj;
+        AckableMessage tm = (AckableMessage) obj;
         return new EqualsBuilder()
-                .appendSuper(super.equals(obj))
-                .append(this.topic, tm.topic)
-                .append(this.message, tm.message)
-                .isEquals();
+            .appendSuper(super.equals(obj))
+            .append(this.topic, tm.topic)
+            .append(this.message, tm.message)
+            .isEquals();
     }
 
-    Runnable ack(){
+    Runnable ack() {
         return this.ack;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java
index df6c6fd..268a695 100644
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java
+++ 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/spout/MqttSpout.java
@@ -1,32 +1,31 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.mqtt.spout;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.storm.Config;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichSpout;
-import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.mqtt.MqttMessageMapper;
 import org.apache.storm.mqtt.common.MqttOptions;
 import org.apache.storm.mqtt.common.MqttUtils;
 import org.apache.storm.mqtt.common.SslUtils;
 import org.apache.storm.mqtt.ssl.KeyStoreLoader;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.UTF8Buffer;
 import org.fusesource.mqtt.client.Callback;
@@ -38,55 +37,45 @@ import org.fusesource.mqtt.client.Topic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-
 public class MqttSpout implements IRichSpout, Listener {
     private static final Logger LOG = LoggerFactory.getLogger(MqttSpout.class);
-
-    private String topologyName;
-
-
-    private CallbackConnection connection;
-
     protected transient SpoutOutputCollector collector;
     protected transient TopologyContext context;
     protected transient LinkedBlockingQueue<AckableMessage> incoming;
     protected transient HashMap<Long, AckableMessage> pending;
-    private transient Map<String, Object> conf;
     protected MqttMessageMapper type;
     protected MqttOptions options;
     protected KeyStoreLoader keyStoreLoader;
-
+    private String topologyName;
+    private CallbackConnection connection;
+    private transient Map<String, Object> conf;
     private boolean mqttConnected = false;
     private boolean mqttConnectFailed = false;
 
 
     private Long sequence = Long.MIN_VALUE;
 
-    private Long nextId(){
-        this.sequence++;
-        if(this.sequence == Long.MAX_VALUE){
-            this.sequence = Long.MIN_VALUE;
-        }
-        return this.sequence;
-    }
-
-    protected MqttSpout(){}
+    protected MqttSpout() {}
 
-    public MqttSpout(MqttMessageMapper type, MqttOptions options){
+    public MqttSpout(MqttMessageMapper type, MqttOptions options) {
         this(type, options, null);
     }
 
-    public MqttSpout(MqttMessageMapper type, MqttOptions options, 
KeyStoreLoader keyStoreLoader){
+    public MqttSpout(MqttMessageMapper type, MqttOptions options, 
KeyStoreLoader keyStoreLoader) {
         this.type = type;
         this.options = options;
         this.keyStoreLoader = keyStoreLoader;
         SslUtils.checkSslConfig(this.options.getUrl(), this.keyStoreLoader);
     }
 
+    private Long nextId() {
+        this.sequence++;
+        if (this.sequence == Long.MAX_VALUE) {
+            this.sequence = Long.MIN_VALUE;
+        }
+        return this.sequence;
+    }
+
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare(this.type.outputFields());
     }
@@ -96,7 +85,7 @@ public class MqttSpout implements IRichSpout, Listener {
     }
 
     public void open(Map<String, Object> conf, TopologyContext context, 
SpoutOutputCollector collector) {
-        this.topologyName = (String)conf.get(Config.TOPOLOGY_NAME);
+        this.topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
 
         this.collector = collector;
         this.context = context;
@@ -116,23 +105,23 @@ public class MqttSpout implements IRichSpout, Listener {
 
     private void connectMqtt() throws Exception {
         String clientId = this.topologyName + "-" + 
this.context.getThisComponentId() + "-" +
-                this.context.getThisTaskId();
+                          this.context.getThisTaskId();
 
         MQTT client = MqttUtils.configureClient(this.options, clientId, 
this.keyStoreLoader);
         this.connection = client.callbackConnection();
         this.connection.listener(this);
         this.connection.connect(new ConnectCallback());
 
-        while(!this.mqttConnected && !this.mqttConnectFailed){
+        while (!this.mqttConnected && !this.mqttConnectFailed) {
             LOG.info("Waiting for connection...");
             Thread.sleep(500);
         }
 
-        if(this.mqttConnected){
+        if (this.mqttConnected) {
             List<String> topicList = this.options.getTopics();
             Topic[] topics = new Topic[topicList.size()];
             QoS qos = MqttUtils.qosFromInt(this.options.getQos());
-            for(int i = 0;i < topicList.size();i++){
+            for (int i = 0; i < topicList.size(); i++) {
                 topics[i] = new Topic(topicList.get(i), qos);
             }
             connection.subscribe(topics, new SubscribeCallback());
@@ -140,7 +129,6 @@ public class MqttSpout implements IRichSpout, Listener {
     }
 
 
-
     public void close() {
         this.connection.disconnect(new DisconnectCallback());
     }
@@ -161,7 +149,7 @@ public class MqttSpout implements IRichSpout, Listener {
      */
     public void nextTuple() {
         AckableMessage tm = this.incoming.poll();
-        if(tm != null){
+        if (tm != null) {
             Long id = nextId();
             this.collector.emit(this.type.toValues(tm.getMessage()), id);
             this.pending.put(id, tm);
@@ -237,7 +225,7 @@ public class MqttSpout implements IRichSpout, Listener {
     }
 
     // ################# Subscribe Callback Implementation 
######################
-    private class SubscribeCallback implements Callback<byte[]>{
+    private class SubscribeCallback implements Callback<byte[]> {
         public void onSuccess(byte[] qos) {
             LOG.info("Subscripton sucessful.");
         }
@@ -249,7 +237,7 @@ public class MqttSpout implements IRichSpout, Listener {
     }
 
     // ################# Subscribe Callback Implementation 
######################
-    private class DisconnectCallback implements Callback<Void>{
+    private class DisconnectCallback implements Callback<Void> {
         public void onSuccess(Void aVoid) {
             LOG.info("MQTT Disconnect successful.");
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java
 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java
index 8bca407..14c3c6c 100644
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java
+++ 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/DefaultKeyStoreLoader.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.mqtt.ssl;
 
 import java.io.FileInputStream;
@@ -37,7 +32,7 @@ public class DefaultKeyStoreLoader implements KeyStoreLoader {
      *
      * @param keystore path to keystore file
      */
-    public DefaultKeyStoreLoader(String keystore){
+    public DefaultKeyStoreLoader(String keystore) {
         this.ksFile = keystore;
     }
 
@@ -48,7 +43,7 @@ public class DefaultKeyStoreLoader implements KeyStoreLoader {
      * @param keystore path to keystore file
      * @param truststore path to truststore file
      */
-    public DefaultKeyStoreLoader(String keystore, String truststore){
+    public DefaultKeyStoreLoader(String keystore, String truststore) {
         this.ksFile = keystore;
         this.tsFile = truststore;
     }
@@ -73,7 +68,7 @@ public class DefaultKeyStoreLoader implements KeyStoreLoader {
     @Override
     public InputStream trustStoreInputStream() throws FileNotFoundException {
         // if no truststore file, assume the truststore is the keystore.
-        if(this.tsFile == null){
+        if (this.tsFile == null) {
             return new FileInputStream(this.ksFile);
         } else {
             return new FileInputStream(this.tsFile);

http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java
 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java
index 297efcc..5630ff8 100644
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java
+++ 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/ssl/KeyStoreLoader.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.mqtt.ssl;
 
 import java.io.IOException;
@@ -28,8 +23,12 @@ import java.io.Serializable;
 public interface KeyStoreLoader extends Serializable {
 
     String keyStorePassword();
+
     String trustStorePassword();
+
     String keyPassword();
+
     InputStream keyStoreInputStream() throws IOException;
+
     InputStream trustStoreInputStream() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java
----------------------------------------------------------------------
diff --git 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java
 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java
index 5a821c6..329cff5 100644
--- 
a/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java
+++ 
b/external/storm-mqtt/src/main/java/org/apache/storm/mqtt/trident/MqttPublishFunction.java
@@ -1,31 +1,27 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.mqtt.trident;
 
+import java.util.Map;
 import org.apache.storm.Config;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.topology.FailedException;
 import org.apache.storm.mqtt.MqttMessage;
-import org.apache.storm.mqtt.common.MqttOptions;
 import org.apache.storm.mqtt.MqttTupleMapper;
+import org.apache.storm.mqtt.common.MqttOptions;
 import org.apache.storm.mqtt.common.MqttPublisher;
 import org.apache.storm.mqtt.common.SslUtils;
 import org.apache.storm.mqtt.ssl.KeyStoreLoader;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.FailedException;
 import org.apache.storm.trident.operation.BaseFunction;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.operation.TridentOperationContext;
@@ -33,9 +29,6 @@ import org.apache.storm.trident.tuple.TridentTuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-import java.util.Map;
-
 public class MqttPublishFunction extends BaseFunction {
     private static final Logger LOG = 
LoggerFactory.getLogger(MqttPublishFunction.class);
     private MqttTupleMapper mapper;
@@ -47,7 +40,7 @@ public class MqttPublishFunction extends BaseFunction {
     private transient String topologyName;
 
 
-    public MqttPublishFunction(MqttOptions options, MqttTupleMapper mapper, 
KeyStoreLoader keyStoreLoader, boolean retain){
+    public MqttPublishFunction(MqttOptions options, MqttTupleMapper mapper, 
KeyStoreLoader keyStoreLoader, boolean retain) {
         this.options = options;
         this.mapper = mapper;
         this.retain = retain;
@@ -61,7 +54,7 @@ public class MqttPublishFunction extends BaseFunction {
 
     @Override
     public void prepare(Map<String, Object> conf, TridentOperationContext 
context) {
-        this.topologyName = (String)conf.get(Config.TOPOLOGY_NAME);
+        this.topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
         this.publisher = new MqttPublisher(this.options, this.keyStoreLoader, 
this.retain);
         try {
             this.publisher.connectMqtt(this.topologyName + "-" + 
context.getPartitionIndex());

http://git-wip-us.apache.org/repos/asf/storm/blob/f7f3524d/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
 
b/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
index 0dd4d73..ba214eb 100644
--- 
a/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
+++ 
b/external/storm-mqtt/src/test/java/org/apache/storm/mqtt/StormMqttIntegrationTest.java
@@ -1,35 +1,33 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.mqtt;
 
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Arrays;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.LocalCluster.LocalTopology;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.testing.IntegrationTest;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.ITuple;
-import org.apache.activemq.broker.BrokerService;
 import org.apache.storm.mqtt.bolt.MqttBolt;
 import org.apache.storm.mqtt.common.MqttOptions;
 import org.apache.storm.mqtt.common.MqttPublisher;
 import org.apache.storm.mqtt.mappers.StringMessageMapper;
 import org.apache.storm.mqtt.spout.MqttSpout;
+import org.apache.storm.testing.IntegrationTest;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.ITuple;
 import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.MQTT;
 import org.fusesource.mqtt.client.Message;
@@ -43,33 +41,14 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
-import java.net.URI;
-import java.util.Arrays;
-
 @Category(IntegrationTest.class)
-public class StormMqttIntegrationTest implements Serializable{
+public class StormMqttIntegrationTest implements Serializable {
     private static final Logger LOG = 
LoggerFactory.getLogger(StormMqttIntegrationTest.class);
-    private static BrokerService broker;
-    static boolean spoutActivated = false;
-
     private static final String TEST_TOPIC = "/mqtt-topology";
     private static final String RESULT_TOPIC = "/integration-result";
     private static final String RESULT_PAYLOAD = "Storm MQTT Spout";
-
-    public static class TestSpout extends MqttSpout{
-        public TestSpout(MqttMessageMapper type, MqttOptions options){
-            super(type, options);
-        }
-
-        @Override
-        public void activate() {
-            super.activate();
-            LOG.info("Spout activated.");
-            spoutActivated = true;
-        }
-    }
-
+    static boolean spoutActivated = false;
+    private static BrokerService broker;
 
     @AfterClass
     public static void cleanup() throws Exception {
@@ -86,7 +65,6 @@ public class StormMqttIntegrationTest implements Serializable{
         LOG.debug("MQTT broker started");
     }
 
-
     @Test
     public void testMqttTopology() throws Exception {
         MQTT client = new MQTT();
@@ -98,14 +76,14 @@ public class StormMqttIntegrationTest implements 
Serializable{
         client.setCleanSession(false);
         BlockingConnection connection = client.blockingConnection();
         connection.connect();
-        Topic[] topics = {new Topic("/integration-result", QoS.AT_LEAST_ONCE)};
+        Topic[] topics = { new Topic("/integration-result", QoS.AT_LEAST_ONCE) 
};
         byte[] qoses = connection.subscribe(topics);
 
         try (LocalCluster cluster = new LocalCluster();
              LocalTopology topo = cluster.submitTopology("test", new Config(), 
buildMqttTopology());) {
 
             LOG.info("topology started");
-            while(!spoutActivated) {
+            while (!spoutActivated) {
                 Thread.sleep(500);
             }
 
@@ -128,7 +106,7 @@ public class StormMqttIntegrationTest implements 
Serializable{
         }
     }
 
-    public StormTopology buildMqttTopology(){
+    public StormTopology buildMqttTopology() {
         TopologyBuilder builder = new TopologyBuilder();
 
         MqttOptions options = new MqttOptions();
@@ -150,4 +128,17 @@ public class StormMqttIntegrationTest implements 
Serializable{
         return builder.createTopology();
     }
 
+    public static class TestSpout extends MqttSpout {
+        public TestSpout(MqttMessageMapper type, MqttOptions options) {
+            super(type, options);
+        }
+
+        @Override
+        public void activate() {
+            super.activate();
+            LOG.info("Spout activated.");
+            spoutActivated = true;
+        }
+    }
+
 }

Reply via email to