Repository: servicemix Updated Branches: refs/heads/master bf0c6d737 -> 84abd7a83
[SM-2850] Starting the Connection and some refactorings Project: http://git-wip-us.apache.org/repos/asf/servicemix/repo Commit: http://git-wip-us.apache.org/repos/asf/servicemix/commit/84abd7a8 Tree: http://git-wip-us.apache.org/repos/asf/servicemix/tree/84abd7a8 Diff: http://git-wip-us.apache.org/repos/asf/servicemix/diff/84abd7a8 Branch: refs/heads/master Commit: 84abd7a83516b9e3788f4f3e8fd60aa54d059a54 Parents: bf0c6d7 Author: Christian Schneider <[email protected]> Authored: Fri Feb 5 12:03:23 2016 +0100 Committer: Christian Schneider <[email protected]> Committed: Fri Feb 5 12:03:23 2016 +0100 ---------------------------------------------------------------------- .../servicemix/logging/jms/JMSAppender.java | 111 +++++++++++-------- .../servicemix/logging/jms/JMSAppenderTest.java | 2 - 2 files changed, 63 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/servicemix/blob/84abd7a8/logging/jms-appender/src/main/java/org/apache/servicemix/logging/jms/JMSAppender.java ---------------------------------------------------------------------- diff --git a/logging/jms-appender/src/main/java/org/apache/servicemix/logging/jms/JMSAppender.java b/logging/jms-appender/src/main/java/org/apache/servicemix/logging/jms/JMSAppender.java index b765e0f..824f50e 100644 --- a/logging/jms-appender/src/main/java/org/apache/servicemix/logging/jms/JMSAppender.java +++ b/logging/jms-appender/src/main/java/org/apache/servicemix/logging/jms/JMSAppender.java @@ -16,15 +16,23 @@ */ package org.apache.servicemix.logging.jms; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + import org.ops4j.pax.logging.spi.PaxAppender; import org.ops4j.pax.logging.spi.PaxLoggingEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - public class JMSAppender implements PaxAppender { private static final String PACKAGE = JMSAppender.class.getPackage().getName(); private static final transient Logger LOG = LoggerFactory.getLogger(JMSAppender.class); @@ -32,8 +40,6 @@ public class JMSAppender implements PaxAppender { private static final String DEFAULT_EVENT_FORMAT = "default"; private static final String LOGSTASH_EVENT_FORMAT = "logstash"; - private boolean serviceAvailable; - private ConnectionFactory jmsConnectionFactory; private Connection connection; private Session session; @@ -47,43 +53,48 @@ public class JMSAppender implements PaxAppender { public void close() { closeJMSResources(); } + public void onBind(ConnectionFactory service){ + closeJMSResources(); jmsConnectionFactory = service; - try { - connection = getOrCreateConnection(); - session = getOrCreateSession(); - producer = getOrCreatePublisher(); - serviceAvailable = true; - } catch (JMSException e) { - serviceAvailable = false; - } + // Connect early to fail fast in case of config errors + executor.execute(new Runnable() { + + @Override + public void run() { + try { + getOrCreateConnection(); + } catch (JMSException e) { + LOG.warn("Exception connecting to broker - reinitializing JMS resources to recover",e); + closeJMSResources(); + } + } + }); } + public void onUnbind(ConnectionFactory service){ - serviceAvailable = false; closeJMSResources(); } public void doAppend(final PaxLoggingEvent paxLoggingEvent) { - if (exclude(paxLoggingEvent) || !serviceAvailable) { + if (exclude(paxLoggingEvent) || jmsConnectionFactory == null) { return; } - Runnable worker = new Runnable() { - public void run() { - if(serviceAvailable){ - try { - // Send message to the destination - TextMessage message = getOrCreateSession().createTextMessage(); - message.setText(format.toString(paxLoggingEvent)); - MessageProducer producer = getOrCreatePublisher(); - producer.send(message); - } catch (JMSException e) { - LOG.warn("Exception caught while sending log event - reinitializing JMS resources to recover", e); - close(); - } - } + Runnable worker = new Runnable() { + public void run() { + try { + // Send message to the destination + TextMessage message = getOrCreateSession().createTextMessage(); + message.setText(format.toString(paxLoggingEvent)); + MessageProducer producer = getOrCreatePublisher(); + producer.send(message); + } catch (JMSException e) { + LOG.warn("Exception caught while sending log event - reinitializing JMS resources to recover",e); + closeJMSResources(); } - }; - executor.execute(worker); + } + }; + executor.execute(worker); } private static boolean exclude(PaxLoggingEvent event) { @@ -109,10 +120,11 @@ public class JMSAppender implements PaxAppender { format = new DefaultLoggingEventFormat(); } } - + protected Connection getOrCreateConnection() throws JMSException { if (connection == null) { connection = jmsConnectionFactory.createConnection(); + connection.start(); } return connection; } @@ -135,25 +147,28 @@ public class JMSAppender implements PaxAppender { } private void closeJMSResources() { + close(producer); + close(session); + close(connection); + producer = null; + session = null; + connection = null; + } + + private static void close(Object obj) { + if (obj == null) { + return; + } try { - if (producer != null) { - producer.close(); - producer = null; - } - if (session != null) { - session.close(); - session = null; - } - if (connection != null) { - connection.close(); - connection = null; + if (obj instanceof MessageProducer) { + ((MessageProducer)obj).close(); + } else if (obj instanceof Session) { + ((Session)obj).close(); + } else if (obj instanceof Connection) { + ((Connection)obj).close(); } } catch (JMSException e) { LOG.debug("Exception caught while closing JMS resources", e); - // let's just set all the fields to null so stuff will be re-created - producer = null; - session = null; - connection = null; } } } http://git-wip-us.apache.org/repos/asf/servicemix/blob/84abd7a8/logging/jms-appender/src/test/java/org/apache/servicemix/logging/jms/JMSAppenderTest.java ---------------------------------------------------------------------- diff --git a/logging/jms-appender/src/test/java/org/apache/servicemix/logging/jms/JMSAppenderTest.java b/logging/jms-appender/src/test/java/org/apache/servicemix/logging/jms/JMSAppenderTest.java index f1d0198..50079fa 100644 --- a/logging/jms-appender/src/test/java/org/apache/servicemix/logging/jms/JMSAppenderTest.java +++ b/logging/jms-appender/src/test/java/org/apache/servicemix/logging/jms/JMSAppenderTest.java @@ -88,8 +88,6 @@ public class JMSAppenderTest extends CamelTestSupport { appender.doAppend(MockEvents.createInfoEvent()); assertMockEndpointsSatisfied(); - - } @Override
