http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/JmsTupleProducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/storm/jms/JmsTupleProducer.java 
b/src/main/java/org/apache/storm/jms/JmsTupleProducer.java
new file mode 100644
index 0000000..96b027d
--- /dev/null
+++ b/src/main/java/org/apache/storm/jms/JmsTupleProducer.java
@@ -0,0 +1,41 @@
+package org.apache.storm.jms;
+
+import java.io.Serializable;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Values;
+
+/**
+ * Interface to define classes that can produce a Storm <code>Values</code> 
objects
+ * from a <code>javax.jms.Message</code> object>.
+ * <p/>
+ * Implementations are also responsible for declaring the output
+ * fields they produce.
+ * <p/>
+ * If for some reason the implementation can't process a message
+ * (for example if it received a <code>javax.jms.ObjectMessage</code>
+ * when it was expecting a <code>javax.jms.TextMessage</code> it should
+ * return <code>null</code> to indicate to the <code>JmsSpout</code> that
+ * the message could not be processed.
+ * 
+ * @author P. Taylor Goetz
+ *
+ */
+public interface JmsTupleProducer extends Serializable{
+       /**
+        * Process a JMS message object to create a Values object.
+        * @param msg - the JMS message
+        * @return the Values tuple, or null if the message couldn't be 
processed.
+        * @throws JMSException
+        */
+       Values toTuple(Message msg) throws JMSException;
+       
+       /**
+        * Declare the output fields produced by this JmsTupleProducer.
+        * @param declarer The OuputFieldsDeclarer for the spout.
+        */
+       void declareOutputFields(OutputFieldsDeclarer declarer);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java 
b/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
new file mode 100644
index 0000000..896f932
--- /dev/null
+++ b/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java
@@ -0,0 +1,201 @@
+package org.apache.storm.jms.bolt;
+
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import backtype.storm.topology.base.BaseRichBolt;
+import org.apache.storm.jms.JmsMessageProducer;
+import org.apache.storm.jms.JmsProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+
+/**
+ * A JmsBolt receives <code>backtype.storm.tuple.Tuple</code> objects from a 
Storm
+ * topology and publishes JMS Messages to a destination (topic or queue).
+ * <p/>
+ * To use a JmsBolt in a topology, the following must be supplied:
+ * <ol>
+ * <li>A <code>JmsProvider</code> implementation.</li>
+ * <li>A <code>JmsMessageProducer</code> implementation.</li>
+ * </ol>
+ * The <code>JmsProvider</code> provides the JMS 
<code>javax.jms.ConnectionFactory</code>
+ * and <code>javax.jms.Destination</code> objects requied to publish JMS 
messages.
+ * <p/>
+ * The JmsBolt uses a <code>JmsMessageProducer</code> to translate 
+ * <code>backtype.storm.tuple.Tuple</code> objects into
+ * <code>javax.jms.Message</code> objects for publishing.
+ * <p/>
+ * Both JmsProvider and JmsMessageProducer must be set, or the bolt will
+ * fail upon deployment to a cluster.
+ * <p/>
+ * The JmsBolt is typically an endpoint in a topology -- in other words
+ * it does not emit any tuples.
+ * 
+ * 
+ * @author P. Taylor Goetz
+ *
+ */
+public class JmsBolt extends BaseRichBolt {
+       private static Logger LOG = LoggerFactory.getLogger(JmsBolt.class);
+       
+       private boolean autoAck = true;
+       
+       // javax.jms objects
+       private Connection connection;
+       private Session session;
+       private MessageProducer messageProducer;
+       
+       // JMS options
+       private boolean jmsTransactional = false;
+       private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+       
+       
+       private JmsProvider jmsProvider;
+       private JmsMessageProducer producer;
+       
+       
+       private OutputCollector collector;
+       
+       /**
+        * Set the JmsProvider used to connect to the JMS destination 
topic/queue
+        * @param provider
+        */
+       public void setJmsProvider(JmsProvider provider){
+               this.jmsProvider = provider;
+       }
+       
+       /**
+        * Set the JmsMessageProducer used to convert tuples
+        * into JMS messages.
+        * 
+        * @param producer
+        */
+       public void setJmsMessageProducer(JmsMessageProducer producer){
+               this.producer = producer;
+       }
+       
+       /**
+        * Sets the JMS acknowledgement mode for JMS messages sent
+        * by this bolt.
+        * <p/>
+        * Possible values:
+        * <ul>
+        * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
+        * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
+        * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
+        * </ul>
+        * @param acknowledgeMode (constant defined in javax.jms.Session)
+        */
+       public void setJmsAcknowledgeMode(int acknowledgeMode){
+               this.jmsAcknowledgeMode = acknowledgeMode;
+       }
+       
+       /**
+        * Set the JMS transactional setting for the JMS session.
+        * 
+        * @param transactional
+        */
+//     public void setJmsTransactional(boolean transactional){
+//             this.jmsTransactional = transactional;
+//     }
+       
+       /**
+        * Sets whether or not tuples should be acknowledged by this
+        * bolt.
+        * <p/>
+        * @param autoAck
+        */
+       public void setAutoAck(boolean autoAck){
+               this.autoAck = autoAck;
+       }
+
+
+       /**
+        * Consumes a tuple and sends a JMS message.
+        * <p/>
+        * If autoAck is true, the tuple will be acknowledged
+        * after the message is sent.
+        * <p/>
+        * If JMS sending fails, the tuple will be failed.
+        */
+       @Override
+       public void execute(Tuple input) {
+               // write the tuple to a JMS destination...
+               LOG.debug("Tuple received. Sending JMS message.");
+               
+               try {
+                       Message msg = this.producer.toMessage(this.session, 
input);
+                       if(msg != null){
+                               if (msg.getJMSDestination() != null) {
+                                       
this.messageProducer.send(msg.getJMSDestination(), msg);
+                               } else {
+                                       this.messageProducer.send(msg);
+                               }
+                       }
+                       if(this.autoAck){
+                               LOG.debug("ACKing tuple: " + input);
+                               this.collector.ack(input);
+                       }               
+               } catch (JMSException e) {
+                       // failed to send the JMS message, fail the tuple fast
+                       LOG.warn("Failing tuple: " + input);
+                       LOG.warn("Exception: ", e);
+                       this.collector.fail(input);
+               }
+       }
+
+       /**
+        * Releases JMS resources.
+        */
+       @Override
+       public void cleanup() {
+               try {
+                       LOG.debug("Closing JMS connection.");
+                       this.session.close();
+                       this.connection.close();
+               } catch (JMSException e) {
+                       LOG.warn("Error closing JMS connection.", e);
+               }
+       }
+
+       @Override
+       public void declareOutputFields(OutputFieldsDeclarer declarer) {
+       }
+
+    /**
+        * Initializes JMS resources.
+        */
+       @Override
+       public void prepare(Map stormConf, TopologyContext context,
+                       OutputCollector collector) {
+               if(this.jmsProvider == null || this.producer == null){
+                       throw new IllegalStateException("JMS Provider and 
MessageProducer not set.");
+               }
+               this.collector = collector;
+               LOG.debug("Connecting JMS..");
+               try {
+                       ConnectionFactory cf = 
this.jmsProvider.connectionFactory();
+                       Destination dest = this.jmsProvider.destination();
+                       this.connection = cf.createConnection();
+                       this.session = 
connection.createSession(this.jmsTransactional,
+                                       this.jmsAcknowledgeMode);
+                       this.messageProducer = session.createProducer(dest);
+                       
+                       connection.start();
+               } catch (Exception e) {
+                       LOG.warn("Error creating JMS connection.", e);
+               }       
+       }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java 
b/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java
new file mode 100644
index 0000000..225e525
--- /dev/null
+++ b/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java
@@ -0,0 +1,53 @@
+package org.apache.storm.jms.spout;
+
+import java.io.Serializable;
+
+/**
+ * Created by tgoetz on 7/14/14.
+ */
+public class JmsMessageID implements Comparable<JmsMessageID>, Serializable {
+
+    private String jmsID;
+
+    private Long sequence;
+
+//    private Message message;
+
+    public JmsMessageID(long sequence, String jmsID){
+        this.jmsID = jmsID;
+        this.sequence = sequence;
+    }
+
+//    public void setMessage(Message message){
+//        this.message = message;
+//    }
+//
+//    public Message getMessage(){
+//        return this.message;
+//    }
+
+    public String getJmsID(){
+        return this.jmsID;
+    }
+
+    @Override
+    public int compareTo(JmsMessageID jmsMessageID) {
+        return (int)(this.sequence - jmsMessageID.sequence);
+    }
+
+    @Override
+    public int hashCode() {
+        return this.sequence.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if(o instanceof JmsMessageID){
+            JmsMessageID id = (JmsMessageID)o;
+            return this.jmsID.equals(id.jmsID);
+        } else {
+            return false;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/storm/jms/spout/JmsSpout.java 
b/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
new file mode 100644
index 0000000..03ab77b
--- /dev/null
+++ b/src/main/java/org/apache/storm/jms/spout/JmsSpout.java
@@ -0,0 +1,366 @@
+package org.apache.storm.jms.spout;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import backtype.storm.topology.base.BaseRichSpout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.storm.jms.JmsProvider;
+import org.apache.storm.jms.JmsTupleProducer;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+/**
+ * A Storm <code>Spout</code> implementation that listens to a JMS topic or 
queue
+ * and outputs tuples based on the messages it receives.
+ * <p/>
+ * <code>JmsSpout</code> instances rely on <code>JmsProducer</code> 
implementations 
+ * to obtain the JMS <code>ConnectionFactory</code> and 
<code>Destination</code> objects
+ * necessary to connect to a JMS topic/queue.
+ * <p/>
+ * When a <code>JmsSpout</code> receives a JMS message, it delegates to an 
+ * internal <code>JmsTupleProducer</code> instance to create a Storm tuple 
from the 
+ * incoming message.
+ * <p/>
+ * Typically, developers will supply a custom <code>JmsTupleProducer</code> 
implementation
+ * appropriate for the expected message content.
+ * 
+ * @author P. Taylor Goetz
+ *
+ */
+@SuppressWarnings("serial")
+public class JmsSpout extends BaseRichSpout implements MessageListener {
+       private static final Logger LOG = 
LoggerFactory.getLogger(JmsSpout.class);
+
+       // JMS options
+       private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+       
+       private boolean distributed = true;
+
+       private JmsTupleProducer tupleProducer;
+
+       private JmsProvider jmsProvider;
+
+       private LinkedBlockingQueue<Message> queue;
+       private TreeSet<JmsMessageID> toCommit;
+    private HashMap<JmsMessageID, Message> pendingMessages;
+    private long messageSequence = 0;
+
+       private SpoutOutputCollector collector;
+
+       private transient Connection connection;
+       private transient Session session;
+       
+       private boolean hasFailures = false;
+       public final Serializable recoveryMutex = "RECOVERY_MUTEX";
+       private Timer recoveryTimer = null;
+       private long recoveryPeriod = -1; // default to disabled
+       
+       /**
+        * Sets the JMS Session acknowledgement mode for the JMS seesion 
associated with this spout.
+        * <p/>
+        * Possible values:
+        * <ul>
+        * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
+        * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
+        * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
+        * </ul>
+        * @param mode JMS Session Acknowledgement mode
+        * @throws IllegalArgumentException if the mode is not recognized.
+        */
+       public void setJmsAcknowledgeMode(int mode){
+               switch (mode) {
+               case Session.AUTO_ACKNOWLEDGE:
+               case Session.CLIENT_ACKNOWLEDGE:
+               case Session.DUPS_OK_ACKNOWLEDGE:
+                       break;
+               default:
+                       throw new IllegalArgumentException("Unknown Acknowledge 
mode: " + mode + " (See javax.jms.Session for valid values)");
+
+               }
+               this.jmsAcknowledgeMode = mode;
+       }
+       
+       /**
+        * Returns the JMS Session acknowledgement mode for the JMS seesion 
associated with this spout.
+        * @return
+        */
+       public int getJmsAcknowledgeMode(){
+               return this.jmsAcknowledgeMode;
+       }
+       
+       /**
+        * Set the <code>JmsProvider</code>
+        * implementation that this Spout will use to connect to 
+        * a JMS <code>javax.jms.Desination</code>
+        * 
+        * @param provider
+        */
+       public void setJmsProvider(JmsProvider provider){
+               this.jmsProvider = provider;
+       }
+       /**
+        * Set the <code>JmsTupleProducer</code>
+        * implementation that will convert <code>javax.jms.Message</code>
+        * object to <code>backtype.storm.tuple.Values</code> objects
+        * to be emitted.
+        * 
+        * @param producer
+        */
+       public void setJmsTupleProducer(JmsTupleProducer producer){
+               this.tupleProducer = producer;
+       }
+
+       /**
+        * <code>javax.jms.MessageListener</code> implementation.
+        * <p/>
+        * Stored the JMS message in an internal queue for processing
+        * by the <code>nextTuple()</code> method.
+        */
+       public void onMessage(Message msg) {
+               try {
+                    LOG.debug("Queuing msg [" + msg.getJMSMessageID() + "]");
+                } catch (JMSException e) {
+                }
+               this.queue.offer(msg);
+       }
+
+       /**
+        * <code>ISpout</code> implementation.
+        * <p/>
+        * Connects the JMS spout to the configured JMS destination
+        * topic/queue.
+        * 
+        */
+       @SuppressWarnings("rawtypes")
+       public void open(Map conf, TopologyContext context,
+                       SpoutOutputCollector collector) {
+               if(this.jmsProvider == null){
+                       throw new IllegalStateException("JMS provider has not 
been set.");
+               }
+               if(this.tupleProducer == null){
+                       throw new IllegalStateException("JMS Tuple Producer has 
not been set.");
+               }
+               Integer topologyTimeout = 
(Integer)conf.get("topology.message.timeout.secs");
+               // TODO fine a way to get the default timeout from storm, so 
we're not hard-coding to 30 seconds (it could change)
+               topologyTimeout = topologyTimeout == null ? 30 : 
topologyTimeout;
+               if( (topologyTimeout.intValue() * 1000 )> this.recoveryPeriod){
+                   LOG.warn("*** WARNING *** : " +
+                               "Recovery period ("+ this.recoveryPeriod + " 
ms.) is less then the configured " +
+                               "'topology.message.timeout.secs' of " + 
topologyTimeout + 
+                               " secs. This could lead to a message replay 
flood!");
+               }
+               this.queue = new LinkedBlockingQueue<Message>();
+               this.toCommit = new TreeSet<JmsMessageID>();
+        this.pendingMessages = new HashMap<JmsMessageID, Message>();
+               this.collector = collector;
+               try {
+                       ConnectionFactory cf = 
this.jmsProvider.connectionFactory();
+                       Destination dest = this.jmsProvider.destination();
+                       this.connection = cf.createConnection();
+                       this.session = connection.createSession(false,
+                                       this.jmsAcknowledgeMode);
+                       MessageConsumer consumer = session.createConsumer(dest);
+                       consumer.setMessageListener(this);
+                       this.connection.start();
+                       if (this.isDurableSubscription() && this.recoveryPeriod 
> 0){
+                           this.recoveryTimer = new Timer();
+                           this.recoveryTimer.scheduleAtFixedRate(new 
RecoveryTask(), 10, this.recoveryPeriod);
+                       }
+                       
+               } catch (Exception e) {
+                       LOG.warn("Error creating JMS connection.", e);
+               }
+
+       }
+
+       public void close() {
+               try {
+                       LOG.debug("Closing JMS connection.");
+                       this.session.close();
+                       this.connection.close();
+               } catch (JMSException e) {
+                       LOG.warn("Error closing JMS connection.", e);
+               }
+
+       }
+
+       public void nextTuple() {
+               Message msg = this.queue.poll();
+               if (msg == null) {
+                       Utils.sleep(50);
+               } else {
+
+                       LOG.debug("sending tuple: " + msg);
+                       // get the tuple from the handler
+                       try {
+                               Values vals = this.tupleProducer.toTuple(msg);
+                               // ack if we're not in AUTO_ACKNOWLEDGE mode, 
or the message requests ACKNOWLEDGE
+                               LOG.debug("Requested deliveryMode: " + 
toDeliveryModeString(msg.getJMSDeliveryMode()));
+                               LOG.debug("Our deliveryMode: " + 
toDeliveryModeString(this.jmsAcknowledgeMode));
+                               if (this.isDurableSubscription()) {
+                                       LOG.debug("Requesting acks.");
+                    JmsMessageID messageId = new 
JmsMessageID(this.messageSequence++, msg.getJMSMessageID());
+                                       this.collector.emit(vals, messageId);
+
+                                       // at this point we successfully 
emitted. Store
+                                       // the message and message ID so we can 
do a
+                                       // JMS acknowledge later
+                                       this.pendingMessages.put(messageId, 
msg);
+                    this.toCommit.add(messageId);
+                               } else {
+                                       this.collector.emit(vals);
+                               }
+                       } catch (JMSException e) {
+                               LOG.warn("Unable to convert JMS message: " + 
msg);
+                       }
+
+               }
+
+       }
+
+       /*
+        * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
+        */
+       public void ack(Object msgId) {
+
+               Message msg = this.pendingMessages.remove(msgId);
+        JmsMessageID oldest = this.toCommit.first();
+        if(msgId.equals(oldest)) {
+            if (msg != null) {
+                try {
+                    LOG.debug("Committing...");
+                    msg.acknowledge();
+                    LOG.debug("JMS Message acked: " + msgId);
+                    this.toCommit.remove(msgId);
+                } catch (JMSException e) {
+                    LOG.warn("Error acknowldging JMS message: " + msgId, e);
+                }
+            } else {
+                LOG.warn("Couldn't acknowledge unknown JMS message ID: " + 
msgId);
+            }
+        } else {
+            this.toCommit.remove(msgId);
+        }
+
+       }
+
+       /*
+        * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
+        */
+       public void fail(Object msgId) {
+               LOG.warn("Message failed: " + msgId);
+        this.pendingMessages.clear();
+        this.toCommit.clear();
+               synchronized(this.recoveryMutex){
+                   this.hasFailures = true;
+               }
+       }
+
+       public void declareOutputFields(OutputFieldsDeclarer declarer) {
+               this.tupleProducer.declareOutputFields(declarer);
+
+       }
+
+       /**
+         * Returns <code>true</code> if the spout has received failures 
+         * from which it has not yet recovered.
+         */
+       public boolean hasFailures(){
+               return this.hasFailures;
+       }
+       
+       protected void recovered(){
+               this.hasFailures = false;
+       }
+       
+       /**
+        * Sets the periodicity of the timer task that 
+        * checks for failures and recovers the JMS session.
+        * 
+        * @param period
+        */
+       public void setRecoveryPeriod(long period){
+           this.recoveryPeriod = period;
+       }
+       
+       public boolean isDistributed() {
+               return this.distributed;
+       }
+       
+       /**
+        * Sets the "distributed" mode of this spout.
+        * <p/>
+        * If <code>true</code> multiple instances of this spout <i>may</i> be
+        * created across the cluster (depending on the "parallelism_hint" in 
the topology configuration).
+        * <p/>
+        * Setting this value to <code>false</code> essentially means this 
spout will run as a singleton 
+        * within the cluster ("parallelism_hint" will be ignored).
+        * <p/>
+        * In general, this should be set to <code>false</code> if the 
underlying JMS destination is a 
+        * topic, and <code>true</code> if it is a JMS queue.
+        * 
+        * @param distributed
+        */
+       public void setDistributed(boolean distributed){
+               this.distributed = distributed;
+       }
+
+
+       private static final String toDeliveryModeString(int deliveryMode) {
+               switch (deliveryMode) {
+               case Session.AUTO_ACKNOWLEDGE:
+                       return "AUTO_ACKNOWLEDGE";
+               case Session.CLIENT_ACKNOWLEDGE:
+                       return "CLIENT_ACKNOWLEDGE";
+               case Session.DUPS_OK_ACKNOWLEDGE:
+                       return "DUPS_OK_ACKNOWLEDGE";
+               default:
+                       return "UNKNOWN";
+
+               }
+       }
+       
+       protected Session getSession(){
+               return this.session;
+       }
+       
+       private boolean isDurableSubscription(){
+           return (this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE);
+       }
+       
+       
+       private class RecoveryTask extends TimerTask {
+           private final Logger LOG = 
LoggerFactory.getLogger(RecoveryTask.class);
+
+           public void run() {
+               synchronized (JmsSpout.this.recoveryMutex) {
+                   if (JmsSpout.this.hasFailures()) {
+                       try {
+                           LOG.info("Recovering from a message failure.");
+                           JmsSpout.this.getSession().recover();
+                           JmsSpout.this.recovered();
+                       } catch (JMSException e) {
+                           LOG.warn("Could not recover jms session.", e);
+                       }
+                   }
+               }
+           }
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/trident/JmsBatch.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/storm/jms/trident/JmsBatch.java 
b/src/main/java/org/apache/storm/jms/trident/JmsBatch.java
new file mode 100644
index 0000000..3a7981f
--- /dev/null
+++ b/src/main/java/org/apache/storm/jms/trident/JmsBatch.java
@@ -0,0 +1,10 @@
+package org.apache.storm.jms.trident;
+
+/**
+ * Batch coordination metadata object for the TridentJmsSpout.
+ * This implementation does not use batch metadata, so the object is empty.
+ *
+ */
+public class JmsBatch {
+    // Empty class
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/trident/JmsState.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/storm/jms/trident/JmsState.java 
b/src/main/java/org/apache/storm/jms/trident/JmsState.java
new file mode 100644
index 0000000..0694098
--- /dev/null
+++ b/src/main/java/org/apache/storm/jms/trident/JmsState.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.trident;
+
+import org.apache.storm.jms.JmsProvider;
+import backtype.storm.topology.FailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+
+import javax.jms.*;
+import java.io.Serializable;
+import java.lang.IllegalStateException;
+import java.util.List;
+
+public class JmsState implements State {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JmsState.class);
+
+    private Options options;
+    private Connection connection;
+    private Session session;
+    private MessageProducer messageProducer;
+
+    protected JmsState(Options options) {
+        this.options = options;
+    }
+
+    public static class Options implements Serializable {
+        private JmsProvider jmsProvider;
+        private TridentJmsMessageProducer msgProducer;
+        private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+        private boolean jmsTransactional = true;
+
+        public Options withJmsProvider(JmsProvider provider) {
+            this.jmsProvider = provider;
+            return this;
+        }
+
+        public Options withMessageProducer(TridentJmsMessageProducer 
msgProducer) {
+            this.msgProducer = msgProducer;
+            return this;
+        }
+
+        public Options withJmsAcknowledgeMode(int jmsAcknowledgeMode) {
+            this.jmsAcknowledgeMode = jmsAcknowledgeMode;
+            return this;
+        }
+
+        public Options withJmsTransactional(boolean jmsTransactional) {
+            this.jmsTransactional = jmsTransactional;
+            return this;
+        }
+    }
+
+    protected void prepare() {
+        if(this.options.jmsProvider == null || this.options.msgProducer == 
null){
+            throw new IllegalStateException("JMS Provider and MessageProducer 
not set.");
+        }
+        LOG.debug("Connecting JMS..");
+        try {
+            ConnectionFactory cf = 
this.options.jmsProvider.connectionFactory();
+            Destination dest = this.options.jmsProvider.destination();
+            this.connection = cf.createConnection();
+            this.session = 
connection.createSession(this.options.jmsTransactional,
+                    this.options.jmsAcknowledgeMode);
+            this.messageProducer = session.createProducer(dest);
+
+            connection.start();
+        } catch (Exception e) {
+            LOG.warn("Error creating JMS connection.", e);
+        }
+    }
+
+    @Override
+    public void beginCommit(Long aLong) {
+    }
+
+    @Override
+    public void commit(Long aLong) {
+        LOG.debug("Committing JMS transaction.");
+        if(this.options.jmsTransactional) {
+            try {
+                session.commit();
+            } catch(JMSException e){
+                LOG.error("JMS Session commit failed.", e);
+            }
+        }
+    }
+
+    public void updateState(List<TridentTuple> tuples, TridentCollector 
collector) throws JMSException {
+        try {
+        for(TridentTuple tuple : tuples) {
+                Message msg = this.options.msgProducer.toMessage(this.session, 
tuple);
+                if (msg != null) {
+                    if (msg.getJMSDestination() != null) {
+                        this.messageProducer.send(msg.getJMSDestination(), 
msg);
+                    } else {
+                        this.messageProducer.send(msg);
+                    }
+                }
+            }
+        } catch (JMSException e) {
+            LOG.warn("Failed to send jmd message for a trident batch ", e);
+            if(this.options.jmsTransactional) {
+                session.rollback();
+            }
+            throw new FailedException("Failed to write tuples", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java 
b/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java
new file mode 100644
index 0000000..7fe4d85
--- /dev/null
+++ b/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.trident;
+
+import backtype.storm.task.IMetricsContext;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+
+import java.util.Map;
+
+public class JmsStateFactory implements StateFactory {
+
+    private JmsState.Options options;
+
+    public JmsStateFactory(JmsState.Options options) {
+        this.options = options;
+    }
+
+    @Override
+    public State makeState(Map map, IMetricsContext iMetricsContext, int 
partitionIndex, int numPartitions) {
+        JmsState state = new JmsState(options);
+        state.prepare();
+        return state;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java 
b/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java
new file mode 100644
index 0000000..3269521
--- /dev/null
+++ b/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.trident;
+
+import backtype.storm.topology.FailedException;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.tuple.TridentTuple;
+
+import javax.jms.JMSException;
+import java.util.List;
+
+public class JmsUpdater extends BaseStateUpdater<JmsState>  {
+
+    @Override
+    public void updateState(JmsState jmsState, List<TridentTuple> tuples, 
TridentCollector collector) {
+        try {
+            jmsState.updateState(tuples, collector);
+        } catch (JMSException e) {
+            throw new FailedException("failed JMS opetation", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/trident/TridentJmsMessageProducer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/storm/jms/trident/TridentJmsMessageProducer.java 
b/src/main/java/org/apache/storm/jms/trident/TridentJmsMessageProducer.java
new file mode 100644
index 0000000..8312947
--- /dev/null
+++ b/src/main/java/org/apache/storm/jms/trident/TridentJmsMessageProducer.java
@@ -0,0 +1,23 @@
+package org.apache.storm.jms.trident;
+
+import backtype.storm.tuple.Tuple;
+import storm.trident.tuple.TridentTuple;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import java.io.Serializable;
+
+public interface TridentJmsMessageProducer extends Serializable{
+
+       /**
+        * Translate a <code>backtype.storm.tuple.TridentTuple</code> object
+        * to a <code>javax.jms.Message</code object.
+        *
+        * @param session
+        * @param input
+        * @return
+        * @throws javax.jms.JMSException
+        */
+       public Message toMessage(Session session, TridentTuple input) throws 
JMSException;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java 
b/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java
new file mode 100644
index 0000000..6b80919
--- /dev/null
+++ b/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java
@@ -0,0 +1,393 @@
+package org.apache.storm.jms.trident;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.storm.jms.JmsProvider;
+import org.apache.storm.jms.JmsTupleProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import storm.trident.operation.TridentCollector;
+import storm.trident.spout.ITridentSpout;
+import storm.trident.topology.TransactionAttempt;
+import backtype.storm.Config;
+import backtype.storm.generated.StreamInfo;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsGetter;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.RotatingMap;
+import backtype.storm.utils.Utils;
+
+/**
+ * Trident implementation of the JmsSpout, based on code provided by P. Taylor 
Goetz - https://github.com/ptgoetz
+ * <p>
+ * @author Andy Toone for Metabroadcast
+ *
+ */
+public class TridentJmsSpout implements ITridentSpout<JmsBatch> {
+
+    public static final String MAX_BATCH_SIZE_CONF = 
"topology.spout.max.batch.size";
+    
+    public static final int DEFAULT_BATCH_SIZE = 1000;
+
+    private static final long serialVersionUID = -3469351154693356655L;
+    
+    private JmsTupleProducer tupleProducer;
+
+    private JmsProvider jmsProvider;
+
+    private int jmsAcknowledgeMode;
+
+    private String name;
+
+    private static int nameIndex = 1;
+    
+    /**
+     * Create a TridentJmsSpout with a default name and acknowledge mode of 
AUTO_ACKNOWLEDGE
+     */
+    public TridentJmsSpout() {
+        this.name = "JmsSpout_"+(nameIndex++);
+        this.jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+    }
+    
+    /**
+     * Set the name for this spout, to improve log identification
+     * @param name The name to be used in log messages
+     * @return This spout
+     */
+    public TridentJmsSpout named(String name) {
+        this.name = name;
+        return this;
+    }
+    
+    /**
+     * Set the <code>JmsProvider</code>
+     * implementation that this Spout will use to connect to 
+     * a JMS <code>javax.jms.Desination</code>
+     * 
+     * @param provider
+     */
+    public TridentJmsSpout withJmsProvider(JmsProvider provider){
+        this.jmsProvider = provider;
+        return this;
+    }
+    
+    /**
+     * Set the <code>JmsTupleProducer</code>
+     * implementation that will convert <code>javax.jms.Message</code>
+     * object to <code>backtype.storm.tuple.Values</code> objects
+     * to be emitted.
+     * 
+     * @param tupleProducer
+     * @return This spout
+     */
+    public TridentJmsSpout withTupleProducer(JmsTupleProducer tupleProducer) {
+        this.tupleProducer = tupleProducer;
+        return this;
+    }
+    
+    /**
+     * Set the JMS acknowledge mode for messages being processed by this spout.
+     * <p/>
+     * Possible values:
+     * <ul>
+     * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
+     * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
+     * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
+     * </ul>
+     * @param jmsAcknowledgeMode The chosen acknowledge mode
+     * @return This spout
+     * @throws IllegalArgumentException if the mode is not recognized
+     */
+    public TridentJmsSpout withJmsAcknowledgeMode(int jmsAcknowledgeMode) {
+        toDeliveryModeString(jmsAcknowledgeMode);
+        this.jmsAcknowledgeMode = jmsAcknowledgeMode;
+        return this;
+    }
+    
+    /**
+     * Return a friendly string for the given JMS acknowledge mode, or throw 
an IllegalArgumentException if
+     * the mode is not recognized.
+     * <p/>
+     * Possible values:
+     * <ul>
+     * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
+     * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
+     * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
+     * </ul>
+     * @param acknowledgeMode A valid JMS acknowledge mode
+     * @return A friendly string describing the acknowledge mode
+     * @throws IllegalArgumentException if the mode is not recognized
+     */
+    private static final String toDeliveryModeString(int acknowledgeMode) {
+        switch (acknowledgeMode) {
+        case Session.AUTO_ACKNOWLEDGE:
+            return "AUTO_ACKNOWLEDGE";
+        case Session.CLIENT_ACKNOWLEDGE:
+            return "CLIENT_ACKNOWLEDGE";
+        case Session.DUPS_OK_ACKNOWLEDGE:
+            return "DUPS_OK_ACKNOWLEDGE";
+        default:
+            throw new IllegalArgumentException("Unknown JMS Acknowledge mode " 
+ acknowledgeMode + " (See javax.jms.Session for valid values)");
+        }
+    }
+    
+    @Override
+    public storm.trident.spout.ITridentSpout.BatchCoordinator<JmsBatch> 
getCoordinator(
+            String txStateId, @SuppressWarnings("rawtypes") Map conf, 
TopologyContext context) {
+        return new JmsBatchCoordinator(name);
+    }
+
+    @Override
+    public Emitter<JmsBatch> getEmitter(String txStateId, 
@SuppressWarnings("rawtypes") Map conf, TopologyContext context) {
+        return new JmsEmitter(name, jmsProvider, tupleProducer, 
jmsAcknowledgeMode, conf);
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        OutputFieldsGetter fieldGetter = new OutputFieldsGetter();
+        tupleProducer.declareOutputFields(fieldGetter);
+        StreamInfo streamInfo = 
fieldGetter.getFieldsDeclaration().get(Utils.DEFAULT_STREAM_ID);
+        if (streamInfo == null) {
+            throw new IllegalArgumentException("Jms Tuple producer has not 
declared output fields for the default stream");
+        }
+        
+        return new Fields(streamInfo.get_output_fields());
+    }
+    
+    /**
+     * The JmsEmitter class listens for incoming messages and stores them in a 
blocking queue. On each invocation of emit,
+     * the queued messages are emitted as a batch.
+     *
+     */
+    private class JmsEmitter implements Emitter<JmsBatch>, MessageListener {
+
+        private final LinkedBlockingQueue<Message> queue;
+        private final Connection connection;
+        private final Session session;
+
+        private final RotatingMap<Long, List<Message>> batchMessageMap; // 
Maps transaction Ids to JMS message ids.
+        
+        private final long rotateTimeMillis;
+        private final int maxBatchSize;
+        private final String name;
+        
+        private long lastRotate;
+       
+        private final Logger LOG = LoggerFactory.getLogger(JmsEmitter.class);
+ 
+        public JmsEmitter(String name, JmsProvider jmsProvider, 
JmsTupleProducer tupleProducer, int jmsAcknowledgeMode, 
@SuppressWarnings("rawtypes") Map conf) {
+            if (jmsProvider == null) {
+                throw new IllegalStateException("JMS provider has not been 
set.");
+            }
+            if (tupleProducer == null) {
+                throw new IllegalStateException("JMS Tuple Producer has not 
been set.");
+            }
+
+            this.queue = new LinkedBlockingQueue<Message>();
+            this.name = name;
+            
+            batchMessageMap = new RotatingMap<Long, List<Message>>(3);
+            rotateTimeMillis = 1000L * 
((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
+            lastRotate = System.currentTimeMillis();
+            
+            Number batchSize = (Number) conf.get(MAX_BATCH_SIZE_CONF);
+            maxBatchSize = batchSize != null ? batchSize.intValue() : 
DEFAULT_BATCH_SIZE;
+
+            try {
+                ConnectionFactory cf = jmsProvider.connectionFactory();
+                Destination dest = jmsProvider.destination();
+                this.connection = cf.createConnection();
+                this.session = connection.createSession(false, 
jmsAcknowledgeMode);
+                MessageConsumer consumer = session.createConsumer(dest);
+                consumer.setMessageListener(this);
+                this.connection.start();
+
+                LOG.info("Created JmsEmitter with max batch size 
"+maxBatchSize+" rotate time "+rotateTimeMillis+"ms and destination "+dest+" 
for "+name);
+
+            } catch (Exception e) {
+                LOG.warn("Error creating JMS connection.", e);
+                throw new IllegalStateException("Could not create JMS 
connection for spout ", e);
+            }
+            
+        }
+        
+        @Override
+        public void success(TransactionAttempt tx) {
+            
+            @SuppressWarnings("unchecked")
+            List<Message> messages = (List<Message>) 
batchMessageMap.remove(tx.getTransactionId());
+            
+            if (messages != null) {
+                if (!messages.isEmpty()) {
+                    LOG.debug("Success for batch with transaction id 
"+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name);
+                }
+                
+                for (Message msg: messages) {
+                    String messageId = "UnknownId";
+                    
+                    try {
+                        messageId = msg.getJMSMessageID();
+                        msg.acknowledge();
+                        LOG.trace("Acknowledged message "+messageId);
+                    } catch (JMSException e) {
+                        LOG.warn("Failed to acknowledge message "+messageId, 
e);
+                    }
+                }
+            }
+            else {
+                LOG.warn("No messages found in batch with transaction id 
"+tx.getTransactionId()+"/"+tx.getAttemptId());
+            }
+        }
+
+        /**
+         * Fail a batch with the given transaction id. This is called when a 
batch is timed out, or a new batch with a 
+         * matching transaction id is emitted. Note that the current 
implementation does nothing - i.e. it discards 
+         * messages that have been failed.
+         * @param transactionId The transaction id of the failed batch
+         * @param messages The list of messages to fail.
+         */
+        private void fail(Long transactionId, List<Message> messages) {
+            LOG.debug("Failure for batch with transaction id "+transactionId+" 
for "+name);
+            if (messages != null) {
+                for (Message msg: messages) {
+                    try {
+                        LOG.trace("Failed message "+msg.getJMSMessageID());
+                    } catch (JMSException e) {
+                        LOG.warn("Could not identify failed message ", e);
+                    }
+                }
+            }
+            else {
+                LOG.warn("Failed batch has no messages with transaction id 
"+transactionId);
+            }            
+        }
+
+        @Override
+        public void close() {
+            try {
+                LOG.info("Closing JMS connection.");
+                this.session.close();
+                this.connection.close();
+            } catch (JMSException e) {
+                LOG.warn("Error closing JMS connection.", e);
+            }   
+        }
+
+        @Override
+        public void emitBatch(TransactionAttempt tx, JmsBatch coordinatorMeta,
+                TridentCollector collector) {
+            
+            long now = System.currentTimeMillis();
+            if(now - lastRotate > rotateTimeMillis) {
+                Map<Long, List<Message>> failed = batchMessageMap.rotate();
+                for(Long id: failed.keySet()) {
+                    LOG.warn("TIMED OUT batch with transaction id "+id+" for 
"+name);
+                    fail(id, failed.get(id));
+                }
+                lastRotate = now;
+            }
+            
+            if(batchMessageMap.containsKey(tx.getTransactionId())) {
+                LOG.warn("FAILED duplicate batch with transaction id 
"+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name);
+                fail(tx.getTransactionId(), 
batchMessageMap.get(tx.getTransactionId()));
+            }
+            
+            List<Message> batchMessages = new ArrayList<Message>();
+            
+            for (int index=0; index<maxBatchSize; index++) {
+                Message msg = queue.poll();
+                if (msg == null) {
+                    Utils.sleep(50); // Back off
+                    break;
+                }
+                
+                try {
+                    if (TridentJmsSpout.this.jmsAcknowledgeMode != 
Session.AUTO_ACKNOWLEDGE) {
+                        batchMessages.add(msg);
+                    }
+                    Values tuple = tupleProducer.toTuple(msg);
+                    collector.emit(tuple);
+                } catch (JMSException e) {
+                    LOG.warn("Failed to emit message, could not retrieve data 
for "+name+": "+e );
+                }
+            }
+            
+            if (!batchMessages.isEmpty()) {
+                LOG.debug("Emitting batch with transaction id 
"+tx.getTransactionId()+"/"+tx.getAttemptId()+" and size 
"+batchMessages.size()+" for "+name);
+            }
+            else {
+                LOG.trace("No items to acknowledge for batch with transaction 
id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name);
+            }
+            batchMessageMap.put(tx.getTransactionId(), batchMessages);
+        }
+
+        @Override
+        public void onMessage(Message msg) {
+            try {
+                LOG.trace("Queuing msg [" + msg.getJMSMessageID() + "]");
+            } catch (JMSException e) {
+                // Nothing here, could not get message id
+            }
+            this.queue.offer(msg);
+        }
+        
+    }
+    
+    /**
+     * Bare implementation of a BatchCoordinator, returning a null JmsBatch 
object
+     *
+     */
+    private class JmsBatchCoordinator implements BatchCoordinator<JmsBatch> {
+
+        private final String name;
+        
+        private final Logger LOG = 
LoggerFactory.getLogger(JmsBatchCoordinator.class);
+
+        public JmsBatchCoordinator(String name) {
+            this.name = name;
+            LOG.info("Created batch coordinator for "+name);
+        }
+        
+        @Override
+        public JmsBatch initializeTransaction(long txid, JmsBatch 
prevMetadata, JmsBatch curMetadata) {
+            LOG.debug("Initialise transaction "+txid+" for "+name);
+            return null;
+        }
+
+        @Override
+        public void success(long txid) {
+        }
+
+        @Override
+        public boolean isReady(long txid) {
+            return true;
+        }
+
+        @Override
+        public void close() {
+        }
+        
+    }
+
+}
+
+    
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/test/java/backtype/storm/contrib/jms/spout/JmsSpoutTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/backtype/storm/contrib/jms/spout/JmsSpoutTest.java 
b/src/test/java/backtype/storm/contrib/jms/spout/JmsSpoutTest.java
deleted file mode 100644
index 1d05687..0000000
--- a/src/test/java/backtype/storm/contrib/jms/spout/JmsSpoutTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.contrib.jms.spout;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.HashMap;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.mortbay.log.Log;
-
-import backtype.storm.contrib.jms.JmsProvider;
-import backtype.storm.spout.SpoutOutputCollector;
-
-public class JmsSpoutTest {
-    @Test
-    public void testFailure() throws JMSException, Exception{
-        JmsSpout spout = new JmsSpout();
-        JmsProvider mockProvider = new MockJmsProvider();
-        MockSpoutOutputCollector mockCollector = new 
MockSpoutOutputCollector();
-        SpoutOutputCollector collector = new 
SpoutOutputCollector(mockCollector);
-        spout.setJmsProvider(new MockJmsProvider());
-        spout.setJmsTupleProducer(new MockTupleProducer());
-        spout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
-        spout.setRecoveryPeriod(10); // Rapid recovery for testing.
-        spout.open(new HashMap<String,String>(), null, collector);
-        Message msg = this.sendMessage(mockProvider.connectionFactory(), 
mockProvider.destination());
-        Thread.sleep(100);
-        spout.nextTuple(); // Pretend to be storm.
-        Assert.assertTrue(mockCollector.emitted);
-        
-        mockCollector.reset();        
-        spout.fail(msg.getJMSMessageID()); // Mock failure
-        Thread.sleep(5000);
-        spout.nextTuple(); // Pretend to be storm.
-        Thread.sleep(5000);
-        Assert.assertTrue(mockCollector.emitted); // Should have been 
re-emitted
-    }
-
-    @Test
-    public void testSerializability() throws IOException{
-        JmsSpout spout = new JmsSpout();
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        ObjectOutputStream oos = new ObjectOutputStream(out);
-        oos.writeObject(spout);
-        oos.close();
-        Assert.assertTrue(out.toByteArray().length > 0);
-    }
-    
-    public Message sendMessage(ConnectionFactory connectionFactory, 
Destination destination) throws JMSException {        
-        Session mySess = 
connectionFactory.createConnection().createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
-        MessageProducer producer = mySess.createProducer(destination);
-        TextMessage msg = mySess.createTextMessage();
-        msg.setText("Hello World");
-        Log.debug("Sending Message: " + msg.getText());
-        producer.send(msg);
-        return msg;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/test/java/backtype/storm/contrib/jms/spout/MockJmsProvider.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/backtype/storm/contrib/jms/spout/MockJmsProvider.java 
b/src/test/java/backtype/storm/contrib/jms/spout/MockJmsProvider.java
deleted file mode 100644
index 2dacca5..0000000
--- a/src/test/java/backtype/storm/contrib/jms/spout/MockJmsProvider.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.contrib.jms.spout;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-
-import backtype.storm.contrib.jms.JmsProvider;
-
-public class MockJmsProvider implements JmsProvider {
-    private static final long serialVersionUID = 1L;
-
-    private ConnectionFactory connectionFactory = null;
-    private Destination destination = null;
-    
-    public MockJmsProvider() throws NamingException{
-        this.connectionFactory = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); 
-        Context jndiContext = new InitialContext();
-        this.destination = (Destination) 
jndiContext.lookup("dynamicQueues/FOO.BAR");        
-
-    }
-    
-    /**
-     * Provides the JMS <code>ConnectionFactory</code>
-     * @return the connection factory
-     * @throws Exception
-     */
-    public ConnectionFactory connectionFactory() throws Exception{
-        return this.connectionFactory;
-    }
-
-    /**
-     * Provides the <code>Destination</code> (topic or queue) from which the
-     * <code>JmsSpout</code> will receive messages.
-     * @return
-     * @throws Exception
-     */
-    public Destination destination() throws Exception{
-        return this.destination;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/test/java/backtype/storm/contrib/jms/spout/MockSpoutOutputCollector.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/backtype/storm/contrib/jms/spout/MockSpoutOutputCollector.java 
b/src/test/java/backtype/storm/contrib/jms/spout/MockSpoutOutputCollector.java
deleted file mode 100644
index d9f7fac..0000000
--- 
a/src/test/java/backtype/storm/contrib/jms/spout/MockSpoutOutputCollector.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.contrib.jms.spout;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import backtype.storm.spout.ISpoutOutputCollector;
-
-public class MockSpoutOutputCollector implements ISpoutOutputCollector {
-    boolean emitted = false;
-
-    @Override
-    public List<Integer> emit(String streamId, List<Object> tuple, Object 
messageId) {
-        emitted = true;
-        return new ArrayList<Integer>();
-    }
-
-    @Override
-    public void emitDirect(int taskId, String streamId, List<Object> tuple, 
Object messageId) {
-        emitted = true;
-    }
-
-    @Override
-    public void reportError(Throwable error) {
-    }
-
-    public boolean emitted(){
-        return this.emitted;
-    }
-
-    public void reset(){
-        this.emitted = false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/test/java/backtype/storm/contrib/jms/spout/MockTupleProducer.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/backtype/storm/contrib/jms/spout/MockTupleProducer.java 
b/src/test/java/backtype/storm/contrib/jms/spout/MockTupleProducer.java
deleted file mode 100644
index 92b6788..0000000
--- a/src/test/java/backtype/storm/contrib/jms/spout/MockTupleProducer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.contrib.jms.spout;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.TextMessage;
-
-import backtype.storm.contrib.jms.JmsTupleProducer;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
-public class MockTupleProducer implements JmsTupleProducer {
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public Values toTuple(Message msg) throws JMSException {
-        if (msg instanceof TextMessage) {
-            String json = ((TextMessage) msg).getText();
-            return new Values(json);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("json"));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java 
b/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
new file mode 100644
index 0000000..42ce573
--- /dev/null
+++ b/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.spout;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.HashMap;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
+import org.apache.storm.jms.JmsProvider;
+import backtype.storm.spout.SpoutOutputCollector;
+
+public class JmsSpoutTest {
+    @Test
+    public void testFailure() throws JMSException, Exception{
+        JmsSpout spout = new JmsSpout();
+        JmsProvider mockProvider = new MockJmsProvider();
+        MockSpoutOutputCollector mockCollector = new 
MockSpoutOutputCollector();
+        SpoutOutputCollector collector = new 
SpoutOutputCollector(mockCollector);
+        spout.setJmsProvider(new MockJmsProvider());
+        spout.setJmsTupleProducer(new MockTupleProducer());
+        spout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
+        spout.setRecoveryPeriod(10); // Rapid recovery for testing.
+        spout.open(new HashMap<String,String>(), null, collector);
+        Message msg = this.sendMessage(mockProvider.connectionFactory(), 
mockProvider.destination());
+        Thread.sleep(100);
+        spout.nextTuple(); // Pretend to be storm.
+        Assert.assertTrue(mockCollector.emitted);
+        
+        mockCollector.reset();        
+        spout.fail(msg.getJMSMessageID()); // Mock failure
+        Thread.sleep(5000);
+        spout.nextTuple(); // Pretend to be storm.
+        Thread.sleep(5000);
+        Assert.assertTrue(mockCollector.emitted); // Should have been 
re-emitted
+    }
+
+    @Test
+    public void testSerializability() throws IOException{
+        JmsSpout spout = new JmsSpout();
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(out);
+        oos.writeObject(spout);
+        oos.close();
+        Assert.assertTrue(out.toByteArray().length > 0);
+    }
+    
+    public Message sendMessage(ConnectionFactory connectionFactory, 
Destination destination) throws JMSException {        
+        Session mySess = 
connectionFactory.createConnection().createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+        MessageProducer producer = mySess.createProducer(destination);
+        TextMessage msg = mySess.createTextMessage();
+        msg.setText("Hello World");
+        Log.debug("Sending Message: " + msg.getText());
+        producer.send(msg);
+        return msg;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java 
b/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java
new file mode 100644
index 0000000..3ba0853
--- /dev/null
+++ b/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.spout;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+import org.apache.storm.jms.JmsProvider;
+
+public class MockJmsProvider implements JmsProvider {
+    private static final long serialVersionUID = 1L;
+
+    private ConnectionFactory connectionFactory = null;
+    private Destination destination = null;
+    
+    public MockJmsProvider() throws NamingException{
+        this.connectionFactory = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); 
+        Context jndiContext = new InitialContext();
+        this.destination = (Destination) 
jndiContext.lookup("dynamicQueues/FOO.BAR");        
+
+    }
+    
+    /**
+     * Provides the JMS <code>ConnectionFactory</code>
+     * @return the connection factory
+     * @throws Exception
+     */
+    public ConnectionFactory connectionFactory() throws Exception{
+        return this.connectionFactory;
+    }
+
+    /**
+     * Provides the <code>Destination</code> (topic or queue) from which the
+     * <code>JmsSpout</code> will receive messages.
+     * @return
+     * @throws Exception
+     */
+    public Destination destination() throws Exception{
+        return this.destination;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java 
b/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
new file mode 100644
index 0000000..8a535ce
--- /dev/null
+++ b/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.spout;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import backtype.storm.spout.ISpoutOutputCollector;
+
+public class MockSpoutOutputCollector implements ISpoutOutputCollector {
+    boolean emitted = false;
+
+    @Override
+    public List<Integer> emit(String streamId, List<Object> tuple, Object 
messageId) {
+        emitted = true;
+        return new ArrayList<Integer>();
+    }
+
+    @Override
+    public void emitDirect(int taskId, String streamId, List<Object> tuple, 
Object messageId) {
+        emitted = true;
+    }
+
+    @Override
+    public void reportError(Throwable error) {
+    }
+
+    public boolean emitted(){
+        return this.emitted;
+    }
+
+    public void reset(){
+        this.emitted = false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java 
b/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java
new file mode 100644
index 0000000..f5eda10
--- /dev/null
+++ b/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jms.spout;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+
+import org.apache.storm.jms.JmsTupleProducer;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+public class MockTupleProducer implements JmsTupleProducer {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public Values toTuple(Message msg) throws JMSException {
+        if (msg instanceof TextMessage) {
+            String json = ((TextMessage) msg).getText();
+            return new Values(json);
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("json"));
+    }
+
+}

Reply via email to