Author: gtully
Date: Thu Aug 19 10:09:36 2010
New Revision: 987113
URL: http://svn.apache.org/viewvc?rev=987113&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2872
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=987113&r1=987112&r2=987113&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Thu Aug 19 10:09:36 2010
@@ -246,7 +246,6 @@ public class ActiveMQSession implements
this.sessionAsyncDispatch = sessionAsyncDispatch;
this.info = new SessionInfo(connection.getConnectionInfo(),
sessionId.getValue());
setTransactionContext(new TransactionContext(connection));
- connection.addSession(this);
stats = new JMSSessionStatsImpl(producers, consumers);
this.connection.asyncSendPacket(info);
setTransformer(connection.getTransformer());
@@ -254,6 +253,7 @@ public class ActiveMQSession implements
this.scheduler=connection.getScheduler();
this.connectionExecutor=connection.getExecutor();
this.executor = new ActiveMQSessionExecutor(this);
+ connection.addSession(this);
if (connection.isStarted()) {
start();
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java?rev=987113&r1=987112&r2=987113&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java
Thu Aug 19 10:09:36 2010
@@ -16,6 +16,15 @@
*/
package org.apache.activemq;
+import java.util.Random;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -105,4 +114,44 @@ public class JmsConnectionStartStopTest
stoppedConnection.stop();
testStoppedConsumerHoldsMessagesTillStarted();
}
+
+
+ public void testConcurrentSessionCreateWithStart() throws Exception {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(50,
Integer.MAX_VALUE,
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>());
+ final Vector<Throwable> exceptions = new Vector<Throwable>();
+ final Random rand = new Random();
+ Runnable createSessionTask = new Runnable() {
+ public void run() {
+ try {
+ TimeUnit.MILLISECONDS.sleep(rand.nextInt(10));
+ stoppedConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ } catch (Exception e) {
+ exceptions.add(e);
+ }
+ }
+ };
+
+ Runnable startStopTask = new Runnable() {
+ public void run() {
+ try {
+ TimeUnit.MILLISECONDS.sleep(rand.nextInt(10));
+ stoppedConnection.start();
+ stoppedConnection.stop();
+ } catch (Exception e) {
+ exceptions.add(e);
+ }
+ }
+ };
+
+ for (int i=0; i<1000; i++) {
+ executor.execute(createSessionTask);
+ executor.execute(startStopTask);
+ }
+
+ executor.shutdown();
+ assertTrue("executor terminated", executor.awaitTermination(30,
TimeUnit.SECONDS));
+ assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+ }
}