Repository: storm Updated Branches: refs/heads/1.x-branch d6fa3d411 -> 27b269e82
http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java new file mode 100644 index 0000000..6aaa7c9 --- /dev/null +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java @@ -0,0 +1,382 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.spout; + +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.storm.topology.base.BaseRichSpout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.storm.jms.JmsProvider; +import org.apache.storm.jms.JmsTupleProducer; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; + +/** + * A Storm <code>Spout</code> implementation that listens to a JMS topic or queue + * and outputs tuples based on the messages it receives. + * <p> + * <code>JmsSpout</code> instances rely on <code>JmsProducer</code> implementations + * to obtain the JMS <code>ConnectionFactory</code> and <code>Destination</code> objects + * necessary to connect to a JMS topic/queue. + * <p> + * When a <code>JmsSpout</code> receives a JMS message, it delegates to an + * internal <code>JmsTupleProducer</code> instance to create a Storm tuple from the + * incoming message. + * <p> + * Typically, developers will supply a custom <code>JmsTupleProducer</code> implementation + * appropriate for the expected message content. + */ +@SuppressWarnings("serial") +public class JmsSpout extends BaseRichSpout implements MessageListener { + private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class); + + // JMS options + private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; + + private boolean distributed = true; + + private JmsTupleProducer tupleProducer; + + private JmsProvider jmsProvider; + + private LinkedBlockingQueue<Message> queue; + private TreeSet<JmsMessageID> toCommit; + private HashMap<JmsMessageID, Message> pendingMessages; + private long messageSequence = 0; + + private SpoutOutputCollector collector; + + private transient Connection connection; + private transient Session session; + + private boolean hasFailures = false; + public final Serializable recoveryMutex = "RECOVERY_MUTEX"; + private Timer recoveryTimer = null; + private long recoveryPeriod = -1; // default to disabled + + /** + * Sets the JMS Session acknowledgement mode for the JMS seesion associated with this spout. + * <p> + * Possible values: + * <ul> + * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li> + * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li> + * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li> + * </ul> + * + * @param mode JMS Session Acknowledgement mode + * @throws IllegalArgumentException if the mode is not recognized. + */ + public void setJmsAcknowledgeMode(int mode) { + switch (mode) { + case Session.AUTO_ACKNOWLEDGE: + case Session.CLIENT_ACKNOWLEDGE: + case Session.DUPS_OK_ACKNOWLEDGE: + break; + default: + throw new IllegalArgumentException("Unknown Acknowledge mode: " + mode + " (See javax.jms.Session for valid values)"); + + } + this.jmsAcknowledgeMode = mode; + } + + /** + * Returns the JMS Session acknowledgement mode for the JMS seesion associated with this spout. + * + * @return + */ + public int getJmsAcknowledgeMode() { + return this.jmsAcknowledgeMode; + } + + /** + * Set the <code>JmsProvider</code> + * implementation that this Spout will use to connect to + * a JMS <code>javax.jms.Desination</code> + * + * @param provider + */ + public void setJmsProvider(JmsProvider provider) { + this.jmsProvider = provider; + } + + /** + * Set the <code>JmsTupleProducer</code> + * implementation that will convert <code>javax.jms.Message</code> + * object to <code>org.apache.storm.tuple.Values</code> objects + * to be emitted. + * + * @param producer + */ + public void setJmsTupleProducer(JmsTupleProducer producer) { + this.tupleProducer = producer; + } + + /** + * <code>javax.jms.MessageListener</code> implementation. + * <p> + * Stored the JMS message in an internal queue for processing + * by the <code>nextTuple()</code> method. + */ + public void onMessage(Message msg) { + try { + LOG.debug("Queuing msg [" + msg.getJMSMessageID() + "]"); + } catch (JMSException e) { + } + this.queue.offer(msg); + } + + /** + * <code>ISpout</code> implementation. + * <p> + * Connects the JMS spout to the configured JMS destination + * topic/queue. + */ + @SuppressWarnings("rawtypes") + public void open(Map conf, TopologyContext context, + SpoutOutputCollector collector) { + if (this.jmsProvider == null) { + throw new IllegalStateException("JMS provider has not been set."); + } + if (this.tupleProducer == null) { + throw new IllegalStateException("JMS Tuple Producer has not been set."); + } + Integer topologyTimeout = (Integer) conf.get("topology.message.timeout.secs"); + // TODO fine a way to get the default timeout from storm, so we're not hard-coding to 30 seconds (it could change) + topologyTimeout = topologyTimeout == null ? 30 : topologyTimeout; + if ((topologyTimeout.intValue() * 1000) > this.recoveryPeriod) { + LOG.warn("*** WARNING *** : " + + "Recovery period (" + this.recoveryPeriod + " ms.) is less then the configured " + + "'topology.message.timeout.secs' of " + topologyTimeout + + " secs. This could lead to a message replay flood!"); + } + this.queue = new LinkedBlockingQueue<Message>(); + this.toCommit = new TreeSet<JmsMessageID>(); + this.pendingMessages = new HashMap<JmsMessageID, Message>(); + this.collector = collector; + try { + ConnectionFactory cf = this.jmsProvider.connectionFactory(); + Destination dest = this.jmsProvider.destination(); + this.connection = cf.createConnection(); + this.session = connection.createSession(false, + this.jmsAcknowledgeMode); + MessageConsumer consumer = session.createConsumer(dest); + consumer.setMessageListener(this); + this.connection.start(); + if (this.isDurableSubscription() && this.recoveryPeriod > 0) { + this.recoveryTimer = new Timer(); + this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(), 10, this.recoveryPeriod); + } + + } catch (Exception e) { + LOG.warn("Error creating JMS connection.", e); + } + + } + + public void close() { + try { + LOG.debug("Closing JMS connection."); + this.session.close(); + this.connection.close(); + } catch (JMSException e) { + LOG.warn("Error closing JMS connection.", e); + } + + } + + public void nextTuple() { + Message msg = this.queue.poll(); + if (msg == null) { + Utils.sleep(50); + } else { + + LOG.debug("sending tuple: " + msg); + // get the tuple from the handler + try { + Values vals = this.tupleProducer.toTuple(msg); + // ack if we're not in AUTO_ACKNOWLEDGE mode, or the message requests ACKNOWLEDGE + LOG.debug("Requested deliveryMode: " + toDeliveryModeString(msg.getJMSDeliveryMode())); + LOG.debug("Our deliveryMode: " + toDeliveryModeString(this.jmsAcknowledgeMode)); + if (this.isDurableSubscription()) { + LOG.debug("Requesting acks."); + JmsMessageID messageId = new JmsMessageID(this.messageSequence++, msg.getJMSMessageID()); + this.collector.emit(vals, messageId); + + // at this point we successfully emitted. Store + // the message and message ID so we can do a + // JMS acknowledge later + this.pendingMessages.put(messageId, msg); + this.toCommit.add(messageId); + } else { + this.collector.emit(vals); + } + } catch (JMSException e) { + LOG.warn("Unable to convert JMS message: " + msg); + } + + } + + } + + /* + * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE + */ + public void ack(Object msgId) { + + Message msg = this.pendingMessages.remove(msgId); + JmsMessageID oldest = this.toCommit.first(); + if (msgId.equals(oldest)) { + if (msg != null) { + try { + LOG.debug("Committing..."); + msg.acknowledge(); + LOG.debug("JMS Message acked: " + msgId); + this.toCommit.remove(msgId); + } catch (JMSException e) { + LOG.warn("Error acknowldging JMS message: " + msgId, e); + } + } else { + LOG.warn("Couldn't acknowledge unknown JMS message ID: " + msgId); + } + } else { + this.toCommit.remove(msgId); + } + + } + + /* + * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE + */ + public void fail(Object msgId) { + LOG.warn("Message failed: " + msgId); + this.pendingMessages.clear(); + this.toCommit.clear(); + synchronized (this.recoveryMutex) { + this.hasFailures = true; + } + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + this.tupleProducer.declareOutputFields(declarer); + + } + + /** + * Returns <code>true</code> if the spout has received failures + * from which it has not yet recovered. + */ + public boolean hasFailures() { + return this.hasFailures; + } + + protected void recovered() { + this.hasFailures = false; + } + + /** + * Sets the periodicity of the timer task that + * checks for failures and recovers the JMS session. + * + * @param period + */ + public void setRecoveryPeriod(long period) { + this.recoveryPeriod = period; + } + + public boolean isDistributed() { + return this.distributed; + } + + /** + * Sets the "distributed" mode of this spout. + * <p> + * If <code>true</code> multiple instances of this spout <i>may</i> be + * created across the cluster (depending on the "parallelism_hint" in the topology configuration). + * <p> + * Setting this value to <code>false</code> essentially means this spout will run as a singleton + * within the cluster ("parallelism_hint" will be ignored). + * <p> + * In general, this should be set to <code>false</code> if the underlying JMS destination is a + * topic, and <code>true</code> if it is a JMS queue. + * + * @param distributed + */ + public void setDistributed(boolean distributed) { + this.distributed = distributed; + } + + + private static final String toDeliveryModeString(int deliveryMode) { + switch (deliveryMode) { + case Session.AUTO_ACKNOWLEDGE: + return "AUTO_ACKNOWLEDGE"; + case Session.CLIENT_ACKNOWLEDGE: + return "CLIENT_ACKNOWLEDGE"; + case Session.DUPS_OK_ACKNOWLEDGE: + return "DUPS_OK_ACKNOWLEDGE"; + default: + return "UNKNOWN"; + + } + } + + protected Session getSession() { + return this.session; + } + + private boolean isDurableSubscription() { + return (this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE); + } + + + private class RecoveryTask extends TimerTask { + private final Logger LOG = LoggerFactory.getLogger(RecoveryTask.class); + + public void run() { + synchronized (JmsSpout.this.recoveryMutex) { + if (JmsSpout.this.hasFailures()) { + try { + LOG.info("Recovering from a message failure."); + JmsSpout.this.getSession().recover(); + JmsSpout.this.recovered(); + } catch (JMSException e) { + LOG.warn("Could not recover jms session.", e); + } + } + } + } + + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsBatch.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsBatch.java b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsBatch.java new file mode 100644 index 0000000..c990058 --- /dev/null +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsBatch.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.trident; + +/** + * Batch coordination metadata object for the TridentJmsSpout. + * This implementation does not use batch metadata, so the object is empty. + * + */ +public class JmsBatch { + // Empty class +} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsState.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsState.java b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsState.java new file mode 100644 index 0000000..bfb78b5 --- /dev/null +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsState.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.trident; + +import org.apache.storm.jms.JmsMessageProducer; +import org.apache.storm.jms.JmsProvider; +import org.apache.storm.topology.FailedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.tuple.TridentTuple; + +import javax.jms.*; +import java.io.Serializable; +import java.lang.IllegalStateException; +import java.util.List; + +public class JmsState implements State { + + private static final Logger LOG = LoggerFactory.getLogger(JmsState.class); + + private Options options; + private Connection connection; + private Session session; + private MessageProducer messageProducer; + + protected JmsState(Options options) { + this.options = options; + } + + public static class Options implements Serializable { + private JmsProvider jmsProvider; + private JmsMessageProducer msgProducer; + private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; + private boolean jmsTransactional = true; + + public Options withJmsProvider(JmsProvider provider) { + this.jmsProvider = provider; + return this; + } + + public Options withMessageProducer(JmsMessageProducer msgProducer) { + this.msgProducer = msgProducer; + return this; + } + + public Options withJmsAcknowledgeMode(int jmsAcknowledgeMode) { + this.jmsAcknowledgeMode = jmsAcknowledgeMode; + return this; + } + + public Options withJmsTransactional(boolean jmsTransactional) { + this.jmsTransactional = jmsTransactional; + return this; + } + } + + protected void prepare() { + if(this.options.jmsProvider == null || this.options.msgProducer == null){ + throw new IllegalStateException("JMS Provider and MessageProducer not set."); + } + LOG.debug("Connecting JMS.."); + try { + ConnectionFactory cf = this.options.jmsProvider.connectionFactory(); + Destination dest = this.options.jmsProvider.destination(); + this.connection = cf.createConnection(); + this.session = connection.createSession(this.options.jmsTransactional, + this.options.jmsAcknowledgeMode); + this.messageProducer = session.createProducer(dest); + + connection.start(); + } catch (Exception e) { + LOG.warn("Error creating JMS connection.", e); + } + } + + @Override + public void beginCommit(Long aLong) { + } + + @Override + public void commit(Long aLong) { + LOG.debug("Committing JMS transaction."); + if(this.options.jmsTransactional) { + try { + session.commit(); + } catch(JMSException e){ + LOG.error("JMS Session commit failed.", e); + } + } + } + + public void updateState(List<TridentTuple> tuples, TridentCollector collector) throws JMSException { + try { + for(TridentTuple tuple : tuples) { + Message msg = this.options.msgProducer.toMessage(this.session, tuple); + if (msg != null) { + if (msg.getJMSDestination() != null) { + this.messageProducer.send(msg.getJMSDestination(), msg); + } else { + this.messageProducer.send(msg); + } + } + } + } catch (JMSException e) { + LOG.warn("Failed to send jmd message for a trident batch ", e); + if(this.options.jmsTransactional) { + session.rollback(); + } + throw new FailedException("Failed to write tuples", e); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java new file mode 100644 index 0000000..9a02ba9 --- /dev/null +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.trident; + +import org.apache.storm.task.IMetricsContext; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.state.StateFactory; + +import java.util.Map; + +public class JmsStateFactory implements StateFactory { + + private JmsState.Options options; + + public JmsStateFactory(JmsState.Options options) { + this.options = options; + } + + @Override + public State makeState(Map map, IMetricsContext iMetricsContext, int partitionIndex, int numPartitions) { + JmsState state = new JmsState(options); + state.prepare(); + return state; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java new file mode 100644 index 0000000..a2709a4 --- /dev/null +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.trident; + +import org.apache.storm.topology.FailedException; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.BaseStateUpdater; +import org.apache.storm.trident.tuple.TridentTuple; + +import javax.jms.JMSException; +import java.util.List; + +public class JmsUpdater extends BaseStateUpdater<JmsState> { + + @Override + public void updateState(JmsState jmsState, List<TridentTuple> tuples, TridentCollector collector) { + try { + jmsState.updateState(tuples, collector); + } catch (JMSException e) { + throw new FailedException("failed JMS opetation", e); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java new file mode 100644 index 0000000..55e29bc --- /dev/null +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java @@ -0,0 +1,409 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.trident; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.storm.jms.JmsProvider; +import org.apache.storm.jms.JmsTupleProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.spout.ITridentSpout; +import org.apache.storm.trident.topology.TransactionAttempt; +import org.apache.storm.Config; +import org.apache.storm.generated.StreamInfo; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsGetter; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Utils; + +/** + * Trident implementation of the JmsSpout + * <p> + * + */ +public class TridentJmsSpout implements ITridentSpout<JmsBatch> { + + public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size"; + + public static final int DEFAULT_BATCH_SIZE = 1000; + + private static final long serialVersionUID = -3469351154693356655L; + + private JmsTupleProducer tupleProducer; + + private JmsProvider jmsProvider; + + private int jmsAcknowledgeMode; + + private String name; + + private static int nameIndex = 1; + + /** + * Create a TridentJmsSpout with a default name and acknowledge mode of AUTO_ACKNOWLEDGE + */ + public TridentJmsSpout() { + this.name = "JmsSpout_"+(nameIndex++); + this.jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; + } + + /** + * Set the name for this spout, to improve log identification + * @param name The name to be used in log messages + * @return This spout + */ + public TridentJmsSpout named(String name) { + this.name = name; + return this; + } + + /** + * Set the <code>JmsProvider</code> + * implementation that this Spout will use to connect to + * a JMS <code>javax.jms.Desination</code> + * + * @param provider + */ + public TridentJmsSpout withJmsProvider(JmsProvider provider){ + this.jmsProvider = provider; + return this; + } + + /** + * Set the <code>JmsTupleProducer</code> + * implementation that will convert <code>javax.jms.Message</code> + * object to <code>backtype.storm.tuple.Values</code> objects + * to be emitted. + * + * @param tupleProducer + * @return This spout + */ + public TridentJmsSpout withTupleProducer(JmsTupleProducer tupleProducer) { + this.tupleProducer = tupleProducer; + return this; + } + + /** + * Set the JMS acknowledge mode for messages being processed by this spout. + * <p/> + * Possible values: + * <ul> + * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li> + * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li> + * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li> + * </ul> + * @param jmsAcknowledgeMode The chosen acknowledge mode + * @return This spout + * @throws IllegalArgumentException if the mode is not recognized + */ + public TridentJmsSpout withJmsAcknowledgeMode(int jmsAcknowledgeMode) { + toDeliveryModeString(jmsAcknowledgeMode); + this.jmsAcknowledgeMode = jmsAcknowledgeMode; + return this; + } + + /** + * Return a friendly string for the given JMS acknowledge mode, or throw an IllegalArgumentException if + * the mode is not recognized. + * <p/> + * Possible values: + * <ul> + * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li> + * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li> + * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li> + * </ul> + * @param acknowledgeMode A valid JMS acknowledge mode + * @return A friendly string describing the acknowledge mode + * @throws IllegalArgumentException if the mode is not recognized + */ + private static final String toDeliveryModeString(int acknowledgeMode) { + switch (acknowledgeMode) { + case Session.AUTO_ACKNOWLEDGE: + return "AUTO_ACKNOWLEDGE"; + case Session.CLIENT_ACKNOWLEDGE: + return "CLIENT_ACKNOWLEDGE"; + case Session.DUPS_OK_ACKNOWLEDGE: + return "DUPS_OK_ACKNOWLEDGE"; + default: + throw new IllegalArgumentException("Unknown JMS Acknowledge mode " + acknowledgeMode + " (See javax.jms.Session for valid values)"); + } + } + + @Override + public ITridentSpout.BatchCoordinator<JmsBatch> getCoordinator( + String txStateId, @SuppressWarnings("rawtypes") Map conf, TopologyContext context) { + return new JmsBatchCoordinator(name); + } + + @Override + public Emitter<JmsBatch> getEmitter(String txStateId, @SuppressWarnings("rawtypes") Map conf, TopologyContext context) { + return new JmsEmitter(name, jmsProvider, tupleProducer, jmsAcknowledgeMode, conf); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + + @Override + public Fields getOutputFields() { + OutputFieldsGetter fieldGetter = new OutputFieldsGetter(); + tupleProducer.declareOutputFields(fieldGetter); + StreamInfo streamInfo = fieldGetter.getFieldsDeclaration().get(Utils.DEFAULT_STREAM_ID); + if (streamInfo == null) { + throw new IllegalArgumentException("Jms Tuple producer has not declared output fields for the default stream"); + } + + return new Fields(streamInfo.get_output_fields()); + } + + /** + * The JmsEmitter class listens for incoming messages and stores them in a blocking queue. On each invocation of emit, + * the queued messages are emitted as a batch. + * + */ + private class JmsEmitter implements Emitter<JmsBatch>, MessageListener { + + private final LinkedBlockingQueue<Message> queue; + private final Connection connection; + private final Session session; + + private final RotatingMap<Long, List<Message>> batchMessageMap; // Maps transaction Ids to JMS message ids. + + private final long rotateTimeMillis; + private final int maxBatchSize; + private final String name; + + private long lastRotate; + + private final Logger LOG = LoggerFactory.getLogger(JmsEmitter.class); + + public JmsEmitter(String name, JmsProvider jmsProvider, JmsTupleProducer tupleProducer, int jmsAcknowledgeMode, @SuppressWarnings("rawtypes") Map conf) { + if (jmsProvider == null) { + throw new IllegalStateException("JMS provider has not been set."); + } + if (tupleProducer == null) { + throw new IllegalStateException("JMS Tuple Producer has not been set."); + } + + this.queue = new LinkedBlockingQueue<Message>(); + this.name = name; + + batchMessageMap = new RotatingMap<Long, List<Message>>(3); + rotateTimeMillis = 1000L * ((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue(); + lastRotate = System.currentTimeMillis(); + + Number batchSize = (Number) conf.get(MAX_BATCH_SIZE_CONF); + maxBatchSize = batchSize != null ? batchSize.intValue() : DEFAULT_BATCH_SIZE; + + try { + ConnectionFactory cf = jmsProvider.connectionFactory(); + Destination dest = jmsProvider.destination(); + this.connection = cf.createConnection(); + this.session = connection.createSession(false, jmsAcknowledgeMode); + MessageConsumer consumer = session.createConsumer(dest); + consumer.setMessageListener(this); + this.connection.start(); + + LOG.info("Created JmsEmitter with max batch size "+maxBatchSize+" rotate time "+rotateTimeMillis+"ms and destination "+dest+" for "+name); + + } catch (Exception e) { + LOG.warn("Error creating JMS connection.", e); + throw new IllegalStateException("Could not create JMS connection for spout ", e); + } + + } + + @Override + public void success(TransactionAttempt tx) { + + @SuppressWarnings("unchecked") + List<Message> messages = (List<Message>) batchMessageMap.remove(tx.getTransactionId()); + + if (messages != null) { + if (!messages.isEmpty()) { + LOG.debug("Success for batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name); + } + + for (Message msg: messages) { + String messageId = "UnknownId"; + + try { + messageId = msg.getJMSMessageID(); + msg.acknowledge(); + LOG.trace("Acknowledged message "+messageId); + } catch (JMSException e) { + LOG.warn("Failed to acknowledge message "+messageId, e); + } + } + } + else { + LOG.warn("No messages found in batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()); + } + } + + /** + * Fail a batch with the given transaction id. This is called when a batch is timed out, or a new batch with a + * matching transaction id is emitted. Note that the current implementation does nothing - i.e. it discards + * messages that have been failed. + * @param transactionId The transaction id of the failed batch + * @param messages The list of messages to fail. + */ + private void fail(Long transactionId, List<Message> messages) { + LOG.debug("Failure for batch with transaction id "+transactionId+" for "+name); + if (messages != null) { + for (Message msg: messages) { + try { + LOG.trace("Failed message "+msg.getJMSMessageID()); + } catch (JMSException e) { + LOG.warn("Could not identify failed message ", e); + } + } + } + else { + LOG.warn("Failed batch has no messages with transaction id "+transactionId); + } + } + + @Override + public void close() { + try { + LOG.info("Closing JMS connection."); + this.session.close(); + this.connection.close(); + } catch (JMSException e) { + LOG.warn("Error closing JMS connection.", e); + } + } + + @Override + public void emitBatch(TransactionAttempt tx, JmsBatch coordinatorMeta, + TridentCollector collector) { + + long now = System.currentTimeMillis(); + if(now - lastRotate > rotateTimeMillis) { + Map<Long, List<Message>> failed = batchMessageMap.rotate(); + for(Long id: failed.keySet()) { + LOG.warn("TIMED OUT batch with transaction id "+id+" for "+name); + fail(id, failed.get(id)); + } + lastRotate = now; + } + + if(batchMessageMap.containsKey(tx.getTransactionId())) { + LOG.warn("FAILED duplicate batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name); + fail(tx.getTransactionId(), batchMessageMap.get(tx.getTransactionId())); + } + + List<Message> batchMessages = new ArrayList<Message>(); + + for (int index=0; index<maxBatchSize; index++) { + Message msg = queue.poll(); + if (msg == null) { + Utils.sleep(50); // Back off + break; + } + + try { + if (TridentJmsSpout.this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE) { + batchMessages.add(msg); + } + Values tuple = tupleProducer.toTuple(msg); + collector.emit(tuple); + } catch (JMSException e) { + LOG.warn("Failed to emit message, could not retrieve data for "+name+": "+e ); + } + } + + if (!batchMessages.isEmpty()) { + LOG.debug("Emitting batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" and size "+batchMessages.size()+" for "+name); + } + else { + LOG.trace("No items to acknowledge for batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name); + } + batchMessageMap.put(tx.getTransactionId(), batchMessages); + } + + @Override + public void onMessage(Message msg) { + try { + LOG.trace("Queuing msg [" + msg.getJMSMessageID() + "]"); + } catch (JMSException e) { + // Nothing here, could not get message id + } + this.queue.offer(msg); + } + + } + + /** + * Bare implementation of a BatchCoordinator, returning a null JmsBatch object + * + */ + private class JmsBatchCoordinator implements BatchCoordinator<JmsBatch> { + + private final String name; + + private final Logger LOG = LoggerFactory.getLogger(JmsBatchCoordinator.class); + + public JmsBatchCoordinator(String name) { + this.name = name; + LOG.info("Created batch coordinator for "+name); + } + + @Override + public JmsBatch initializeTransaction(long txid, JmsBatch prevMetadata, JmsBatch curMetadata) { + LOG.debug("Initialise transaction "+txid+" for "+name); + return null; + } + + @Override + public void success(long txid) { + } + + @Override + public boolean isReady(long txid) { + return true; + } + + @Override + public void close() { + } + + } + +} + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java new file mode 100644 index 0000000..e80f70a --- /dev/null +++ b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.spout; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.HashMap; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.storm.jms.JmsProvider; +import org.apache.storm.spout.SpoutOutputCollector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JmsSpoutTest { + private static final Logger LOG = LoggerFactory.getLogger(JmsSpoutTest.class); + + @Test + public void testFailure() throws JMSException, Exception{ + JmsSpout spout = new JmsSpout(); + JmsProvider mockProvider = new MockJmsProvider(); + MockSpoutOutputCollector mockCollector = new MockSpoutOutputCollector(); + SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector); + spout.setJmsProvider(new MockJmsProvider()); + spout.setJmsTupleProducer(new MockTupleProducer()); + spout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); + spout.setRecoveryPeriod(10); // Rapid recovery for testing. + spout.open(new HashMap<String,String>(), null, collector); + Message msg = this.sendMessage(mockProvider.connectionFactory(), mockProvider.destination()); + Thread.sleep(100); + spout.nextTuple(); // Pretend to be storm. + Assert.assertTrue(mockCollector.emitted); + + mockCollector.reset(); + spout.fail(msg.getJMSMessageID()); // Mock failure + Thread.sleep(5000); + spout.nextTuple(); // Pretend to be storm. + Thread.sleep(5000); + Assert.assertTrue(mockCollector.emitted); // Should have been re-emitted + } + + @Test + public void testSerializability() throws IOException{ + JmsSpout spout = new JmsSpout(); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(out); + oos.writeObject(spout); + oos.close(); + Assert.assertTrue(out.toByteArray().length > 0); + } + + public Message sendMessage(ConnectionFactory connectionFactory, Destination destination) throws JMSException { + Session mySess = connectionFactory.createConnection().createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = mySess.createProducer(destination); + TextMessage msg = mySess.createTextMessage(); + msg.setText("Hello World"); + LOG.info("Sending Message: {}", msg.getText()); + producer.send(msg); + return msg; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java new file mode 100644 index 0000000..3ba0853 --- /dev/null +++ b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.spout; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import org.apache.activemq.ActiveMQConnectionFactory; + +import org.apache.storm.jms.JmsProvider; + +public class MockJmsProvider implements JmsProvider { + private static final long serialVersionUID = 1L; + + private ConnectionFactory connectionFactory = null; + private Destination destination = null; + + public MockJmsProvider() throws NamingException{ + this.connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + Context jndiContext = new InitialContext(); + this.destination = (Destination) jndiContext.lookup("dynamicQueues/FOO.BAR"); + + } + + /** + * Provides the JMS <code>ConnectionFactory</code> + * @return the connection factory + * @throws Exception + */ + public ConnectionFactory connectionFactory() throws Exception{ + return this.connectionFactory; + } + + /** + * Provides the <code>Destination</code> (topic or queue) from which the + * <code>JmsSpout</code> will receive messages. + * @return + * @throws Exception + */ + public Destination destination() throws Exception{ + return this.destination; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java new file mode 100644 index 0000000..a5a6c51 --- /dev/null +++ b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.spout; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.storm.spout.ISpoutOutputCollector; + +public class MockSpoutOutputCollector implements ISpoutOutputCollector { + boolean emitted = false; + + @Override + public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { + emitted = true; + return new ArrayList<Integer>(); + } + + @Override + public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) { + emitted = true; + } + + @Override + public void reportError(Throwable error) { + } + + public boolean emitted(){ + return this.emitted; + } + + public void reset(){ + this.emitted = false; + } + + @Override + public long getPendingCount() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java new file mode 100644 index 0000000..ea571fc --- /dev/null +++ b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.spout; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.TextMessage; + +import org.apache.storm.jms.JmsTupleProducer; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +public class MockTupleProducer implements JmsTupleProducer { + private static final long serialVersionUID = 1L; + + @Override + public Values toTuple(Message msg) throws JMSException { + if (msg instanceof TextMessage) { + String json = ((TextMessage) msg).getText(); + return new Values(json); + } else { + return null; + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("json")); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/src/test/resources/jndi.properties ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/test/resources/jndi.properties b/external/storm-jms/src/test/resources/jndi.properties new file mode 100644 index 0000000..af19521 --- /dev/null +++ b/external/storm-jms/src/test/resources/jndi.properties @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory +java.naming.provider.url = vm://localhost?broker.persistent=false \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 816c657..d6475a3 100644 --- a/pom.xml +++ b/pom.xml @@ -331,6 +331,7 @@ <module>examples/storm-elasticsearch-examples</module> <module>examples/storm-mqtt-examples</module> <module>examples/storm-pmml-examples</module> + <module>examples/storm-jms-examples</module> <module>storm-perf</module> </modules>
