Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2639#discussion_r187496121
  
    --- Diff: 
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
    @@ -18,164 +18,124 @@
     
     package org.apache.storm.jms.spout;
     
    -import java.io.Serializable;
    -import java.util.HashMap;
    -import java.util.Map;
    -import java.util.Timer;
    -import java.util.TimerTask;
    -import java.util.TreeSet;
    -import java.util.concurrent.LinkedBlockingQueue;
    -import java.util.concurrent.TimeUnit;
    -import javax.jms.Connection;
    -import javax.jms.ConnectionFactory;
    -import javax.jms.Destination;
    -import javax.jms.JMSException;
    -import javax.jms.Message;
    -import javax.jms.MessageConsumer;
    -import javax.jms.MessageListener;
    -import javax.jms.Session;
    -import org.apache.storm.Config;
     import org.apache.storm.jms.JmsProvider;
     import org.apache.storm.jms.JmsTupleProducer;
     import org.apache.storm.spout.SpoutOutputCollector;
     import org.apache.storm.task.TopologyContext;
     import org.apache.storm.topology.OutputFieldsDeclarer;
     import org.apache.storm.topology.base.BaseRichSpout;
     import org.apache.storm.tuple.Values;
    -import org.apache.storm.utils.Utils;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    +import javax.jms.Connection;
    +import javax.jms.ConnectionFactory;
    +import javax.jms.Destination;
    +import javax.jms.JMSException;
    +import javax.jms.Message;
    +import javax.jms.MessageConsumer;
    +import javax.jms.Session;
    +import java.util.HashMap;
    +import java.util.Map;
    +
     
     /**
    - * A Storm <code>Spout</code> implementation that listens to a JMS topic 
or queue and outputs tuples based on the messages it receives.
    + * A Storm <code>Spout</code> implementation that listens to a JMS topic or
    + * queue and outputs tuples based on the messages it receives.
      *
      * <p><code>JmsSpout</code> instances rely on <code>JmsProducer</code>
      * implementations to obtain the JMS
      * <code>ConnectionFactory</code> and <code>Destination</code> objects 
necessary
      * to connect to a JMS topic/queue.
      *
    - * <p>When a <code>JmsSpout</code> receives a JMS message, it delegates to 
an
    - * internal <code>JmsTupleProducer</code> instance to create a Storm tuple 
from the incoming message.
    + * <p>When a {@code JmsSpout} receives a JMS message, it delegates to an
    + * internal {@code JmsTupleProducer} instance to create a Storm tuple from
    + * the incoming message.
      *
      * <p>Typically, developers will supply a custom 
<code>JmsTupleProducer</code>
      * implementation appropriate for the expected message content.
      */
     @SuppressWarnings("serial")
    -public class JmsSpout extends BaseRichSpout implements MessageListener {
    +public class JmsSpout extends BaseRichSpout {
     
    -    /**
    -     * The logger object instance for this class.
    -     */
    +    /** The logger object instance for this class. */
         private static final Logger LOG = 
LoggerFactory.getLogger(JmsSpout.class);
     
    -    /**
    -     * The logger of the recovery task.
    -     */
    -    private static final Logger RECOVERY_TASK_LOG = 
LoggerFactory.getLogger(RecoveryTask.class);
    -
    -    /**
    -     * Time to sleep between queue polling attempts.
    -     */
    +    /** Time to sleep between queue polling attempts. */
         private static final int POLL_INTERVAL_MS = 50;
     
    -    /**
    -     * The default value for {@link Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}.
    -     */
    -    private static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 30;
    -
    -    /**
    -     * Time to wait before queuing the first recovery task.
    -     */
    -    private static final int RECOVERY_DELAY_MS = 10;
    -    /**
    -     * Used to safely recover failed JMS sessions across instances.
    -     */
    -    private final Serializable recoveryMutex = "RECOVERY_MUTEX";
         /**
          * The acknowledgment mode used for this instance.
          *
          * @see Session
          */
         private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
    -    /**
    -     * Indicates whether or not this spout should run as a singleton.
    -     */
    +
    +    /** Sets up the way we want to handle the emit, ack and fails. */
    +    private transient MessageHandler messageHandler = new MessageHandler();
    +
    +    /** Indicates whether or not this spout should run as a singleton. */
         private boolean distributed = true;
    -    /**
    -     * Used to generate tuples from incoming messages.
    -     */
    +
    +    /** Used to generate tuples from incoming messages. */
         private JmsTupleProducer tupleProducer;
    -    /**
    -     * Encapsulates jms related classes needed to communicate with the mq.
    -     */
    +
    +    /** Encapsulates jms related classes needed to communicate with the 
mq. */
         private JmsProvider jmsProvider;
    -    /**
    -     * Stores incoming messages for later sending.
    -     */
    -    private LinkedBlockingQueue<Message> queue;
    -    /**
    -     * Contains all message ids of messages that were not yet acked.
    -     */
    -    private TreeSet<JmsMessageID> toCommit;
    -    /**
    -     * Maps between message ids of not-yet acked messages, and the 
messages.
    -     */
    -    private HashMap<JmsMessageID, Message> pendingMessages;
    -    /**
    -     * Counter of handled messages.
    -     */
    +
    +    /** Counter of handled messages. */
         private long messageSequence = 0;
    -    /**
    -     * The collector used to emit tuples.
    -     */
    +
    +    /** The collector used to emit tuples. */
         private SpoutOutputCollector collector;
    -    /**
    -     * Connection to the jms queue.
    -     */
    +
    +    /** Connection to the jms queue. */
         private transient Connection connection;
    -    /**
    -     * The active jms session.
    -     */
    +
    +    /** The active jms session. */
         private transient Session session;
    -    /**
    -     * Indicates whether or not a message failed to be processed.
    -     */
    -    private boolean hasFailures = false;
    -    /**
    -     * Schedules recovery tasks periodically.
    -     */
    -    private Timer recoveryTimer = null;
     
         /**
    -     * Time to wait between recovery attempts.
    +     * The message consumer.
          */
    -    private long recoveryPeriodMs = -1; // default to disabled
    +    private MessageConsumer consumer;
     
         /**
    -     * Translate the {@code int} value of an acknowledgment to a {@code 
String}.
    +     * Sets the JMS Session acknowledgement mode for the JMS session.
          *
    -     * @param deliveryMode the mode to translate.
    -     * @return its {@code String} explanation (name).
    +     * <p>Possible values:
    +     * <ul>
    +     * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
    +     * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
    +     * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
    +     * </ul>
          *
    -     * @see Session
    +     * Any other vendor specific modes are not supported.
    +     *
    +     * @param mode JMS Session Acknowledgement mode
          */
    -    private static String toDeliveryModeString(int deliveryMode) {
    -        switch (deliveryMode) {
    +    public void setJmsAcknowledgeMode(final int mode) {
    +        switch (mode) {
                 case Session.AUTO_ACKNOWLEDGE:
    -                return "AUTO_ACKNOWLEDGE";
    -            case Session.CLIENT_ACKNOWLEDGE:
    -                return "CLIENT_ACKNOWLEDGE";
                 case Session.DUPS_OK_ACKNOWLEDGE:
    -                return "DUPS_OK_ACKNOWLEDGE";
    +                messageHandler = new MessageHandler();
    +                break;
    +            case Session.CLIENT_ACKNOWLEDGE:
    +                messageHandler = new ClientAckHandler();
    +                break;
    +            case Session.SESSION_TRANSACTED:
    +                messageHandler = new TransactedSessionMessageHandler();
    +                break;
                 default:
    -                return "UNKNOWN";
    -
    +                LOG.warn("Unsupported Acknowledge mode: "
    --- End diff --
    
    Yes. Its the only way to support provider extensions.


---

Reply via email to