Fixing stylecheck problems with storm-jms

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/95602b1b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/95602b1b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/95602b1b

Branch: refs/heads/master
Commit: 95602b1be7493c33b7dc8c3a8cb6e406b59e907d
Parents: 224633d
Author: Kishor Patil <[email protected]>
Authored: Sun Apr 22 23:23:30 2018 -0400
Committer: Kishor Patil <[email protected]>
Committed: Mon Apr 23 02:32:41 2018 -0400

----------------------------------------------------------------------
 external/storm-jms/pom.xml                      |   2 +-
 .../apache/storm/jms/JmsMessageProducer.java    |  21 +-
 .../java/org/apache/storm/jms/JmsProvider.java  |  20 +-
 .../org/apache/storm/jms/JmsTupleProducer.java  |  21 +-
 .../java/org/apache/storm/jms/bolt/JmsBolt.java |  38 ++-
 .../apache/storm/jms/spout/JmsMessageID.java    |  29 +--
 .../org/apache/storm/jms/spout/JmsSpout.java    | 216 +++++++++--------
 .../org/apache/storm/jms/trident/JmsBatch.java  |  19 +-
 .../org/apache/storm/jms/trident/JmsState.java  | 103 ++++----
 .../storm/jms/trident/JmsStateFactory.java      |  22 +-
 .../apache/storm/jms/trident/JmsUpdater.java    |  26 +-
 .../storm/jms/trident/TridentJmsSpout.java      | 237 +++++++++----------
 .../apache/storm/jms/spout/JmsSpoutTest.java    |  37 ++-
 .../apache/storm/jms/spout/MockJmsProvider.java |  21 +-
 .../jms/spout/MockSpoutOutputCollector.java     |   6 +-
 .../storm/jms/spout/MockTupleProducer.java      |   2 +-
 16 files changed, 382 insertions(+), 438 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-jms/pom.xml b/external/storm-jms/pom.xml
index 81e7f62..c7bcc51 100644
--- a/external/storm-jms/pom.xml
+++ b/external/storm-jms/pom.xml
@@ -94,7 +94,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>235</maxAllowedViolations>
+                    <maxAllowedViolations>63</maxAllowedViolations>
                 </configuration>
             </plugin>
         </plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java 
b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java
index 4932929..671cdd9 100644
--- 
a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java
+++ 
b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java
@@ -1,28 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.jms;
 
 import java.io.Serializable;
-
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Session;
-
 import org.apache.storm.tuple.ITuple;
 
 /**

http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java 
b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java
index d976326..b8dde44 100644
--- a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java
@@ -1,24 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.jms;
 
 import java.io.Serializable;
-
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java 
b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java
index 0bbb3a0..4457f5a 100644
--- 
a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java
+++ 
b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java
@@ -1,27 +1,20 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.jms;
 
 import java.io.Serializable;
-
 import javax.jms.JMSException;
 import javax.jms.Message;
-
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Values;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java 
b/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
index 9b3b614..0b461a1 100644
--- a/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
@@ -1,24 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.jms.bolt;
 
 import java.util.Map;
-
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
@@ -26,19 +20,15 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-
-import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.storm.jms.JmsMessageProducer;
 import org.apache.storm.jms.JmsProvider;
-import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
 import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A JmsBolt receives <code>org.apache.storm.tuple.Tuple</code> objects from a 
Storm
@@ -124,9 +114,9 @@ public class JmsBolt extends BaseTickTupleAwareRichBolt {
      *
      * @param transactional
      */
-//     public void setJmsTransactional(boolean transactional){
-//             this.jmsTransactional = transactional;
-//     }
+    // public void setJmsTransactional(boolean transactional){
+    //         this.jmsTransactional = transactional;
+    // }
 
     /**
      * Sets whether or not tuples should be acknowledged by this
@@ -208,7 +198,7 @@ public class JmsBolt extends BaseTickTupleAwareRichBolt {
             Destination dest = this.jmsProvider.destination();
             this.connection = cf.createConnection();
             this.session = connection.createSession(this.jmsTransactional,
-                    this.jmsAcknowledgeMode);
+                                                    this.jmsAcknowledgeMode);
             this.messageProducer = session.createProducer(dest);
 
             connection.start();

http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java 
b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java
index b78a41e..c069b7c 100644
--- 
a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java
+++ 
b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.jms.spout;
 
 import java.io.Serializable;
@@ -25,19 +20,19 @@ public class JmsMessageID implements 
Comparable<JmsMessageID>, Serializable {
 
     private Long sequence;
 
-    public JmsMessageID(long sequence, String jmsID){
+    public JmsMessageID(long sequence, String jmsID) {
         this.jmsID = jmsID;
         this.sequence = sequence;
     }
 
 
-    public String getJmsID(){
+    public String getJmsID() {
         return this.jmsID;
     }
 
     @Override
     public int compareTo(JmsMessageID jmsMessageID) {
-        return (int)(this.sequence - jmsMessageID.sequence);
+        return (int) (this.sequence - jmsMessageID.sequence);
     }
 
     @Override
@@ -47,8 +42,8 @@ public class JmsMessageID implements 
Comparable<JmsMessageID>, Serializable {
 
     @Override
     public boolean equals(Object o) {
-        if(o instanceof JmsMessageID){
-            JmsMessageID id = (JmsMessageID)o;
+        if (o instanceof JmsMessageID) {
+            JmsMessageID id = (JmsMessageID) o;
             return this.jmsID.equals(id.jmsID);
         } else {
             return false;

http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java 
b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
index 8973dbf..41d5636 100644
--- a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
+++ b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
@@ -34,7 +34,6 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Session;
-
 import org.apache.storm.Config;
 import org.apache.storm.jms.JmsProvider;
 import org.apache.storm.jms.JmsTupleProducer;
@@ -48,10 +47,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-
 /**
- * A Storm <code>Spout</code> implementation that listens to a JMS topic or
- * queue and outputs tuples based on the messages it receives.
+ * A Storm <code>Spout</code> implementation that listens to a JMS topic or 
queue and outputs tuples based on the messages it receives.
  *
  * <p><code>JmsSpout</code> instances rely on <code>JmsProducer</code>
  * implementations to obtain the JMS
@@ -59,8 +56,7 @@ import org.slf4j.LoggerFactory;
  * to connect to a JMS topic/queue.
  *
  * <p>When a <code>JmsSpout</code> receives a JMS message, it delegates to an
- * internal <code>JmsTupleProducer</code> instance to create a Storm tuple from
- * the incoming message.
+ * internal <code>JmsTupleProducer</code> instance to create a Storm tuple 
from the incoming message.
  *
  * <p>Typically, developers will supply a custom <code>JmsTupleProducer</code>
  * implementation appropriate for the expected message content.
@@ -68,13 +64,19 @@ import org.slf4j.LoggerFactory;
 @SuppressWarnings("serial")
 public class JmsSpout extends BaseRichSpout implements MessageListener {
 
-    /** The logger object instance for this class. */
+    /**
+     * The logger object instance for this class.
+     */
     private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class);
 
