Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2639#discussion_r187486786
--- Diff:
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
@@ -18,164 +18,124 @@
package org.apache.storm.jms.spout;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.TreeSet;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import org.apache.storm.Config;
import org.apache.storm.jms.JmsProvider;
import org.apache.storm.jms.JmsTupleProducer;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.util.HashMap;
+import java.util.Map;
+
/**
- * A Storm <code>Spout</code> implementation that listens to a JMS topic
or queue and outputs tuples based on the messages it receives.
+ * A Storm <code>Spout</code> implementation that listens to a JMS topic or
+ * queue and outputs tuples based on the messages it receives.
*
* <p><code>JmsSpout</code> instances rely on <code>JmsProducer</code>
* implementations to obtain the JMS
* <code>ConnectionFactory</code> and <code>Destination</code> objects
necessary
* to connect to a JMS topic/queue.
*
- * <p>When a <code>JmsSpout</code> receives a JMS message, it delegates to
an
- * internal <code>JmsTupleProducer</code> instance to create a Storm tuple
from the incoming message.
+ * <p>When a {@code JmsSpout} receives a JMS message, it delegates to an
+ * internal {@code JmsTupleProducer} instance to create a Storm tuple from
+ * the incoming message.
*
* <p>Typically, developers will supply a custom
<code>JmsTupleProducer</code>
* implementation appropriate for the expected message content.
*/
@SuppressWarnings("serial")
-public class JmsSpout extends BaseRichSpout implements MessageListener {
+public class JmsSpout extends BaseRichSpout {
- /**
- * The logger object instance for this class.
- */
+ /** The logger object instance for this class. */
private static final Logger LOG =
LoggerFactory.getLogger(JmsSpout.class);
- /**
- * The logger of the recovery task.
- */
- private static final Logger RECOVERY_TASK_LOG =
LoggerFactory.getLogger(RecoveryTask.class);
-
- /**
- * Time to sleep between queue polling attempts.
- */
+ /** Time to sleep between queue polling attempts. */
private static final int POLL_INTERVAL_MS = 50;
- /**
- * The default value for {@link Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}.
- */
- private static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 30;
-
- /**
- * Time to wait before queuing the first recovery task.
- */
- private static final int RECOVERY_DELAY_MS = 10;
- /**
- * Used to safely recover failed JMS sessions across instances.
- */
- private final Serializable recoveryMutex = "RECOVERY_MUTEX";
/**
* The acknowledgment mode used for this instance.
*
* @see Session
*/
private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
- /**
- * Indicates whether or not this spout should run as a singleton.
- */
+
+ /** Sets up the way we want to handle the emit, ack and fails. */
+ private transient MessageHandler messageHandler = new MessageHandler();
+
+ /** Indicates whether or not this spout should run as a singleton. */
private boolean distributed = true;
- /**
- * Used to generate tuples from incoming messages.
- */
+
+ /** Used to generate tuples from incoming messages. */
private JmsTupleProducer tupleProducer;
- /**
- * Encapsulates jms related classes needed to communicate with the mq.
- */
+
+ /** Encapsulates jms related classes needed to communicate with the
mq. */
private JmsProvider jmsProvider;
- /**
- * Stores incoming messages for later sending.
- */
- private LinkedBlockingQueue<Message> queue;
- /**
- * Contains all message ids of messages that were not yet acked.
- */
- private TreeSet<JmsMessageID> toCommit;
- /**
- * Maps between message ids of not-yet acked messages, and the
messages.
- */
- private HashMap<JmsMessageID, Message> pendingMessages;
- /**
- * Counter of handled messages.
- */
+
+ /** Counter of handled messages. */
private long messageSequence = 0;
- /**
- * The collector used to emit tuples.
- */
+
+ /** The collector used to emit tuples. */
private SpoutOutputCollector collector;
- /**
- * Connection to the jms queue.
- */
+
+ /** Connection to the jms queue. */
private transient Connection connection;
- /**
- * The active jms session.
- */
+
+ /** The active jms session. */
private transient Session session;
- /**
- * Indicates whether or not a message failed to be processed.
- */
- private boolean hasFailures = false;
- /**
- * Schedules recovery tasks periodically.
- */
- private Timer recoveryTimer = null;
/**
- * Time to wait between recovery attempts.
+ * The message consumer.
*/
- private long recoveryPeriodMs = -1; // default to disabled
+ private MessageConsumer consumer;
/**
- * Translate the {@code int} value of an acknowledgment to a {@code
String}.
+ * Sets the JMS Session acknowledgement mode for the JMS session.
*
- * @param deliveryMode the mode to translate.
- * @return its {@code String} explanation (name).
+ * <p>Possible values:
+ * <ul>
+ * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li>
+ * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li>
+ * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li>
+ * </ul>
*
- * @see Session
+ * Any other vendor specific modes are not supported.
+ *
+ * @param mode JMS Session Acknowledgement mode
*/
- private static String toDeliveryModeString(int deliveryMode) {
- switch (deliveryMode) {
+ public void setJmsAcknowledgeMode(final int mode) {
+ switch (mode) {
case Session.AUTO_ACKNOWLEDGE:
- return "AUTO_ACKNOWLEDGE";
- case Session.CLIENT_ACKNOWLEDGE:
- return "CLIENT_ACKNOWLEDGE";
case Session.DUPS_OK_ACKNOWLEDGE:
- return "DUPS_OK_ACKNOWLEDGE";
+ messageHandler = new MessageHandler();
+ break;
+ case Session.CLIENT_ACKNOWLEDGE:
+ messageHandler = new ClientAckHandler();
+ break;
+ case Session.SESSION_TRANSACTED:
+ messageHandler = new TransactedSessionMessageHandler();
+ break;
default:
- return "UNKNOWN";
-
+ LOG.warn("Unsupported Acknowledge mode: "
--- End diff --
So we are allowing non-JMS standard mode to parameter, right? I agree this
is needed to support specific JMS implementation which **extends** session
acknowledge mode.
---