[CXF-6576] Handle exceptions in MessageListener container without using setExceptionListener
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/822429bc Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/822429bc Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/822429bc Branch: refs/heads/jms-exception-handling Commit: 822429bce5143e1e107edaf08d00cbd87837fa5d Parents: ba9fa0e Author: Christian Schneider <[email protected]> Authored: Fri Apr 21 10:47:16 2017 +0200 Committer: Christian Schneider <[email protected]> Committed: Fri Apr 21 11:20:01 2017 +0200 ---------------------------------------------------------------------- rt/transports/jms/pom.xml | 8 +- .../cxf/transport/jms/JMSDestination.java | 7 +- .../util/PollingMessageListenerContainer.java | 93 +++++--------------- .../transport/jms/util/MessageListenerTest.java | 72 ++++++++++++++- 4 files changed, 103 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/822429bc/rt/transports/jms/pom.xml ---------------------------------------------------------------------- diff --git a/rt/transports/jms/pom.xml b/rt/transports/jms/pom.xml index e13c5a6..e6fd9a4 100644 --- a/rt/transports/jms/pom.xml +++ b/rt/transports/jms/pom.xml @@ -45,7 +45,6 @@ <dependency> <groupId>org.apache.geronimo.specs</groupId> <artifactId>geronimo-jta_1.1_spec</artifactId> - <version>1.1.1</version> </dependency> <dependency> <groupId>org.apache.geronimo.specs</groupId> @@ -64,6 +63,13 @@ <artifactId>easymock</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <version>2.0.0</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.cxf</groupId> <artifactId>cxf-rt-management</artifactId> http://git-wip-us.apache.org/repos/asf/cxf/blob/822429bc/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java index 8ec23cd..22a94de 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java @@ -118,19 +118,20 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess Session session = null; try { connection = JMSFactory.createConnection(jmsConfig); - connection.setExceptionListener(new ExceptionListener() { + ExceptionListener exListener = new ExceptionListener() { public void onException(JMSException exception) { if (!shutdown) { LOG.log(Level.WARNING, "Exception on JMS connection. Trying to reconnect", exception); restartConnection(); } } - }); + }; session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = jmsConfig.getTargetDestination(session); PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection, - destination, this); + destination, + this, exListener); container.setConcurrentConsumers(jmsConfig.getConcurrentConsumers()); container.setTransactionManager(jmsConfig.getTransactionManager()); container.setMessageSelector(jmsConfig.getMessageSelector()); http://git-wip-us.apache.org/repos/asf/cxf/blob/822429bc/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java index c4276eb..461a2b1 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java @@ -23,6 +23,7 @@ import java.util.logging.Logger; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -36,24 +37,25 @@ import org.apache.cxf.common.logging.LogUtils; public class PollingMessageListenerContainer extends AbstractMessageListenerContainer { private static final Logger LOG = LogUtils.getL7dLogger(PollingMessageListenerContainer.class); + private ExceptionListener exceptionListener; public PollingMessageListenerContainer(Connection connection, Destination destination, - MessageListener listenerHandler) { + MessageListener listenerHandler, ExceptionListener exceptionListener) { this.connection = connection; this.destination = destination; this.listenerHandler = listenerHandler; + this.exceptionListener = exceptionListener; } - private class Poller extends AbstractPoller implements Runnable { + private class Poller implements Runnable { @Override public void run() { Session session = null; - init(); while (running) { try (ResourceCloser closer = new ResourceCloser()) { closer.register(createInitialContext()); - // Create session early to optimize performance + // Create session early to optimize performance // In session = closer.register(connection.createSession(transacted, acknowledgeMode)); MessageConsumer consumer = closer.register(createConsumer(session)); while (running) { @@ -70,14 +72,12 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont safeRollBack(session); } } - } catch (Throwable e) { - catchUnexpectedExceptionDuringPolling(null, e); + } catch (Exception e) { + handleException(e); } } - } - @Override protected void safeRollBack(Session session) { try { if (session != null && session.getTransacted()) { @@ -90,11 +90,10 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont } - private class XAPoller extends AbstractPoller implements Runnable { + private class XAPoller implements Runnable { @Override public void run() { - init(); while (running) { try (ResourceCloser closer = new ResourceCloser()) { closer.register(createInitialContext()); @@ -121,14 +120,12 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont safeRollBack(session); } } catch (Exception e) { - catchUnexpectedExceptionDuringPolling(null, e); + handleException(e); } - } } - @Override protected void safeRollBack(Session session) { try { transactionManager.rollback(); @@ -139,64 +136,6 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont } - private abstract class AbstractPoller { - private static final String RETRY_COUNTER_ON_EXCEPTION = "jms.polling.retrycounteronexception"; - private static final String SLEEPING_TIME_BEFORE_RETRY = "jms.polling.sleepingtimebeforeretry"; - protected int retryCounter = -1; - protected int counter; - protected int sleepingTime = 5000; - - protected void init() { - if (jndiEnvironment != null) { - if (jndiEnvironment.containsKey(RETRY_COUNTER_ON_EXCEPTION)) { - retryCounter = Integer.valueOf(jndiEnvironment.getProperty(RETRY_COUNTER_ON_EXCEPTION)); - } - if (jndiEnvironment.containsKey(SLEEPING_TIME_BEFORE_RETRY)) { - sleepingTime = Integer.valueOf(jndiEnvironment.getProperty(SLEEPING_TIME_BEFORE_RETRY)); - } - } - } - - protected boolean hasToCount() { - return retryCounter > -1; - } - - protected boolean hasToStop() { - return counter > retryCounter; - } - - protected void catchUnexpectedExceptionDuringPolling(Session session, Throwable e) { - LOG.log(Level.WARNING, "Unexpected exception.", e); - if (hasToCount()) { - counter++; - if (hasToStop()) { - stop(session, e); - } - } - if (running) { - try { - String log = "Now sleeping for " + sleepingTime / 1000 + " seconds"; - log += hasToCount() - ? ". Then restarting session and consumer: attempt " + counter + "/" + retryCounter - : ""; - LOG.log(Level.WARNING, log); - Thread.sleep(sleepingTime); - } catch (InterruptedException e1) { - LOG.log(Level.WARNING, e1.getMessage()); - } - } - } - - protected void stop(Session session, Throwable e) { - LOG.log(Level.WARNING, "Stopping the jms message polling thread in cxf", e); - safeRollBack(session); - running = false; - } - - protected abstract void safeRollBack(Session session); - - } - private MessageConsumer createConsumer(Session session) throws JMSException { if (durableSubscriptionName != null && destination instanceof Topic) { return session.createDurableSubscriber((Topic)destination, durableSubscriptionName, @@ -205,6 +144,18 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont return session.createConsumer(destination, messageSelector); } } + + protected void handleException(Exception e) { + running = false; + JMSException wrapped; + if (e instanceof JMSException) { + wrapped = (JMSException) e; + } else { + wrapped = new JMSException("Wrapped exception. " + e.getMessage()); + wrapped.addSuppressed(e); + } + this.exceptionListener.onException(wrapped); + } @Override public void start() { http://git-wip-us.apache.org/repos/asf/cxf/blob/822429bc/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java index 82cc37a..228ffa7 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java @@ -20,6 +20,7 @@ package org.apache.cxf.transport.jms.util; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -36,14 +37,76 @@ import org.apache.activemq.ActiveMQXAConnectionFactory; import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.pool.XaPooledConnectionFactory; import org.apache.geronimo.transaction.manager.GeronimoTransactionManager; +import org.awaitility.Awaitility; +import org.easymock.Capture; import org.junit.Assert; import org.junit.Test; + +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.newCapture; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; + public class MessageListenerTest { private static final String FAIL = "fail"; private static final String FAILFIRST = "failfirst"; private static final String OK = "ok"; + + @Test + public void testConnectionProblem() throws JMSException { + Connection connection = createConnection("broker"); + Queue dest = JMSUtil.createQueue(connection, "test"); + + MessageListener listenerHandler = new TestMessageListener(); + ExceptionListener exListener = createMock(ExceptionListener.class); + + Capture<JMSException> captured = newCapture(); + exListener.onException(capture(captured)); + expectLastCall(); + replay(exListener); + + PollingMessageListenerContainer container = // + new PollingMessageListenerContainer(connection, dest, listenerHandler, exListener); + connection.close(); // Simulate connection problem + container.start(); + Awaitility.await().until(() -> !container.isRunning()); + verify(exListener); + JMSException ex = captured.getValue(); + Assert.assertEquals("The connection is already closed", ex.getMessage()); + } + + @Test + public void testConnectionProblemXA() throws JMSException, XAException, InterruptedException { + TransactionManager transactionManager = new GeronimoTransactionManager(); + Connection connection = createXAConnection("brokerJTA", transactionManager); + Queue dest = JMSUtil.createQueue(connection, "test"); + + MessageListener listenerHandler = new TestMessageListener(); + ExceptionListener exListener = createMock(ExceptionListener.class); + + Capture<JMSException> captured = newCapture(); + exListener.onException(capture(captured)); + expectLastCall(); + replay(exListener); + + PollingMessageListenerContainer container = // + new PollingMessageListenerContainer(connection, dest, listenerHandler, exListener); + container.setTransacted(false); + container.setAcknowledgeMode(Session.SESSION_TRANSACTED); + container.setTransactionManager(transactionManager); + + connection.close(); // Simulate connection problem + container.start(); + Awaitility.await().until(() -> !container.isRunning()); + verify(exListener); + JMSException ex = captured.getValue(); + // Closing the pooled connection will result in a NPE when using it + Assert.assertEquals("Wrapped exception. null", ex.getMessage()); + } @Test public void testWithJTA() throws JMSException, XAException, InterruptedException { @@ -52,11 +115,16 @@ public class MessageListenerTest { Queue dest = JMSUtil.createQueue(connection, "test"); MessageListener listenerHandler = new TestMessageListener(); + ExceptionListener exListener = new ExceptionListener() { + + @Override + public void onException(JMSException exception) { + } + }; PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection, dest, - listenerHandler); + listenerHandler, exListener); container.setTransacted(false); container.setAcknowledgeMode(Session.SESSION_TRANSACTED); - container.setTransactionManager(transactionManager); container.start();
