Repository: incubator-falcon Updated Branches: refs/heads/master 59441beb7 -> c4dd440d9
FALCON-717 Shutdown not clean for JMSMessageConsumer. Contributed by Shaik Idris Ali Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/40c3e949 Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/40c3e949 Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/40c3e949 Branch: refs/heads/master Commit: 40c3e94955e1703da9ef07a7a5517b7d9aee91b0 Parents: 59441be Author: Venkatesh Seetharam <venkat...@apache.org> Authored: Thu Nov 13 17:49:19 2014 -0800 Committer: Venkatesh Seetharam <venkat...@apache.org> Committed: Thu Nov 13 17:49:19 2014 -0800 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../falcon/messaging/JMSMessageConsumer.java | 38 +------- .../falcon/messaging/JMSMessageProducer.java | 13 +-- .../falcon/messaging/util/MessagingUtil.java | 94 ++++++++++++++++++++ .../apache/falcon/rerun/queue/ActiveMQueue.java | 36 +++----- .../apache/falcon/rerun/queue/DelayedQueue.java | 2 + .../falcon/rerun/queue/InMemoryQueue.java | 10 ++- .../falcon/rerun/service/LateRunService.java | 11 ++- 8 files changed, 134 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/40c3e949/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7eb4b70..896116b 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -138,6 +138,9 @@ Trunk (Unreleased) OPTIMIZATIONS BUG FIXES + FALCON-717 Shutdown not clean for JMSMessageConsumer + (Shaik Idris Ali via Venkatesh Seetharam + FALCON-875 Enitiy Summary endpoint filterBy does not filter entities without pipelines (Balu Vellanki via Venkatesh Seetharam) http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/40c3e949/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java ---------------------------------------------------------------------- diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java index 300fecf..4df1490 100644 --- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java +++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java @@ -21,6 +21,7 @@ package org.apache.falcon.messaging; import org.apache.commons.lang.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.aspect.GenericAlert; +import org.apache.falcon.messaging.util.MessagingUtil; import org.apache.falcon.security.CurrentUser; import org.apache.falcon.workflow.WorkflowExecutionArgs; import org.apache.falcon.workflow.WorkflowExecutionContext; @@ -147,40 +148,9 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener { public void closeSubscriber() { LOG.info("Closing topicSubscriber on topic : " + this.topicName); // closing each quietly so client id can be unsubscribed - closeTopicSubscriberQuietly(); - closeTopicSessionQuietly(); - closeConnectionQuietly(); - } - - private void closeTopicSubscriberQuietly() { - if (topicSubscriber != null) { - try { - topicSubscriber.close(); - } catch (JMSException ignore) { - LOG.error("Error closing JMS topic subscriber: " + topicSubscriber, ignore); - } - } - } - - private void closeTopicSessionQuietly() { - if (topicSession != null) { // unsubscribe the durable topic topicSubscriber - try { - topicSession.unsubscribe(FALCON_CLIENT_ID); - topicSession.close(); - } catch (JMSException ignore) { - LOG.error("Error closing JMS topic session: " + topicSession, ignore); - } - } - } - - private void closeConnectionQuietly() { - if (connection != null) { - try { - connection.close(); - } catch (JMSException ignore) { - LOG.error("Error closing JMS connection: " + connection, ignore); - } - } + MessagingUtil.closeQuietly(topicSubscriber); + MessagingUtil.closeQuietly(topicSession, FALCON_CLIENT_ID); + MessagingUtil.closeQuietly(connection); } private static Connection createAndGetConnection(String implementation, http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/40c3e949/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java ---------------------------------------------------------------------- diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java index dece932..515562a 100644 --- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java +++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java @@ -19,6 +19,7 @@ package org.apache.falcon.messaging; import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.messaging.util.MessagingUtil; import org.apache.falcon.retention.EvictedInstanceSerDe; import org.apache.falcon.workflow.WorkflowExecutionArgs; import org.apache.falcon.workflow.WorkflowExecutionContext; @@ -168,7 +169,7 @@ public class JMSMessageProducer { sendMessage(connection, message); } } finally { - closeQuietly(connection); + MessagingUtil.closeQuietly(connection); } return 0; @@ -296,14 +297,4 @@ public class JMSMessageProducer { return connection; } - - private void closeQuietly(Connection connection) { - try { - if (connection != null) { - connection.close(); - } - } catch (JMSException e) { - LOG.error("Error in closing connection:", e); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/40c3e949/messaging/src/main/java/org/apache/falcon/messaging/util/MessagingUtil.java ---------------------------------------------------------------------- diff --git a/messaging/src/main/java/org/apache/falcon/messaging/util/MessagingUtil.java b/messaging/src/main/java/org/apache/falcon/messaging/util/MessagingUtil.java new file mode 100644 index 0000000..8d59937 --- /dev/null +++ b/messaging/src/main/java/org/apache/falcon/messaging/util/MessagingUtil.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.messaging.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; + +/** + * Utility class. + */ +public final class MessagingUtil { + + private static final Logger LOG = LoggerFactory.getLogger(MessagingUtil.class); + + private MessagingUtil() { + } + + public static void closeQuietly(TopicSubscriber topicSubscriber) { + if (topicSubscriber != null) { + try { + topicSubscriber.close(); + } catch (JMSException ignore) { + LOG.error("Error closing JMS topic subscriber: " + topicSubscriber, ignore); + } + } + } + + public static void closeQuietly(TopicSession topicSession, String clientId) { + if (topicSession != null) { // unsubscribe the durable topic topicSubscriber + try { + topicSession.unsubscribe(clientId); + topicSession.close(); + } catch (JMSException ignore) { + LOG.error("Error closing JMS topic session: " + topicSession, ignore); + } + } + } + + public static void closeQuietly(Connection connection) { + if (connection != null) { + try { + LOG.info("Attempting to close connection"); + connection.close(); + } catch (JMSException ignore) { + LOG.error("Error closing JMS connection: " + connection, ignore); + } + } + } + + public static void closeQuietly(MessageProducer messageProducer) { + if (messageProducer != null) { + try { + LOG.info("Attempting to close producer"); + messageProducer.close(); + } catch (JMSException ignore) { + LOG.error("Error closing JMS messageProducer: " + messageProducer, ignore); + } + } + } + + public static void closeQuietly(MessageConsumer messageConsumer) { + if (messageConsumer != null) { + try { + LOG.info("Attempting to close consumer"); + messageConsumer.close(); + } catch (JMSException ignore) { + LOG.error("Error closing JMS messageConsumer: " + messageConsumer, ignore); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/40c3e949/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java index d9567b6..ec57a53 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/ActiveMQueue.java @@ -21,6 +21,7 @@ import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ScheduledMessage; import org.apache.falcon.FalconException; +import org.apache.falcon.messaging.util.MessagingUtil; import org.apache.falcon.rerun.event.RerunEvent; import org.apache.falcon.rerun.event.RerunEventFactory; @@ -71,8 +72,7 @@ public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> { init(); } - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - return session; + return connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } @Override @@ -121,30 +121,16 @@ public class ActiveMQueue<T extends RerunEvent> extends DelayedQueue<T> { @Override public void reconnect() throws FalconException { - try { - LOG.info("Attempting to close producer"); - producer.close(); - LOG.info("Producer closed successfully"); - } catch (Exception ignore) { - LOG.info("Producer could not be closed"); - } - - try { - LOG.info("Attempting to close consumer"); - consumer.close(); - LOG.info("Consumer closed successfully"); - } catch (Exception ignore) { - LOG.info("Consumer could not be closed"); - } + close(); + init(); + } - try { - LOG.info("Attempting to close connection"); - connection.close(); - LOG.info("Connection closed successfully"); - } catch (Exception ignore) { - LOG.info("Connection could not be closed"); - } + public void close() { + LOG.info("Closing queue for broker={}, destination{}", brokerUrl, destinationName); + destination = null; - init(); + MessagingUtil.closeQuietly(producer); + MessagingUtil.closeQuietly(consumer); + MessagingUtil.closeQuietly(connection); } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/40c3e949/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java index 01231d4..f5fbbe8 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/DelayedQueue.java @@ -40,4 +40,6 @@ public abstract class DelayedQueue<T extends RerunEvent> { public abstract void init(); public abstract void reconnect() throws FalconException; + + public abstract void close(); } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/40c3e949/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java b/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java index ecd6b0a..6035cdb 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/queue/InMemoryQueue.java @@ -36,6 +36,7 @@ import java.util.concurrent.DelayQueue; * @param <T> */ public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T> { + public static final Logger LOG = LoggerFactory.getLogger(DelayedQueue.class); protected DelayQueue<T> delayQueue = new DelayQueue<T>(); @@ -47,8 +48,8 @@ public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T> { @Override public boolean offer(T event) { - boolean flag = delayQueue.offer(event); beforeRetry(event); + boolean flag = delayQueue.offer(event); LOG.debug("Enqueued Message: {}", event.toString()); return flag; } @@ -58,8 +59,8 @@ public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T> { T event; try { event = delayQueue.take(); - LOG.debug("Dequeued Message: {}", event.toString()); afterRetry(event); + LOG.debug("Dequeued Message: {}", event.toString()); } catch (InterruptedException e) { throw new FalconException(e); } @@ -145,4 +146,9 @@ public class InMemoryQueue<T extends RerunEvent> extends DelayedQueue<T> { } return rerunEvents; } + + @Override + public void close() { + //Do nothing + } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/40c3e949/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java b/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java index bbfdaff..2bb198b 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/service/LateRunService.java @@ -37,6 +37,8 @@ public class LateRunService implements FalconService { private static final Logger LOG = LoggerFactory.getLogger(LateRunService.class); + private ActiveMQueue<LaterunEvent> queue; + @Override public String getName() { return LateRunService.class.getName(); @@ -50,7 +52,7 @@ public class LateRunService implements FalconService { AbstractRerunHandler<LaterunEvent, ActiveMQueue<LaterunEvent>> rerunHandler = RerunHandlerFactory.getRerunHandler(RerunType.LATE); - ActiveMQueue<LaterunEvent> queue = new ActiveMQueue<LaterunEvent>( + queue = new ActiveMQueue<LaterunEvent>( StartupProperties.get() .getProperty("broker.url", "failover:(tcp://localhost:61616)?initialReconnectDelay=5000"), "falcon.late.queue"); @@ -62,6 +64,13 @@ public class LateRunService implements FalconService { @Override public void destroy() throws FalconException { + closeQuietly(); LOG.info("LateRun thread destroyed"); } + + private void closeQuietly() { + if (queue != null) { + queue.close(); + } + } }