-    /** The logger of the recovery task. */
+    /**
+     * The logger of the recovery task.
+     */
     private static final Logger RECOVERY_TASK_LOG = 
LoggerFactory.getLogger(RecoveryTask.class);
 
-    /** Time to sleep between queue polling attempts. */
+    /**
+     * Time to sleep between queue polling attempts.
+     */
     private static final int POLL_INTERVAL_MS = 50;
 
     /**
@@ -82,88 +84,98 @@ public class JmsSpout extends BaseRichSpout implements 
MessageListener {
      */
     private static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 30;
 
-    /** Time to wait before queuing the first recovery task. */
+    /**
+     * Time to wait before queuing the first recovery task.
+     */
     private static final int RECOVERY_DELAY_MS = 10;
-
+    /**
+     * Used to safely recover failed JMS sessions across instances.
+     */
+    private final Serializable recoveryMutex = "RECOVERY_MUTEX";
     /**
      * The acknowledgment mode used for this instance.
      *
      * @see Session
      */
     private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-
-    /** Indicates whether or not this spout should run as a singleton. */
+    /**
+     * Indicates whether or not this spout should run as a singleton.
+     */
     private boolean distributed = true;
-
-    /** Used to generate tuples from incoming messages. */
+    /**
+     * Used to generate tuples from incoming messages.
+     */
     private JmsTupleProducer tupleProducer;
-
-    /** Encapsulates jms related classes needed to communicate with the mq. */
+    /**
+     * Encapsulates jms related classes needed to communicate with the mq.
+     */
     private JmsProvider jmsProvider;
-
-    /** Stores incoming messages for later sending. */
+    /**
+     * Stores incoming messages for later sending.
+     */
     private LinkedBlockingQueue<Message> queue;
-
-    /** Contains all message ids of messages that were not yet acked. */
+    /**
+     * Contains all message ids of messages that were not yet acked.
+     */
     private TreeSet<JmsMessageID> toCommit;
-
-    /** Maps between message ids of not-yet acked messages, and the messages. 
*/
+    /**
+     * Maps between message ids of not-yet acked messages, and the messages.
+     */
     private HashMap<JmsMessageID, Message> pendingMessages;
-
-    /** Counter of handled messages. */
+    /**
+     * Counter of handled messages.
+     */
     private long messageSequence = 0;
-
-    /** The collector used to emit tuples. */
+    /**
+     * The collector used to emit tuples.
+     */
     private SpoutOutputCollector collector;
-
-    /** Connection to the jms queue. */
+    /**
+     * Connection to the jms queue.
+     */
     private transient Connection connection;
-
-    /** The active jms session. */
+    /**
+     * The active jms session.
+     */
     private transient Session session;
-
-    /** Indicates whether or not a message failed to be processed. */
+    /**
+     * Indicates whether or not a message failed to be processed.
+     */
     private boolean hasFailures = false;
-
-    /** Used to safely recover failed JMS sessions across instances. */
-    private final Serializable recoveryMutex = "RECOVERY_MUTEX";
-
-    /** Schedules recovery tasks periodically. */
+    /**
+     * Schedules recovery tasks periodically.
+     */
     private Timer recoveryTimer = null;
 
-    /** Time to wait between recovery attempts. */
+    /**
+     * Time to wait between recovery attempts.
+     */
     private long recoveryPeriodMs = -1; // default to disabled
 
     /**
-     * Sets the JMS Session acknowledgement mode for the JMS session.
+     * Translate the {@code int} value of an acknowledgment to a {@code 
String}.
      *
-     * <p>Possible values:
-     * <ul>
-     * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
-     * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
-     * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
-     * </ul>
+     * @param deliveryMode the mode to translate.
+     * @return its {@code String} explanation (name).
      *
-     * @param mode JMS Session Acknowledgement mode
-     * @throws IllegalArgumentException if the mode is not recognized.
+     * @see Session
      */
