User: sparre
Date: 01/10/05 12:34:14
Modified: src/main/org/jboss/mq/server ClientConsumer.java
Log:
Created a thread pool with an embedded work queue for JBossMQ.
And made the message pushers use it.
Revision Changes Path
1.7 +68 -49 jbossmq/src/main/org/jboss/mq/server/ClientConsumer.java
Index: ClientConsumer.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/server/ClientConsumer.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- ClientConsumer.java 2001/09/20 03:52:57 1.6
+++ ClientConsumer.java 2001/10/05 19:34:14 1.7
@@ -5,6 +5,7 @@
* See terms of license at gnu.org.
*/
package org.jboss.mq.server;
+
import java.util.HashMap;
import java.util.Hashtable;
@@ -19,15 +20,20 @@
import org.jboss.mq.xml.XElement;
+import org.jboss.mq.threadpool.ThreadPool;
+import org.jboss.mq.threadpool.Work;
+
/**
* This represent the clients queue which consumes messages from the
* destinations on the provider.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
* @created August 16, 2001
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
-public class ClientConsumer implements Runnable {
+public class ClientConsumer
+ implements Work
+{
//The JMSServer object
JMSServer server;
//The connection this queue will send messages over
@@ -49,18 +55,32 @@
private LinkedList messages = new LinkedList();
//LinkedList of the the temporary destinations that this client created
// public LinkedList temporaryDestinations = new LinkedList();
- //The message push thread for this consumer connection
- private Thread messagePushThread;
+
+ /**
+ * Flags that I am enqueued as work on my thread pool.
+ */
+ private boolean enqueued = false;
+
+ // Static ---------------------------------------------------
+
+ /**
+ * The {@link org.jboss.mq.threadpool.ThreadPool ThreadPool} that
+ * does the actual message pushing for us.
+ */
+ private static ThreadPool threadPool = null;
// Constructor ---------------------------------------------------
+
public ClientConsumer( JMSServer server, ConnectionToken dc )
throws JMSException {
this.server = server;
this.dc = dc;
cat = org.apache.log4j.Category.getInstance( ClientConsumer.class.getName() +
":" + dc.getClientID() );
- messagePushThread = new Thread( server.threadGroup, this, "Message Pusher " +
dc.getClientID() );
- messagePushThread.setDaemon( true );
- messagePushThread.start();
+ // Create thread pool
+ synchronized (ClientConsumer.class) {
+ if (threadPool == null)
+ threadPool = new ThreadPool("Message Pushers", server.threadGroup, 10,
true);
+ }
}
public void setEnabled( boolean enabled )
@@ -82,10 +102,17 @@
}
}
- public void queueMessageForSending( ReceiveRequest r ) {
- synchronized ( messages ) {
- messages.add( r );
- messages.notify();
+ public void queueMessageForSending(ReceiveRequest r)
+ {
+ synchronized (messages) {
+ if (closed)
+ return; // Wouldn't be delivered anyway
+
+ messages.add(r);
+ if (!enqueued) {
+ threadPool.enqueueWork(this);
+ enqueued = true;
+ }
}
}
@@ -110,12 +137,15 @@
}
public void close() {
-
cat.debug( "" + this + "->close()" );
- synchronized ( messages ) {
+ synchronized (messages) {
closed = true;
- messages.notifyAll();
+ if (enqueued) {
+ cat.debug("" + this + "->close(): Cancelling work in progress.");
+ threadPool.cancelWork(this);
+ enqueued = false;
+ }
}
synchronized ( subscriptions ) {
@@ -181,45 +211,34 @@
queue.removeSubscriber( req );
}
-
- // Iterate over the consumers asking them to take messages until they stop
- // consuming.
- public void run() {
-
- cat.debug( "" + this + "->run()" );
-
- while ( true ) {
-
- ReceiveRequest[] job;
- synchronized ( messages ) {
- while ( messages.size() == 0 ) {
- try {
- messages.wait();
- } catch ( InterruptedException e ) {
- }
- if ( closed ) {
- return;
- }
- }
-
- job = new ReceiveRequest[messages.size()];
- job = ( ReceiveRequest[] )messages.toArray( job );
- messages.clear();
- }
-
+ /**
+ * Push some messages.
+ */
+ public void doWork()
+ {
+ ReceiveRequest[] job;
+
+ synchronized (messages) {
+ if (closed)
+ return;
+
+ job = new ReceiveRequest[messages.size()];
+ job = (ReceiveRequest[])messages.toArray(job);
+ messages.clear();
+ enqueued = false;
+ }
+
+ try {
+ dc.clientIL.receive(job);
+ } catch (Exception e) {
+ cat.warn("Could not send messages to a receiver.", e);
try {
- dc.clientIL.receive( job );
- } catch ( Exception e ) {
- cat.warn( "Could not send messages to a receiver.", e );
- try {
- server.connectionFailure( dc );
- } catch ( Throwable ignore ) {
- cat.warn( "Could not close the client connection..", ignore );
- }
+ server.connectionFailure(dc);
+ } catch (Throwable ignore) {
+ cat.warn( "Could not close the client connection..", ignore);
}
}
-
}
public String toString() {
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development