STORM-2416: break out storm-jms-examples
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d6c8298d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d6c8298d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d6c8298d Branch: refs/heads/1.x-branch Commit: d6c8298d8a596589442cfa049039ff90baa33f5f Parents: e0b1333 Author: P. Taylor Goetz <[email protected]> Authored: Thu Mar 16 15:14:26 2017 -0400 Committer: P. Taylor Goetz <[email protected]> Committed: Thu Mar 16 15:14:26 2017 -0400 ---------------------------------------------------------------------- examples/storm-jms-examples/README.markdown | 12 + examples/storm-jms-examples/pom.xml | 151 +++++++ .../storm/jms/example/ExampleJmsTopology.java | 131 ++++++ .../apache/storm/jms/example/GenericBolt.java | 116 ++++++ .../storm/jms/example/JsonTupleProducer.java | 58 +++ .../storm/jms/example/SpringJmsProvider.java | 74 ++++ .../src/main/resources/jms-activemq.xml | 53 +++ .../src/main/resources/log4j.properties | 29 ++ external/storm-jms/core/pom.xml | 95 ----- .../apache/storm/jms/JmsMessageProducer.java | 46 --- .../java/org/apache/storm/jms/JmsProvider.java | 48 --- .../org/apache/storm/jms/JmsTupleProducer.java | 58 --- .../java/org/apache/storm/jms/bolt/JmsBolt.java | 219 ---------- .../apache/storm/jms/spout/JmsMessageID.java | 58 --- .../org/apache/storm/jms/spout/JmsSpout.java | 382 ----------------- .../org/apache/storm/jms/trident/JmsBatch.java | 27 -- .../org/apache/storm/jms/trident/JmsState.java | 129 ------ .../storm/jms/trident/JmsStateFactory.java | 40 -- .../apache/storm/jms/trident/JmsUpdater.java | 38 -- .../storm/jms/trident/TridentJmsSpout.java | 409 ------------------- .../apache/storm/jms/spout/JmsSpoutTest.java | 88 ---- .../apache/storm/jms/spout/MockJmsProvider.java | 62 --- .../jms/spout/MockSpoutOutputCollector.java | 55 --- .../storm/jms/spout/MockTupleProducer.java | 47 --- .../core/src/test/resources/jndi.properties | 18 - external/storm-jms/examples/README.markdown | 12 - external/storm-jms/examples/pom.xml | 151 ------- .../storm/jms/example/ExampleJmsTopology.java | 131 ------ .../apache/storm/jms/example/GenericBolt.java | 116 ------ .../storm/jms/example/JsonTupleProducer.java | 58 --- .../storm/jms/example/SpringJmsProvider.java | 74 ---- .../src/main/resources/jms-activemq.xml | 53 --- .../src/main/resources/log4j.properties | 29 -- external/storm-jms/pom.xml | 39 +- .../apache/storm/jms/JmsMessageProducer.java | 46 +++ .../java/org/apache/storm/jms/JmsProvider.java | 48 +++ .../org/apache/storm/jms/JmsTupleProducer.java | 58 +++ .../java/org/apache/storm/jms/bolt/JmsBolt.java | 219 ++++++++++ .../apache/storm/jms/spout/JmsMessageID.java | 58 +++ .../org/apache/storm/jms/spout/JmsSpout.java | 382 +++++++++++++++++ .../org/apache/storm/jms/trident/JmsBatch.java | 27 ++ .../org/apache/storm/jms/trident/JmsState.java | 129 ++++++ .../storm/jms/trident/JmsStateFactory.java | 40 ++ .../apache/storm/jms/trident/JmsUpdater.java | 38 ++ .../storm/jms/trident/TridentJmsSpout.java | 409 +++++++++++++++++++ .../apache/storm/jms/spout/JmsSpoutTest.java | 88 ++++ .../apache/storm/jms/spout/MockJmsProvider.java | 62 +++ .../jms/spout/MockSpoutOutputCollector.java | 55 +++ .../storm/jms/spout/MockTupleProducer.java | 47 +++ .../src/test/resources/jndi.properties | 18 + pom.xml | 1 + 51 files changed, 2381 insertions(+), 2450 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/examples/storm-jms-examples/README.markdown ---------------------------------------------------------------------- diff --git a/examples/storm-jms-examples/README.markdown b/examples/storm-jms-examples/README.markdown new file mode 100644 index 0000000..7a4d8f0 --- /dev/null +++ b/examples/storm-jms-examples/README.markdown @@ -0,0 +1,12 @@ +## About Storm JMS Examples +This project contains a simple storm topology that illustrates the usage of "storm-jms". + +To build: + +`mvn clean install` + +The default build will create a jar file that can be deployed to to a Storm cluster in the "target" directory: + +`storm-jms-examples-0.1-SNAPSHOT-jar-with-dependencies.jar` + + http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/examples/storm-jms-examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-jms-examples/pom.xml b/examples/storm-jms-examples/pom.xml new file mode 100644 index 0000000..6451283 --- /dev/null +++ b/examples/storm-jms-examples/pom.xml @@ -0,0 +1,151 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>storm</artifactId> + <groupId>org.apache.storm</groupId> + <version>1.1.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + + <artifactId>storm-jms-examples</artifactId> + + <properties> + <spring.version>2.5.6</spring.version> + </properties> + <dependencies> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-beans</artifactId> + <version>${spring.version}</version> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-core</artifactId> + <version>${spring.version}</version> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-context</artifactId> + <version>${spring.version}</version> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-jms</artifactId> + <version>${spring.version}</version> + </dependency> + <dependency> + <groupId>org.apache.xbean</groupId> + <artifactId>xbean-spring</artifactId> + <version>3.7</version> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${project.version}</version> + <!-- keep storm out of the jar-with-dependencies --> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-jms</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-core</artifactId> + <version>5.4.0</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + <build> + <plugins> + <!-- bind the maven-assembly-plugin to the package phase this will create + a jar file without the storm dependencies suitable for deployment to a cluster. --> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + <archive> + <manifest> + <mainClass></mainClass> + </manifest> + </archive> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + + </plugin> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.2.1</version> + <executions> + <execution> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + <configuration> + <executable>java</executable> + <includeProjectDependencies>true</includeProjectDependencies> + <includePluginDependencies>true</includePluginDependencies> + <mainClass>org.apache.storm.jms.example.ExampleJmsTopology</mainClass> + <systemProperties> + <systemProperty> + <key>log4j.configuration</key> + <value>file:./src/main/resources/log4j.properties</value> + </systemProperty> + </systemProperties> + </configuration> + <dependencies> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${project.version}</version> + <type>jar</type> + </dependency> + </dependencies> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java new file mode 100644 index 0000000..3324aac --- /dev/null +++ b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/ExampleJmsTopology.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.example; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.jms.JmsMessageProducer; +import org.apache.storm.jms.JmsProvider; +import org.apache.storm.jms.JmsTupleProducer; +import org.apache.storm.jms.bolt.JmsBolt; +import org.apache.storm.jms.spout.JmsSpout; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.ITuple; +import org.apache.storm.utils.Utils; + +public class ExampleJmsTopology { + public static final String JMS_QUEUE_SPOUT = "JMS_QUEUE_SPOUT"; + public static final String INTERMEDIATE_BOLT = "INTERMEDIATE_BOLT"; + public static final String FINAL_BOLT = "FINAL_BOLT"; + public static final String JMS_TOPIC_BOLT = "JMS_TOPIC_BOLT"; + public static final String JMS_TOPIC_SPOUT = "JMS_TOPIC_SPOUT"; + public static final String ANOTHER_BOLT = "ANOTHER_BOLT"; + + @SuppressWarnings("serial") + public static void main(String[] args) throws Exception { + + // JMS Queue Provider + JmsProvider jmsQueueProvider = new SpringJmsProvider( + "jms-activemq.xml", "jmsConnectionFactory", + "notificationQueue"); + + // JMS Topic provider + JmsProvider jmsTopicProvider = new SpringJmsProvider( + "jms-activemq.xml", "jmsConnectionFactory", + "notificationTopic"); + + // JMS Producer + JmsTupleProducer producer = new JsonTupleProducer(); + + // JMS Queue Spout + JmsSpout queueSpout = new JmsSpout(); + queueSpout.setJmsProvider(jmsQueueProvider); + queueSpout.setJmsTupleProducer(producer); + queueSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); + queueSpout.setDistributed(true); // allow multiple instances + + TopologyBuilder builder = new TopologyBuilder(); + + // spout with 5 parallel instances + builder.setSpout(JMS_QUEUE_SPOUT, queueSpout, 5); + + // intermediate bolt, subscribes to jms spout, anchors on tuples, and auto-acks + builder.setBolt(INTERMEDIATE_BOLT, + new GenericBolt("INTERMEDIATE_BOLT", true, true, new Fields("json")), 3).shuffleGrouping( + JMS_QUEUE_SPOUT); + + // bolt that subscribes to the intermediate bolt, and auto-acks + // messages. + builder.setBolt(FINAL_BOLT, new GenericBolt("FINAL_BOLT", true, true), 3).shuffleGrouping( + INTERMEDIATE_BOLT); + + // bolt that subscribes to the intermediate bolt, and publishes to a JMS Topic + JmsBolt jmsBolt = new JmsBolt(); + jmsBolt.setJmsProvider(jmsTopicProvider); + + // anonymous message producer just calls toString() on the tuple to create a jms message + jmsBolt.setJmsMessageProducer(new JmsMessageProducer() { + @Override + public Message toMessage(Session session, ITuple input) throws JMSException { + System.out.println("Sending JMS Message:" + input.toString()); + TextMessage tm = session.createTextMessage(input.toString()); + return tm; + } + }); + + builder.setBolt(JMS_TOPIC_BOLT, jmsBolt).shuffleGrouping(INTERMEDIATE_BOLT); + + // JMS Topic spout + JmsSpout topicSpout = new JmsSpout(); + topicSpout.setJmsProvider(jmsTopicProvider); + topicSpout.setJmsTupleProducer(producer); + topicSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); + topicSpout.setDistributed(false); + + builder.setSpout(JMS_TOPIC_SPOUT, topicSpout); + + builder.setBolt(ANOTHER_BOLT, new GenericBolt("ANOTHER_BOLT", true, true), 1).shuffleGrouping( + JMS_TOPIC_SPOUT); + + Config conf = new Config(); + + if (args.length > 0) { + conf.setNumWorkers(3); + + StormSubmitter.submitTopology(args[0], conf, + builder.createTopology()); + } else { + + conf.setDebug(true); + + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("storm-jms-example", conf, builder.createTopology()); + Utils.sleep(60000); + cluster.killTopology("storm-jms-example"); + cluster.shutdown(); + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java new file mode 100644 index 0000000..57de1ba --- /dev/null +++ b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/GenericBolt.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.example; + +import java.util.Map; + +import org.apache.storm.topology.base.BaseRichBolt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; + +/** + * A generic <code>org.apache.storm.topology.IRichBolt</code> implementation + * for testing/debugging the Storm JMS Spout and example topologies. + * <p> + * For debugging purposes, set the log level of the + * <code>org.apache.storm.contrib.jms</code> package to DEBUG for debugging + * output. + * + * @author tgoetz + */ +@SuppressWarnings("serial") +public class GenericBolt extends BaseRichBolt { + private static final Logger LOG = LoggerFactory.getLogger(GenericBolt.class); + private OutputCollector collector; + private boolean autoAck = false; + private boolean autoAnchor = false; + private Fields declaredFields; + private String name; + + /** + * Constructs a new <code>GenericBolt</code> instance. + * + * @param name The name of the bolt (used in DEBUG logging) + * @param autoAck Whether or not this bolt should automatically acknowledge received tuples. + * @param autoAnchor Whether or not this bolt should automatically anchor to received tuples. + * @param declaredFields The fields this bolt declares as output. + */ + public GenericBolt(String name, boolean autoAck, boolean autoAnchor, Fields declaredFields) { + this.name = name; + this.autoAck = autoAck; + this.autoAnchor = autoAnchor; + this.declaredFields = declaredFields; + } + + public GenericBolt(String name, boolean autoAck, boolean autoAnchor) { + this(name, autoAck, autoAnchor, null); + } + + @SuppressWarnings("rawtypes") + public void prepare(Map stormConf, TopologyContext context, + OutputCollector collector) { + this.collector = collector; + + } + + public void execute(Tuple input) { + LOG.debug("[" + this.name + "] Received message: " + input); + + + // only emit if we have declared fields. + if (this.declaredFields != null) { + LOG.debug("[" + this.name + "] emitting: " + input); + if (this.autoAnchor) { + this.collector.emit(input, input.getValues()); + } else { + this.collector.emit(input.getValues()); + } + } + + if (this.autoAck) { + LOG.debug("[" + this.name + "] ACKing tuple: " + input); + this.collector.ack(input); + } + + } + + public void cleanup() { + + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + if (this.declaredFields != null) { + declarer.declare(this.declaredFields); + } + } + + public boolean isAutoAck() { + return this.autoAck; + } + + public void setAutoAck(boolean autoAck) { + this.autoAck = autoAck; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java ---------------------------------------------------------------------- diff --git a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java new file mode 100644 index 0000000..9ee175e --- /dev/null +++ b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/JsonTupleProducer.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.example; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.TextMessage; + +import org.apache.storm.jms.JmsTupleProducer; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +/** + * A simple <code>JmsTupleProducer</code> that expects to receive + * JMS <code>TextMessage</code> objects with a body in JSON format. + * <p/> + * Ouputs a tuple with field name "json" and a string value + * containing the raw json. + * <p/> + * <b>NOTE: </b> Currently this implementation assumes the text is valid + * JSON and does not attempt to parse or validate it. + * + * @author tgoetz + * + */ +@SuppressWarnings("serial") +public class JsonTupleProducer implements JmsTupleProducer { + + public Values toTuple(Message msg) throws JMSException { + if(msg instanceof TextMessage){ + String json = ((TextMessage) msg).getText(); + return new Values(json); + } else { + return null; + } + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("json")); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java ---------------------------------------------------------------------- diff --git a/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java new file mode 100644 index 0000000..306fc25 --- /dev/null +++ b/examples/storm-jms-examples/src/main/java/org/apache/storm/jms/example/SpringJmsProvider.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.jms.example; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; + +import org.springframework.context.ApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +import org.apache.storm.jms.JmsProvider; + + +/** + * A <code>JmsProvider</code> that uses the spring framework + * to obtain a JMS <code>ConnectionFactory</code> and + * <code>Desitnation</code> objects. + * <p/> + * The constructor takes three arguments: + * <ol> + * <li>A string pointing to the the spring application context file contining the JMS configuration + * (must be on the classpath) + * </li> + * <li>The name of the connection factory bean</li> + * <li>The name of the destination bean</li> + * </ol> + * + * + * + */ +@SuppressWarnings("serial") +public class SpringJmsProvider implements JmsProvider { + private ConnectionFactory connectionFactory; + private Destination destination; + + /** + * Constructs a <code>SpringJmsProvider</code> object given the name of a + * classpath resource (the spring application context file), and the bean + * names of a JMS connection factory and destination. + * + * @param appContextClasspathResource - the spring configuration file (classpath resource) + * @param connectionFactoryBean - the JMS connection factory bean name + * @param destinationBean - the JMS destination bean name + */ + public SpringJmsProvider(String appContextClasspathResource, String connectionFactoryBean, String destinationBean){ + ApplicationContext context = new ClassPathXmlApplicationContext(appContextClasspathResource); + this.connectionFactory = (ConnectionFactory)context.getBean(connectionFactoryBean); + this.destination = (Destination)context.getBean(destinationBean); + } + + public ConnectionFactory connectionFactory() throws Exception { + return this.connectionFactory; + } + + public Destination destination() throws Exception { + return this.destination; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/examples/storm-jms-examples/src/main/resources/jms-activemq.xml ---------------------------------------------------------------------- diff --git a/examples/storm-jms-examples/src/main/resources/jms-activemq.xml b/examples/storm-jms-examples/src/main/resources/jms-activemq.xml new file mode 100644 index 0000000..1a845b8 --- /dev/null +++ b/examples/storm-jms-examples/src/main/resources/jms-activemq.xml @@ -0,0 +1,53 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:amq="http://activemq.apache.org/schema/core" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> + + <!-- ActiveMQ --> + + <!-- embedded ActiveMQ Broker --> + <!-- <amq:broker useJmx="false" persistent="false"> + <amq:transportConnectors> + <amq:transportConnector uri="tcp://localhost:61616" /> + </amq:transportConnectors> + </amq:broker> --> + + <amq:queue id="notificationQueue" physicalName="backtype.storm.contrib.example.queue" /> + + <amq:topic id="notificationTopic" physicalName="backtype.storm.contrib.example.topic" /> + + <amq:connectionFactory id="jmsConnectionFactory" + brokerURL="tcp://localhost:61616" /> + + <!-- <bean id="queueTemplate" class="org.springframework.jms.core.JmsTemplate"> + <property name="connectionFactory"> + <ref bean="jmsConnectionFactory" /> + </property> + <property name="pubSubDomain" value="false" /> + </bean> --> + +</beans> + + + + + http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/examples/storm-jms-examples/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/storm-jms-examples/src/main/resources/log4j.properties b/examples/storm-jms-examples/src/main/resources/log4j.properties new file mode 100644 index 0000000..079b195 --- /dev/null +++ b/examples/storm-jms-examples/src/main/resources/log4j.properties @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout + +log4j.appender.stdout.layout.ConversionPattern=%5p (%C:%L) - %m%n + + +log4j.logger.backtype.storm.contrib=DEBUG +log4j.logger.clojure.contrib=WARN +log4j.logger.org.springframework=WARN +log4j.logger.org.apache.zookeeper=WARN + http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-jms/core/pom.xml b/external/storm-jms/core/pom.xml deleted file mode 100644 index c2c04ea..0000000 --- a/external/storm-jms/core/pom.xml +++ /dev/null @@ -1,95 +0,0 @@ -<?xml version="1.0"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.storm</groupId> - <artifactId>storm-jms-parent</artifactId> - <version>1.1.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - - - <artifactId>storm-jms</artifactId> - - - - <developers> - <developer> - <id>ptgoetz</id> - <name>P. Taylor Goetz</name> - <email>[email protected]</email> - </developer> - </developers> - - <dependencies> - <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-core</artifactId> - <version>${project.version}</version> - <!-- keep storm out of the jar-with-dependencies --> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.geronimo.specs</groupId> - <artifactId>geronimo-jms_1.1_spec</artifactId> - <version>1.1.1</version> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.10</version> - <scope>test</scope> - </dependency> - - <!-- Active MQ --> - <dependency> - <groupId>org.apache.activemq</groupId> - <artifactId>activemq-core</artifactId> - <version>5.5.1</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - </exclusions> - </dependency> - - </dependencies> - - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - <version>2.9</version> - <configuration> - <additionalparam>-Xdoclint:none</additionalparam> - </configuration> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsMessageProducer.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsMessageProducer.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsMessageProducer.java deleted file mode 100644 index 4932929..0000000 --- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsMessageProducer.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jms; - -import java.io.Serializable; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Session; - -import org.apache.storm.tuple.ITuple; - -/** - * JmsMessageProducer implementations are responsible for translating - * a <code>org.apache.storm.tuple.Values</code> instance into a - * <code>javax.jms.Message</code> object. - * <p> - */ -public interface JmsMessageProducer extends Serializable { - - /** - * Translate a <code>org.apache.storm.tuple.Tuple</code> object - * to a <code>javax.jms.Message</code object. - * - * @param session - * @param input - * @return - * @throws JMSException - */ - public Message toMessage(Session session, ITuple input) throws JMSException; -} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsProvider.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsProvider.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsProvider.java deleted file mode 100644 index d976326..0000000 --- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsProvider.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jms; - -import java.io.Serializable; - -import javax.jms.ConnectionFactory; -import javax.jms.Destination; - -/** - * A <code>JmsProvider</code> object encapsulates the <code>ConnectionFactory</code> - * and <code>Destination</code> JMS objects the <code>JmsSpout</code> needs to manage - * a topic/queue connection over the course of it's lifecycle. - * - */ -public interface JmsProvider extends Serializable { - /** - * Provides the JMS <code>ConnectionFactory</code> - * - * @return the connection factory - * @throws Exception - */ - public ConnectionFactory connectionFactory() throws Exception; - - /** - * Provides the <code>Destination</code> (topic or queue) from which the - * <code>JmsSpout</code> will receive messages. - * - * @return - * @throws Exception - */ - public Destination destination() throws Exception; -} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsTupleProducer.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsTupleProducer.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsTupleProducer.java deleted file mode 100644 index 0bbb3a0..0000000 --- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/JmsTupleProducer.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jms; - -import java.io.Serializable; - -import javax.jms.JMSException; -import javax.jms.Message; - -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Values; - -/** - * Interface to define classes that can produce a Storm <code>Values</code> objects - * from a <code>javax.jms.Message</code> object>. - * <p> - * Implementations are also responsible for declaring the output - * fields they produce. - * <p> - * If for some reason the implementation can't process a message - * (for example if it received a <code>javax.jms.ObjectMessage</code> - * when it was expecting a <code>javax.jms.TextMessage</code> it should - * return <code>null</code> to indicate to the <code>JmsSpout</code> that - * the message could not be processed. - * - */ -public interface JmsTupleProducer extends Serializable { - /** - * Process a JMS message object to create a Values object. - * - * @param msg - the JMS message - * @return the Values tuple, or null if the message couldn't be processed. - * @throws JMSException - */ - Values toTuple(Message msg) throws JMSException; - - /** - * Declare the output fields produced by this JmsTupleProducer. - * - * @param declarer The OuputFieldsDeclarer for the spout. - */ - void declareOutputFields(OutputFieldsDeclarer declarer); -} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java deleted file mode 100644 index d691e75..0000000 --- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java +++ /dev/null @@ -1,219 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jms.bolt; - -import java.util.Map; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.storm.topology.base.BaseRichBolt; -import org.apache.storm.jms.JmsMessageProducer; -import org.apache.storm.jms.JmsProvider; -import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt; -import org.apache.storm.utils.TupleUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Tuple; - -/** - * A JmsBolt receives <code>org.apache.storm.tuple.Tuple</code> objects from a Storm - * topology and publishes JMS Messages to a destination (topic or queue). - * <p> - * To use a JmsBolt in a topology, the following must be supplied: - * <ol> - * <li>A <code>JmsProvider</code> implementation.</li> - * <li>A <code>JmsMessageProducer</code> implementation.</li> - * </ol> - * The <code>JmsProvider</code> provides the JMS <code>javax.jms.ConnectionFactory</code> - * and <code>javax.jms.Destination</code> objects requied to publish JMS messages. - * <p> - * The JmsBolt uses a <code>JmsMessageProducer</code> to translate - * <code>org.apache.storm.tuple.Tuple</code> objects into - * <code>javax.jms.Message</code> objects for publishing. - * <p> - * Both JmsProvider and JmsMessageProducer must be set, or the bolt will - * fail upon deployment to a cluster. - * <p> - * The JmsBolt is typically an endpoint in a topology -- in other words - * it does not emit any tuples. - */ -public class JmsBolt extends BaseTickTupleAwareRichBolt { - private static Logger LOG = LoggerFactory.getLogger(JmsBolt.class); - - private boolean autoAck = true; - - // javax.jms objects - private Connection connection; - private Session session; - private MessageProducer messageProducer; - - // JMS options - private boolean jmsTransactional = false; - private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; - - - private JmsProvider jmsProvider; - private JmsMessageProducer producer; - - - private OutputCollector collector; - - /** - * Set the JmsProvider used to connect to the JMS destination topic/queue - * - * @param provider - */ - public void setJmsProvider(JmsProvider provider) { - this.jmsProvider = provider; - } - - /** - * Set the JmsMessageProducer used to convert tuples - * into JMS messages. - * - * @param producer - */ - public void setJmsMessageProducer(JmsMessageProducer producer) { - this.producer = producer; - } - - /** - * Sets the JMS acknowledgement mode for JMS messages sent - * by this bolt. - * <p> - * Possible values: - * <ul> - * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li> - * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li> - * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li> - * </ul> - * - * @param acknowledgeMode (constant defined in javax.jms.Session) - */ - public void setJmsAcknowledgeMode(int acknowledgeMode) { - this.jmsAcknowledgeMode = acknowledgeMode; - } - - /** - * Set the JMS transactional setting for the JMS session. - * - * @param transactional - */ -// public void setJmsTransactional(boolean transactional){ -// this.jmsTransactional = transactional; -// } - - /** - * Sets whether or not tuples should be acknowledged by this - * bolt. - * <p> - * - * @param autoAck - */ - public void setAutoAck(boolean autoAck) { - this.autoAck = autoAck; - } - - - /** - * Consumes a tuple and sends a JMS message. - * <p> - * If autoAck is true, the tuple will be acknowledged - * after the message is sent. - * <p> - * If JMS sending fails, the tuple will be failed. - */ - @Override - protected void process(Tuple input) { - // write the tuple to a JMS destination... - LOG.debug("Tuple received. Sending JMS message."); - - try { - Message msg = this.producer.toMessage(this.session, input); - if (msg != null) { - if (msg.getJMSDestination() != null) { - this.messageProducer.send(msg.getJMSDestination(), msg); - } else { - this.messageProducer.send(msg); - } - } - if (this.autoAck) { - LOG.debug("ACKing tuple: " + input); - this.collector.ack(input); - } - } catch (JMSException e) { - // failed to send the JMS message, fail the tuple fast - LOG.warn("Failing tuple: " + input); - LOG.warn("Exception: ", e); - this.collector.fail(input); - } - } - - /** - * Releases JMS resources. - */ - @Override - public void cleanup() { - try { - LOG.debug("Closing JMS connection."); - this.session.close(); - this.connection.close(); - } catch (JMSException e) { - LOG.warn("Error closing JMS connection.", e); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - } - - /** - * Initializes JMS resources. - */ - @Override - public void prepare(Map stormConf, TopologyContext context, - OutputCollector collector) { - if (this.jmsProvider == null || this.producer == null) { - throw new IllegalStateException("JMS Provider and MessageProducer not set."); - } - this.collector = collector; - LOG.debug("Connecting JMS.."); - try { - ConnectionFactory cf = this.jmsProvider.connectionFactory(); - Destination dest = this.jmsProvider.destination(); - this.connection = cf.createConnection(); - this.session = connection.createSession(this.jmsTransactional, - this.jmsAcknowledgeMode); - this.messageProducer = session.createProducer(dest); - - connection.start(); - } catch (Exception e) { - LOG.warn("Error creating JMS connection.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java deleted file mode 100644 index b78a41e..0000000 --- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jms.spout; - -import java.io.Serializable; - -public class JmsMessageID implements Comparable<JmsMessageID>, Serializable { - - private String jmsID; - - private Long sequence; - - public JmsMessageID(long sequence, String jmsID){ - this.jmsID = jmsID; - this.sequence = sequence; - } - - - public String getJmsID(){ - return this.jmsID; - } - - @Override - public int compareTo(JmsMessageID jmsMessageID) { - return (int)(this.sequence - jmsMessageID.sequence); - } - - @Override - public int hashCode() { - return this.sequence.hashCode(); - } - - @Override - public boolean equals(Object o) { - if(o instanceof JmsMessageID){ - JmsMessageID id = (JmsMessageID)o; - return this.jmsID.equals(id.jmsID); - } else { - return false; - } - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsSpout.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsSpout.java deleted file mode 100644 index 6aaa7c9..0000000 --- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/spout/JmsSpout.java +++ /dev/null @@ -1,382 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jms.spout; - -import java.io.Serializable; -import java.util.*; -import java.util.concurrent.LinkedBlockingQueue; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; - -import org.apache.storm.topology.base.BaseRichSpout; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.storm.jms.JmsProvider; -import org.apache.storm.jms.JmsTupleProducer; -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Utils; - -/** - * A Storm <code>Spout</code> implementation that listens to a JMS topic or queue - * and outputs tuples based on the messages it receives. - * <p> - * <code>JmsSpout</code> instances rely on <code>JmsProducer</code> implementations - * to obtain the JMS <code>ConnectionFactory</code> and <code>Destination</code> objects - * necessary to connect to a JMS topic/queue. - * <p> - * When a <code>JmsSpout</code> receives a JMS message, it delegates to an - * internal <code>JmsTupleProducer</code> instance to create a Storm tuple from the - * incoming message. - * <p> - * Typically, developers will supply a custom <code>JmsTupleProducer</code> implementation - * appropriate for the expected message content. - */ -@SuppressWarnings("serial") -public class JmsSpout extends BaseRichSpout implements MessageListener { - private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class); - - // JMS options - private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; - - private boolean distributed = true; - - private JmsTupleProducer tupleProducer; - - private JmsProvider jmsProvider; - - private LinkedBlockingQueue<Message> queue; - private TreeSet<JmsMessageID> toCommit; - private HashMap<JmsMessageID, Message> pendingMessages; - private long messageSequence = 0; - - private SpoutOutputCollector collector; - - private transient Connection connection; - private transient Session session; - - private boolean hasFailures = false; - public final Serializable recoveryMutex = "RECOVERY_MUTEX"; - private Timer recoveryTimer = null; - private long recoveryPeriod = -1; // default to disabled - - /** - * Sets the JMS Session acknowledgement mode for the JMS seesion associated with this spout. - * <p> - * Possible values: - * <ul> - * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li> - * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li> - * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li> - * </ul> - * - * @param mode JMS Session Acknowledgement mode - * @throws IllegalArgumentException if the mode is not recognized. - */ - public void setJmsAcknowledgeMode(int mode) { - switch (mode) { - case Session.AUTO_ACKNOWLEDGE: - case Session.CLIENT_ACKNOWLEDGE: - case Session.DUPS_OK_ACKNOWLEDGE: - break; - default: - throw new IllegalArgumentException("Unknown Acknowledge mode: " + mode + " (See javax.jms.Session for valid values)"); - - } - this.jmsAcknowledgeMode = mode; - } - - /** - * Returns the JMS Session acknowledgement mode for the JMS seesion associated with this spout. - * - * @return - */ - public int getJmsAcknowledgeMode() { - return this.jmsAcknowledgeMode; - } - - /** - * Set the <code>JmsProvider</code> - * implementation that this Spout will use to connect to - * a JMS <code>javax.jms.Desination</code> - * - * @param provider - */ - public void setJmsProvider(JmsProvider provider) { - this.jmsProvider = provider; - } - - /** - * Set the <code>JmsTupleProducer</code> - * implementation that will convert <code>javax.jms.Message</code> - * object to <code>org.apache.storm.tuple.Values</code> objects - * to be emitted. - * - * @param producer - */ - public void setJmsTupleProducer(JmsTupleProducer producer) { - this.tupleProducer = producer; - } - - /** - * <code>javax.jms.MessageListener</code> implementation. - * <p> - * Stored the JMS message in an internal queue for processing - * by the <code>nextTuple()</code> method. - */ - public void onMessage(Message msg) { - try { - LOG.debug("Queuing msg [" + msg.getJMSMessageID() + "]"); - } catch (JMSException e) { - } - this.queue.offer(msg); - } - - /** - * <code>ISpout</code> implementation. - * <p> - * Connects the JMS spout to the configured JMS destination - * topic/queue. - */ - @SuppressWarnings("rawtypes") - public void open(Map conf, TopologyContext context, - SpoutOutputCollector collector) { - if (this.jmsProvider == null) { - throw new IllegalStateException("JMS provider has not been set."); - } - if (this.tupleProducer == null) { - throw new IllegalStateException("JMS Tuple Producer has not been set."); - } - Integer topologyTimeout = (Integer) conf.get("topology.message.timeout.secs"); - // TODO fine a way to get the default timeout from storm, so we're not hard-coding to 30 seconds (it could change) - topologyTimeout = topologyTimeout == null ? 30 : topologyTimeout; - if ((topologyTimeout.intValue() * 1000) > this.recoveryPeriod) { - LOG.warn("*** WARNING *** : " + - "Recovery period (" + this.recoveryPeriod + " ms.) is less then the configured " + - "'topology.message.timeout.secs' of " + topologyTimeout + - " secs. This could lead to a message replay flood!"); - } - this.queue = new LinkedBlockingQueue<Message>(); - this.toCommit = new TreeSet<JmsMessageID>(); - this.pendingMessages = new HashMap<JmsMessageID, Message>(); - this.collector = collector; - try { - ConnectionFactory cf = this.jmsProvider.connectionFactory(); - Destination dest = this.jmsProvider.destination(); - this.connection = cf.createConnection(); - this.session = connection.createSession(false, - this.jmsAcknowledgeMode); - MessageConsumer consumer = session.createConsumer(dest); - consumer.setMessageListener(this); - this.connection.start(); - if (this.isDurableSubscription() && this.recoveryPeriod > 0) { - this.recoveryTimer = new Timer(); - this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(), 10, this.recoveryPeriod); - } - - } catch (Exception e) { - LOG.warn("Error creating JMS connection.", e); - } - - } - - public void close() { - try { - LOG.debug("Closing JMS connection."); - this.session.close(); - this.connection.close(); - } catch (JMSException e) { - LOG.warn("Error closing JMS connection.", e); - } - - } - - public void nextTuple() { - Message msg = this.queue.poll(); - if (msg == null) { - Utils.sleep(50); - } else { - - LOG.debug("sending tuple: " + msg); - // get the tuple from the handler - try { - Values vals = this.tupleProducer.toTuple(msg); - // ack if we're not in AUTO_ACKNOWLEDGE mode, or the message requests ACKNOWLEDGE - LOG.debug("Requested deliveryMode: " + toDeliveryModeString(msg.getJMSDeliveryMode())); - LOG.debug("Our deliveryMode: " + toDeliveryModeString(this.jmsAcknowledgeMode)); - if (this.isDurableSubscription()) { - LOG.debug("Requesting acks."); - JmsMessageID messageId = new JmsMessageID(this.messageSequence++, msg.getJMSMessageID()); - this.collector.emit(vals, messageId); - - // at this point we successfully emitted. Store - // the message and message ID so we can do a - // JMS acknowledge later - this.pendingMessages.put(messageId, msg); - this.toCommit.add(messageId); - } else { - this.collector.emit(vals); - } - } catch (JMSException e) { - LOG.warn("Unable to convert JMS message: " + msg); - } - - } - - } - - /* - * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE - */ - public void ack(Object msgId) { - - Message msg = this.pendingMessages.remove(msgId); - JmsMessageID oldest = this.toCommit.first(); - if (msgId.equals(oldest)) { - if (msg != null) { - try { - LOG.debug("Committing..."); - msg.acknowledge(); - LOG.debug("JMS Message acked: " + msgId); - this.toCommit.remove(msgId); - } catch (JMSException e) { - LOG.warn("Error acknowldging JMS message: " + msgId, e); - } - } else { - LOG.warn("Couldn't acknowledge unknown JMS message ID: " + msgId); - } - } else { - this.toCommit.remove(msgId); - } - - } - - /* - * Will only be called if we're transactional or not AUTO_ACKNOWLEDGE - */ - public void fail(Object msgId) { - LOG.warn("Message failed: " + msgId); - this.pendingMessages.clear(); - this.toCommit.clear(); - synchronized (this.recoveryMutex) { - this.hasFailures = true; - } - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - this.tupleProducer.declareOutputFields(declarer); - - } - - /** - * Returns <code>true</code> if the spout has received failures - * from which it has not yet recovered. - */ - public boolean hasFailures() { - return this.hasFailures; - } - - protected void recovered() { - this.hasFailures = false; - } - - /** - * Sets the periodicity of the timer task that - * checks for failures and recovers the JMS session. - * - * @param period - */ - public void setRecoveryPeriod(long period) { - this.recoveryPeriod = period; - } - - public boolean isDistributed() { - return this.distributed; - } - - /** - * Sets the "distributed" mode of this spout. - * <p> - * If <code>true</code> multiple instances of this spout <i>may</i> be - * created across the cluster (depending on the "parallelism_hint" in the topology configuration). - * <p> - * Setting this value to <code>false</code> essentially means this spout will run as a singleton - * within the cluster ("parallelism_hint" will be ignored). - * <p> - * In general, this should be set to <code>false</code> if the underlying JMS destination is a - * topic, and <code>true</code> if it is a JMS queue. - * - * @param distributed - */ - public void setDistributed(boolean distributed) { - this.distributed = distributed; - } - - - private static final String toDeliveryModeString(int deliveryMode) { - switch (deliveryMode) { - case Session.AUTO_ACKNOWLEDGE: - return "AUTO_ACKNOWLEDGE"; - case Session.CLIENT_ACKNOWLEDGE: - return "CLIENT_ACKNOWLEDGE"; - case Session.DUPS_OK_ACKNOWLEDGE: - return "DUPS_OK_ACKNOWLEDGE"; - default: - return "UNKNOWN"; - - } - } - - protected Session getSession() { - return this.session; - } - - private boolean isDurableSubscription() { - return (this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE); - } - - - private class RecoveryTask extends TimerTask { - private final Logger LOG = LoggerFactory.getLogger(RecoveryTask.class); - - public void run() { - synchronized (JmsSpout.this.recoveryMutex) { - if (JmsSpout.this.hasFailures()) { - try { - LOG.info("Recovering from a message failure."); - JmsSpout.this.getSession().recover(); - JmsSpout.this.recovered(); - } catch (JMSException e) { - LOG.warn("Could not recover jms session.", e); - } - } - } - } - - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsBatch.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsBatch.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsBatch.java deleted file mode 100644 index c990058..0000000 --- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsBatch.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jms.trident; - -/** - * Batch coordination metadata object for the TridentJmsSpout. - * This implementation does not use batch metadata, so the object is empty. - * - */ -public class JmsBatch { - // Empty class -} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsState.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsState.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsState.java deleted file mode 100644 index bfb78b5..0000000 --- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsState.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jms.trident; - -import org.apache.storm.jms.JmsMessageProducer; -import org.apache.storm.jms.JmsProvider; -import org.apache.storm.topology.FailedException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.state.State; -import org.apache.storm.trident.tuple.TridentTuple; - -import javax.jms.*; -import java.io.Serializable; -import java.lang.IllegalStateException; -import java.util.List; - -public class JmsState implements State { - - private static final Logger LOG = LoggerFactory.getLogger(JmsState.class); - - private Options options; - private Connection connection; - private Session session; - private MessageProducer messageProducer; - - protected JmsState(Options options) { - this.options = options; - } - - public static class Options implements Serializable { - private JmsProvider jmsProvider; - private JmsMessageProducer msgProducer; - private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; - private boolean jmsTransactional = true; - - public Options withJmsProvider(JmsProvider provider) { - this.jmsProvider = provider; - return this; - } - - public Options withMessageProducer(JmsMessageProducer msgProducer) { - this.msgProducer = msgProducer; - return this; - } - - public Options withJmsAcknowledgeMode(int jmsAcknowledgeMode) { - this.jmsAcknowledgeMode = jmsAcknowledgeMode; - return this; - } - - public Options withJmsTransactional(boolean jmsTransactional) { - this.jmsTransactional = jmsTransactional; - return this; - } - } - - protected void prepare() { - if(this.options.jmsProvider == null || this.options.msgProducer == null){ - throw new IllegalStateException("JMS Provider and MessageProducer not set."); - } - LOG.debug("Connecting JMS.."); - try { - ConnectionFactory cf = this.options.jmsProvider.connectionFactory(); - Destination dest = this.options.jmsProvider.destination(); - this.connection = cf.createConnection(); - this.session = connection.createSession(this.options.jmsTransactional, - this.options.jmsAcknowledgeMode); - this.messageProducer = session.createProducer(dest); - - connection.start(); - } catch (Exception e) { - LOG.warn("Error creating JMS connection.", e); - } - } - - @Override - public void beginCommit(Long aLong) { - } - - @Override - public void commit(Long aLong) { - LOG.debug("Committing JMS transaction."); - if(this.options.jmsTransactional) { - try { - session.commit(); - } catch(JMSException e){ - LOG.error("JMS Session commit failed.", e); - } - } - } - - public void updateState(List<TridentTuple> tuples, TridentCollector collector) throws JMSException { - try { - for(TridentTuple tuple : tuples) { - Message msg = this.options.msgProducer.toMessage(this.session, tuple); - if (msg != null) { - if (msg.getJMSDestination() != null) { - this.messageProducer.send(msg.getJMSDestination(), msg); - } else { - this.messageProducer.send(msg); - } - } - } - } catch (JMSException e) { - LOG.warn("Failed to send jmd message for a trident batch ", e); - if(this.options.jmsTransactional) { - session.rollback(); - } - throw new FailedException("Failed to write tuples", e); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java deleted file mode 100644 index 9a02ba9..0000000 --- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jms.trident; - -import org.apache.storm.task.IMetricsContext; -import org.apache.storm.trident.state.State; -import org.apache.storm.trident.state.StateFactory; - -import java.util.Map; - -public class JmsStateFactory implements StateFactory { - - private JmsState.Options options; - - public JmsStateFactory(JmsState.Options options) { - this.options = options; - } - - @Override - public State makeState(Map map, IMetricsContext iMetricsContext, int partitionIndex, int numPartitions) { - JmsState state = new JmsState(options); - state.prepare(); - return state; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/d6c8298d/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java b/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java deleted file mode 100644 index a2709a4..0000000 --- a/external/storm-jms/core/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.jms.trident; - -import org.apache.storm.topology.FailedException; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.state.BaseStateUpdater; -import org.apache.storm.trident.tuple.TridentTuple; - -import javax.jms.JMSException; -import java.util.List; - -public class JmsUpdater extends BaseStateUpdater<JmsState> { - - @Override - public void updateState(JmsState jmsState, List<TridentTuple> tuples, TridentCollector collector) { - try { - jmsState.updateState(tuples, collector); - } catch (JMSException e) { - throw new FailedException("failed JMS opetation", e); - } - } -}
