http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/JmsTupleProducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/storm/jms/JmsTupleProducer.java b/src/main/java/org/apache/storm/jms/JmsTupleProducer.java new file mode 100644 index 0000000..96b027d --- /dev/null +++ b/src/main/java/org/apache/storm/jms/JmsTupleProducer.java @@ -0,0 +1,41 @@ +package org.apache.storm.jms; + +import java.io.Serializable; + +import javax.jms.JMSException; +import javax.jms.Message; + +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Values; + +/** + * Interface to define classes that can produce a Storm <code>Values</code> objects + * from a <code>javax.jms.Message</code> object>. + * <p/> + * Implementations are also responsible for declaring the output + * fields they produce. + * <p/> + * If for some reason the implementation can't process a message + * (for example if it received a <code>javax.jms.ObjectMessage</code> + * when it was expecting a <code>javax.jms.TextMessage</code> it should + * return <code>null</code> to indicate to the <code>JmsSpout</code> that + * the message could not be processed. + * + * @author P. Taylor Goetz + * + */ +public interface JmsTupleProducer extends Serializable{ + /** + * Process a JMS message object to create a Values object. + * @param msg - the JMS message + * @return the Values tuple, or null if the message couldn't be processed. + * @throws JMSException + */ + Values toTuple(Message msg) throws JMSException; + + /** + * Declare the output fields produced by this JmsTupleProducer. + * @param declarer The OuputFieldsDeclarer for the spout. + */ + void declareOutputFields(OutputFieldsDeclarer declarer); +}
http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java b/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java new file mode 100644 index 0000000..896f932 --- /dev/null +++ b/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java @@ -0,0 +1,201 @@ +package org.apache.storm.jms.bolt; + +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import backtype.storm.topology.base.BaseRichBolt; +import org.apache.storm.jms.JmsMessageProducer; +import org.apache.storm.jms.JmsProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Tuple; + +/** + * A JmsBolt receives <code>backtype.storm.tuple.Tuple</code> objects from a Storm + * topology and publishes JMS Messages to a destination (topic or queue). + * <p/> + * To use a JmsBolt in a topology, the following must be supplied: + * <ol> + * <li>A <code>JmsProvider</code> implementation.</li> + * <li>A <code>JmsMessageProducer</code> implementation.</li> + * </ol> + * The <code>JmsProvider</code> provides the JMS <code>javax.jms.ConnectionFactory</code> + * and <code>javax.jms.Destination</code> objects requied to publish JMS messages. + * <p/> + * The JmsBolt uses a <code>JmsMessageProducer</code> to translate + * <code>backtype.storm.tuple.Tuple</code> objects into + * <code>javax.jms.Message</code> objects for publishing. + * <p/> + * Both JmsProvider and JmsMessageProducer must be set, or the bolt will + * fail upon deployment to a cluster. + * <p/> + * The JmsBolt is typically an endpoint in a topology -- in other words + * it does not emit any tuples. + * + * + * @author P. Taylor Goetz + * + */ +public class JmsBolt extends BaseRichBolt { + private static Logger LOG = LoggerFactory.getLogger(JmsBolt.class); + + private boolean autoAck = true; + + // javax.jms objects + private Connection connection; + private Session session; + private MessageProducer messageProducer; + + // JMS options + private boolean jmsTransactional = false; + private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; + + + private JmsProvider jmsProvider; + private JmsMessageProducer producer; + + + private OutputCollector collector; + + /** + * Set the JmsProvider used to connect to the JMS destination topic/queue + * @param provider + */ + public void setJmsProvider(JmsProvider provider){ + this.jmsProvider = provider; + } + + /** + * Set the JmsMessageProducer used to convert tuples + * into JMS messages. + * + * @param producer + */ + public void setJmsMessageProducer(JmsMessageProducer producer){ + this.producer = producer; + } + + /** + * Sets the JMS acknowledgement mode for JMS messages sent + * by this bolt. + * <p/> + * Possible values: + * <ul> + * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li> + * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li> + * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li> + * </ul> + * @param acknowledgeMode (constant defined in javax.jms.Session) + */ + public void setJmsAcknowledgeMode(int acknowledgeMode){ + this.jmsAcknowledgeMode = acknowledgeMode; + } + + /** + * Set the JMS transactional setting for the JMS session. + * + * @param transactional + */ +// public void setJmsTransactional(boolean transactional){ +// this.jmsTransactional = transactional; +// } + + /** + * Sets whether or not tuples should be acknowledged by this + * bolt. + * <p/> + * @param autoAck + */ + public void setAutoAck(boolean autoAck){ + this.autoAck = autoAck; + } + + + /** + * Consumes a tuple and sends a JMS message. + * <p/> + * If autoAck is true, the tuple will be acknowledged + * after the message is sent. + * <p/> + * If JMS sending fails, the tuple will be failed. + */ + @Override + public void execute(Tuple input) { + // write the tuple to a JMS destination... + LOG.debug("Tuple received. Sending JMS message."); + + try { + Message msg = this.producer.toMessage(this.session, input); + if(msg != null){ + if (msg.getJMSDestination() != null) { + this.messageProducer.send(msg.getJMSDestination(), msg); + } else { + this.messageProducer.send(msg); + } + } + if(this.autoAck){ + LOG.debug("ACKing tuple: " + input); + this.collector.ack(input); + } + } catch (JMSException e) { + // failed to send the JMS message, fail the tuple fast + LOG.warn("Failing tuple: " + input); + LOG.warn("Exception: ", e); + this.collector.fail(input); + } + } + + /** + * Releases JMS resources. + */ + @Override + public void cleanup() { + try { + LOG.debug("Closing JMS connection."); + this.session.close(); + this.connection.close(); + } catch (JMSException e) { + LOG.warn("Error closing JMS connection.", e); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + } + + /** + * Initializes JMS resources. + */ + @Override + public void prepare(Map stormConf, TopologyContext context, + OutputCollector collector) { + if(this.jmsProvider == null || this.producer == null){ + throw new IllegalStateException("JMS Provider and MessageProducer not set."); + } + this.collector = collector; + LOG.debug("Connecting JMS.."); + try { + ConnectionFactory cf = this.jmsProvider.connectionFactory(); + Destination dest = this.jmsProvider.destination(); + this.connection = cf.createConnection(); + this.session = connection.createSession(this.jmsTransactional, + this.jmsAcknowledgeMode); + this.messageProducer = session.createProducer(dest); + + connection.start(); + } catch (Exception e) { + LOG.warn("Error creating JMS connection.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java b/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java new file mode 100644 index 0000000..225e525 --- /dev/null +++ b/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java @@ -0,0 +1,53 @@ +package org.apache.storm.jms.spout; + +import java.io.Serializable; + +/** + * Created by tgoetz on 7/14/14. + */ +public class JmsMessageID implements Comparable<JmsMessageID>, Serializable { + + private String jmsID; + + private Long sequence; + +// private Message message; + + public JmsMessageID(long sequence, String jmsID){ + this.jmsID = jmsID; + this.sequence = sequence; + } + +// public void setMessage(Message message){ +// this.message = message; +// } +// +// public Message getMessage(){ +// return this.message; +// } + + public String getJmsID(){ + return this.jmsID; + } + + @Override + public int compareTo(JmsMessageID jmsMessageID) { + return (int)(this.sequence - jmsMessageID.sequence); + } + + @Override + public int hashCode() { + return this.sequence.hashCode(); + } + + @Override + public boolean equals(Object o) { + if(o instanceof JmsMessageID){ + JmsMessageID id = (JmsMessageID)o; + return this.jmsID.equals(id.jmsID); + } else { + return false; + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/storm/jms/spout/JmsSpout.java b/src/main/java/org/apache/storm/jms/spout/JmsSpout.java new file mode 100644 index 0000000..03ab77b --- /dev/null +++ b/src/main/java/org/apache/storm/jms/spout/JmsSpout.java @@ -0,0 +1,366 @@ +package org.apache.storm.jms.spout; + +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import backtype.storm.topology.base.BaseRichSpout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.storm.jms.JmsProvider; +import org.apache.storm.jms.JmsTupleProducer; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Values; +import backtype.storm.utils.Utils; + +/** + * A Storm <code>Spout</code> implementation that listens to a JMS topic or queue + * and outputs tuples based on the messages it receives. + * <p/> + * <code>JmsSpout</code> instances rely on <code>JmsProducer</code> implementations + * to obtain the JMS <code>ConnectionFactory</code> and <code>Destination</code> objects + * necessary to connect to a JMS topic/queue. + * <p/> + * When a <code>JmsSpout</code> receives a JMS message, it delegates to an + * internal <code>JmsTupleProducer</code> instance to create a Storm tuple from the + * incoming message. + * <p/> + * Typically, developers will supply a custom <code>JmsTupleProducer</code> implementation + * appropriate for the expected message content. + * + * @author P. Taylor Goetz + * + */ +@SuppressWarnings("serial") +public class JmsSpout extends BaseRichSpout implements MessageListener { + private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class); + + // JMS options + private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; + + private boolean distributed = true; + + private JmsTupleProducer tupleProducer; + + private JmsProvider jmsProvider; + + private LinkedBlockingQueue<Message> queue; + private TreeSet<JmsMessageID> toCommit; + private HashMap<JmsMessageID, Message> pendingMessages; + private long messageSequence = 0; + + private SpoutOutputCollector collector; + + private transient Connection connection; + private transient Session session; + + private boolean hasFailures = false; + public final Serializable recoveryMutex = "RECOVERY_MUTEX"; + private Timer recoveryTimer = null; + private long recoveryPeriod = -1; // default to disabled + + /** + * Sets the JMS Session acknowledgement mode for the JMS seesion associated with this spout. + * <p/> + * Possible values: + * <ul> + * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li> + * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li> + * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li> + * </ul> + * @param mode JMS Session Acknowledgement mode + * @throws IllegalArgumentException if the mode is not recognized. + */ + public void setJmsAcknowledgeMode(int mode){ + switch (mode) { + case Session.AUTO_ACKNOWLEDGE: + case Session.CLIENT_ACKNOWLEDGE: + case Session.DUPS_OK_ACKNOWLEDGE: + break; + default: + throw new IllegalArgumentException("Unknown Acknowledge mode: " + mode + " (See javax.jms.Session for valid values)"); + + } + this.jmsAcknowledgeMode = mode; + } + + /** + * Returns the JMS Session acknowledgement mode for the JMS seesion associated with this spout. + * @return + */ + public int getJmsAcknowledgeMode(){ + return this.jmsAcknowledgeMode; + } + + /** + * Set the <code>JmsProvider</code> + * implementation that this Spout will use to connect to + * a JMS <code>javax.jms.Desination</code> + * + * @param provider + */ + public void setJmsProvider(JmsProvider provider){ + this.jmsProvider = provider; + } + /** + * Set the <code>JmsTupleProducer</code> + * implementation that will convert <code>javax.jms.Message</code> + * object to <code>backtype.storm.tuple.Values</code> objects + * to be emitted. + * + * @param producer + */ + public void setJmsTupleProducer(JmsTupleProducer producer){ + this.tupleProducer = producer; + } + + /** + * <code>javax.jms.MessageListener</code> implementation. + * <p/> + * Stored the JMS message in an internal queue for processing + * by the <code>nextTuple()</code> method. + */ + public void onMessage(Message msg) { + try { + LOG.debug("Queuing msg [" + msg.getJMSMessageID() + "]"); + } catch (JMSException e) { + } + this.queue.offer(msg); + } + + /** + * <code>ISpout</code> implementation. + * <p/> + * Connects the JMS spout to the configured JMS destination + * topic/queue. + * + */ + @SuppressWarnings("rawtypes") + public void open(Map conf, TopologyContext context, + SpoutOutputCollector collector) { + if(this.jmsProvider == null){ + throw new IllegalStateException("JMS provider has not been set."); + } + if(this.tupleProducer == null){ + throw new IllegalStateException("JMS Tuple Producer has not been set."); + } + Integer topologyTimeout = (Integer)conf.get("topology.message.timeout.secs"); + // TODO fine a way to get the default timeout from storm, so we're not hard-coding to 30 seconds (it could change) + topologyTimeout = topologyTimeout == null ? 30 : topologyTimeout; + if( (topologyTimeout.intValue() * 1000 )> this.recoveryPeriod){ + LOG.warn("*** WARNING *** : " + + "Recovery period ("+ this.recoveryPeriod + " ms.) is less then the configured " + + "'topology.message.timeout.secs' of " + topologyTimeout + + " secs. This could lead to a message replay flood!"); + } + this.queue = new LinkedBlockingQueue<Message>(); + this.toCommit = new TreeSet<JmsMessageID>(); + this.pendingMessages = new HashMap<JmsMessageID, Message>(); + this.collector = collector; + try { + ConnectionFactory cf = this.jmsProvider.connectionFactory(); + Destination dest = this.jmsProvider.destination(); + this.connection = cf.createConnection(); + this.session = connection.createSession(false, + this.jmsAcknowledgeMode); + MessageConsumer consumer = session.createConsumer(dest); + consumer.setMessageListener(this); + this.connection.start(); + if (this.isDurableSubscription() && this.recoveryPeriod > 0){ + this.recoveryTimer = new Timer(); + this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(), 10, this.recoveryPeriod); + } + + } catch (Exception e) { + LOG.warn("Error creating JMS connection.", e); + } + + } + + public void close() { + try { + LOG.debug("Closing JMS connection."); + this.session.close(); + this.connection.close(); + } catch (JMSException e) { + LOG.warn("Error closing JMS connection.", e); + } + + } + + public void nextTuple() { + Message msg = this.queue.poll(); + if (msg == null) { + Utils.sleep(50); + } else { + + LOG.debug("sending tuple: " + msg); + // get the tuple from the handler + try { + Values vals = this.tupleProducer.toTuple(msg); + // ack if we're not in AUTO_ACKNOWLEDGE mode, or the message requests ACKNOWLEDGE + LOG.debug("Requested deliveryMode: " + toDeliveryModeString(msg.getJMSDeliveryMode())); + LOG.debug("Our deliveryMode: " + toDeliveryModeString(this.jmsAcknowledgeMode)); + if (this.isDurableSubscription()) { + LOG.debug("Requesting acks."); + JmsMessageID messageId = new JmsMessageID(this.messageSequence++, msg.getJMSMessageID()); + this.collector.emit(vals, messageId); + + // at this point we successfully emitted. Store + // the message and message ID so we can do a + // JMS acknowledge later + this.pendingMessages.put(messageId, msg); + this.toCommit.add(messageId); + } else { + this.collector.emit(vals); + } + } catch (JMSException e) { + LOG.warn("Unable to convert JMS message: " + msg); + } + + } + + } + + /* + * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE + */ + public void ack(Object msgId) { + + Message msg = this.pendingMessages.remove(msgId); + JmsMessageID oldest = this.toCommit.first(); + if(msgId.equals(oldest)) { + if (msg != null) { + try { + LOG.debug("Committing..."); + msg.acknowledge(); + LOG.debug("JMS Message acked: " + msgId); + this.toCommit.remove(msgId); + } catch (JMSException e) { + LOG.warn("Error acknowldging JMS message: " + msgId, e); + } + } else { + LOG.warn("Couldn't acknowledge unknown JMS message ID: " + msgId); + } + } else { + this.toCommit.remove(msgId); + } + + } + + /* + * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE + */ + public void fail(Object msgId) { + LOG.warn("Message failed: " + msgId); + this.pendingMessages.clear(); + this.toCommit.clear(); + synchronized(this.recoveryMutex){ + this.hasFailures = true; + } + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + this.tupleProducer.declareOutputFields(declarer); + + } + + /** + * Returns <code>true</code> if the spout has received failures + * from which it has not yet recovered. + */ + public boolean hasFailures(){ + return this.hasFailures; + } + + protected void recovered(){ + this.hasFailures = false; + } + + /** + * Sets the periodicity of the timer task that + * checks for failures and recovers the JMS session. + * + * @param period + */ + public void setRecoveryPeriod(long period){ + this.recoveryPeriod = period; + } + + public boolean isDistributed() { + return this.distributed; + } + + /** + * Sets the "distributed" mode of this spout. + * <p/> + * If <code>true</code> multiple instances of this spout <i>may</i> be + * created across the cluster (depending on the "parallelism_hint" in the topology configuration). + * <p/> + * Setting this value to <code>false</code> essentially means this spout will run as a singleton + * within the cluster ("parallelism_hint" will be ignored). + * <p/> + * In general, this should be set to <code>false</code> if the underlying JMS destination is a + * topic, and <code>true</code> if it is a JMS queue. + * + * @param distributed + */ + public void setDistributed(boolean distributed){ + this.distributed = distributed; + } + + + private static final String toDeliveryModeString(int deliveryMode) { + switch (deliveryMode) { + case Session.AUTO_ACKNOWLEDGE: + return "AUTO_ACKNOWLEDGE"; + case Session.CLIENT_ACKNOWLEDGE: + return "CLIENT_ACKNOWLEDGE"; + case Session.DUPS_OK_ACKNOWLEDGE: + return "DUPS_OK_ACKNOWLEDGE"; + default: + return "UNKNOWN"; + + } + } + + protected Session getSession(){ + return this.session; + } + + private boolean isDurableSubscription(){ + return (this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE); + } + + + private class RecoveryTask extends TimerTask { + private final Logger LOG = LoggerFactory.getLogger(RecoveryTask.class); + + public void run() { + synchronized (JmsSpout.this.recoveryMutex) { + if (JmsSpout.this.hasFailures()) { + try { + LOG.info("Recovering from a message failure."); + JmsSpout.this.getSession().recover(); + JmsSpout.this.recovered(); + } catch (JMSException e) { + LOG.warn("Could not recover jms session.", e); + } + } + } + } + + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/trident/JmsBatch.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/storm/jms/trident/JmsBatch.java b/src/main/java/org/apache/storm/jms/trident/JmsBatch.java new file mode 100644 index 0000000..3a7981f --- /dev/null +++ b/src/main/java/org/apache/storm/jms/trident/JmsBatch.java @@ -0,0 +1,10 @@ +package org.apache.storm.jms.trident; + +/** + * Batch coordination metadata object for the TridentJmsSpout. + * This implementation does not use batch metadata, so the object is empty. + * + */ +public class JmsBatch { + // Empty class +} http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/trident/JmsState.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/storm/jms/trident/JmsState.java b/src/main/java/org/apache/storm/jms/trident/JmsState.java new file mode 100644 index 0000000..0694098 --- /dev/null +++ b/src/main/java/org/apache/storm/jms/trident/JmsState.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.trident; + +import org.apache.storm.jms.JmsProvider; +import backtype.storm.topology.FailedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import storm.trident.operation.TridentCollector; +import storm.trident.state.State; +import storm.trident.tuple.TridentTuple; + +import javax.jms.*; +import java.io.Serializable; +import java.lang.IllegalStateException; +import java.util.List; + +public class JmsState implements State { + + private static final Logger LOG = LoggerFactory.getLogger(JmsState.class); + + private Options options; + private Connection connection; + private Session session; + private MessageProducer messageProducer; + + protected JmsState(Options options) { + this.options = options; + } + + public static class Options implements Serializable { + private JmsProvider jmsProvider; + private TridentJmsMessageProducer msgProducer; + private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; + private boolean jmsTransactional = true; + + public Options withJmsProvider(JmsProvider provider) { + this.jmsProvider = provider; + return this; + } + + public Options withMessageProducer(TridentJmsMessageProducer msgProducer) { + this.msgProducer = msgProducer; + return this; + } + + public Options withJmsAcknowledgeMode(int jmsAcknowledgeMode) { + this.jmsAcknowledgeMode = jmsAcknowledgeMode; + return this; + } + + public Options withJmsTransactional(boolean jmsTransactional) { + this.jmsTransactional = jmsTransactional; + return this; + } + } + + protected void prepare() { + if(this.options.jmsProvider == null || this.options.msgProducer == null){ + throw new IllegalStateException("JMS Provider and MessageProducer not set."); + } + LOG.debug("Connecting JMS.."); + try { + ConnectionFactory cf = this.options.jmsProvider.connectionFactory(); + Destination dest = this.options.jmsProvider.destination(); + this.connection = cf.createConnection(); + this.session = connection.createSession(this.options.jmsTransactional, + this.options.jmsAcknowledgeMode); + this.messageProducer = session.createProducer(dest); + + connection.start(); + } catch (Exception e) { + LOG.warn("Error creating JMS connection.", e); + } + } + + @Override + public void beginCommit(Long aLong) { + } + + @Override + public void commit(Long aLong) { + LOG.debug("Committing JMS transaction."); + if(this.options.jmsTransactional) { + try { + session.commit(); + } catch(JMSException e){ + LOG.error("JMS Session commit failed.", e); + } + } + } + + public void updateState(List<TridentTuple> tuples, TridentCollector collector) throws JMSException { + try { + for(TridentTuple tuple : tuples) { + Message msg = this.options.msgProducer.toMessage(this.session, tuple); + if (msg != null) { + if (msg.getJMSDestination() != null) { + this.messageProducer.send(msg.getJMSDestination(), msg); + } else { + this.messageProducer.send(msg); + } + } + } + } catch (JMSException e) { + LOG.warn("Failed to send jmd message for a trident batch ", e); + if(this.options.jmsTransactional) { + session.rollback(); + } + throw new FailedException("Failed to write tuples", e); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java b/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java new file mode 100644 index 0000000..7fe4d85 --- /dev/null +++ b/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.trident; + +import backtype.storm.task.IMetricsContext; +import storm.trident.state.State; +import storm.trident.state.StateFactory; + +import java.util.Map; + +public class JmsStateFactory implements StateFactory { + + private JmsState.Options options; + + public JmsStateFactory(JmsState.Options options) { + this.options = options; + } + + @Override + public State makeState(Map map, IMetricsContext iMetricsContext, int partitionIndex, int numPartitions) { + JmsState state = new JmsState(options); + state.prepare(); + return state; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java b/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java new file mode 100644 index 0000000..3269521 --- /dev/null +++ b/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.trident; + +import backtype.storm.topology.FailedException; +import storm.trident.operation.TridentCollector; +import storm.trident.state.BaseStateUpdater; +import storm.trident.tuple.TridentTuple; + +import javax.jms.JMSException; +import java.util.List; + +public class JmsUpdater extends BaseStateUpdater<JmsState> { + + @Override + public void updateState(JmsState jmsState, List<TridentTuple> tuples, TridentCollector collector) { + try { + jmsState.updateState(tuples, collector); + } catch (JMSException e) { + throw new FailedException("failed JMS opetation", e); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/trident/TridentJmsMessageProducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/storm/jms/trident/TridentJmsMessageProducer.java b/src/main/java/org/apache/storm/jms/trident/TridentJmsMessageProducer.java new file mode 100644 index 0000000..8312947 --- /dev/null +++ b/src/main/java/org/apache/storm/jms/trident/TridentJmsMessageProducer.java @@ -0,0 +1,23 @@ +package org.apache.storm.jms.trident; + +import backtype.storm.tuple.Tuple; +import storm.trident.tuple.TridentTuple; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; +import java.io.Serializable; + +public interface TridentJmsMessageProducer extends Serializable{ + + /** + * Translate a <code>backtype.storm.tuple.TridentTuple</code> object + * to a <code>javax.jms.Message</code object. + * + * @param session + * @param input + * @return + * @throws javax.jms.JMSException + */ + public Message toMessage(Session session, TridentTuple input) throws JMSException; +} http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java b/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java new file mode 100644 index 0000000..6b80919 --- /dev/null +++ b/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java @@ -0,0 +1,393 @@ +package org.apache.storm.jms.trident; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.storm.jms.JmsProvider; +import org.apache.storm.jms.JmsTupleProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import storm.trident.operation.TridentCollector; +import storm.trident.spout.ITridentSpout; +import storm.trident.topology.TransactionAttempt; +import backtype.storm.Config; +import backtype.storm.generated.StreamInfo; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsGetter; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import backtype.storm.utils.RotatingMap; +import backtype.storm.utils.Utils; + +/** + * Trident implementation of the JmsSpout, based on code provided by P. Taylor Goetz - https://github.com/ptgoetz + * <p> + * @author Andy Toone for Metabroadcast + * + */ +public class TridentJmsSpout implements ITridentSpout<JmsBatch> { + + public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size"; + + public static final int DEFAULT_BATCH_SIZE = 1000; + + private static final long serialVersionUID = -3469351154693356655L; + + private JmsTupleProducer tupleProducer; + + private JmsProvider jmsProvider; + + private int jmsAcknowledgeMode; + + private String name; + + private static int nameIndex = 1; + + /** + * Create a TridentJmsSpout with a default name and acknowledge mode of AUTO_ACKNOWLEDGE + */ + public TridentJmsSpout() { + this.name = "JmsSpout_"+(nameIndex++); + this.jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; + } + + /** + * Set the name for this spout, to improve log identification + * @param name The name to be used in log messages + * @return This spout + */ + public TridentJmsSpout named(String name) { + this.name = name; + return this; + } + + /** + * Set the <code>JmsProvider</code> + * implementation that this Spout will use to connect to + * a JMS <code>javax.jms.Desination</code> + * + * @param provider + */ + public TridentJmsSpout withJmsProvider(JmsProvider provider){ + this.jmsProvider = provider; + return this; + } + + /** + * Set the <code>JmsTupleProducer</code> + * implementation that will convert <code>javax.jms.Message</code> + * object to <code>backtype.storm.tuple.Values</code> objects + * to be emitted. + * + * @param tupleProducer + * @return This spout + */ + public TridentJmsSpout withTupleProducer(JmsTupleProducer tupleProducer) { + this.tupleProducer = tupleProducer; + return this; + } + + /** + * Set the JMS acknowledge mode for messages being processed by this spout. + * <p/> + * Possible values: + * <ul> + * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li> + * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li> + * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li> + * </ul> + * @param jmsAcknowledgeMode The chosen acknowledge mode + * @return This spout + * @throws IllegalArgumentException if the mode is not recognized + */ + public TridentJmsSpout withJmsAcknowledgeMode(int jmsAcknowledgeMode) { + toDeliveryModeString(jmsAcknowledgeMode); + this.jmsAcknowledgeMode = jmsAcknowledgeMode; + return this; + } + + /** + * Return a friendly string for the given JMS acknowledge mode, or throw an IllegalArgumentException if + * the mode is not recognized. + * <p/> + * Possible values: + * <ul> + * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li> + * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li> + * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li> + * </ul> + * @param acknowledgeMode A valid JMS acknowledge mode + * @return A friendly string describing the acknowledge mode + * @throws IllegalArgumentException if the mode is not recognized + */ + private static final String toDeliveryModeString(int acknowledgeMode) { + switch (acknowledgeMode) { + case Session.AUTO_ACKNOWLEDGE: + return "AUTO_ACKNOWLEDGE"; + case Session.CLIENT_ACKNOWLEDGE: + return "CLIENT_ACKNOWLEDGE"; + case Session.DUPS_OK_ACKNOWLEDGE: + return "DUPS_OK_ACKNOWLEDGE"; + default: + throw new IllegalArgumentException("Unknown JMS Acknowledge mode " + acknowledgeMode + " (See javax.jms.Session for valid values)"); + } + } + + @Override + public storm.trident.spout.ITridentSpout.BatchCoordinator<JmsBatch> getCoordinator( + String txStateId, @SuppressWarnings("rawtypes") Map conf, TopologyContext context) { + return new JmsBatchCoordinator(name); + } + + @Override + public Emitter<JmsBatch> getEmitter(String txStateId, @SuppressWarnings("rawtypes") Map conf, TopologyContext context) { + return new JmsEmitter(name, jmsProvider, tupleProducer, jmsAcknowledgeMode, conf); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + + @Override + public Fields getOutputFields() { + OutputFieldsGetter fieldGetter = new OutputFieldsGetter(); + tupleProducer.declareOutputFields(fieldGetter); + StreamInfo streamInfo = fieldGetter.getFieldsDeclaration().get(Utils.DEFAULT_STREAM_ID); + if (streamInfo == null) { + throw new IllegalArgumentException("Jms Tuple producer has not declared output fields for the default stream"); + } + + return new Fields(streamInfo.get_output_fields()); + } + + /** + * The JmsEmitter class listens for incoming messages and stores them in a blocking queue. On each invocation of emit, + * the queued messages are emitted as a batch. + * + */ + private class JmsEmitter implements Emitter<JmsBatch>, MessageListener { + + private final LinkedBlockingQueue<Message> queue; + private final Connection connection; + private final Session session; + + private final RotatingMap<Long, List<Message>> batchMessageMap; // Maps transaction Ids to JMS message ids. + + private final long rotateTimeMillis; + private final int maxBatchSize; + private final String name; + + private long lastRotate; + + private final Logger LOG = LoggerFactory.getLogger(JmsEmitter.class); + + public JmsEmitter(String name, JmsProvider jmsProvider, JmsTupleProducer tupleProducer, int jmsAcknowledgeMode, @SuppressWarnings("rawtypes") Map conf) { + if (jmsProvider == null) { + throw new IllegalStateException("JMS provider has not been set."); + } + if (tupleProducer == null) { + throw new IllegalStateException("JMS Tuple Producer has not been set."); + } + + this.queue = new LinkedBlockingQueue<Message>(); + this.name = name; + + batchMessageMap = new RotatingMap<Long, List<Message>>(3); + rotateTimeMillis = 1000L * ((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue(); + lastRotate = System.currentTimeMillis(); + + Number batchSize = (Number) conf.get(MAX_BATCH_SIZE_CONF); + maxBatchSize = batchSize != null ? batchSize.intValue() : DEFAULT_BATCH_SIZE; + + try { + ConnectionFactory cf = jmsProvider.connectionFactory(); + Destination dest = jmsProvider.destination(); + this.connection = cf.createConnection(); + this.session = connection.createSession(false, jmsAcknowledgeMode); + MessageConsumer consumer = session.createConsumer(dest); + consumer.setMessageListener(this); + this.connection.start(); + + LOG.info("Created JmsEmitter with max batch size "+maxBatchSize+" rotate time "+rotateTimeMillis+"ms and destination "+dest+" for "+name); + + } catch (Exception e) { + LOG.warn("Error creating JMS connection.", e); + throw new IllegalStateException("Could not create JMS connection for spout ", e); + } + + } + + @Override + public void success(TransactionAttempt tx) { + + @SuppressWarnings("unchecked") + List<Message> messages = (List<Message>) batchMessageMap.remove(tx.getTransactionId()); + + if (messages != null) { + if (!messages.isEmpty()) { + LOG.debug("Success for batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name); + } + + for (Message msg: messages) { + String messageId = "UnknownId"; + + try { + messageId = msg.getJMSMessageID(); + msg.acknowledge(); + LOG.trace("Acknowledged message "+messageId); + } catch (JMSException e) { + LOG.warn("Failed to acknowledge message "+messageId, e); + } + } + } + else { + LOG.warn("No messages found in batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()); + } + } + + /** + * Fail a batch with the given transaction id. This is called when a batch is timed out, or a new batch with a + * matching transaction id is emitted. Note that the current implementation does nothing - i.e. it discards + * messages that have been failed. + * @param transactionId The transaction id of the failed batch + * @param messages The list of messages to fail. + */ + private void fail(Long transactionId, List<Message> messages) { + LOG.debug("Failure for batch with transaction id "+transactionId+" for "+name); + if (messages != null) { + for (Message msg: messages) { + try { + LOG.trace("Failed message "+msg.getJMSMessageID()); + } catch (JMSException e) { + LOG.warn("Could not identify failed message ", e); + } + } + } + else { + LOG.warn("Failed batch has no messages with transaction id "+transactionId); + } + } + + @Override + public void close() { + try { + LOG.info("Closing JMS connection."); + this.session.close(); + this.connection.close(); + } catch (JMSException e) { + LOG.warn("Error closing JMS connection.", e); + } + } + + @Override + public void emitBatch(TransactionAttempt tx, JmsBatch coordinatorMeta, + TridentCollector collector) { + + long now = System.currentTimeMillis(); + if(now - lastRotate > rotateTimeMillis) { + Map<Long, List<Message>> failed = batchMessageMap.rotate(); + for(Long id: failed.keySet()) { + LOG.warn("TIMED OUT batch with transaction id "+id+" for "+name); + fail(id, failed.get(id)); + } + lastRotate = now; + } + + if(batchMessageMap.containsKey(tx.getTransactionId())) { + LOG.warn("FAILED duplicate batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name); + fail(tx.getTransactionId(), batchMessageMap.get(tx.getTransactionId())); + } + + List<Message> batchMessages = new ArrayList<Message>(); + + for (int index=0; index<maxBatchSize; index++) { + Message msg = queue.poll(); + if (msg == null) { + Utils.sleep(50); // Back off + break; + } + + try { + if (TridentJmsSpout.this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE) { + batchMessages.add(msg); + } + Values tuple = tupleProducer.toTuple(msg); + collector.emit(tuple); + } catch (JMSException e) { + LOG.warn("Failed to emit message, could not retrieve data for "+name+": "+e ); + } + } + + if (!batchMessages.isEmpty()) { + LOG.debug("Emitting batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" and size "+batchMessages.size()+" for "+name); + } + else { + LOG.trace("No items to acknowledge for batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name); + } + batchMessageMap.put(tx.getTransactionId(), batchMessages); + } + + @Override + public void onMessage(Message msg) { + try { + LOG.trace("Queuing msg [" + msg.getJMSMessageID() + "]"); + } catch (JMSException e) { + // Nothing here, could not get message id + } + this.queue.offer(msg); + } + + } + + /** + * Bare implementation of a BatchCoordinator, returning a null JmsBatch object + * + */ + private class JmsBatchCoordinator implements BatchCoordinator<JmsBatch> { + + private final String name; + + private final Logger LOG = LoggerFactory.getLogger(JmsBatchCoordinator.class); + + public JmsBatchCoordinator(String name) { + this.name = name; + LOG.info("Created batch coordinator for "+name); + } + + @Override + public JmsBatch initializeTransaction(long txid, JmsBatch prevMetadata, JmsBatch curMetadata) { + LOG.debug("Initialise transaction "+txid+" for "+name); + return null; + } + + @Override + public void success(long txid) { + } + + @Override + public boolean isReady(long txid) { + return true; + } + + @Override + public void close() { + } + + } + +} + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/test/java/backtype/storm/contrib/jms/spout/JmsSpoutTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/backtype/storm/contrib/jms/spout/JmsSpoutTest.java b/src/test/java/backtype/storm/contrib/jms/spout/JmsSpoutTest.java deleted file mode 100644 index 1d05687..0000000 --- a/src/test/java/backtype/storm/contrib/jms/spout/JmsSpoutTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.contrib.jms.spout; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.util.HashMap; - -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.junit.Assert; -import org.junit.Test; -import org.mortbay.log.Log; - -import backtype.storm.contrib.jms.JmsProvider; -import backtype.storm.spout.SpoutOutputCollector; - -public class JmsSpoutTest { - @Test - public void testFailure() throws JMSException, Exception{ - JmsSpout spout = new JmsSpout(); - JmsProvider mockProvider = new MockJmsProvider(); - MockSpoutOutputCollector mockCollector = new MockSpoutOutputCollector(); - SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector); - spout.setJmsProvider(new MockJmsProvider()); - spout.setJmsTupleProducer(new MockTupleProducer()); - spout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); - spout.setRecoveryPeriod(10); // Rapid recovery for testing. - spout.open(new HashMap<String,String>(), null, collector); - Message msg = this.sendMessage(mockProvider.connectionFactory(), mockProvider.destination()); - Thread.sleep(100); - spout.nextTuple(); // Pretend to be storm. - Assert.assertTrue(mockCollector.emitted); - - mockCollector.reset(); - spout.fail(msg.getJMSMessageID()); // Mock failure - Thread.sleep(5000); - spout.nextTuple(); // Pretend to be storm. - Thread.sleep(5000); - Assert.assertTrue(mockCollector.emitted); // Should have been re-emitted - } - - @Test - public void testSerializability() throws IOException{ - JmsSpout spout = new JmsSpout(); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(out); - oos.writeObject(spout); - oos.close(); - Assert.assertTrue(out.toByteArray().length > 0); - } - - public Message sendMessage(ConnectionFactory connectionFactory, Destination destination) throws JMSException { - Session mySess = connectionFactory.createConnection().createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageProducer producer = mySess.createProducer(destination); - TextMessage msg = mySess.createTextMessage(); - msg.setText("Hello World"); - Log.debug("Sending Message: " + msg.getText()); - producer.send(msg); - return msg; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/test/java/backtype/storm/contrib/jms/spout/MockJmsProvider.java ---------------------------------------------------------------------- diff --git a/src/test/java/backtype/storm/contrib/jms/spout/MockJmsProvider.java b/src/test/java/backtype/storm/contrib/jms/spout/MockJmsProvider.java deleted file mode 100644 index 2dacca5..0000000 --- a/src/test/java/backtype/storm/contrib/jms/spout/MockJmsProvider.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.contrib.jms.spout; - -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; - -import org.apache.activemq.ActiveMQConnectionFactory; - -import backtype.storm.contrib.jms.JmsProvider; - -public class MockJmsProvider implements JmsProvider { - private static final long serialVersionUID = 1L; - - private ConnectionFactory connectionFactory = null; - private Destination destination = null; - - public MockJmsProvider() throws NamingException{ - this.connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); - Context jndiContext = new InitialContext(); - this.destination = (Destination) jndiContext.lookup("dynamicQueues/FOO.BAR"); - - } - - /** - * Provides the JMS <code>ConnectionFactory</code> - * @return the connection factory - * @throws Exception - */ - public ConnectionFactory connectionFactory() throws Exception{ - return this.connectionFactory; - } - - /** - * Provides the <code>Destination</code> (topic or queue) from which the - * <code>JmsSpout</code> will receive messages. - * @return - * @throws Exception - */ - public Destination destination() throws Exception{ - return this.destination; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/test/java/backtype/storm/contrib/jms/spout/MockSpoutOutputCollector.java ---------------------------------------------------------------------- diff --git a/src/test/java/backtype/storm/contrib/jms/spout/MockSpoutOutputCollector.java b/src/test/java/backtype/storm/contrib/jms/spout/MockSpoutOutputCollector.java deleted file mode 100644 index d9f7fac..0000000 --- a/src/test/java/backtype/storm/contrib/jms/spout/MockSpoutOutputCollector.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.contrib.jms.spout; - -import java.util.ArrayList; -import java.util.List; - -import backtype.storm.spout.ISpoutOutputCollector; - -public class MockSpoutOutputCollector implements ISpoutOutputCollector { - boolean emitted = false; - - @Override - public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { - emitted = true; - return new ArrayList<Integer>(); - } - - @Override - public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) { - emitted = true; - } - - @Override - public void reportError(Throwable error) { - } - - public boolean emitted(){ - return this.emitted; - } - - public void reset(){ - this.emitted = false; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/test/java/backtype/storm/contrib/jms/spout/MockTupleProducer.java ---------------------------------------------------------------------- diff --git a/src/test/java/backtype/storm/contrib/jms/spout/MockTupleProducer.java b/src/test/java/backtype/storm/contrib/jms/spout/MockTupleProducer.java deleted file mode 100644 index 92b6788..0000000 --- a/src/test/java/backtype/storm/contrib/jms/spout/MockTupleProducer.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.contrib.jms.spout; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.TextMessage; - -import backtype.storm.contrib.jms.JmsTupleProducer; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; - -public class MockTupleProducer implements JmsTupleProducer { - private static final long serialVersionUID = 1L; - - @Override - public Values toTuple(Message msg) throws JMSException { - if (msg instanceof TextMessage) { - String json = ((TextMessage) msg).getText(); - return new Values(json); - } else { - return null; - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("json")); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java b/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java new file mode 100644 index 0000000..42ce573 --- /dev/null +++ b/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.spout; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.HashMap; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.junit.Assert; +import org.junit.Test; +import org.mortbay.log.Log; + +import org.apache.storm.jms.JmsProvider; +import backtype.storm.spout.SpoutOutputCollector; + +public class JmsSpoutTest { + @Test + public void testFailure() throws JMSException, Exception{ + JmsSpout spout = new JmsSpout(); + JmsProvider mockProvider = new MockJmsProvider(); + MockSpoutOutputCollector mockCollector = new MockSpoutOutputCollector(); + SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector); + spout.setJmsProvider(new MockJmsProvider()); + spout.setJmsTupleProducer(new MockTupleProducer()); + spout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); + spout.setRecoveryPeriod(10); // Rapid recovery for testing. + spout.open(new HashMap<String,String>(), null, collector); + Message msg = this.sendMessage(mockProvider.connectionFactory(), mockProvider.destination()); + Thread.sleep(100); + spout.nextTuple(); // Pretend to be storm. + Assert.assertTrue(mockCollector.emitted); + + mockCollector.reset(); + spout.fail(msg.getJMSMessageID()); // Mock failure + Thread.sleep(5000); + spout.nextTuple(); // Pretend to be storm. + Thread.sleep(5000); + Assert.assertTrue(mockCollector.emitted); // Should have been re-emitted + } + + @Test + public void testSerializability() throws IOException{ + JmsSpout spout = new JmsSpout(); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(out); + oos.writeObject(spout); + oos.close(); + Assert.assertTrue(out.toByteArray().length > 0); + } + + public Message sendMessage(ConnectionFactory connectionFactory, Destination destination) throws JMSException { + Session mySess = connectionFactory.createConnection().createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = mySess.createProducer(destination); + TextMessage msg = mySess.createTextMessage(); + msg.setText("Hello World"); + Log.debug("Sending Message: " + msg.getText()); + producer.send(msg); + return msg; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java b/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java new file mode 100644 index 0000000..3ba0853 --- /dev/null +++ b/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.spout; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import org.apache.activemq.ActiveMQConnectionFactory; + +import org.apache.storm.jms.JmsProvider; + +public class MockJmsProvider implements JmsProvider { + private static final long serialVersionUID = 1L; + + private ConnectionFactory connectionFactory = null; + private Destination destination = null; + + public MockJmsProvider() throws NamingException{ + this.connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + Context jndiContext = new InitialContext(); + this.destination = (Destination) jndiContext.lookup("dynamicQueues/FOO.BAR"); + + } + + /** + * Provides the JMS <code>ConnectionFactory</code> + * @return the connection factory + * @throws Exception + */ + public ConnectionFactory connectionFactory() throws Exception{ + return this.connectionFactory; + } + + /** + * Provides the <code>Destination</code> (topic or queue) from which the + * <code>JmsSpout</code> will receive messages. + * @return + * @throws Exception + */ + public Destination destination() throws Exception{ + return this.destination; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java b/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java new file mode 100644 index 0000000..8a535ce --- /dev/null +++ b/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.spout; + +import java.util.ArrayList; +import java.util.List; + +import backtype.storm.spout.ISpoutOutputCollector; + +public class MockSpoutOutputCollector implements ISpoutOutputCollector { + boolean emitted = false; + + @Override + public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { + emitted = true; + return new ArrayList<Integer>(); + } + + @Override + public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) { + emitted = true; + } + + @Override + public void reportError(Throwable error) { + } + + public boolean emitted(){ + return this.emitted; + } + + public void reset(){ + this.emitted = false; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d152d72f/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java b/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java new file mode 100644 index 0000000..f5eda10 --- /dev/null +++ b/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.spout; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.TextMessage; + +import org.apache.storm.jms.JmsTupleProducer; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; + +public class MockTupleProducer implements JmsTupleProducer { + private static final long serialVersionUID = 1L; + + @Override + public Values toTuple(Message msg) throws JMSException { + if (msg instanceof TextMessage) { + String json = ((TextMessage) msg).getText(); + return new Values(json); + } else { + return null; + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("json")); + } + +}