Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2639#discussion_r187496121 --- Diff: external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java --- @@ -18,164 +18,124 @@ package org.apache.storm.jms.spout; -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.TreeSet; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; -import org.apache.storm.Config; import org.apache.storm.jms.JmsProvider; import org.apache.storm.jms.JmsTupleProducer; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import java.util.HashMap; +import java.util.Map; + /** - * A Storm <code>Spout</code> implementation that listens to a JMS topic or queue and outputs tuples based on the messages it receives. + * A Storm <code>Spout</code> implementation that listens to a JMS topic or + * queue and outputs tuples based on the messages it receives. * * <p><code>JmsSpout</code> instances rely on <code>JmsProducer</code> * implementations to obtain the JMS * <code>ConnectionFactory</code> and <code>Destination</code> objects necessary * to connect to a JMS topic/queue. * - * <p>When a <code>JmsSpout</code> receives a JMS message, it delegates to an - * internal <code>JmsTupleProducer</code> instance to create a Storm tuple from the incoming message. + * <p>When a {@code JmsSpout} receives a JMS message, it delegates to an + * internal {@code JmsTupleProducer} instance to create a Storm tuple from + * the incoming message. * * <p>Typically, developers will supply a custom <code>JmsTupleProducer</code> * implementation appropriate for the expected message content. */ @SuppressWarnings("serial") -public class JmsSpout extends BaseRichSpout implements MessageListener { +public class JmsSpout extends BaseRichSpout { - /** - * The logger object instance for this class. - */ + /** The logger object instance for this class. */ private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class); - /** - * The logger of the recovery task. - */ - private static final Logger RECOVERY_TASK_LOG = LoggerFactory.getLogger(RecoveryTask.class); - - /** - * Time to sleep between queue polling attempts. - */ + /** Time to sleep between queue polling attempts. */ private static final int POLL_INTERVAL_MS = 50; - /** - * The default value for {@link Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}. - */ - private static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 30; - - /** - * Time to wait before queuing the first recovery task. - */ - private static final int RECOVERY_DELAY_MS = 10; - /** - * Used to safely recover failed JMS sessions across instances. - */ - private final Serializable recoveryMutex = "RECOVERY_MUTEX"; /** * The acknowledgment mode used for this instance. * * @see Session */ private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; - /** - * Indicates whether or not this spout should run as a singleton. - */ + + /** Sets up the way we want to handle the emit, ack and fails. */ + private transient MessageHandler messageHandler = new MessageHandler(); + + /** Indicates whether or not this spout should run as a singleton. */ private boolean distributed = true; - /** - * Used to generate tuples from incoming messages. - */ + + /** Used to generate tuples from incoming messages. */ private JmsTupleProducer tupleProducer; - /** - * Encapsulates jms related classes needed to communicate with the mq. - */ + + /** Encapsulates jms related classes needed to communicate with the mq. */ private JmsProvider jmsProvider; - /** - * Stores incoming messages for later sending. - */ - private LinkedBlockingQueue<Message> queue; - /** - * Contains all message ids of messages that were not yet acked. - */ - private TreeSet<JmsMessageID> toCommit; - /** - * Maps between message ids of not-yet acked messages, and the messages. - */ - private HashMap<JmsMessageID, Message> pendingMessages; - /** - * Counter of handled messages. - */ + + /** Counter of handled messages. */ private long messageSequence = 0; - /** - * The collector used to emit tuples. - */ + + /** The collector used to emit tuples. */ private SpoutOutputCollector collector; - /** - * Connection to the jms queue. - */ + + /** Connection to the jms queue. */ private transient Connection connection; - /** - * The active jms session. - */ + + /** The active jms session. */ private transient Session session; - /** - * Indicates whether or not a message failed to be processed. - */ - private boolean hasFailures = false; - /** - * Schedules recovery tasks periodically. - */ - private Timer recoveryTimer = null; /** - * Time to wait between recovery attempts. + * The message consumer. */ - private long recoveryPeriodMs = -1; // default to disabled + private MessageConsumer consumer; /** - * Translate the {@code int} value of an acknowledgment to a {@code String}. + * Sets the JMS Session acknowledgement mode for the JMS session. * - * @param deliveryMode the mode to translate. - * @return its {@code String} explanation (name). + * <p>Possible values: + * <ul> + * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li> + * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li> + * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li> + * </ul> * - * @see Session + * Any other vendor specific modes are not supported. + * + * @param mode JMS Session Acknowledgement mode */ - private static String toDeliveryModeString(int deliveryMode) { - switch (deliveryMode) { + public void setJmsAcknowledgeMode(final int mode) { + switch (mode) { case Session.AUTO_ACKNOWLEDGE: - return "AUTO_ACKNOWLEDGE"; - case Session.CLIENT_ACKNOWLEDGE: - return "CLIENT_ACKNOWLEDGE"; case Session.DUPS_OK_ACKNOWLEDGE: - return "DUPS_OK_ACKNOWLEDGE"; + messageHandler = new MessageHandler(); + break; + case Session.CLIENT_ACKNOWLEDGE: + messageHandler = new ClientAckHandler(); + break; + case Session.SESSION_TRANSACTED: + messageHandler = new TransactedSessionMessageHandler(); + break; default: - return "UNKNOWN"; - + LOG.warn("Unsupported Acknowledge mode: " --- End diff -- Yes. Its the only way to support provider extensions.
---