http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java deleted file mode 100644 index 55e29bc..0000000 --- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java +++ /dev/null @@ -1,409 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jms.trident; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; - -import org.apache.storm.jms.JmsProvider; -import org.apache.storm.jms.JmsTupleProducer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.spout.ITridentSpout; -import org.apache.storm.trident.topology.TransactionAttempt; -import org.apache.storm.Config; -import org.apache.storm.generated.StreamInfo; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsGetter; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; -import org.apache.storm.utils.RotatingMap; -import org.apache.storm.utils.Utils; - -/** - * Trident implementation of the JmsSpout - * <p> - * - */ -public class TridentJmsSpout implements ITridentSpout<JmsBatch> { - - public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size"; - - public static final int DEFAULT_BATCH_SIZE = 1000; - - private static final long serialVersionUID = -3469351154693356655L; - - private JmsTupleProducer tupleProducer; - - private JmsProvider jmsProvider; - - private int jmsAcknowledgeMode; - - private String name; - - private static int nameIndex = 1; - - /** - * Create a TridentJmsSpout with a default name and acknowledge mode of AUTO_ACKNOWLEDGE - */ - public TridentJmsSpout() { - this.name = "JmsSpout_"+(nameIndex++); - this.jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; - } - - /** - * Set the name for this spout, to improve log identification - * @param name The name to be used in log messages - * @return This spout - */ - public TridentJmsSpout named(String name) { - this.name = name; - return this; - } - - /** - * Set the <code>JmsProvider</code> - * implementation that this Spout will use to connect to - * a JMS <code>javax.jms.Desination</code> - * - * @param provider - */ - public TridentJmsSpout withJmsProvider(JmsProvider provider){ - this.jmsProvider = provider; - return this; - } - - /** - * Set the <code>JmsTupleProducer</code> - * implementation that will convert <code>javax.jms.Message</code> - * object to <code>backtype.storm.tuple.Values</code> objects - * to be emitted. - * - * @param tupleProducer - * @return This spout - */ - public TridentJmsSpout withTupleProducer(JmsTupleProducer tupleProducer) { - this.tupleProducer = tupleProducer; - return this; - } - - /** - * Set the JMS acknowledge mode for messages being processed by this spout. - * <p/> - * Possible values: - * <ul> - * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li> - * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li> - * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li> - * </ul> - * @param jmsAcknowledgeMode The chosen acknowledge mode - * @return This spout - * @throws IllegalArgumentException if the mode is not recognized - */ - public TridentJmsSpout withJmsAcknowledgeMode(int jmsAcknowledgeMode) { - toDeliveryModeString(jmsAcknowledgeMode); - this.jmsAcknowledgeMode = jmsAcknowledgeMode; - return this; - } - - /** - * Return a friendly string for the given JMS acknowledge mode, or throw an IllegalArgumentException if - * the mode is not recognized. - * <p/> - * Possible values: - * <ul> - * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li> - * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li> - * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li> - * </ul> - * @param acknowledgeMode A valid JMS acknowledge mode - * @return A friendly string describing the acknowledge mode - * @throws IllegalArgumentException if the mode is not recognized - */ - private static final String toDeliveryModeString(int acknowledgeMode) { - switch (acknowledgeMode) { - case Session.AUTO_ACKNOWLEDGE: - return "AUTO_ACKNOWLEDGE"; - case Session.CLIENT_ACKNOWLEDGE: - return "CLIENT_ACKNOWLEDGE"; - case Session.DUPS_OK_ACKNOWLEDGE: - return "DUPS_OK_ACKNOWLEDGE"; - default: - throw new IllegalArgumentException("Unknown JMS Acknowledge mode " + acknowledgeMode + " (See javax.jms.Session for valid values)"); - } - } - - @Override - public ITridentSpout.BatchCoordinator<JmsBatch> getCoordinator( - String txStateId, @SuppressWarnings("rawtypes") Map conf, TopologyContext context) { - return new JmsBatchCoordinator(name); - } - - @Override - public Emitter<JmsBatch> getEmitter(String txStateId, @SuppressWarnings("rawtypes") Map conf, TopologyContext context) { - return new JmsEmitter(name, jmsProvider, tupleProducer, jmsAcknowledgeMode, conf); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - - @Override - public Fields getOutputFields() { - OutputFieldsGetter fieldGetter = new OutputFieldsGetter(); - tupleProducer.declareOutputFields(fieldGetter); - StreamInfo streamInfo = fieldGetter.getFieldsDeclaration().get(Utils.DEFAULT_STREAM_ID); - if (streamInfo == null) { - throw new IllegalArgumentException("Jms Tuple producer has not declared output fields for the default stream"); - } - - return new Fields(streamInfo.get_output_fields()); - } - - /** - * The JmsEmitter class listens for incoming messages and stores them in a blocking queue. On each invocation of emit, - * the queued messages are emitted as a batch. - * - */ - private class JmsEmitter implements Emitter<JmsBatch>, MessageListener { - - private final LinkedBlockingQueue<Message> queue; - private final Connection connection; - private final Session session; - - private final RotatingMap<Long, List<Message>> batchMessageMap; // Maps transaction Ids to JMS message ids. - - private final long rotateTimeMillis; - private final int maxBatchSize; - private final String name; - - private long lastRotate; - - private final Logger LOG = LoggerFactory.getLogger(JmsEmitter.class); - - public JmsEmitter(String name, JmsProvider jmsProvider, JmsTupleProducer tupleProducer, int jmsAcknowledgeMode, @SuppressWarnings("rawtypes") Map conf) { - if (jmsProvider == null) { - throw new IllegalStateException("JMS provider has not been set."); - } - if (tupleProducer == null) { - throw new IllegalStateException("JMS Tuple Producer has not been set."); - } - - this.queue = new LinkedBlockingQueue<Message>(); - this.name = name; - - batchMessageMap = new RotatingMap<Long, List<Message>>(3); - rotateTimeMillis = 1000L * ((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue(); - lastRotate = System.currentTimeMillis(); - - Number batchSize = (Number) conf.get(MAX_BATCH_SIZE_CONF); - maxBatchSize = batchSize != null ? batchSize.intValue() : DEFAULT_BATCH_SIZE; - - try { - ConnectionFactory cf = jmsProvider.connectionFactory(); - Destination dest = jmsProvider.destination(); - this.connection = cf.createConnection(); - this.session = connection.createSession(false, jmsAcknowledgeMode); - MessageConsumer consumer = session.createConsumer(dest); - consumer.setMessageListener(this); - this.connection.start(); - - LOG.info("Created JmsEmitter with max batch size "+maxBatchSize+" rotate time "+rotateTimeMillis+"ms and destination "+dest+" for "+name); - - } catch (Exception e) { - LOG.warn("Error creating JMS connection.", e); - throw new IllegalStateException("Could not create JMS connection for spout ", e); - } - - } - - @Override - public void success(TransactionAttempt tx) { - - @SuppressWarnings("unchecked") - List<Message> messages = (List<Message>) batchMessageMap.remove(tx.getTransactionId()); - - if (messages != null) { - if (!messages.isEmpty()) { - LOG.debug("Success for batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name); - } - - for (Message msg: messages) { - String messageId = "UnknownId"; - - try { - messageId = msg.getJMSMessageID(); - msg.acknowledge(); - LOG.trace("Acknowledged message "+messageId); - } catch (JMSException e) { - LOG.warn("Failed to acknowledge message "+messageId, e); - } - } - } - else { - LOG.warn("No messages found in batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()); - } - } - - /** - * Fail a batch with the given transaction id. This is called when a batch is timed out, or a new batch with a - * matching transaction id is emitted. Note that the current implementation does nothing - i.e. it discards - * messages that have been failed. - * @param transactionId The transaction id of the failed batch - * @param messages The list of messages to fail. - */ - private void fail(Long transactionId, List<Message> messages) { - LOG.debug("Failure for batch with transaction id "+transactionId+" for "+name); - if (messages != null) { - for (Message msg: messages) { - try { - LOG.trace("Failed message "+msg.getJMSMessageID()); - } catch (JMSException e) { - LOG.warn("Could not identify failed message ", e); - } - } - } - else { - LOG.warn("Failed batch has no messages with transaction id "+transactionId); - } - } - - @Override - public void close() { - try { - LOG.info("Closing JMS connection."); - this.session.close(); - this.connection.close(); - } catch (JMSException e) { - LOG.warn("Error closing JMS connection.", e); - } - } - - @Override - public void emitBatch(TransactionAttempt tx, JmsBatch coordinatorMeta, - TridentCollector collector) { - - long now = System.currentTimeMillis(); - if(now - lastRotate > rotateTimeMillis) { - Map<Long, List<Message>> failed = batchMessageMap.rotate(); - for(Long id: failed.keySet()) { - LOG.warn("TIMED OUT batch with transaction id "+id+" for "+name); - fail(id, failed.get(id)); - } - lastRotate = now; - } - - if(batchMessageMap.containsKey(tx.getTransactionId())) { - LOG.warn("FAILED duplicate batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name); - fail(tx.getTransactionId(), batchMessageMap.get(tx.getTransactionId())); - } - - List<Message> batchMessages = new ArrayList<Message>(); - - for (int index=0; index<maxBatchSize; index++) { - Message msg = queue.poll(); - if (msg == null) { - Utils.sleep(50); // Back off - break; - } - - try { - if (TridentJmsSpout.this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE) { - batchMessages.add(msg); - } - Values tuple = tupleProducer.toTuple(msg); - collector.emit(tuple); - } catch (JMSException e) { - LOG.warn("Failed to emit message, could not retrieve data for "+name+": "+e ); - } - } - - if (!batchMessages.isEmpty()) { - LOG.debug("Emitting batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" and size "+batchMessages.size()+" for "+name); - } - else { - LOG.trace("No items to acknowledge for batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name); - } - batchMessageMap.put(tx.getTransactionId(), batchMessages); - } - - @Override - public void onMessage(Message msg) { - try { - LOG.trace("Queuing msg [" + msg.getJMSMessageID() + "]"); - } catch (JMSException e) { - // Nothing here, could not get message id - } - this.queue.offer(msg); - } - - } - - /** - * Bare implementation of a BatchCoordinator, returning a null JmsBatch object - * - */ - private class JmsBatchCoordinator implements BatchCoordinator<JmsBatch> { - - private final String name; - - private final Logger LOG = LoggerFactory.getLogger(JmsBatchCoordinator.class); - - public JmsBatchCoordinator(String name) { - this.name = name; - LOG.info("Created batch coordinator for "+name); - } - - @Override - public JmsBatch initializeTransaction(long txid, JmsBatch prevMetadata, JmsBatch curMetadata) { - LOG.debug("Initialise transaction "+txid+" for "+name); - return null; - } - - @Override - public void success(long txid) { - } - - @Override - public boolean isReady(long txid) { - return true; - } - - @Override - public void close() { - } - - } - -} - - \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java b/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java deleted file mode 100644 index e80f70a..0000000 --- a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jms.spout; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.util.HashMap; - -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.junit.Assert; -import org.junit.Test; - -import org.apache.storm.jms.JmsProvider; -import org.apache.storm.spout.SpoutOutputCollector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class JmsSpoutTest { - private static final Logger LOG = LoggerFactory.getLogger(JmsSpoutTest.class); - - @Test - public void testFailure() throws JMSException, Exception{ - JmsSpout spout = new JmsSpout(); - JmsProvider mockProvider = new MockJmsProvider(); - MockSpoutOutputCollector mockCollector = new MockSpoutOutputCollector(); - SpoutOutputCollector collector = new SpoutOutputCollector(mockCollector); - spout.setJmsProvider(new MockJmsProvider()); - spout.setJmsTupleProducer(new MockTupleProducer()); - spout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); - spout.setRecoveryPeriod(10); // Rapid recovery for testing. - spout.open(new HashMap<String,String>(), null, collector); - Message msg = this.sendMessage(mockProvider.connectionFactory(), mockProvider.destination()); - Thread.sleep(100); - spout.nextTuple(); // Pretend to be storm. - Assert.assertTrue(mockCollector.emitted); - - mockCollector.reset(); - spout.fail(msg.getJMSMessageID()); // Mock failure - Thread.sleep(5000); - spout.nextTuple(); // Pretend to be storm. - Thread.sleep(5000); - Assert.assertTrue(mockCollector.emitted); // Should have been re-emitted - } - - @Test - public void testSerializability() throws IOException{ - JmsSpout spout = new JmsSpout(); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(out); - oos.writeObject(spout); - oos.close(); - Assert.assertTrue(out.toByteArray().length > 0); - } - - public Message sendMessage(ConnectionFactory connectionFactory, Destination destination) throws JMSException { - Session mySess = connectionFactory.createConnection().createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageProducer producer = mySess.createProducer(destination); - TextMessage msg = mySess.createTextMessage(); - msg.setText("Hello World"); - LOG.info("Sending Message: {}", msg.getText()); - producer.send(msg); - return msg; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java b/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java deleted file mode 100644 index 3ba0853..0000000 --- a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jms.spout; - -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; - -import org.apache.activemq.ActiveMQConnectionFactory; - -import org.apache.storm.jms.JmsProvider; - -public class MockJmsProvider implements JmsProvider { - private static final long serialVersionUID = 1L; - - private ConnectionFactory connectionFactory = null; - private Destination destination = null; - - public MockJmsProvider() throws NamingException{ - this.connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); - Context jndiContext = new InitialContext(); - this.destination = (Destination) jndiContext.lookup("dynamicQueues/FOO.BAR"); - - } - - /** - * Provides the JMS <code>ConnectionFactory</code> - * @return the connection factory - * @throws Exception - */ - public ConnectionFactory connectionFactory() throws Exception{ - return this.connectionFactory; - } - - /** - * Provides the <code>Destination</code> (topic or queue) from which the - * <code>JmsSpout</code> will receive messages. - * @return - * @throws Exception - */ - public Destination destination() throws Exception{ - return this.destination; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java b/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java deleted file mode 100644 index a5a6c51..0000000 --- a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jms.spout; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.storm.spout.ISpoutOutputCollector; - -public class MockSpoutOutputCollector implements ISpoutOutputCollector { - boolean emitted = false; - - @Override - public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { - emitted = true; - return new ArrayList<Integer>(); - } - - @Override - public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) { - emitted = true; - } - - @Override - public void reportError(Throwable error) { - } - - public boolean emitted(){ - return this.emitted; - } - - public void reset(){ - this.emitted = false; - } - - @Override - public long getPendingCount() { - return 0; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java b/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java deleted file mode 100644 index ea571fc..0000000 --- a/external/storm-jms/core/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jms.spout; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.TextMessage; - -import org.apache.storm.jms.JmsTupleProducer; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; - -public class MockTupleProducer implements JmsTupleProducer { - private static final long serialVersionUID = 1L; - - @Override - public Values toTuple(Message msg) throws JMSException { - if (msg instanceof TextMessage) { - String json = ((TextMessage) msg).getText(); - return new Values(json); - } else { - return null; - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("json")); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/test/resources/jndi.properties ---------------------------------------------------------------------- diff --git a/external/storm-jms/core/src/test/resources/jndi.properties b/external/storm-jms/core/src/test/resources/jndi.properties deleted file mode 100644 index af19521..0000000 --- a/external/storm-jms/core/src/test/resources/jndi.properties +++ /dev/null @@ -1,18 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory -java.naming.provider.url = vm://localhost?broker.persistent=false \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/examples/README.markdown ---------------------------------------------------------------------- diff --git a/external/storm-jms/examples/README.markdown b/external/storm-jms/examples/README.markdown deleted file mode 100644 index 7a4d8f0..0000000 --- a/external/storm-jms/examples/README.markdown +++ /dev/null @@ -1,12 +0,0 @@ -## About Storm JMS Examples -This project contains a simple storm topology that illustrates the usage of "storm-jms". - -To build: - -`mvn clean install` - -The default build will create a jar file that can be deployed to to a Storm cluster in the "target" directory: - -`storm-jms-examples-0.1-SNAPSHOT-jar-with-dependencies.jar` - - http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/examples/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-jms/examples/pom.xml b/external/storm-jms/examples/pom.xml deleted file mode 100644 index 1809e34..0000000 --- a/external/storm-jms/examples/pom.xml +++ /dev/null @@ -1,151 +0,0 @@ -<?xml version="1.0"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.storm</groupId> - <artifactId>storm-jms-parent</artifactId> - <version>1.1.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - - <artifactId>storm-jms-examples</artifactId> - - <properties> - <spring.version>2.5.6</spring.version> - </properties> - <dependencies> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-beans</artifactId> - <version>${spring.version}</version> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-core</artifactId> - <version>${spring.version}</version> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-context</artifactId> - <version>${spring.version}</version> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-jms</artifactId> - <version>${spring.version}</version> - </dependency> - <dependency> - <groupId>org.apache.xbean</groupId> - <artifactId>xbean-spring</artifactId> - <version>3.7</version> - </dependency> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-core</artifactId> - <version>${project.version}</version> - <!-- keep storm out of the jar-with-dependencies --> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-jms</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-core</artifactId> - <version>5.4.0</version> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> - <build> - <plugins> - <!-- bind the maven-assembly-plugin to the package phase this will create - a jar file without the storm dependencies suitable for deployment to a cluster. --> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <configuration> - <descriptorRefs> - <descriptorRef>jar-with-dependencies</descriptorRef> - </descriptorRefs> - <archive> - <manifest> - <mainClass></mainClass> - </manifest> - </archive> - </configuration> - <executions> - <execution> - <id>make-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - </execution> - </executions> - - </plugin> - - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>exec-maven-plugin</artifactId> - <version>1.2.1</version> - <executions> - <execution> - <goals> - <goal>exec</goal> - </goals> - </execution> - </executions> - <configuration> - <executable>java</executable> - <includeProjectDependencies>true</includeProjectDependencies> - <includePluginDependencies>true</includePluginDependencies> - <mainClass>org.apache.storm.jms.example.ExampleJmsTopology</mainClass> - <systemProperties> - <systemProperty> - <key>log4j.configuration</key> - <value>file:./src/main/resources/log4j.properties</value> - </systemProperty> - </systemProperties> - </configuration> - <dependencies> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-core</artifactId> - <version>${project.version}</version> - <type>jar</type> - </dependency> - </dependencies> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java b/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java deleted file mode 100644 index 3324aac..0000000 --- a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jms.example; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.storm.Config; -import org.apache.storm.LocalCluster; -import org.apache.storm.StormSubmitter; -import org.apache.storm.jms.JmsMessageProducer; -import org.apache.storm.jms.JmsProvider; -import org.apache.storm.jms.JmsTupleProducer; -import org.apache.storm.jms.bolt.JmsBolt; -import org.apache.storm.jms.spout.JmsSpout; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.ITuple; -import org.apache.storm.utils.Utils; - -public class ExampleJmsTopology { - public static final String JMS_QUEUE_SPOUT = "JMS_QUEUE_SPOUT"; - public static final String INTERMEDIATE_BOLT = "INTERMEDIATE_BOLT"; - public static final String FINAL_BOLT = "FINAL_BOLT"; - public static final String JMS_TOPIC_BOLT = "JMS_TOPIC_BOLT"; - public static final String JMS_TOPIC_SPOUT = "JMS_TOPIC_SPOUT"; - public static final String ANOTHER_BOLT = "ANOTHER_BOLT"; - - @SuppressWarnings("serial") - public static void main(String[] args) throws Exception { - - // JMS Queue Provider - JmsProvider jmsQueueProvider = new SpringJmsProvider( - "jms-activemq.xml", "jmsConnectionFactory", - "notificationQueue"); - - // JMS Topic provider - JmsProvider jmsTopicProvider = new SpringJmsProvider( - "jms-activemq.xml", "jmsConnectionFactory", - "notificationTopic"); - - // JMS Producer - JmsTupleProducer producer = new JsonTupleProducer(); - - // JMS Queue Spout - JmsSpout queueSpout = new JmsSpout(); - queueSpout.setJmsProvider(jmsQueueProvider); - queueSpout.setJmsTupleProducer(producer); - queueSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); - queueSpout.setDistributed(true); // allow multiple instances - - TopologyBuilder builder = new TopologyBuilder(); - - // spout with 5 parallel instances - builder.setSpout(JMS_QUEUE_SPOUT, queueSpout, 5); - - // intermediate bolt, subscribes to jms spout, anchors on tuples, and auto-acks - builder.setBolt(INTERMEDIATE_BOLT, - new GenericBolt("INTERMEDIATE_BOLT", true, true, new Fields("json")), 3).shuffleGrouping( - JMS_QUEUE_SPOUT); - - // bolt that subscribes to the intermediate bolt, and auto-acks - // messages. - builder.setBolt(FINAL_BOLT, new GenericBolt("FINAL_BOLT", true, true), 3).shuffleGrouping( - INTERMEDIATE_BOLT); - - // bolt that subscribes to the intermediate bolt, and publishes to a JMS Topic - JmsBolt jmsBolt = new JmsBolt(); - jmsBolt.setJmsProvider(jmsTopicProvider); - - // anonymous message producer just calls toString() on the tuple to create a jms message - jmsBolt.setJmsMessageProducer(new JmsMessageProducer() { - @Override - public Message toMessage(Session session, ITuple input) throws JMSException { - System.out.println("Sending JMS Message:" + input.toString()); - TextMessage tm = session.createTextMessage(input.toString()); - return tm; - } - }); - - builder.setBolt(JMS_TOPIC_BOLT, jmsBolt).shuffleGrouping(INTERMEDIATE_BOLT); - - // JMS Topic spout - JmsSpout topicSpout = new JmsSpout(); - topicSpout.setJmsProvider(jmsTopicProvider); - topicSpout.setJmsTupleProducer(producer); - topicSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); - topicSpout.setDistributed(false); - - builder.setSpout(JMS_TOPIC_SPOUT, topicSpout); - - builder.setBolt(ANOTHER_BOLT, new GenericBolt("ANOTHER_BOLT", true, true), 1).shuffleGrouping( - JMS_TOPIC_SPOUT); - - Config conf = new Config(); - - if (args.length > 0) { - conf.setNumWorkers(3); - - StormSubmitter.submitTopology(args[0], conf, - builder.createTopology()); - } else { - - conf.setDebug(true); - - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("storm-jms-example", conf, builder.createTopology()); - Utils.sleep(60000); - cluster.killTopology("storm-jms-example"); - cluster.shutdown(); - } - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java b/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java deleted file mode 100644 index 57de1ba..0000000 --- a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jms.example; - -import java.util.Map; - -import org.apache.storm.topology.base.BaseRichBolt; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; - -/** - * A generic <code>org.apache.storm.topology.IRichBolt</code> implementation - * for testing/debugging the Storm JMS Spout and example topologies. - * <p> - * For debugging purposes, set the log level of the - * <code>org.apache.storm.contrib.jms</code> package to DEBUG for debugging - * output. - * - * @author tgoetz - */ -@SuppressWarnings("serial") -public class GenericBolt extends BaseRichBolt { - private static final Logger LOG = LoggerFactory.getLogger(GenericBolt.class); - private OutputCollector collector; - private boolean autoAck = false; - private boolean autoAnchor = false; - private Fields declaredFields; - private String name; - - /** - * Constructs a new <code>GenericBolt</code> instance. - * - * @param name The name of the bolt (used in DEBUG logging) - * @param autoAck Whether or not this bolt should automatically acknowledge received tuples. - * @param autoAnchor Whether or not this bolt should automatically anchor to received tuples. - * @param declaredFields The fields this bolt declares as output. - */ - public GenericBolt(String name, boolean autoAck, boolean autoAnchor, Fields declaredFields) { - this.name = name; - this.autoAck = autoAck; - this.autoAnchor = autoAnchor; - this.declaredFields = declaredFields; - } - - public GenericBolt(String name, boolean autoAck, boolean autoAnchor) { - this(name, autoAck, autoAnchor, null); - } - - @SuppressWarnings("rawtypes") - public void prepare(Map stormConf, TopologyContext context, - OutputCollector collector) { - this.collector = collector; - - } - - public void execute(Tuple input) { - LOG.debug("[" + this.name + "] Received message: " + input); - - - // only emit if we have declared fields. - if (this.declaredFields != null) { - LOG.debug("[" + this.name + "] emitting: " + input); - if (this.autoAnchor) { - this.collector.emit(input, input.getValues()); - } else { - this.collector.emit(input.getValues()); - } - } - - if (this.autoAck) { - LOG.debug("[" + this.name + "] ACKing tuple: " + input); - this.collector.ack(input); - } - - } - - public void cleanup() { - - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - if (this.declaredFields != null) { - declarer.declare(this.declaredFields); - } - } - - public boolean isAutoAck() { - return this.autoAck; - } - - public void setAutoAck(boolean autoAck) { - this.autoAck = autoAck; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java b/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java deleted file mode 100644 index 9ee175e..0000000 --- a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jms.example; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.TextMessage; - -import org.apache.storm.jms.JmsTupleProducer; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; - -/** - * A simple <code>JmsTupleProducer</code> that expects to receive - * JMS <code>TextMessage</code> objects with a body in JSON format. - * <p/> - * Ouputs a tuple with field name "json" and a string value - * containing the raw json. - * <p/> - * <b>NOTE: </b> Currently this implementation assumes the text is valid - * JSON and does not attempt to parse or validate it. - * - * @author tgoetz - * - */ -@SuppressWarnings("serial") -public class JsonTupleProducer implements JmsTupleProducer { - - public Values toTuple(Message msg) throws JMSException { - if(msg instanceof TextMessage){ - String json = ((TextMessage) msg).getText(); - return new Values(json); - } else { - return null; - } - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("json")); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java b/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java deleted file mode 100644 index 306fc25..0000000 --- a/external/storm-jms/examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jms.example; - -import javax.jms.ConnectionFactory; -import javax.jms.Destination; - -import org.springframework.context.ApplicationContext; -import org.springframework.context.support.ClassPathXmlApplicationContext; - -import org.apache.storm.jms.JmsProvider; - - -/** - * A <code>JmsProvider</code> that uses the spring framework - * to obtain a JMS <code>ConnectionFactory</code> and - * <code>Desitnation</code> objects. - * <p/> - * The constructor takes three arguments: - * <ol> - * <li>A string pointing to the the spring application context file contining the JMS configuration - * (must be on the classpath) - * </li> - * <li>The name of the connection factory bean</li> - * <li>The name of the destination bean</li> - * </ol> - * - * - * - */ -@SuppressWarnings("serial") -public class SpringJmsProvider implements JmsProvider { - private ConnectionFactory connectionFactory; - private Destination destination; - - /** - * Constructs a <code>SpringJmsProvider</code> object given the name of a - * classpath resource (the spring application context file), and the bean - * names of a JMS connection factory and destination. - * - * @param appContextClasspathResource - the spring configuration file (classpath resource) - * @param connectionFactoryBean - the JMS connection factory bean name - * @param destinationBean - the JMS destination bean name - */ - public SpringJmsProvider(String appContextClasspathResource, String connectionFactoryBean, String destinationBean){ - ApplicationContext context = new ClassPathXmlApplicationContext(appContextClasspathResource); - this.connectionFactory = (ConnectionFactory)context.getBean(connectionFactoryBean); - this.destination = (Destination)context.getBean(destinationBean); - } - - public ConnectionFactory connectionFactory() throws Exception { - return this.connectionFactory; - } - - public Destination destination() throws Exception { - return this.destination; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/examples/src/main/resources/jms-activemq.xml ---------------------------------------------------------------------- diff --git a/external/storm-jms/examples/src/main/resources/jms-activemq.xml b/external/storm-jms/examples/src/main/resources/jms-activemq.xml deleted file mode 100644 index 1a845b8..0000000 --- a/external/storm-jms/examples/src/main/resources/jms-activemq.xml +++ /dev/null @@ -1,53 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<beans - xmlns="http://www.springframework.org/schema/beans" - xmlns:amq="http://activemq.apache.org/schema/core" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd - http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> - - <!-- ActiveMQ --> - - <!-- embedded ActiveMQ Broker --> - <!-- <amq:broker useJmx="false" persistent="false"> - <amq:transportConnectors> - <amq:transportConnector uri="tcp://localhost:61616" /> - </amq:transportConnectors> - </amq:broker> --> - - <amq:queue id="notificationQueue" physicalName="backtype.storm.contrib.example.queue" /> - - <amq:topic id="notificationTopic" physicalName="backtype.storm.contrib.example.topic" /> - - <amq:connectionFactory id="jmsConnectionFactory" - brokerURL="tcp://localhost:61616" /> - - <!-- <bean id="queueTemplate" class="org.springframework.jms.core.JmsTemplate"> - <property name="connectionFactory"> - <ref bean="jmsConnectionFactory" /> - </property> - <property name="pubSubDomain" value="false" /> - </bean> --> - -</beans> - - - - - http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/examples/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/external/storm-jms/examples/src/main/resources/log4j.properties b/external/storm-jms/examples/src/main/resources/log4j.properties deleted file mode 100644 index 079b195..0000000 --- a/external/storm-jms/examples/src/main/resources/log4j.properties +++ /dev/null @@ -1,29 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -log4j.rootLogger=INFO, stdout - -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout - -log4j.appender.stdout.layout.ConversionPattern=%5p (%C:%L) - %m%n - - -log4j.logger.backtype.storm.contrib=DEBUG -log4j.logger.clojure.contrib=WARN -log4j.logger.org.springframework=WARN -log4j.logger.org.apache.zookeeper=WARN - http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-jms/pom.xml b/external/storm-jms/pom.xml index 7364394..b6fff85 100644 --- a/external/storm-jms/pom.xml +++ b/external/storm-jms/pom.xml @@ -27,8 +27,9 @@ - <artifactId>storm-jms-parent</artifactId> - <packaging>pom</packaging> + <artifactId>storm-jms</artifactId> + + <developers> <developer> @@ -38,11 +39,6 @@ </developer> </developers> - <modules> - <module>core</module> - <module>examples</module> - </modules> - <dependencies> <dependency> <groupId>org.apache.storm</groupId> @@ -51,6 +47,35 @@ <!-- keep storm out of the jar-with-dependencies --> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-jms_1.1_spec</artifactId> + <version>1.1.1</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.10</version> + <scope>test</scope> + </dependency> + + <!-- Active MQ --> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-core</artifactId> + <version>5.5.1</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java new file mode 100644 index 0000000..4932929 --- /dev/null +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms; + +import java.io.Serializable; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; + +import org.apache.storm.tuple.ITuple; + +/** + * JmsMessageProducer implementations are responsible for translating + * a <code>org.apache.storm.tuple.Values</code> instance into a + * <code>javax.jms.Message</code> object. + * <p> + */ +public interface JmsMessageProducer extends Serializable { + + /** + * Translate a <code>org.apache.storm.tuple.Tuple</code> object + * to a <code>javax.jms.Message</code object. + * + * @param session + * @param input + * @return + * @throws JMSException + */ + public Message toMessage(Session session, ITuple input) throws JMSException; +} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java new file mode 100644 index 0000000..d976326 --- /dev/null +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms; + +import java.io.Serializable; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; + +/** + * A <code>JmsProvider</code> object encapsulates the <code>ConnectionFactory</code> + * and <code>Destination</code> JMS objects the <code>JmsSpout</code> needs to manage + * a topic/queue connection over the course of it's lifecycle. + * + */ +public interface JmsProvider extends Serializable { + /** + * Provides the JMS <code>ConnectionFactory</code> + * + * @return the connection factory + * @throws Exception + */ + public ConnectionFactory connectionFactory() throws Exception; + + /** + * Provides the <code>Destination</code> (topic or queue) from which the + * <code>JmsSpout</code> will receive messages. + * + * @return + * @throws Exception + */ + public Destination destination() throws Exception; +} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java new file mode 100644 index 0000000..0bbb3a0 --- /dev/null +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms; + +import java.io.Serializable; + +import javax.jms.JMSException; +import javax.jms.Message; + +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Values; + +/** + * Interface to define classes that can produce a Storm <code>Values</code> objects + * from a <code>javax.jms.Message</code> object>. + * <p> + * Implementations are also responsible for declaring the output + * fields they produce. + * <p> + * If for some reason the implementation can't process a message + * (for example if it received a <code>javax.jms.ObjectMessage</code> + * when it was expecting a <code>javax.jms.TextMessage</code> it should + * return <code>null</code> to indicate to the <code>JmsSpout</code> that + * the message could not be processed. + * + */ +public interface JmsTupleProducer extends Serializable { + /** + * Process a JMS message object to create a Values object. + * + * @param msg - the JMS message + * @return the Values tuple, or null if the message couldn't be processed. + * @throws JMSException + */ + Values toTuple(Message msg) throws JMSException; + + /** + * Declare the output fields produced by this JmsTupleProducer. + * + * @param declarer The OuputFieldsDeclarer for the spout. + */ + void declareOutputFields(OutputFieldsDeclarer declarer); +} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java b/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java new file mode 100644 index 0000000..d691e75 --- /dev/null +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.bolt; + +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.jms.JmsMessageProducer; +import org.apache.storm.jms.JmsProvider; +import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt; +import org.apache.storm.utils.TupleUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; + +/** + * A JmsBolt receives <code>org.apache.storm.tuple.Tuple</code> objects from a Storm + * topology and publishes JMS Messages to a destination (topic or queue). + * <p> + * To use a JmsBolt in a topology, the following must be supplied: + * <ol> + * <li>A <code>JmsProvider</code> implementation.</li> + * <li>A <code>JmsMessageProducer</code> implementation.</li> + * </ol> + * The <code>JmsProvider</code> provides the JMS <code>javax.jms.ConnectionFactory</code> + * and <code>javax.jms.Destination</code> objects requied to publish JMS messages. + * <p> + * The JmsBolt uses a <code>JmsMessageProducer</code> to translate + * <code>org.apache.storm.tuple.Tuple</code> objects into + * <code>javax.jms.Message</code> objects for publishing. + * <p> + * Both JmsProvider and JmsMessageProducer must be set, or the bolt will + * fail upon deployment to a cluster. + * <p> + * The JmsBolt is typically an endpoint in a topology -- in other words + * it does not emit any tuples. + */ +public class JmsBolt extends BaseTickTupleAwareRichBolt { + private static Logger LOG = LoggerFactory.getLogger(JmsBolt.class); + + private boolean autoAck = true; + + // javax.jms objects + private Connection connection; + private Session session; + private MessageProducer messageProducer; + + // JMS options + private boolean jmsTransactional = false; + private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; + + + private JmsProvider jmsProvider; + private JmsMessageProducer producer; + + + private OutputCollector collector; + + /** + * Set the JmsProvider used to connect to the JMS destination topic/queue + * + * @param provider + */ + public void setJmsProvider(JmsProvider provider) { + this.jmsProvider = provider; + } + + /** + * Set the JmsMessageProducer used to convert tuples + * into JMS messages. + * + * @param producer + */ + public void setJmsMessageProducer(JmsMessageProducer producer) { + this.producer = producer; + } + + /** + * Sets the JMS acknowledgement mode for JMS messages sent + * by this bolt. + * <p> + * Possible values: + * <ul> + * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li> + * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li> + * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li> + * </ul> + * + * @param acknowledgeMode (constant defined in javax.jms.Session) + */ + public void setJmsAcknowledgeMode(int acknowledgeMode) { + this.jmsAcknowledgeMode = acknowledgeMode; + } + + /** + * Set the JMS transactional setting for the JMS session. + * + * @param transactional + */ +// public void setJmsTransactional(boolean transactional){ +// this.jmsTransactional = transactional; +// } + + /** + * Sets whether or not tuples should be acknowledged by this + * bolt. + * <p> + * + * @param autoAck + */ + public void setAutoAck(boolean autoAck) { + this.autoAck = autoAck; + } + + + /** + * Consumes a tuple and sends a JMS message. + * <p> + * If autoAck is true, the tuple will be acknowledged + * after the message is sent. + * <p> + * If JMS sending fails, the tuple will be failed. + */ + @Override + protected void process(Tuple input) { + // write the tuple to a JMS destination... + LOG.debug("Tuple received. Sending JMS message."); + + try { + Message msg = this.producer.toMessage(this.session, input); + if (msg != null) { + if (msg.getJMSDestination() != null) { + this.messageProducer.send(msg.getJMSDestination(), msg); + } else { + this.messageProducer.send(msg); + } + } + if (this.autoAck) { + LOG.debug("ACKing tuple: " + input); + this.collector.ack(input); + } + } catch (JMSException e) { + // failed to send the JMS message, fail the tuple fast + LOG.warn("Failing tuple: " + input); + LOG.warn("Exception: ", e); + this.collector.fail(input); + } + } + + /** + * Releases JMS resources. + */ + @Override + public void cleanup() { + try { + LOG.debug("Closing JMS connection."); + this.session.close(); + this.connection.close(); + } catch (JMSException e) { + LOG.warn("Error closing JMS connection.", e); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + } + + /** + * Initializes JMS resources. + */ + @Override + public void prepare(Map stormConf, TopologyContext context, + OutputCollector collector) { + if (this.jmsProvider == null || this.producer == null) { + throw new IllegalStateException("JMS Provider and MessageProducer not set."); + } + this.collector = collector; + LOG.debug("Connecting JMS.."); + try { + ConnectionFactory cf = this.jmsProvider.connectionFactory(); + Destination dest = this.jmsProvider.destination(); + this.connection = cf.createConnection(); + this.session = connection.createSession(this.jmsTransactional, + this.jmsAcknowledgeMode); + this.messageProducer = session.createProducer(dest); + + connection.start(); + } catch (Exception e) { + LOG.warn("Error creating JMS connection.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java new file mode 100644 index 0000000..b78a41e --- /dev/null +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.spout; + +import java.io.Serializable; + +public class JmsMessageID implements Comparable<JmsMessageID>, Serializable { + + private String jmsID; + + private Long sequence; + + public JmsMessageID(long sequence, String jmsID){ + this.jmsID = jmsID; + this.sequence = sequence; + } + + + public String getJmsID(){ + return this.jmsID; + } + + @Override + public int compareTo(JmsMessageID jmsMessageID) { + return (int)(this.sequence - jmsMessageID.sequence); + } + + @Override + public int hashCode() { + return this.sequence.hashCode(); + } + + @Override + public boolean equals(Object o) { + if(o instanceof JmsMessageID){ + JmsMessageID id = (JmsMessageID)o; + return this.jmsID.equals(id.jmsID); + } else { + return false; + } + } + +}