-    public void setJmsAcknowledgeMode(final int mode) {
-        switch (mode) {
+    private static String toDeliveryModeString(int deliveryMode) {
+        switch (deliveryMode) {
             case Session.AUTO_ACKNOWLEDGE:
+                return "AUTO_ACKNOWLEDGE";
             case Session.CLIENT_ACKNOWLEDGE:
+                return "CLIENT_ACKNOWLEDGE";
             case Session.DUPS_OK_ACKNOWLEDGE:
-                break;
+                return "DUPS_OK_ACKNOWLEDGE";
             default:
-                throw new IllegalArgumentException(
-                        "Unknown Acknowledge mode: " + mode + " (See 
javax.jms.Session for valid values)");
+                return "UNKNOWN";
 
         }
-        this.jmsAcknowledgeMode = mode;
     }
 
     /**
-     * Returns the JMS Session acknowledgement mode for the JMS session
-     * associated with this spout. Can be either of:
+     * Returns the JMS Session acknowledgement mode for the JMS session 
associated with this spout. Can be either of:
      * <ul>
      * <li>{@link Session#AUTO_ACKNOWLEDGE}</li>
      * <li>{@link Session#CLIENT_ACKNOWLEDGE}</li>
@@ -178,11 +190,37 @@ public class JmsSpout extends BaseRichSpout implements 
MessageListener {
     }
 
     /**
+     * Sets the JMS Session acknowledgement mode for the JMS session.
+     *
+     * <p>Possible values:
+     * <ul>
+     * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
+     * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
+     * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
+     * </ul>
+     *
+     * @param mode JMS Session Acknowledgement mode
+     * @throws IllegalArgumentException if the mode is not recognized.
+     */
+    public void setJmsAcknowledgeMode(final int mode) {
+        switch (mode) {
+            case Session.AUTO_ACKNOWLEDGE:
+            case Session.CLIENT_ACKNOWLEDGE:
+            case Session.DUPS_OK_ACKNOWLEDGE:
+                break;
+            default:
+                throw new IllegalArgumentException(
+                    "Unknown Acknowledge mode: " + mode + " (See 
javax.jms.Session for valid values)");
+
+        }
+        this.jmsAcknowledgeMode = mode;
+    }
+
+    /**
      * Set {@link #jmsProvider}.
      *
      * <p>Set the <code>JmsProvider</code>
-     * implementation that this Spout will use to connect to
-     * a JMS <code>javax.jms.Desination</code>
+     * implementation that this Spout will use to connect to a JMS 
<code>javax.jms.Desination</code>
      *
      * @param provider the provider to use
      */
@@ -191,10 +229,8 @@ public class JmsSpout extends BaseRichSpout implements 
MessageListener {
     }
 
     /**
-     * Set the <code>JmsTupleProducer</code>
-     * implementation that will convert <code>javax.jms.Message</code>
-     * object to <code>org.apache.storm.tuple.Values</code> objects
-     * to be emitted.
+     * Set the <code>JmsTupleProducer</code> implementation that will convert 
<code>javax.jms.Message</code> object to
+     * <code>org.apache.storm.tuple.Values</code> objects to be emitted.
      *
      * @param producer the producer instance to use
      */
@@ -238,12 +274,12 @@ public class JmsSpout extends BaseRichSpout implements 
MessageListener {
         }
         // TODO get the default value from storm instead of hard coding 30 secs
         Long topologyTimeout =
-                ((Number) 
conf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 
DEFAULT_MESSAGE_TIMEOUT_SECS)).longValue();
+            ((Number) conf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 
DEFAULT_MESSAGE_TIMEOUT_SECS)).longValue();
         if ((TimeUnit.SECONDS.toMillis(topologyTimeout)) > 
this.recoveryPeriodMs) {
             LOG.warn("*** WARNING *** : "
-                    + "Recovery period (" + this.recoveryPeriodMs + " ms.) is 
less then the configured "
-                    + "'topology.message.timeout.secs' of " + topologyTimeout
-                    + " secs. This could lead to a message replay flood!");
+                     + "Recovery period (" + this.recoveryPeriodMs + " ms.) is 
less then the configured "
+                     + "'topology.message.timeout.secs' of " + topologyTimeout
+                     + " secs. This could lead to a message replay flood!");
         }
         this.queue = new LinkedBlockingQueue<Message>();
         this.toCommit = new TreeSet<JmsMessageID>();
