Fixing stylecheck problems with storm-jms
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/95602b1b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/95602b1b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/95602b1b Branch: refs/heads/master Commit: 95602b1be7493c33b7dc8c3a8cb6e406b59e907d Parents: 224633d Author: Kishor Patil <[email protected]> Authored: Sun Apr 22 23:23:30 2018 -0400 Committer: Kishor Patil <[email protected]> Committed: Mon Apr 23 02:32:41 2018 -0400 ---------------------------------------------------------------------- external/storm-jms/pom.xml | 2 +- .../apache/storm/jms/JmsMessageProducer.java | 21 +- .../java/org/apache/storm/jms/JmsProvider.java | 20 +- .../org/apache/storm/jms/JmsTupleProducer.java | 21 +- .../java/org/apache/storm/jms/bolt/JmsBolt.java | 38 ++- .../apache/storm/jms/spout/JmsMessageID.java | 29 +-- .../org/apache/storm/jms/spout/JmsSpout.java | 216 +++++++++-------- .../org/apache/storm/jms/trident/JmsBatch.java | 19 +- .../org/apache/storm/jms/trident/JmsState.java | 103 ++++---- .../storm/jms/trident/JmsStateFactory.java | 22 +- .../apache/storm/jms/trident/JmsUpdater.java | 26 +- .../storm/jms/trident/TridentJmsSpout.java | 237 +++++++++---------- .../apache/storm/jms/spout/JmsSpoutTest.java | 37 ++- .../apache/storm/jms/spout/MockJmsProvider.java | 21 +- .../jms/spout/MockSpoutOutputCollector.java | 6 +- .../storm/jms/spout/MockTupleProducer.java | 2 +- 16 files changed, 382 insertions(+), 438 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-jms/pom.xml b/external/storm-jms/pom.xml index 81e7f62..c7bcc51 100644 --- a/external/storm-jms/pom.xml +++ b/external/storm-jms/pom.xml @@ -94,7 +94,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>235</maxAllowedViolations> + <maxAllowedViolations>63</maxAllowedViolations> </configuration> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java index 4932929..671cdd9 100644 --- a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsMessageProducer.java @@ -1,28 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jms; import java.io.Serializable; - import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; - import org.apache.storm.tuple.ITuple; /** http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java index d976326..b8dde44 100644 --- a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsProvider.java @@ -1,24 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jms; import java.io.Serializable; - import javax.jms.ConnectionFactory; import javax.jms.Destination; http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java index 0bbb3a0..4457f5a 100644 --- a/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/JmsTupleProducer.java @@ -1,27 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jms; import java.io.Serializable; - import javax.jms.JMSException; import javax.jms.Message; - import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Values; http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java b/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java index 9b3b614..0b461a1 100644 --- a/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/bolt/JmsBolt.java @@ -1,24 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jms.bolt; import java.util.Map; - import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; @@ -26,19 +20,15 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; - -import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.jms.JmsMessageProducer; import org.apache.storm.jms.JmsProvider; -import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt; -import org.apache.storm.utils.TupleUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt; import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A JmsBolt receives <code>org.apache.storm.tuple.Tuple</code> objects from a Storm @@ -124,9 +114,9 @@ public class JmsBolt extends BaseTickTupleAwareRichBolt { * * @param transactional */ -// public void setJmsTransactional(boolean transactional){ -// this.jmsTransactional = transactional; -// } + // public void setJmsTransactional(boolean transactional){ + // this.jmsTransactional = transactional; + // } /** * Sets whether or not tuples should be acknowledged by this @@ -208,7 +198,7 @@ public class JmsBolt extends BaseTickTupleAwareRichBolt { Destination dest = this.jmsProvider.destination(); this.connection = cf.createConnection(); this.session = connection.createSession(this.jmsTransactional, - this.jmsAcknowledgeMode); + this.jmsAcknowledgeMode); this.messageProducer = session.createProducer(dest); connection.start(); http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java index b78a41e..c069b7c 100644 --- a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsMessageID.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jms.spout; import java.io.Serializable; @@ -25,19 +20,19 @@ public class JmsMessageID implements Comparable<JmsMessageID>, Serializable { private Long sequence; - public JmsMessageID(long sequence, String jmsID){ + public JmsMessageID(long sequence, String jmsID) { this.jmsID = jmsID; this.sequence = sequence; } - public String getJmsID(){ + public String getJmsID() { return this.jmsID; } @Override public int compareTo(JmsMessageID jmsMessageID) { - return (int)(this.sequence - jmsMessageID.sequence); + return (int) (this.sequence - jmsMessageID.sequence); } @Override @@ -47,8 +42,8 @@ public class JmsMessageID implements Comparable<JmsMessageID>, Serializable { @Override public boolean equals(Object o) { - if(o instanceof JmsMessageID){ - JmsMessageID id = (JmsMessageID)o; + if (o instanceof JmsMessageID) { + JmsMessageID id = (JmsMessageID) o; return this.jmsID.equals(id.jmsID); } else { return false; http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java index 8973dbf..41d5636 100644 --- a/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java @@ -34,7 +34,6 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; - import org.apache.storm.Config; import org.apache.storm.jms.JmsProvider; import org.apache.storm.jms.JmsTupleProducer; @@ -48,10 +47,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** - * A Storm <code>Spout</code> implementation that listens to a JMS topic or - * queue and outputs tuples based on the messages it receives. + * A Storm <code>Spout</code> implementation that listens to a JMS topic or queue and outputs tuples based on the messages it receives. * * <p><code>JmsSpout</code> instances rely on <code>JmsProducer</code> * implementations to obtain the JMS @@ -59,8 +56,7 @@ import org.slf4j.LoggerFactory; * to connect to a JMS topic/queue. * * <p>When a <code>JmsSpout</code> receives a JMS message, it delegates to an - * internal <code>JmsTupleProducer</code> instance to create a Storm tuple from - * the incoming message. + * internal <code>JmsTupleProducer</code> instance to create a Storm tuple from the incoming message. * * <p>Typically, developers will supply a custom <code>JmsTupleProducer</code> * implementation appropriate for the expected message content. @@ -68,13 +64,19 @@ import org.slf4j.LoggerFactory; @SuppressWarnings("serial") public class JmsSpout extends BaseRichSpout implements MessageListener { - /** The logger object instance for this class. */ + /** + * The logger object instance for this class. + */ private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class); - /** The logger of the recovery task. */ + /** + * The logger of the recovery task. + */ private static final Logger RECOVERY_TASK_LOG = LoggerFactory.getLogger(RecoveryTask.class); - /** Time to sleep between queue polling attempts. */ + /** + * Time to sleep between queue polling attempts. + */ private static final int POLL_INTERVAL_MS = 50; /** @@ -82,88 +84,98 @@ public class JmsSpout extends BaseRichSpout implements MessageListener { */ private static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 30; - /** Time to wait before queuing the first recovery task. */ + /** + * Time to wait before queuing the first recovery task. + */ private static final int RECOVERY_DELAY_MS = 10; - + /** + * Used to safely recover failed JMS sessions across instances. + */ + private final Serializable recoveryMutex = "RECOVERY_MUTEX"; /** * The acknowledgment mode used for this instance. * * @see Session */ private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; - - /** Indicates whether or not this spout should run as a singleton. */ + /** + * Indicates whether or not this spout should run as a singleton. + */ private boolean distributed = true; - - /** Used to generate tuples from incoming messages. */ + /** + * Used to generate tuples from incoming messages. + */ private JmsTupleProducer tupleProducer; - - /** Encapsulates jms related classes needed to communicate with the mq. */ + /** + * Encapsulates jms related classes needed to communicate with the mq. + */ private JmsProvider jmsProvider; - - /** Stores incoming messages for later sending. */ + /** + * Stores incoming messages for later sending. + */ private LinkedBlockingQueue<Message> queue; - - /** Contains all message ids of messages that were not yet acked. */ + /** + * Contains all message ids of messages that were not yet acked. + */ private TreeSet<JmsMessageID> toCommit; - - /** Maps between message ids of not-yet acked messages, and the messages. */ + /** + * Maps between message ids of not-yet acked messages, and the messages. + */ private HashMap<JmsMessageID, Message> pendingMessages; - - /** Counter of handled messages. */ + /** + * Counter of handled messages. + */ private long messageSequence = 0; - - /** The collector used to emit tuples. */ + /** + * The collector used to emit tuples. + */ private SpoutOutputCollector collector; - - /** Connection to the jms queue. */ + /** + * Connection to the jms queue. + */ private transient Connection connection; - - /** The active jms session. */ + /** + * The active jms session. + */ private transient Session session; - - /** Indicates whether or not a message failed to be processed. */ + /** + * Indicates whether or not a message failed to be processed. + */ private boolean hasFailures = false; - - /** Used to safely recover failed JMS sessions across instances. */ - private final Serializable recoveryMutex = "RECOVERY_MUTEX"; - - /** Schedules recovery tasks periodically. */ + /** + * Schedules recovery tasks periodically. + */ private Timer recoveryTimer = null; - /** Time to wait between recovery attempts. */ + /** + * Time to wait between recovery attempts. + */ private long recoveryPeriodMs = -1; // default to disabled /** - * Sets the JMS Session acknowledgement mode for the JMS session. + * Translate the {@code int} value of an acknowledgment to a {@code String}. * - * <p>Possible values: - * <ul> - * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li> - * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li> - * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li> - * </ul> + * @param deliveryMode the mode to translate. + * @return its {@code String} explanation (name). * - * @param mode JMS Session Acknowledgement mode - * @throws IllegalArgumentException if the mode is not recognized. + * @see Session */ - public void setJmsAcknowledgeMode(final int mode) { - switch (mode) { + private static String toDeliveryModeString(int deliveryMode) { + switch (deliveryMode) { case Session.AUTO_ACKNOWLEDGE: + return "AUTO_ACKNOWLEDGE"; case Session.CLIENT_ACKNOWLEDGE: + return "CLIENT_ACKNOWLEDGE"; case Session.DUPS_OK_ACKNOWLEDGE: - break; + return "DUPS_OK_ACKNOWLEDGE"; default: - throw new IllegalArgumentException( - "Unknown Acknowledge mode: " + mode + " (See javax.jms.Session for valid values)"); + return "UNKNOWN"; } - this.jmsAcknowledgeMode = mode; } /** - * Returns the JMS Session acknowledgement mode for the JMS session - * associated with this spout. Can be either of: + * Returns the JMS Session acknowledgement mode for the JMS session associated with this spout. Can be either of: * <ul> * <li>{@link Session#AUTO_ACKNOWLEDGE}</li> * <li>{@link Session#CLIENT_ACKNOWLEDGE}</li> @@ -178,11 +190,37 @@ public class JmsSpout extends BaseRichSpout implements MessageListener { } /** + * Sets the JMS Session acknowledgement mode for the JMS session. + * + * <p>Possible values: + * <ul> + * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li> + * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li> + * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li> + * </ul> + * + * @param mode JMS Session Acknowledgement mode + * @throws IllegalArgumentException if the mode is not recognized. + */ + public void setJmsAcknowledgeMode(final int mode) { + switch (mode) { + case Session.AUTO_ACKNOWLEDGE: + case Session.CLIENT_ACKNOWLEDGE: + case Session.DUPS_OK_ACKNOWLEDGE: + break; + default: + throw new IllegalArgumentException( + "Unknown Acknowledge mode: " + mode + " (See javax.jms.Session for valid values)"); + + } + this.jmsAcknowledgeMode = mode; + } + + /** * Set {@link #jmsProvider}. * * <p>Set the <code>JmsProvider</code> - * implementation that this Spout will use to connect to - * a JMS <code>javax.jms.Desination</code> + * implementation that this Spout will use to connect to a JMS <code>javax.jms.Desination</code> * * @param provider the provider to use */ @@ -191,10 +229,8 @@ public class JmsSpout extends BaseRichSpout implements MessageListener { } /** - * Set the <code>JmsTupleProducer</code> - * implementation that will convert <code>javax.jms.Message</code> - * object to <code>org.apache.storm.tuple.Values</code> objects - * to be emitted. + * Set the <code>JmsTupleProducer</code> implementation that will convert <code>javax.jms.Message</code> object to + * <code>org.apache.storm.tuple.Values</code> objects to be emitted. * * @param producer the producer instance to use */ @@ -238,12 +274,12 @@ public class JmsSpout extends BaseRichSpout implements MessageListener { } // TODO get the default value from storm instead of hard coding 30 secs Long topologyTimeout = - ((Number) conf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, DEFAULT_MESSAGE_TIMEOUT_SECS)).longValue(); + ((Number) conf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, DEFAULT_MESSAGE_TIMEOUT_SECS)).longValue(); if ((TimeUnit.SECONDS.toMillis(topologyTimeout)) > this.recoveryPeriodMs) { LOG.warn("*** WARNING *** : " - + "Recovery period (" + this.recoveryPeriodMs + " ms.) is less then the configured " - + "'topology.message.timeout.secs' of " + topologyTimeout - + " secs. This could lead to a message replay flood!"); + + "Recovery period (" + this.recoveryPeriodMs + " ms.) is less then the configured " + + "'topology.message.timeout.secs' of " + topologyTimeout + + " secs. This could lead to a message replay flood!"); } this.queue = new LinkedBlockingQueue<Message>(); this.toCommit = new TreeSet<JmsMessageID>(); @@ -288,10 +324,8 @@ public class JmsSpout extends BaseRichSpout implements MessageListener { * Generate the next tuple from a message. * * <p>This method polls the queue that's being filled asynchronously by the - * jms connection, every {@link #POLL_INTERVAL_MS} seconds. When a message - * arrives, a {@link Values} (tuple) is generated using - * {@link #tupleProducer}. It is emitted, and the message is saved to - * {@link #toCommit} and {@link #pendingMessages} for later handling. + * jms connection, every {@link #POLL_INTERVAL_MS} seconds. When a message arrives, a {@link Values} (tuple) is generated using {@link + * #tupleProducer}. It is emitted, and the message is saved to {@link #toCommit} and {@link #pendingMessages} for later handling. */ public void nextTuple() { Message msg = this.queue.poll(); @@ -332,8 +366,7 @@ public class JmsSpout extends BaseRichSpout implements MessageListener { * Ack a successfully handled message by the matching {@link JmsMessageID}. * * <p>Acking means removing the message from the pending messages - * collections, and if it was the oldest pending message - - * ack it to the mq as well, so that it's the only one acked. + * collections, and if it was the oldest pending message - ack it to the mq as well, so that it's the only one acked. * * <p>Will only be called if we're transactional or not AUTO_ACKNOWLEDGE. */ @@ -392,8 +425,7 @@ public class JmsSpout extends BaseRichSpout implements MessageListener { } /** - * Returns <code>true</code> if the spout has received failures - * from which it has not yet recovered. + * Returns <code>true</code> if the spout has received failures from which it has not yet recovered. * * @return {@code true} if there were failures, {@code false} otherwise. */ @@ -409,8 +441,7 @@ public class JmsSpout extends BaseRichSpout implements MessageListener { } /** - * Sets the periodicity of the timer task that - * checks for failures and recovers the JMS session. + * Sets the periodicity of the timer task that checks for failures and recovers the JMS session. * * @param period desired wait period */ @@ -429,45 +460,21 @@ public class JmsSpout extends BaseRichSpout implements MessageListener { * Sets the "distributed" mode of this spout. * * <p>If <code>true</code> multiple instances of this spout <i>may</i> be - * created across the cluster - * (depending on the "parallelism_hint" in the topology configuration). + * created across the cluster (depending on the "parallelism_hint" in the topology configuration). * * <p>Setting this value to <code>false</code> essentially means this spout - * will run as a singleton within the cluster - * ("parallelism_hint" will be ignored). + * will run as a singleton within the cluster ("parallelism_hint" will be ignored). * * <p>In general, this should be set to <code>false</code> if the underlying * JMS destination is a topic, and <code>true</code> if it is a JMS queue. * - * @param isDistributed {@code true} if should be distributed, {@code false} - * otherwise. + * @param isDistributed {@code true} if should be distributed, {@code false} otherwise. */ public void setDistributed(boolean isDistributed) { this.distributed = isDistributed; } /** - * Translate the {@code int} value of an acknowledgment to a {@code String}. - * - * @param deliveryMode the mode to translate. - * @return its {@code String} explanation (name). - * @see Session - */ - private static String toDeliveryModeString(int deliveryMode) { - switch (deliveryMode) { - case Session.AUTO_ACKNOWLEDGE: - return "AUTO_ACKNOWLEDGE"; - case Session.CLIENT_ACKNOWLEDGE: - return "CLIENT_ACKNOWLEDGE"; - case Session.DUPS_OK_ACKNOWLEDGE: - return "DUPS_OK_ACKNOWLEDGE"; - default: - return "UNKNOWN"; - - } - } - - /** * @return The currently active session. */ protected Session getSession() { @@ -477,8 +484,7 @@ public class JmsSpout extends BaseRichSpout implements MessageListener { /** * Check if the subscription requires messages to be acked. * - * @return {@code true} if there is a pending messages state, {@code false} - * otherwise. + * @return {@code true} if there is a pending messages state, {@code false} otherwise. */ private boolean isDurableSubscription() { return (this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE); http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsBatch.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsBatch.java b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsBatch.java index c990058..5db4677 100644 --- a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsBatch.java +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsBatch.java @@ -1,20 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jms.trident; /** http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsState.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsState.java b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsState.java index bfb78b5..3611e78 100644 --- a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsState.java +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsState.java @@ -1,35 +1,34 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jms.trident; +import java.io.Serializable; +import java.util.List; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; import org.apache.storm.jms.JmsMessageProducer; import org.apache.storm.jms.JmsProvider; import org.apache.storm.topology.FailedException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.state.State; import org.apache.storm.trident.tuple.TridentTuple; - -import javax.jms.*; -import java.io.Serializable; -import java.lang.IllegalStateException; -import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class JmsState implements State { @@ -44,35 +43,8 @@ public class JmsState implements State { this.options = options; } - public static class Options implements Serializable { - private JmsProvider jmsProvider; - private JmsMessageProducer msgProducer; - private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; - private boolean jmsTransactional = true; - - public Options withJmsProvider(JmsProvider provider) { - this.jmsProvider = provider; - return this; - } - - public Options withMessageProducer(JmsMessageProducer msgProducer) { - this.msgProducer = msgProducer; - return this; - } - - public Options withJmsAcknowledgeMode(int jmsAcknowledgeMode) { - this.jmsAcknowledgeMode = jmsAcknowledgeMode; - return this; - } - - public Options withJmsTransactional(boolean jmsTransactional) { - this.jmsTransactional = jmsTransactional; - return this; - } - } - protected void prepare() { - if(this.options.jmsProvider == null || this.options.msgProducer == null){ + if (this.options.jmsProvider == null || this.options.msgProducer == null) { throw new IllegalStateException("JMS Provider and MessageProducer not set."); } LOG.debug("Connecting JMS.."); @@ -81,7 +53,7 @@ public class JmsState implements State { Destination dest = this.options.jmsProvider.destination(); this.connection = cf.createConnection(); this.session = connection.createSession(this.options.jmsTransactional, - this.options.jmsAcknowledgeMode); + this.options.jmsAcknowledgeMode); this.messageProducer = session.createProducer(dest); connection.start(); @@ -97,10 +69,10 @@ public class JmsState implements State { @Override public void commit(Long aLong) { LOG.debug("Committing JMS transaction."); - if(this.options.jmsTransactional) { + if (this.options.jmsTransactional) { try { session.commit(); - } catch(JMSException e){ + } catch (JMSException e) { LOG.error("JMS Session commit failed.", e); } } @@ -108,7 +80,7 @@ public class JmsState implements State { public void updateState(List<TridentTuple> tuples, TridentCollector collector) throws JMSException { try { - for(TridentTuple tuple : tuples) { + for (TridentTuple tuple : tuples) { Message msg = this.options.msgProducer.toMessage(this.session, tuple); if (msg != null) { if (msg.getJMSDestination() != null) { @@ -120,10 +92,37 @@ public class JmsState implements State { } } catch (JMSException e) { LOG.warn("Failed to send jmd message for a trident batch ", e); - if(this.options.jmsTransactional) { + if (this.options.jmsTransactional) { session.rollback(); } throw new FailedException("Failed to write tuples", e); } } + + public static class Options implements Serializable { + private JmsProvider jmsProvider; + private JmsMessageProducer msgProducer; + private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; + private boolean jmsTransactional = true; + + public Options withJmsProvider(JmsProvider provider) { + this.jmsProvider = provider; + return this; + } + + public Options withMessageProducer(JmsMessageProducer msgProducer) { + this.msgProducer = msgProducer; + return this; + } + + public Options withJmsAcknowledgeMode(int jmsAcknowledgeMode) { + this.jmsAcknowledgeMode = jmsAcknowledgeMode; + return this; + } + + public Options withJmsTransactional(boolean jmsTransactional) { + this.jmsTransactional = jmsTransactional; + return this; + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java index 4123752..b72f88c 100644 --- a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsStateFactory.java @@ -1,28 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jms.trident; +import java.util.Map; import org.apache.storm.task.IMetricsContext; import org.apache.storm.trident.state.State; import org.apache.storm.trident.state.StateFactory; -import java.util.Map; - public class JmsStateFactory implements StateFactory { private JmsState.Options options; http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java index a2709a4..92b8a95 100644 --- a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/JmsUpdater.java @@ -1,31 +1,25 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jms.trident; +import java.util.List; +import javax.jms.JMSException; import org.apache.storm.topology.FailedException; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.state.BaseStateUpdater; import org.apache.storm.trident.tuple.TridentTuple; -import javax.jms.JMSException; -import java.util.List; - -public class JmsUpdater extends BaseStateUpdater<JmsState> { +public class JmsUpdater extends BaseStateUpdater<JmsState> { @Override public void updateState(JmsState jmsState, List<TridentTuple> tuples, TridentCollector collector) { http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java index afdc0b2..de7c182 100644 --- a/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java +++ b/external/storm-jms/src/main/java/org/apache/storm/jms/trident/TridentJmsSpout.java @@ -1,27 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.jms.trident; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; - import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; @@ -30,23 +24,21 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; - +import org.apache.storm.Config; +import org.apache.storm.generated.StreamInfo; import org.apache.storm.jms.JmsProvider; import org.apache.storm.jms.JmsTupleProducer; -import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsGetter; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.spout.ITridentSpout; import org.apache.storm.trident.topology.TransactionAttempt; -import org.apache.storm.Config; -import org.apache.storm.generated.StreamInfo; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsGetter; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Trident implementation of the JmsSpout @@ -56,29 +48,52 @@ import org.apache.storm.utils.RotatingMap; public class TridentJmsSpout implements ITridentSpout<JmsBatch> { public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size"; - + public static final int DEFAULT_BATCH_SIZE = 1000; private static final long serialVersionUID = -3469351154693356655L; - + private static int nameIndex = 1; private JmsTupleProducer tupleProducer; - private JmsProvider jmsProvider; - private int jmsAcknowledgeMode; - private String name; - private static int nameIndex = 1; - /** * Create a TridentJmsSpout with a default name and acknowledge mode of AUTO_ACKNOWLEDGE */ public TridentJmsSpout() { - this.name = "JmsSpout_"+(nameIndex++); + this.name = "JmsSpout_" + (nameIndex++); this.jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; } - + + /** + * Return a friendly string for the given JMS acknowledge mode, or throw an IllegalArgumentException if + * the mode is not recognized. + * <p/> + * Possible values: + * <ul> + * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li> + * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li> + * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li> + * </ul> + * @param acknowledgeMode A valid JMS acknowledge mode + * @return A friendly string describing the acknowledge mode + * @throws IllegalArgumentException if the mode is not recognized + */ + private static final String toDeliveryModeString(int acknowledgeMode) { + switch (acknowledgeMode) { + case Session.AUTO_ACKNOWLEDGE: + return "AUTO_ACKNOWLEDGE"; + case Session.CLIENT_ACKNOWLEDGE: + return "CLIENT_ACKNOWLEDGE"; + case Session.DUPS_OK_ACKNOWLEDGE: + return "DUPS_OK_ACKNOWLEDGE"; + default: + throw new IllegalArgumentException( + "Unknown JMS Acknowledge mode " + acknowledgeMode + " (See javax.jms.Session for valid values)"); + } + } + /** * Set the name for this spout, to improve log identification * @param name The name to be used in log messages @@ -88,25 +103,25 @@ public class TridentJmsSpout implements ITridentSpout<JmsBatch> { this.name = name; return this; } - + /** * Set the <code>JmsProvider</code> - * implementation that this Spout will use to connect to + * implementation that this Spout will use to connect to * a JMS <code>javax.jms.Desination</code> - * + * * @param provider */ - public TridentJmsSpout withJmsProvider(JmsProvider provider){ + public TridentJmsSpout withJmsProvider(JmsProvider provider) { this.jmsProvider = provider; return this; } - + /** * Set the <code>JmsTupleProducer</code> * implementation that will convert <code>javax.jms.Message</code> * object to <code>backtype.storm.tuple.Values</code> objects * to be emitted. - * + * * @param tupleProducer * @return This spout */ @@ -114,7 +129,7 @@ public class TridentJmsSpout implements ITridentSpout<JmsBatch> { this.tupleProducer = tupleProducer; return this; } - + /** * Set the JMS acknowledge mode for messages being processed by this spout. * <p/> @@ -133,37 +148,10 @@ public class TridentJmsSpout implements ITridentSpout<JmsBatch> { this.jmsAcknowledgeMode = jmsAcknowledgeMode; return this; } - - /** - * Return a friendly string for the given JMS acknowledge mode, or throw an IllegalArgumentException if - * the mode is not recognized. - * <p/> - * Possible values: - * <ul> - * <li>javax.jms.Session.AUTO_ACKNOWLEDGE</li> - * <li>javax.jms.Session.CLIENT_ACKNOWLEDGE</li> - * <li>javax.jms.Session.DUPS_OK_ACKNOWLEDGE</li> - * </ul> - * @param acknowledgeMode A valid JMS acknowledge mode - * @return A friendly string describing the acknowledge mode - * @throws IllegalArgumentException if the mode is not recognized - */ - private static final String toDeliveryModeString(int acknowledgeMode) { - switch (acknowledgeMode) { - case Session.AUTO_ACKNOWLEDGE: - return "AUTO_ACKNOWLEDGE"; - case Session.CLIENT_ACKNOWLEDGE: - return "CLIENT_ACKNOWLEDGE"; - case Session.DUPS_OK_ACKNOWLEDGE: - return "DUPS_OK_ACKNOWLEDGE"; - default: - throw new IllegalArgumentException("Unknown JMS Acknowledge mode " + acknowledgeMode + " (See javax.jms.Session for valid values)"); - } - } - + @Override public ITridentSpout.BatchCoordinator<JmsBatch> getCoordinator( - String txStateId, Map<String, Object> conf, TopologyContext context) { + String txStateId, Map<String, Object> conf, TopologyContext context) { return new JmsBatchCoordinator(name); } @@ -185,10 +173,10 @@ public class TridentJmsSpout implements ITridentSpout<JmsBatch> { if (streamInfo == null) { throw new IllegalArgumentException("Jms Tuple producer has not declared output fields for the default stream"); } - + return new Fields(streamInfo.get_output_fields()); } - + /** * The JmsEmitter class listens for incoming messages and stores them in a blocking queue. On each invocation of emit, * the queued messages are emitted as a batch. @@ -201,16 +189,15 @@ public class TridentJmsSpout implements ITridentSpout<JmsBatch> { private final Session session; private final RotatingMap<Long, List<Message>> batchMessageMap; // Maps transaction Ids to JMS message ids. - + private final long rotateTimeMillis; private final int maxBatchSize; private final String name; - - private long lastRotate; - private final Logger LOG = LoggerFactory.getLogger(JmsEmitter.class); - - public JmsEmitter(String name, JmsProvider jmsProvider, JmsTupleProducer tupleProducer, int jmsAcknowledgeMode, Map<String, Object> conf) { + private long lastRotate; + + public JmsEmitter(String name, JmsProvider jmsProvider, JmsTupleProducer tupleProducer, int jmsAcknowledgeMode, + Map<String, Object> conf) { if (jmsProvider == null) { throw new IllegalStateException("JMS provider has not been set."); } @@ -220,11 +207,11 @@ public class TridentJmsSpout implements ITridentSpout<JmsBatch> { this.queue = new LinkedBlockingQueue<Message>(); this.name = name; - + batchMessageMap = new RotatingMap<Long, List<Message>>(3); - rotateTimeMillis = 1000L * ((Number)conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue(); + rotateTimeMillis = 1000L * ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue(); lastRotate = System.currentTimeMillis(); - + Number batchSize = (Number) conf.get(MAX_BATCH_SIZE_CONF); maxBatchSize = batchSize != null ? batchSize.intValue() : DEFAULT_BATCH_SIZE; @@ -237,40 +224,41 @@ public class TridentJmsSpout implements ITridentSpout<JmsBatch> { consumer.setMessageListener(this); this.connection.start(); - LOG.info("Created JmsEmitter with max batch size "+maxBatchSize+" rotate time "+rotateTimeMillis+"ms and destination "+dest+" for "+name); + LOG.info( + "Created JmsEmitter with max batch size " + maxBatchSize + " rotate time " + rotateTimeMillis + "ms and destination " + + dest + " for " + name); } catch (Exception e) { LOG.warn("Error creating JMS connection.", e); throw new IllegalStateException("Could not create JMS connection for spout ", e); } - + } - + @Override public void success(TransactionAttempt tx) { - + @SuppressWarnings("unchecked") List<Message> messages = (List<Message>) batchMessageMap.remove(tx.getTransactionId()); - + if (messages != null) { if (!messages.isEmpty()) { - LOG.debug("Success for batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name); + LOG.debug("Success for batch with transaction id " + tx.getTransactionId() + "/" + tx.getAttemptId() + " for " + name); } - - for (Message msg: messages) { + + for (Message msg : messages) { String messageId = "UnknownId"; - + try { messageId = msg.getJMSMessageID(); msg.acknowledge(); - LOG.trace("Acknowledged message "+messageId); + LOG.trace("Acknowledged message " + messageId); } catch (JMSException e) { - LOG.warn("Failed to acknowledge message "+messageId, e); + LOG.warn("Failed to acknowledge message " + messageId, e); } } - } - else { - LOG.warn("No messages found in batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()); + } else { + LOG.warn("No messages found in batch with transaction id " + tx.getTransactionId() + "/" + tx.getAttemptId()); } } @@ -282,19 +270,18 @@ public class TridentJmsSpout implements ITridentSpout<JmsBatch> { * @param messages The list of messages to fail. */ private void fail(Long transactionId, List<Message> messages) { - LOG.debug("Failure for batch with transaction id "+transactionId+" for "+name); + LOG.debug("Failure for batch with transaction id " + transactionId + " for " + name); if (messages != null) { - for (Message msg: messages) { + for (Message msg : messages) { try { - LOG.trace("Failed message "+msg.getJMSMessageID()); + LOG.trace("Failed message " + msg.getJMSMessageID()); } catch (JMSException e) { LOG.warn("Could not identify failed message ", e); } } + } else { + LOG.warn("Failed batch has no messages with transaction id " + transactionId); } - else { - LOG.warn("Failed batch has no messages with transaction id "+transactionId); - } } @Override @@ -305,37 +292,37 @@ public class TridentJmsSpout implements ITridentSpout<JmsBatch> { this.connection.close(); } catch (JMSException e) { LOG.warn("Error closing JMS connection.", e); - } + } } @Override public void emitBatch(TransactionAttempt tx, JmsBatch coordinatorMeta, - TridentCollector collector) { - + TridentCollector collector) { + long now = System.currentTimeMillis(); - if(now - lastRotate > rotateTimeMillis) { + if (now - lastRotate > rotateTimeMillis) { Map<Long, List<Message>> failed = batchMessageMap.rotate(); - for(Long id: failed.keySet()) { - LOG.warn("TIMED OUT batch with transaction id "+id+" for "+name); + for (Long id : failed.keySet()) { + LOG.warn("TIMED OUT batch with transaction id " + id + " for " + name); fail(id, failed.get(id)); } lastRotate = now; } - - if(batchMessageMap.containsKey(tx.getTransactionId())) { - LOG.warn("FAILED duplicate batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name); + + if (batchMessageMap.containsKey(tx.getTransactionId())) { + LOG.warn("FAILED duplicate batch with transaction id " + tx.getTransactionId() + "/" + tx.getAttemptId() + " for " + name); fail(tx.getTransactionId(), batchMessageMap.get(tx.getTransactionId())); } - + List<Message> batchMessages = new ArrayList<Message>(); - - for (int index=0; index<maxBatchSize; index++) { + + for (int index = 0; index < maxBatchSize; index++) { Message msg = queue.poll(); if (msg == null) { Utils.sleep(50); // Back off break; } - + try { if (TridentJmsSpout.this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE) { batchMessages.add(msg); @@ -343,15 +330,17 @@ public class TridentJmsSpout implements ITridentSpout<JmsBatch> { Values tuple = tupleProducer.toTuple(msg); collector.emit(tuple); } catch (JMSException e) { - LOG.warn("Failed to emit message, could not retrieve data for "+name+": "+e ); + LOG.warn("Failed to emit message, could not retrieve data for " + name + ": " + e); } } - + if (!batchMessages.isEmpty()) { - LOG.debug("Emitting batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" and size "+batchMessages.size()+" for "+name); - } - else { - LOG.trace("No items to acknowledge for batch with transaction id "+tx.getTransactionId()+"/"+tx.getAttemptId()+" for "+name); + LOG.debug("Emitting batch with transaction id " + tx.getTransactionId() + "/" + tx.getAttemptId() + " and size " + + batchMessages.size() + " for " + name); + } else { + LOG.trace( + "No items to acknowledge for batch with transaction id " + tx.getTransactionId() + "/" + tx.getAttemptId() + " for " + + name); } batchMessageMap.put(tx.getTransactionId(), batchMessages); } @@ -365,9 +354,9 @@ public class TridentJmsSpout implements ITridentSpout<JmsBatch> { } this.queue.offer(msg); } - + } - + /** * Bare implementation of a BatchCoordinator, returning a null JmsBatch object * @@ -375,17 +364,17 @@ public class TridentJmsSpout implements ITridentSpout<JmsBatch> { private class JmsBatchCoordinator implements BatchCoordinator<JmsBatch> { private final String name; - + private final Logger LOG = LoggerFactory.getLogger(JmsBatchCoordinator.class); public JmsBatchCoordinator(String name) { this.name = name; - LOG.info("Created batch coordinator for "+name); + LOG.info("Created batch coordinator for " + name); } - + @Override public JmsBatch initializeTransaction(long txid, JmsBatch prevMetadata, JmsBatch curMetadata) { - LOG.debug("Initialise transaction "+txid+" for "+name); + LOG.debug("Initialise transaction " + txid + " for " + name); return null; } @@ -401,7 +390,7 @@ public class TridentJmsSpout implements ITridentSpout<JmsBatch> { @Override public void close() { } - + } } http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java index b6406c8..9f967f8 100644 --- a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java +++ b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/JmsSpoutTest.java @@ -15,16 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.jms.spout; -import org.apache.storm.Config; -import org.apache.storm.jms.JmsProvider; -import org.apache.storm.spout.SpoutOutputCollector; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +package org.apache.storm.jms.spout; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.HashMap; +import java.util.Map; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; @@ -32,15 +30,17 @@ import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.util.HashMap; -import java.util.Map; +import org.apache.storm.Config; +import org.apache.storm.jms.JmsProvider; +import org.apache.storm.spout.SpoutOutputCollector; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class JmsSpoutTest { private static final Logger LOG = - LoggerFactory.getLogger(JmsSpoutTest.class); + LoggerFactory.getLogger(JmsSpoutTest.class); @Test public void testFailure() throws JMSException, Exception { @@ -85,9 +85,8 @@ public class JmsSpoutTest { } /** - * Make sure that {@link JmsSpout#open} returns correctly regardless of - * the type of {@link Number} that is the value of - * {@link Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}. + * Make sure that {@link JmsSpout#open} returns correctly regardless of the type of {@link Number} that is the value of {@link + * Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}. */ @Test public void testOpenWorksMultipleTypesOfNumberObjects() throws Exception { @@ -118,8 +117,8 @@ public class JmsSpoutTest { Destination destination) throws JMSException { Session mySess = connectionFactory.createConnection().createSession( - false, - Session.CLIENT_ACKNOWLEDGE); + false, + Session.CLIENT_ACKNOWLEDGE); MessageProducer producer = mySess.createProducer(destination); TextMessage msg = mySess.createTextMessage(); msg.setText("Hello World"); http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java index 3ba0853..ca9733d 100644 --- a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java +++ b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockJmsProvider.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.jms.spout; import javax.jms.ConnectionFactory; @@ -22,9 +23,7 @@ import javax.jms.Destination; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; - import org.apache.activemq.ActiveMQConnectionFactory; - import org.apache.storm.jms.JmsProvider; public class MockJmsProvider implements JmsProvider { @@ -32,30 +31,34 @@ public class MockJmsProvider implements JmsProvider { private ConnectionFactory connectionFactory = null; private Destination destination = null; - - public MockJmsProvider() throws NamingException{ - this.connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + + public MockJmsProvider() throws NamingException { + this.connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); Context jndiContext = new InitialContext(); - this.destination = (Destination) jndiContext.lookup("dynamicQueues/FOO.BAR"); + this.destination = (Destination) jndiContext.lookup("dynamicQueues/FOO.BAR"); } - + /** * Provides the JMS <code>ConnectionFactory</code> + * * @return the connection factory + * * @throws Exception */ - public ConnectionFactory connectionFactory() throws Exception{ + public ConnectionFactory connectionFactory() throws Exception { return this.connectionFactory; } /** * Provides the <code>Destination</code> (topic or queue) from which the * <code>JmsSpout</code> will receive messages. + * * @return + * * @throws Exception */ - public Destination destination() throws Exception{ + public Destination destination() throws Exception { return this.destination; } http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java index 4e05646..60710bc 100644 --- a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java +++ b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockSpoutOutputCollector.java @@ -15,11 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.jms.spout; import java.util.ArrayList; import java.util.List; - import org.apache.storm.spout.ISpoutOutputCollector; public class MockSpoutOutputCollector implements ISpoutOutputCollector { @@ -45,11 +45,11 @@ public class MockSpoutOutputCollector implements ISpoutOutputCollector { public void reportError(Throwable error) { } - public boolean emitted(){ + public boolean emitted() { return this.emitted; } - public void reset(){ + public void reset() { this.emitted = false; } http://git-wip-us.apache.org/repos/asf/storm/blob/95602b1b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java ---------------------------------------------------------------------- diff --git a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java index ea571fc..70e04ff 100644 --- a/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java +++ b/external/storm-jms/src/test/java/org/apache/storm/jms/spout/MockTupleProducer.java @@ -15,12 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.jms.spout; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.TextMessage; - import org.apache.storm.jms.JmsTupleProducer; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields;
