Updated Branches: refs/heads/trunk 3df943ce0 -> 519d8f7db
https://issues.apache.org/jira/browse/AMQ-5038 - close active sessions on deactivate to ensure consumption stops for an endpoint Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/519d8f7d Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/519d8f7d Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/519d8f7d Branch: refs/heads/trunk Commit: 519d8f7db0d3243cb3b60b156369f3f069e20e52 Parents: 3df943c Author: gtully <[email protected]> Authored: Mon Feb 10 15:11:13 2014 +0000 Committer: gtully <[email protected]> Committed: Mon Feb 10 15:12:39 2014 +0000 ---------------------------------------------------------------------- .../activemq/JmsQueueTransactionTest.java | 3 + .../activemq/ra/ActiveMQEndpointWorker.java | 8 +- .../apache/activemq/ra/ServerSessionImpl.java | 2 +- .../activemq/ra/ServerSessionPoolImpl.java | 18 ++- .../activemq/ra/ServerSessionImplTest.java | 162 +++++++++++++++++-- 5 files changed, 171 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/519d8f7d/activemq-broker/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java b/activemq-broker/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java index 6504d7c..c2e9510 100755 --- a/activemq-broker/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java +++ b/activemq-broker/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java @@ -183,6 +183,7 @@ public class JmsQueueTransactionTest extends JmsTransactionTestSupport { assertTrue("Should have received the third message", enumeration.hasMoreElements()); assertEquals(outbound[2], (Message)enumeration.nextElement()); + LOG.info("Check for more..."); // There should be no more. boolean tooMany = false; while (enumeration.hasMoreElements()) { @@ -190,8 +191,10 @@ public class JmsQueueTransactionTest extends JmsTransactionTestSupport { tooMany = true; } assertFalse(tooMany); + LOG.info("close browser..."); browser.close(); + LOG.info("reopen and consume..."); // Re-open the consumer. consumer = resourceProvider.createConsumer(session, destination); // Receive the second. http://git-wip-us.apache.org/repos/asf/activemq/blob/519d8f7d/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java index 4765520..b18ef29 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java @@ -21,7 +21,6 @@ import java.lang.reflect.Method; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; import javax.jms.ConnectionConsumer; -import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; @@ -117,7 +116,7 @@ public class ActiveMQEndpointWorker { if (connecting.compareAndSet(false, true)) { synchronized (connectWork) { disconnect(); - serverSessionPool.closeIdleSessions(); + serverSessionPool.closeSessions(); connect(); } } else { @@ -328,6 +327,11 @@ public class ActiveMQEndpointWorker { THREAD_LOCAL.set(null); } + // for testing + public void setConnection(ActiveMQConnection activeMQConnection) { + this.connection = activeMQConnection; + } + protected ActiveMQConnection getConnection() { // make sure we only return a working connection // in particular make sure that we do not return null http://git-wip-us.apache.org/repos/asf/activemq/blob/519d8f7d/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java index f6f965f..27c75b1 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java @@ -166,7 +166,7 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D try { InboundContextSupport.register(this); if ( session.isRunning() ) { - session.run(); + session.run(); } else { log.debug("JMS Session is no longer running (maybe due to loss of connection?), marking ServerSesison as stale"); stale = true; http://git-wip-us.apache.org/repos/asf/activemq/blob/519d8f7d/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java index ccae078..c0c3320 100755 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java @@ -262,7 +262,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool { public void close() { closing.set(true); - int activeCount = closeIdleSessions(); + int activeCount = closeSessions(); // we may have to wait erroneously 250ms if an // active session is removed during our wait and we // are not notified @@ -278,14 +278,26 @@ public class ServerSessionPoolImpl implements ServerSessionPool { Thread.currentThread().interrupt(); return; } - activeCount = closeIdleSessions(); + activeCount = closeSessions(); } } - protected int closeIdleSessions() { + protected int closeSessions() { sessionLock.lock(); try { + for (ServerSessionImpl ss : activeSessions) { + try { + ActiveMQSession session = (ActiveMQSession) ss.getSession(); + if (!session.isClosed()) { + session.close(); + } + } catch (JMSException ignored) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to close active running server session {}, reason:{}", ss, ignored.toString(), ignored); + } + } + } for (ServerSessionImpl ss : idleSessions) { ss.close(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/519d8f7d/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java index a862110..fb99330 100644 --- a/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java +++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java @@ -16,36 +16,55 @@ */ package org.apache.activemq.ra; +import java.lang.reflect.Method; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import javax.jms.Session; -import javax.resource.spi.endpoint.MessageEndpoint; +import javax.resource.spi.BootstrapContext; +import javax.resource.spi.endpoint.MessageEndpointFactory; +import javax.resource.spi.work.ExecutionContext; +import javax.resource.spi.work.Work; +import javax.resource.spi.work.WorkListener; import javax.resource.spi.work.WorkManager; +import javax.transaction.xa.XAResource; import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.MessageDispatch; +import org.hamcrest.Description; import org.jmock.Expectations; import org.jmock.Mockery; +import org.jmock.api.Action; +import org.jmock.api.Invocation; import org.jmock.integration.junit4.JMock; import org.jmock.lib.legacy.ClassImposteriser; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * */ @RunWith(JMock.class) public class ServerSessionImplTest extends TestCase { - private static final String BROKER_URL = "vm://localhost"; + private static final Logger LOG = LoggerFactory.getLogger(ServerSessionImplTest.class); + private static final String BROKER_URL = "vm://localhost?broker.persistent=false"; private ServerSessionImpl serverSession; private ServerSessionPoolImpl pool; private WorkManager workManager; - private MessageEndpoint messageEndpoint; + private MessageEndpointProxy messageEndpoint; private ActiveMQConnection con; private ActiveMQSession session; + ActiveMQEndpointWorker endpointWorker; private Mockery context; - @Before public void setUp() throws Exception { @@ -57,25 +76,136 @@ public class ServerSessionImplTest extends TestCase { org.apache.activemq.ActiveMQConnectionFactory factory = new org.apache.activemq.ActiveMQConnectionFactory(BROKER_URL); con = (ActiveMQConnection) factory.createConnection(); + con.start(); session = (ActiveMQSession) con.createSession(false, Session.AUTO_ACKNOWLEDGE); - pool = context.mock(ServerSessionPoolImpl.class); - workManager = context.mock(WorkManager.class); - - serverSession = new ServerSessionImpl( - (ServerSessionPoolImpl) pool, - session, - (WorkManager) workManager, - messageEndpoint, - false, - 10); } - + + @After + public void tearDown() throws Exception { + if (con != null) { + con.close(); + } + } + @Test public void testRunDetectsStoppedSession() throws Exception { + + pool = context.mock(ServerSessionPoolImpl.class); + workManager = context.mock(WorkManager.class); + messageEndpoint = context.mock(MessageEndpointProxy.class); + + serverSession = new ServerSessionImpl( + (ServerSessionPoolImpl) pool, + session, + (WorkManager) workManager, + messageEndpoint, + false, + 10); + con.close(); context.checking(new Expectations() {{ oneOf (pool).removeFromPool(with(same(serverSession))); - }}); + }}); serverSession.run(); } + + @Test + public void testCloseCanStopActiveSession() throws Exception { + + final int maxMessages = 4000; + final CountDownLatch messageCount = new CountDownLatch(maxMessages); + + final MessageEndpointFactory messageEndpointFactory = context.mock(MessageEndpointFactory.class); + final MessageResourceAdapter resourceAdapter = context.mock(MessageResourceAdapter.class); + final ActiveMQEndpointActivationKey key = context.mock(ActiveMQEndpointActivationKey.class); + messageEndpoint = context.mock(MessageEndpointProxy.class); + workManager = context.mock(WorkManager.class); + final MessageActivationSpec messageActivationSpec = context.mock(MessageActivationSpec.class); + final BootstrapContext boostrapContext = context.mock(BootstrapContext.class); + context.checking(new Expectations() {{ + allowing(boostrapContext).getWorkManager(); will (returnValue(workManager)); + allowing(resourceAdapter).getBootstrapContext(); will (returnValue(boostrapContext)); + allowing(messageEndpointFactory).isDeliveryTransacted(with (any(Method.class))); will(returnValue(Boolean.FALSE)); + allowing(key).getMessageEndpointFactory(); will(returnValue(messageEndpointFactory)); + allowing(key).getActivationSpec(); will (returnValue(messageActivationSpec)); + allowing(messageActivationSpec).isUseJndi(); will (returnValue(Boolean.FALSE)); + allowing(messageActivationSpec).getDestinationType(); will (returnValue("javax.jms.Queue")); + allowing(messageActivationSpec).getDestination(); will (returnValue("Queue")); + allowing(messageActivationSpec).getAcknowledgeModeForSession(); will (returnValue(1)); + allowing(messageActivationSpec).getMaxSessionsIntValue(); will (returnValue(1)); + allowing(messageActivationSpec).getEnableBatchBooleanValue(); will (returnValue(Boolean.FALSE)); + allowing(messageActivationSpec).isUseRAManagedTransactionEnabled(); will (returnValue(Boolean.TRUE)); + allowing(messageEndpointFactory).createEndpoint(with (any(XAResource.class))); will (returnValue(messageEndpoint)); + + allowing(workManager).scheduleWork((Work) with(anything()), (long) with(any(long.class)), with(any(ExecutionContext.class)), with(any(WorkListener.class))); + will (new Action() { + @Override + public Object invoke(Invocation invocation) throws Throwable { + return null; + } + + @Override + public void describeTo(Description description) { + } + }); + + allowing(messageEndpoint).beforeDelivery((Method) with(anything())); + allowing (messageEndpoint).onMessage(with (any(javax.jms.Message.class))); will(new Action(){ + @Override + public Object invoke(Invocation invocation) throws Throwable { + messageCount.countDown(); + if (messageCount.getCount() < maxMessages - 11) { + TimeUnit.MILLISECONDS.sleep(200); + } + return null; + } + + @Override + public void describeTo(Description description) { + description.appendText("Keep message count"); + } + }); + allowing(messageEndpoint).afterDelivery(); + allowing(messageEndpoint).release(); + + }}); + + endpointWorker = new ActiveMQEndpointWorker(resourceAdapter, key); + endpointWorker.setConnection(con); + pool = new ServerSessionPoolImpl(endpointWorker, 2); + + endpointWorker.start(); + final ServerSessionImpl serverSession1 = (ServerSessionImpl) pool.getServerSession(); + + // preload the session dispatch queue to keep the session active + ActiveMQSession session1 = (ActiveMQSession) serverSession1.getSession(); + for (int i=0; i<maxMessages; i++) { + MessageDispatch messageDispatch = new MessageDispatch(); + messageDispatch.setMessage(new ActiveMQTextMessage()); + session1.dispatch(messageDispatch); + } + + ExecutorService executorService = Executors.newCachedThreadPool(); + final CountDownLatch runState = new CountDownLatch(1); + executorService.execute(new Runnable(){ + @Override + public void run() { + try { + serverSession1.run(); + runState.countDown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + while (messageCount.getCount() > maxMessages - 10) { + TimeUnit.MILLISECONDS.sleep(100); + } + LOG.info("Closing pool on {}", messageCount.getCount()); + pool.close(); + + assertTrue("run has completed", runState.await(20, TimeUnit.SECONDS)); + assertTrue("not all messages consumed", messageCount.getCount() > 0); + } }
