Author: rajdavies
Date: Fri Mar 14 02:53:39 2008
New Revision: 637028
URL: http://svn.apache.org/viewvc?rev=637028&view=rev
Log:
tidied up synchronization
Modified:
activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
Modified:
activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java?rev=637028&r1=637027&r2=637028&view=diff
==============================================================================
---
activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
(original)
+++
activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
Fri Mar 14 02:53:39 2008
@@ -16,9 +16,9 @@
*/
package org.apache.activemq.ra;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
@@ -45,8 +45,8 @@
private final ActiveMQEndpointWorker activeMQAsfEndpointWorker;
private final int maxSessions;
- private List<ServerSessionImpl> idleSessions = new
CopyOnWriteArrayList<ServerSessionImpl>();
- private List<ServerSessionImpl> activeSessions = new
CopyOnWriteArrayList<ServerSessionImpl>();
+ private List<ServerSessionImpl> idleSessions = new
ArrayList<ServerSessionImpl>();
+ private List<ServerSessionImpl> activeSessions = new
ArrayList<ServerSessionImpl>();
private AtomicBoolean closing = new AtomicBoolean(false);
public ServerSessionPoolImpl(ActiveMQEndpointWorker
activeMQAsfEndpointWorker, int maxSessions) {
@@ -76,7 +76,9 @@
} catch (UnavailableException e) {
// The container could be limiting us on the number of endpoints
// that are being created.
- LOG.debug("Could not create an endpoint.", e);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Could not create an endpoint.", e);
+ }
session.close();
return null;
}
@@ -92,17 +94,30 @@
/**
*/
public ServerSession getServerSession() throws JMSException {
- LOG.debug("ServerSession requested.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ServerSession requested.");
+ }
if (closing.get()) {
throw new JMSException("Session Pool Shutting Down.");
}
- if (idleSessions.size() > 0) {
- ServerSessionImpl ss = idleSessions.remove(idleSessions.size() -
1);
- activeSessions.add(ss);
- LOG.debug("Using idle session: " + ss);
+ ServerSessionImpl ss = null;
+ synchronized (idleSessions) {
+ if (idleSessions.size() > 0) {
+ ss = idleSessions.remove(idleSessions.size() - 1);
+ }
+ }
+ if (ss != null) {
+ synchronized (activeSessions) {
+ activeSessions.add(ss);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using idle session: " + ss);
+ }
return ss;
- } else {
+ }
+
+ synchronized (activeSessions) {
// Are we at the upper limit?
if (activeSessions.size() >= maxSessions) {
// then reuse the already created sessions..
@@ -110,66 +125,97 @@
// processing.
return getExistingServerSession();
}
- ServerSessionImpl ss = createServerSessionImpl();
- // We may not be able to create a session due to the container
- // restricting us.
- if (ss == null) {
- if (activeSessions.size() == 0) {
- //no idle sessions, no active sessions, and we can't
create a new session....
- throw new JMSException("Endpoint factory did not allow
creation of any endpoints.");
- }
+ }
- return getExistingServerSession();
+ ss = createServerSessionImpl();
+ // We may not be able to create a session due to the container
+ // restricting us.
+ if (ss == null) {
+ synchronized (activeSessions) {
+ if (activeSessions.isEmpty()) {
+ throw new JMSException(
+ "Endpoint factory did not allow creation any
endpoints.");
+ }
}
+
+ return getExistingServerSession();
+ }
+ synchronized (activeSessions) {
activeSessions.add(ss);
+ }
+ if (LOG.isDebugEnabled()) {
LOG.debug("Created a new session: " + ss);
- return ss;
}
+ return ss;
+
}
/**
- * @param messageDispatch the message to dispatch
+ * @param messageDispatch
+ * the message to dispatch
* @throws JMSException
*/
- private void dispatchToSession(MessageDispatch messageDispatch) throws
JMSException {
+ private void dispatchToSession(MessageDispatch messageDispatch)
+ throws JMSException {
ServerSession serverSession = getServerSession();
Session s = serverSession.getSession();
ActiveMQSession session = null;
if (s instanceof ActiveMQSession) {
- session = (ActiveMQSession)s;
+ session = (ActiveMQSession) s;
} else if (s instanceof ActiveMQQueueSession) {
- session = (ActiveMQSession)s;
+ session = (ActiveMQSession) s;
} else if (s instanceof ActiveMQTopicSession) {
- session = (ActiveMQSession)s;
+ session = (ActiveMQSession) s;
} else {
- activeMQAsfEndpointWorker.connection.onAsyncException(new
JMSException("Session pool provided an invalid session type: " + s.getClass()));
+ activeMQAsfEndpointWorker.connection
+ .onAsyncException(new JMSException(
+ "Session pool provided an invalid session type: "
+ + s.getClass()));
}
session.dispatch(messageDispatch);
serverSession.start();
}
/**
- * @return
+ * @return session
*/
private ServerSession getExistingServerSession() {
- ServerSessionImpl ss = activeSessions.remove(0);
- activeSessions.add(ss);
- LOG.debug("Reusing an active session: " + ss);
+ ServerSessionImpl ss = null;
+ if (!activeSessions.isEmpty()) {
+ if (activeSessions.size() > 1) {
+ // round robin
+ ss = activeSessions.remove(0);
+ activeSessions.add(ss);
+ } else {
+ ss = activeSessions.get(0);
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reusing an active session: " + ss);
+ }
return ss;
}
public void returnToPool(ServerSessionImpl ss) {
- LOG.debug("Session returned to pool: " + ss);
- activeSessions.remove(ss);
- idleSessions.add(ss);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Session returned to pool: " + ss);
+ }
+ synchronized(activeSessions) {
+ activeSessions.remove(ss);
+ }
+ synchronized(idleSessions) {
+ idleSessions.add(ss);
+ }
synchronized (closing) {
closing.notify();
}
}
public void removeFromPool(ServerSessionImpl ss) {
- activeSessions.remove(ss);
+ synchronized(activeSessions) {
+ activeSessions.remove(ss);
+ }
try {
ActiveMQSession session = (ActiveMQSession)ss.getSession();
List l = session.getUnconsumedMessages();
@@ -186,26 +232,35 @@
}
public void close() {
- synchronized (closing) {
- closing.set(true);
- closeIdleSessions();
- while (activeSessions.size() > 0) {
- LOG.debug("Active Sessions = " + activeSessions.size());
- try {
- closing.wait(1000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
+ closing.set(true);
+ closeIdleSessions();
+ // we may have to wait erroneously 250ms if an
+ // active session is removed during our wait and we
+ // are not notified
+ while (getActiveSessionSize() > 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Active Sessions = " + getActiveSessionSize());
+ }
+ try {
+ synchronized (closing) {
+ closing.wait(250);
}
- closeIdleSessions();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
}
+ closeIdleSessions();
}
}
+
private void closeIdleSessions() {
- for (Iterator<ServerSessionImpl> iter = idleSessions.iterator();
iter.hasNext();) {
- ServerSessionImpl ss = iter.next();
- ss.close();
+ synchronized(idleSessions) {
+ for (Iterator<ServerSessionImpl> iter = idleSessions.iterator();
iter.hasNext();) {
+ ServerSessionImpl ss = iter.next();
+ ss.close();
+ }
+ idleSessions.clear();
}
}
@@ -215,12 +270,18 @@
public boolean isClosing() {
return closing.get();
}
-
+
/**
* @param closing The closing to set.
*/
public void setClosing(boolean closing) {
this.closing.set(closing);
+ }
+
+ private int getActiveSessionSize() {
+ synchronized(activeSessions) {
+ return activeSessions.size();
+ }
}
}