@@ -288,10 +324,8 @@ public class JmsSpout extends BaseRichSpout implements 
MessageListener {
      * Generate the next tuple from a message.
      *
      * <p>This method polls the queue that's being filled asynchronously by the
-     * jms connection, every {@link #POLL_INTERVAL_MS} seconds. When a message
-     * arrives, a {@link Values} (tuple) is generated using
-     * {@link #tupleProducer}. It is emitted, and the message is saved to
-     * {@link #toCommit} and {@link #pendingMessages} for later handling.
+     * jms connection, every {@link #POLL_INTERVAL_MS} seconds. When a message 
arrives, a {@link Values} (tuple) is generated using {@link
+     * #tupleProducer}. It is emitted, and the message is saved to {@link 
#toCommit} and {@link #pendingMessages} for later handling.
      */
     public void nextTuple() {
         Message msg = this.queue.poll();
@@ -332,8 +366,7 @@ public class JmsSpout extends BaseRichSpout implements 
MessageListener {
      * Ack a successfully handled message by the matching {@link JmsMessageID}.
      *
      * <p>Acking means removing the message from the pending messages
-     * collections, and if it was the oldest pending message -
-     * ack it to the mq as well, so that it's the only one acked.
+     * collections, and if it was the oldest pending message - ack it to the 
mq as well, so that it's the only one acked.
      *
      * <p>Will only be called if we're transactional or not AUTO_ACKNOWLEDGE.
      */
@@ -392,8 +425,7 @@ public class JmsSpout extends BaseRichSpout implements 
MessageListener {
     }
 
     /**
-     * Returns <code>true</code> if the spout has received failures
-     * from which it has not yet recovered.
+     * Returns <code>true</code> if the spout has received failures from which 
it has not yet recovered.
      *
      * @return {@code true} if there were failures, {@code false} otherwise.
      */
@@ -409,8 +441,7 @@ public class JmsSpout extends BaseRichSpout implements 
MessageListener {
     }
 
     /**
-     * Sets the periodicity of the timer task that
-     * checks for failures and recovers the JMS session.
+     * Sets the periodicity of the timer task that checks for failures and 
recovers the JMS session.
      *
      * @param period desired wait period
      */
@@ -429,45 +460,21 @@ public class JmsSpout extends BaseRichSpout implements 
MessageListener {
      * Sets the "distributed" mode of this spout.
      *
      * <p>If <code>true</code> multiple instances of this spout <i>may</i> be
-     * created across the cluster
-     * (depending on the "parallelism_hint" in the topology configuration).
+     * created across the cluster (depending on the "parallelism_hint" in the 
topology configuration).
      *
      * <p>Setting this value to <code>false</code> essentially means this spout
-     * will run as a singleton within the cluster
-     * ("parallelism_hint" will be ignored).
+     * will run as a singleton within the cluster ("parallelism_hint" will be 
ignored).
      *
      * <p>In general, this should be set to <code>false</code> if the 
underlying
      * JMS destination is a topic, and <code>true</code> if it is a JMS queue.
      *
-     * @param isDistributed {@code true} if should be distributed, {@code 
false}
-     *                      otherwise.
+     * @param isDistributed {@code true} if should be distributed, {@code 
false} otherwise.
      */
     public void setDistributed(boolean isDistributed) {
         this.distributed = isDistributed;
     }
 
     /**
-     * Translate the {@code int} value of an acknowledgment to a {@code 
String}.
-     *
-     * @param deliveryMode the mode to translate.
-     * @return its {@code String} explanation (name).
-     * @see Session
-     */
-    private static String toDeliveryModeString(int deliveryMode) {
-        switch (deliveryMode) {
-            case Session.AUTO_ACKNOWLEDGE:
-                return "AUTO_ACKNOWLEDGE";
-            case Session.CLIENT_ACKNOWLEDGE:
-                return "CLIENT_ACKNOWLEDGE";
-            case Session.DUPS_OK_ACKNOWLEDGE:
-                return "DUPS_OK_ACKNOWLEDGE";
-            default:
-                return "UNKNOWN";
-
-        }
-    }
-
-    /**
      * @return The currently active session.
      */
     protected Session getSession() {
@@ -477,8 +484,7 @@ public class JmsSpout extends BaseRichSpout implements 
MessageListener {
     /**
      * Check if the subscription requires messages to be acked.
      *
-     * @return {@code true} if there is a pending messages state, {@code false}
-     *         otherwise.
+     * @return {@code true} if there is a pending messages state, {@code 
false} otherwise.
      */
     private boolean isDurableSubscription() {
         return (this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE);

http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsBatch.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsBatch.java 
b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsBatch.java
index c990058..5db4677 100644
--- 
a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsBatch.java
+++ 
b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsBatch.java
@@ -1,20 +1,15 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.jms.trident;
 
 /**

http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsState.java 
b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsState.java
index bfb78b5..3611e78 100644
--- 
a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsState.java
+++ 
b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsState.java
@@ -1,35 +1,34 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.jms.trident;
 
+import java.io.Serializable;
+import java.util.List;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
 import org.apache.storm.jms.JmsMessageProducer;
 import org.apache.storm.jms.JmsProvider;
 import org.apache.storm.topology.FailedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.tuple.TridentTuple;
-
-import javax.jms.*;
-import java.io.Serializable;
-import java.lang.IllegalStateException;
-import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class JmsState implements State {
 
@@ -44,35 +43,8 @@ public class JmsState implements State {
         this.options = options;
     }
 
-    public static class Options implements Serializable {
-        private JmsProvider jmsProvider;
-        private JmsMessageProducer msgProducer;
-        private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-        private boolean jmsTransactional = true;
-
-        public Options withJmsProvider(JmsProvider provider) {
-            this.jmsProvider = provider;
-            return this;
-        }
-
-        public Options withMessageProducer(JmsMessageProducer msgProducer) {
-            this.msgProducer = msgProducer;
-            return this;
-        }
-
-        public Options withJmsAcknowledgeMode(int jmsAcknowledgeMode) {
-            this.jmsAcknowledgeMode = jmsAcknowledgeMode;
-            return this;
-        }
-
-        public Options withJmsTransactional(boolean jmsTransactional) {
-            this.jmsTransactional = jmsTransactional;
-            return this;
-        }
-    }
-
     protected void prepare() {
-        if(this.options.jmsProvider == null || this.options.msgProducer == 
null){
+        if (this.options.jmsProvider == null || this.options.msgProducer == 
null) {
             throw new IllegalStateException("JMS Provider and MessageProducer 
not set.");
         }
         LOG.debug("Connecting JMS..");
@@ -81,7 +53,7 @@ public class JmsState implements State {
             Destination dest = this.options.jmsProvider.destination();
             this.connection = cf.createConnection();
             this.session = 
connection.createSession(this.options.jmsTransactional,
-                    this.options.jmsAcknowledgeMode);
+                                                    
this.options.jmsAcknowledgeMode);
             this.messageProducer = session.createProducer(dest);
 
             connection.start();
@@ -97,10 +69,10 @@ public class JmsState implements State {
     @Override
     public void commit(Long aLong) {
         LOG.debug("Committing JMS transaction.");
-        if(this.options.jmsTransactional) {
+        if (this.options.jmsTransactional) {
             try {
                 session.commit();
-            } catch(JMSException e){
+            } catch (JMSException e) {
                 LOG.error("JMS Session commit failed.", e);
             }
         }
@@ -108,7 +80,7 @@ public class JmsState implements State {
 
     public void updateState(List<TridentTuple> tuples, TridentCollector 
collector) throws JMSException {
         try {
-        for(TridentTuple tuple : tuples) {
+            for (TridentTuple tuple : tuples) {
                 Message msg = this.options.msgProducer.toMessage(this.session, 
tuple);
                 if (msg != null) {
                     if (msg.getJMSDestination() != null) {
@@ -120,10 +92,37 @@ public class JmsState implements State {
             }
         } catch (JMSException e) {
             LOG.warn("Failed to send jmd message for a trident batch ", e);
-            if(this.options.jmsTransactional) {
+            if (this.options.jmsTransactional) {
                 session.rollback();
             }
             throw new FailedException("Failed to write tuples", e);
         }
     }
+
+    public static class Options implements Serializable {
+        private JmsProvider jmsProvider;
+        private JmsMessageProducer msgProducer;
+        private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+        private boolean jmsTransactional = true;
+
+        public Options withJmsProvider(JmsProvider provider) {
+            this.jmsProvider = provider;
+            return this;
+        }
+
+        public Options withMessageProducer(JmsMessageProducer msgProducer) {
+            this.msgProducer = msgProducer;
+            return this;
+        }
+
+        public Options withJmsAcknowledgeMode(int jmsAcknowledgeMode) {
+            this.jmsAcknowledgeMode = jmsAcknowledgeMode;
+            return this;
+        }
+
+        public Options withJmsTransactional(boolean jmsTransactional) {
+            this.jmsTransactional = jmsTransactional;
+            return this;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java
 
b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java
index 4123752..b72f88c 100644
--- 
a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java
+++ 
b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java
@@ -1,28 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.jms.trident;
 
+import java.util.Map;
 import org.apache.storm.task.IMetricsContext;
 import org.apache.storm.trident.state.State;
 import org.apache.storm.trident.state.StateFactory;
 
-import java.util.Map;
-
 public class JmsStateFactory implements StateFactory {
 
     private JmsState.Options options;

http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java 
b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java
index a2709a4..92b8a95 100644
--- 
a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java
+++ 
b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java
@@ -1,31 +1,25 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.jms.trident;
 
+import java.util.List;
+import javax.jms.JMSException;
 import org.apache.storm.topology.FailedException;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.state.BaseStateUpdater;
 import org.apache.storm.trident.tuple.TridentTuple;
 
-import javax.jms.JMSException;
-import java.util.List;
-
-public class JmsUpdater extends BaseStateUpdater<JmsState>  {
+public class JmsUpdater extends BaseStateUpdater<JmsState> {
 
     @Override
     public void updateState(JmsState jmsState, List<TridentTuple> tuples, 
TridentCollector collector) {

http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java
 
b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java
index afdc0b2..de7c182 100644
--- 
a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java
+++ 
b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java
@@ -1,27 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
+
 package org.apache.storm.jms.trident;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
-
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
@@ -30,23 +24,21 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Session;
-
+import org.apache.storm.Config;
+import org.apache.storm.generated.StreamInfo;
 import org.apache.storm.jms.JmsProvider;
 import org.apache.storm.jms.JmsTupleProducer;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsGetter;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.spout.ITridentSpout;
 import org.apache.storm.trident.topology.TransactionAttempt;
-import org.apache.storm.Config;
-import org.apache.storm.generated.StreamInfo;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsGetter;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Trident implementation of the JmsSpout
@@ -56,29 +48,52 @@ import org.apache.storm.utils.RotatingMap;
 public class TridentJmsSpout implements ITridentSpout<JmsBatch> {
 
     public static final String MAX_BATCH_SIZE_CONF = 
"topology.spout.max.batch.size";
-    
+
     public static final int DEFAULT_BATCH_SIZE = 1000;
 
     private static final long serialVersionUID = -3469351154693356655L;
-    
+    private static int nameIndex = 1;
     private JmsTupleProducer tupleProducer;
-
     private JmsProvider jmsProvider;
-
     private int jmsAcknowledgeMode;
-
     private String name;
 
-    private static int nameIndex = 1;
-    
     /**
      * Create a TridentJmsSpout with a default name and acknowledge mode of 
AUTO_ACKNOWLEDGE
      */
     public TridentJmsSpout() {
-        this.name = "JmsSpout_"+(nameIndex++);
+        this.name = "JmsSpout_" + (nameIndex++);
         this.jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
     }
-    
+
+    /**
+     * Return a friendly string for the given JMS acknowledge mode, or throw 
an IllegalArgumentException if
+     * the mode is not recognized.
+     * <p/>
+     * Possible values:
+     * <ul>
+     * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
+     * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
+     * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
+     * </ul>
+     * @param acknowledgeMode A valid JMS acknowledge mode
+     * @return A friendly string describing the acknowledge mode
+     * @throws IllegalArgumentException if the mode is not recognized
+     */
+    private static final String toDeliveryModeString(int acknowledgeMode) {
+        switch (acknowledgeMode) {
+            case Session.AUTO_ACKNOWLEDGE:
+                return "AUTO_ACKNOWLEDGE";
+            case Session.CLIENT_ACKNOWLEDGE:
+                return "CLIENT_ACKNOWLEDGE";
+            case Session.DUPS_OK_ACKNOWLEDGE:
+                return "DUPS_OK_ACKNOWLEDGE";
+            default:
+                throw new IllegalArgumentException(
+                    "Unknown JMS Acknowledge mode " + acknowledgeMode + " (See 
javax.jms.Session for valid values)");
+        }
+    }
+
     /**
      * Set the name for this spout, to improve log identification
      * @param name The name to be used in log messages
@@ -88,25 +103,25 @@ public class TridentJmsSpout implements 
ITridentSpout<JmsBatch> {
         this.name = name;
         return this;
     }
-    
+
     /**
      * Set the <code>JmsProvider</code>
-     * implementation that this Spout will use to connect to 
+     * implementation that this Spout will use to connect to
      * a JMS <code>javax.jms.Desination</code>
-     * 
+     *
      * @param provider
      */
-    public TridentJmsSpout withJmsProvider(JmsProvider provider){
+    public TridentJmsSpout withJmsProvider(JmsProvider provider) {
         this.jmsProvider = provider;
         return this;
     }
-    
+
     /**
      * Set the <code>JmsTupleProducer</code>
      * implementation that will convert <code>javax.jms.Message</code>
      * object to <code>backtype.storm.tuple.Values</code> objects
      * to be emitted.
-     * 
+     *
      * @param tupleProducer
      * @return This spout
      */
@@ -114,7 +129,7 @@ public class TridentJmsSpout implements 
ITridentSpout<JmsBatch> {
         this.tupleProducer = tupleProducer;
         return this;
     }
-    
+
     /**
      * Set the JMS acknowledge mode for messages being processed by this spout.
      * <p/>
@@ -133,37 +148,10 @@ public class TridentJmsSpout implements 
ITridentSpout<JmsBatch> {
         this.jmsAcknowledgeMode = jmsAcknowledgeMode;
         return this;
     }
-    
-    /**
-     * Return a friendly string for the given JMS acknowledge mode, or throw 
an IllegalArgumentException if
-     * the mode is not recognized.
-     * <p/>
-     * Possible values:
-     * <ul>
-     * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
-     * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
-     * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
-     * </ul>
-     * @param acknowledgeMode A valid JMS acknowledge mode
-     * @return A friendly string describing the acknowledge mode
-     * @throws IllegalArgumentException if the mode is not recognized
-     */
-    private static final String toDeliveryModeString(int acknowledgeMode) {
-        switch (acknowledgeMode) {
-        case Session.AUTO_ACKNOWLEDGE:
-            return "AUTO_ACKNOWLEDGE";
-        case Session.CLIENT_ACKNOWLEDGE:
-            return "CLIENT_ACKNOWLEDGE";
-        case Session.DUPS_OK_ACKNOWLEDGE:
-            return "DUPS_OK_ACKNOWLEDGE";
-        default:
-            throw new IllegalArgumentException("Unknown JMS Acknowledge mode " 
+ acknowledgeMode + " (See javax.jms.Session for valid values)");
-        }
-    }
-    
+
     @Override
     public ITridentSpout.BatchCoordinator<JmsBatch> getCoordinator(
-            String txStateId, Map<String, Object> conf, TopologyContext 
context) {
+        String txStateId, Map<String, Object> conf, TopologyContext context) {
         return new JmsBatchCoordinator(name);
     }
 
@@ -185,10 +173,10 @@ public class TridentJmsSpout implements 
ITridentSpout<JmsBatch> {
         if (streamInfo == null) {
             throw new IllegalArgumentException("Jms Tuple producer has not 
declared output fields for the default stream");
         }
-        
+
         return new Fields(streamInfo.get_output_fields());
     }
-    
+
     /**
      * The JmsEmitter class listens for incoming messages and stores them in a 
blocking queue. On each invocation of emit,
      * the queued messages are emitted as a batch.
@@ -201,16 +189,15 @@ public class TridentJmsSpout implements 
ITridentSpout<JmsBatch> {
         private final Session session;
 
         private final RotatingMap<Long, List<Message>> batchMessageMap; // 
Maps transaction Ids to JMS message ids.
-        
+
         private final long rotateTimeMillis;
         private final int maxBatchSize;
         private final String name;
-        
-        private long lastRotate;
-       
         private final Logger LOG = LoggerFactory.getLogger(JmsEmitter.class);
- 
-        public JmsEmitter(String name, JmsProvider jmsProvider, 
JmsTupleProducer tupleProducer, int jmsAcknowledgeMode, Map<String, Object> 
conf) {
+        private long lastRotate;
+
+        public JmsEmitter(String name, JmsProvider jmsProvider, 
JmsTupleProducer tupleProducer, int jmsAcknowledgeMode,
+                          Map<String, Object> conf) {
             if (jmsProvider == null) {
                 throw new IllegalStateException("JMS provider has not been 
set.");
             }
@@ -220,11 +207,11 @@ public class TridentJmsSpout implements 
ITridentSpout<JmsBatch> {
 
             this.queue = new LinkedBlockingQueue<Message>();
             this.name = name;
-            
+
             batchMessageMap = new RotatingMap<Long, List<Message>>(3);
-            rotateTimeMillis = 1000L * 
((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
+            rotateTimeMillis = 1000L * ((Number) 
conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
             lastRotate = System.currentTimeMillis();
-            
+
             Number batchSize = (Number) conf.get(MAX_BATCH_SIZE_CONF);
             maxBatchSize = batchSize != null ? batchSize.intValue() : 
DEFAULT_BATCH_SIZE;
 
@@ -237,40 +224,41 @@ public class TridentJmsSpout implements 
ITridentSpout<JmsBatch> {
                 consumer.setMessageListener(this);
                 this.connection.start();
 
-                LOG.info("Created JmsEmitter with max batch size 
"+maxBatchSize+" rotate time "+rotateTimeMillis+"ms and destination "+dest+" 
for "+name);
+                LOG.info(
+                    "Created JmsEmitter with max batch size " + maxBatchSize + 
" rotate time " + rotateTimeMillis + "ms and destination " +
+                    dest + " for " + name);
 
             } catch (Exception e) {
                 LOG.warn("Error creating JMS connection.", e);
                 throw new IllegalStateException("Could not create JMS 
connection for spout ", e);
             }
-            
+
         }
-        
+
         @Override
         public void success(TransactionAttempt tx) {
-            
+
             @SuppressWarnings("unchecked")
             List<Message> messages = (List<Message>) 
batchMessageMap.remove(tx.getTransactionId());
-            
+
             if (messages != null) {
                 if (!messages.isEmpty()) {
-                    LOG.debug("Success for batch with transaction id 
"+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name);
+                    LOG.debug("Success for batch with transaction id " + 
tx.getTransactionId() + "/" + tx.getAttemptId() + " for " + name);
                 }
-                
-                for (Message msg: messages) {
+
+                for (Message msg : messages) {
                     String messageId = "UnknownId";
-                    
+
                     try {
                         messageId = msg.getJMSMessageID();
                         msg.acknowledge();
-                        LOG.trace("Acknowledged message "+messageId);
+                        LOG.trace("Acknowledged message " + messageId);
                     } catch (JMSException e) {
-                        LOG.warn("Failed to acknowledge message "+messageId, 
e);
+                        LOG.warn("Failed to acknowledge message " + messageId, 
e);
                     }
                 }
-            }
-            else {
-                LOG.warn("No messages found in batch with transaction id 
"+tx.getTransactionId()+"/"+tx.getAttemptId());
+            } else {
+                LOG.warn("No messages found in batch with transaction id " + 
tx.getTransactionId() + "/" + tx.getAttemptId());
             }
         }
 
@@ -282,19 +270,18 @@ public class TridentJmsSpout implements 
ITridentSpout<JmsBatch> {
          * @param messages The list of messages to fail.
          */
         private void fail(Long transactionId, List<Message> messages) {
-            LOG.debug("Failure for batch with transaction id "+transactionId+" 
for "+name);
+            LOG.debug("Failure for batch with transaction id " + transactionId 
+ " for " + name);
             if (messages != null) {
-                for (Message msg: messages) {
+                for (Message msg : messages) {
                     try {
-                        LOG.trace("Failed message "+msg.getJMSMessageID());
+                        LOG.trace("Failed message " + msg.getJMSMessageID());
                     } catch (JMSException e) {
                         LOG.warn("Could not identify failed message ", e);
                     }
                 }
+            } else {
+                LOG.warn("Failed batch has no messages with transaction id " + 
transactionId);
             }
-            else {
-                LOG.warn("Failed batch has no messages with transaction id 
"+transactionId);
-            }            
         }
 
         @Override
@@ -305,37 +292,37 @@ public class TridentJmsSpout implements 
ITridentSpout<JmsBatch> {
                 this.connection.close();
             } catch (JMSException e) {
                 LOG.warn("Error closing JMS connection.", e);
-            }   
+            }
         }
 
         @Override
         public void emitBatch(TransactionAttempt tx, JmsBatch coordinatorMeta,
-                TridentCollector collector) {
-            
+                              TridentCollector collector) {
+
             long now = System.currentTimeMillis();
-            if(now - lastRotate > rotateTimeMillis) {
+            if (now - lastRotate > rotateTimeMillis) {
                 Map<Long, List<Message>> failed = batchMessageMap.rotate();
-                for(Long id: failed.keySet()) {
-                    LOG.warn("TIMED OUT batch with transaction id "+id+" for 
"+name);
+                for (Long id : failed.keySet()) {
+                    LOG.warn("TIMED OUT batch with transaction id " + id + " 
for " + name);
                     fail(id, failed.get(id));
                 }
                 lastRotate = now;
             }
-            
-            if(batchMessageMap.containsKey(tx.getTransactionId())) {
-                LOG.warn("FAILED duplicate batch with transaction id 
"+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name);
+
+            if (batchMessageMap.containsKey(tx.getTransactionId())) {
+                LOG.warn("FAILED duplicate batch with transaction id " + 
tx.getTransactionId() + "/" + tx.getAttemptId() + " for " + name);
                 fail(tx.getTransactionId(), 
batchMessageMap.get(tx.getTransactionId()));
             }
-            
+
             List<Message> batchMessages = new ArrayList<Message>();
-            
-            for (int index=0; index<maxBatchSize; index++) {
+
+            for (int index = 0; index < maxBatchSize; index++) {
                 Message msg = queue.poll();
                 if (msg == null) {
                     Utils.sleep(50); // Back off
                     break;
                 }
-                
+
                 try {
                     if (TridentJmsSpout.this.jmsAcknowledgeMode != 
Session.AUTO_ACKNOWLEDGE) {
                         batchMessages.add(msg);
@@ -343,15 +330,17 @@ public class TridentJmsSpout implements 
ITridentSpout<JmsBatch> {
                     Values tuple = tupleProducer.toTuple(msg);
                     collector.emit(tuple);
                 } catch (JMSException e) {
-                    LOG.warn("Failed to emit message, could not retrieve data 
for "+name+": "+e );
+                    LOG.warn("Failed to emit message, could not retrieve data 
for " + name + ": " + e);
                 }
             }
-            
+
             if (!batchMessages.isEmpty()) {
-                LOG.debug("Emitting batch with transaction id 
"+tx.getTransactionId()+"/"+tx.getAttemptId()+" and size 
"+batchMessages.size()+" for "+name);
-            }
-            else {
-                LOG.trace("No items to acknowledge for batch with transaction 
id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name);
+                LOG.debug("Emitting batch with transaction id " + 
tx.getTransactionId() + "/" + tx.getAttemptId() + " and size " +
+                          batchMessages.size() + " for " + name);
+            } else {
+                LOG.trace(
+                    "No items to acknowledge for batch with transaction id " + 
tx.getTransactionId() + "/" + tx.getAttemptId() + " for " +
+                    name);
             }
             batchMessageMap.put(tx.getTransactionId(), batchMessages);
         }
@@ -365,9 +354,9 @@ public class TridentJmsSpout implements 
ITridentSpout<JmsBatch> {
             }
             this.queue.offer(msg);
         }
-        
+
     }
-    
+
     /**
      * Bare implementation of a BatchCoordinator, returning a null JmsBatch 
object
      *
@@ -375,17 +364,17 @@ public class TridentJmsSpout implements 
ITridentSpout<JmsBatch> {
     private class JmsBatchCoordinator implements BatchCoordinator<JmsBatch> {
 
         private final String name;
-        
+
         private final Logger LOG = 
LoggerFactory.getLogger(JmsBatchCoordinator.class);
 
         public JmsBatchCoordinator(String name) {
             this.name = name;
-            LOG.info("Created batch coordinator for "+name);
+            LOG.info("Created batch coordinator for " + name);
         }
-        
+
         @Override
         public JmsBatch initializeTransaction(long txid, JmsBatch 
prevMetadata, JmsBatch curMetadata) {
-            LOG.debug("Initialise transaction "+txid+" for "+name);
+            LOG.debug("Initialise transaction " + txid + " for " + name);
             return null;
         }
 
@@ -401,7 +390,7 @@ public class TridentJmsSpout implements 
ITridentSpout<JmsBatch> {
         @Override
         public void close() {
         }
-        
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java 
b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
index b6406c8..9f967f8 100644
--- 
a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
+++ 
b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
@@ -15,16 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.jms.spout;
 
-import org.apache.storm.Config;
-import org.apache.storm.jms.JmsProvider;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+package org.apache.storm.jms.spout;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+import java.util.Map;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -32,15 +30,17 @@ import javax.jms.Message;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.jms.JmsProvider;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class JmsSpoutTest {
     private static final Logger LOG =
-            LoggerFactory.getLogger(JmsSpoutTest.class);
+        LoggerFactory.getLogger(JmsSpoutTest.class);
 
     @Test
     public void testFailure() throws JMSException, Exception {
@@ -85,9 +85,8 @@ public class JmsSpoutTest {
     }
 
     /**
-     * Make sure that {@link JmsSpout#open} returns correctly regardless of
-     * the type of {@link Number} that is the value of
-     * {@link Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}.
+     * Make sure that {@link JmsSpout#open} returns correctly regardless of 
the type of {@link Number} that is the value of {@link
+     * Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}.
      */
     @Test
     public void testOpenWorksMultipleTypesOfNumberObjects() throws Exception {
@@ -118,8 +117,8 @@ public class JmsSpoutTest {
                                Destination destination) throws JMSException {
 
         Session mySess = connectionFactory.createConnection().createSession(
-                false,
-                Session.CLIENT_ACKNOWLEDGE);
+            false,
+            Session.CLIENT_ACKNOWLEDGE);
         MessageProducer producer = mySess.createProducer(destination);
         TextMessage msg = mySess.createTextMessage();
         msg.setText("Hello World");

http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java
 
b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java
index 3ba0853..ca9733d 100644
--- 
a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java
+++ 
b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.jms.spout;
 
 import javax.jms.ConnectionFactory;
@@ -22,9 +23,7 @@ import javax.jms.Destination;
 import javax.naming.Context;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
-
 import org.apache.storm.jms.JmsProvider;
 
 public class MockJmsProvider implements JmsProvider {
@@ -32,30 +31,34 @@ public class MockJmsProvider implements JmsProvider {
 
     private ConnectionFactory connectionFactory = null;
     private Destination destination = null;
-    
-    public MockJmsProvider() throws NamingException{
-        this.connectionFactory = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); 
+
+    public MockJmsProvider() throws NamingException {
+        this.connectionFactory = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
         Context jndiContext = new InitialContext();
-        this.destination = (Destination) 
jndiContext.lookup("dynamicQueues/FOO.BAR");        
+        this.destination = (Destination) 
jndiContext.lookup("dynamicQueues/FOO.BAR");
 
     }
-    
+
     /**
      * Provides the JMS <code>ConnectionFactory</code>
+     *
      * @return the connection factory
+     *
      * @throws Exception
      */
-    public ConnectionFactory connectionFactory() throws Exception{
+    public ConnectionFactory connectionFactory() throws Exception {
         return this.connectionFactory;
     }
 
     /**
      * Provides the <code>Destination</code> (topic or queue) from which the
      * <code>JmsSpout</code> will receive messages.
+     *
      * @return
+     *
      * @throws Exception
      */
-    public Destination destination() throws Exception{
+    public Destination destination() throws Exception {
         return this.destination;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
 
b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
index 4e05646..60710bc 100644
--- 
a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
+++ 
b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
@@ -15,11 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.jms.spout;
 
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.storm.spout.ISpoutOutputCollector;
 
 public class MockSpoutOutputCollector implements ISpoutOutputCollector {
@@ -45,11 +45,11 @@ public class MockSpoutOutputCollector implements 
ISpoutOutputCollector {
     public void reportError(Throwable error) {
     }
 
-    public boolean emitted(){
+    public boolean emitted() {
         return this.emitted;
     }
 
-    public void reset(){
+    public void reset() {
         this.emitted = false;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java
----------------------------------------------------------------------
diff --git 
a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java
 
b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java
index ea571fc..70e04ff 100644
--- 
a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java
+++ 
b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java
@@ -15,12 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.jms.spout;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.TextMessage;
-
 import org.apache.storm.jms.JmsTupleProducer;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Fields;

Reply via email to