This is an automated email from the ASF dual-hosted git repository. reta pushed a commit to branch 3.3.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
commit c3fb43cd1b6f86f9bbf62f7262349f2905b985a8 Author: steingebein <[email protected]> AuthorDate: Tue Dec 17 03:59:09 2019 +0100 [CXF-8161] test case with parallel error handling in JMSDestination and PollingMessageListenerContainer (#614) --- .../cxf/transport/jms/JMSDestinationTest.java | 59 ++++++++++++---------- 1 file changed, 33 insertions(+), 26 deletions(-) diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java index f559180..a409ffb 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java @@ -61,7 +61,7 @@ import static org.junit.Assert.assertTrue; public class JMSDestinationTest extends AbstractJMSTester { private static class FaultyConnection implements Connection { private final Connection delegate; - + FaultyConnection(final Connection delegate) { this.delegate = delegate; } @@ -118,23 +118,23 @@ public class JMSDestinationTest extends AbstractJMSTester { } @Override - public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, + public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { - return delegate.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, - sessionPool, maxMessages); + return delegate.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, + sessionPool, maxMessages); } } - + private static final class FaultyConnectionFactory implements ConnectionFactory { + final AtomicInteger latch; private final ConnectionFactory delegate; private final Function<Connection, Connection> wrapper; - private final AtomicInteger latch; private FaultyConnectionFactory(ConnectionFactory delegate, int faults) { this(delegate, FaultyConnection::new, faults); } - - private FaultyConnectionFactory(ConnectionFactory delegate, + + private FaultyConnectionFactory(ConnectionFactory delegate, Function<Connection, Connection> wrapper, int faults) { this.delegate = delegate; this.wrapper = wrapper; @@ -159,7 +159,7 @@ public class JMSDestinationTest extends AbstractJMSTester { } } } - + @Test public void testGetConfigurationFromWSDL() throws Exception { EndpointInfo ei = setupServiceInfo("HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort"); @@ -570,35 +570,39 @@ public class JMSDestinationTest extends AbstractJMSTester { conduit.close(); destination.shutdown(); } - + @Test public void testSessionsExceptionHandling() throws Exception { EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort"); - final AtomicInteger latch = new AtomicInteger(1); - + final AtomicInteger latch = new AtomicInteger(5); + final Function<Connection, Connection> connection = c -> new FaultyConnection(c) { - @Override - public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { - // Fail only once - if (latch.getAndDecrement() == 0) { - throw new JMSException("createSession() failed (simulated)"); - } else { - return super.createSession(transacted, acknowledgeMode); - } + @Override + public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { + // Fail five times, starting with on successful call + final int latchValue = latch.getAndDecrement(); + if (latchValue >= 0 && latchValue < 5) { + throw new JMSException("createSession() failed (simulated)"); + } else { + return super.createSession(transacted, acknowledgeMode); } - }; - + } + }; + + final FaultyConnectionFactory faultyConnectionFactory = new FaultyConnectionFactory(cf, connection, 0); final Function<ConnectionFactory, ConnectionFactory> wrapper = new Function<ConnectionFactory, ConnectionFactory>() { @Override public ConnectionFactory apply(ConnectionFactory cf) { - return new FaultyConnectionFactory(cf, connection, 0); + return faultyConnectionFactory; } }; - JMSConduit conduit = setupJMSConduitWithObserver(ei); - JMSDestination destination = setupJMSDestination(ei, wrapper); - destination.getJmsConfig().setRetryInterval(1000); + JMSConfiguration jmsConfig = JMSConfigFactory.createFromEndpointInfo(bus, ei, null); + jmsConfig.setConnectionFactory(wrapper.apply(cf)); + jmsConfig.setRetryInterval(1000); + jmsConfig.setConcurrentConsumers(10); + JMSDestination destination = new JMSDestination(bus, ei, jmsConfig); destination.setMessageObserver(createMessageObserver()); final Message outMessage = createMessage(); @@ -613,6 +617,9 @@ public class JMSDestinationTest extends AbstractJMSTester { conduit.close(); destination.shutdown(); + assertEquals("Only two createConnection() calls allowed because " + + "restartConnection() should be called only once.", + -2, faultyConnectionFactory.latch.get()); }
