Author: rajdavies Date: Wed Jan 10 11:40:33 2007 New Revision: 494950 URL: http://svn.apache.org/viewvc?view=rev&rev=494950 Log: Tidied up the async dispatch option
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?view=diff&rev=494950&r1=494949&r2=494950 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Wed Jan 10 11:40:33 2007 @@ -11,6 +11,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.activemq.transport.vm; import java.io.IOException; @@ -19,6 +20,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import org.apache.activemq.broker.BrokerStoppedException; import org.apache.activemq.command.Command; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; @@ -32,16 +34,18 @@ import org.apache.commons.logging.LogFactory; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; + /** * A Transport implementation that uses direct method invocations. * * @version $Revision$ */ public class VMTransport implements Transport,Task{ + private static final Log log=LogFactory.getLog(VMTransport.class); private static final AtomicLong nextId=new AtomicLong(0); private static final TaskRunnerFactory taskRunnerFactory=new TaskRunnerFactory("VMTransport",Thread.NORM_PRIORITY, - true,1000); + true,1000); protected VMTransport peer; protected TransportListener transportListener; protected boolean disposed; @@ -51,7 +55,7 @@ protected boolean started=false; protected int asyncQueueDepth=2000; protected List prePeerSetQueue=Collections.synchronizedList(new LinkedList()); - protected LinkedBlockingQueue messageQueue; + protected LinkedBlockingQueue messageQueue=null; protected final URI location; protected final long id; private TaskRunner taskRunner; @@ -76,9 +80,8 @@ if(peer==null) throw new IOException("Peer not connected."); if(!peer.disposed){ - if(async){ - asyncOneWay(command); + asyncOneWay(command); }else{ syncOneWay(command); } @@ -86,7 +89,7 @@ throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed."); } } - + protected void syncOneWay(Object command){ final TransportListener tl=peer.transportListener; prePeerSetQueue=peer.prePeerSetQueue; @@ -96,10 +99,12 @@ tl.onCommand(command); } } - - protected void asyncOneWay(Object command) throws IOException{ - messageQueue=getMessageQueue(); + + protected synchronized void asyncOneWay(Object command) throws IOException{ try{ + if(messageQueue==null){ + messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth); + } messageQueue.put(command); wakeup(); }catch(final InterruptedException e){ @@ -136,17 +141,17 @@ throw new IOException("TransportListener not set."); if(!async){ for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){ - Command command=(Command) iter.next(); + Command command=(Command)iter.next(); transportListener.onCommand(command); iter.remove(); } }else{ - wakeup(); peer.wakeup(); + wakeup(); } } - public void stop() throws Exception{ + public synchronized void stop() throws Exception{ started=false; if(!disposed){ disposed=true; @@ -196,11 +201,17 @@ */ public boolean iterate(){ final TransportListener tl=peer.transportListener; - if(!messageQueue.isEmpty()&&!peer.disposed&&tl!=null){ - final Command command=(Command) messageQueue.poll(); - tl.onCommand(command); + Command command=null; + // if(!disposed && !messageQueue.isEmpty()&&!peer.disposed&&tl!=null){ + synchronized(this){ + if(messageQueue!=null&&!disposed&&!peer.disposed&&tl!=null &&!messageQueue.isEmpty()){ + command=(Command)messageQueue.poll(); + if (command != null) { + tl.onCommand(command); + } + } } - return !messageQueue.isEmpty()&&!peer.disposed&&!(peer.transportListener==null); + return messageQueue!=null&&!messageQueue.isEmpty()&&!peer.disposed; } /** @@ -231,8 +242,8 @@ this.asyncQueueDepth=asyncQueueDepth; } - protected void wakeup(){ - if(async&&messageQueue!=null&&!messageQueue.isEmpty()){ + protected synchronized void wakeup(){ + if(async){ if(taskRunner==null){ taskRunner=taskRunnerFactory.createTaskRunner(this,"VMTransport: "+toString()); } @@ -242,12 +253,5 @@ Thread.currentThread().interrupt(); } } - } - - protected synchronized LinkedBlockingQueue getMessageQueue(){ - if(messageQueue==null){ - messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth); - } - return messageQueue; } }