Author: rajdavies
Date: Fri Aug 18 11:02:49 2006
New Revision: 432664
URL: http://svn.apache.org/viewvc?rev=432664&view=rev
Log:
tidy code for easier maintenance
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=432664&r1=432663&r2=432664&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Fri Aug 18 11:02:49 2006
@@ -77,26 +77,37 @@
if(peer==null)
throw new IOException("Peer not connected.");
if(!peer.disposed){
- final TransportListener tl=peer.transportListener;
- messageQueue=getMessageQueue();
- prePeerSetQueue=peer.prePeerSetQueue;
- if(tl==null){
- prePeerSetQueue.add(command);
- }else if(!async){
- tl.onCommand(command);
+
+ if(async){
+ asyncOneWay(command);
}else{
- try{
- messageQueue.put(command);
- wakeup();
- }catch(final InterruptedException e){
- log.error("messageQueue interuppted",e);
- throw new IOException(e.getMessage());
- }
+ syncOneWay(command);
}
}else{
throw new TransportDisposedIOException("Peer ("+peer.toString()+")
disposed.");
}
}
+
+ protected void syncOneWay(Command command){
+ final TransportListener tl=peer.transportListener;
+ prePeerSetQueue=peer.prePeerSetQueue;
+ if(tl==null){
+ prePeerSetQueue.add(command);
+ }else{
+ tl.onCommand(command);
+ }
+ }
+
+ protected void asyncOneWay(Command command) throws IOException{
+ messageQueue=getMessageQueue();
+ try{
+ messageQueue.put(command);
+ wakeup();
+ }catch(final InterruptedException e){
+ log.error("messageQueue interupted",e);
+ throw new IOException(e.getMessage());
+ }
+ }
public FutureResponse asyncRequest(Command command,ResponseCallback
responseCallback) throws IOException{
throw new AssertionError("Unsupported Method");
@@ -117,18 +128,23 @@
synchronized public void setTransportListener(TransportListener
commandListener){
this.transportListener=commandListener;
wakeup();
+ peer.wakeup();
}
public synchronized void start() throws Exception{
started=true;
if(transportListener==null)
throw new IOException("TransportListener not set.");
- for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
- Command command=(Command) iter.next();
- transportListener.onCommand(command);
- iter.remove();
+ if(!async){
+ for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
+ Command command=(Command) iter.next();
+ transportListener.onCommand(command);
+ iter.remove();
+ }
+ }else{
+ wakeup();
+ peer.wakeup();
}
- wakeup();
}
public void stop() throws Exception{
@@ -176,14 +192,14 @@
return null;
}
- // task implementation
+ /**
+ * @see org.apache.activemq.thread.Task#iterate()
+ */
public boolean iterate(){
- TransportListener tl=peer.transportListener;
+ final TransportListener tl=peer.transportListener;
if(!messageQueue.isEmpty()&&!peer.disposed&&tl!=null){
- Command command=(Command) messageQueue.poll();
- if(tl!=null){
- tl.onCommand(command);
- }
+ final Command command=(Command) messageQueue.poll();
+ tl.onCommand(command);
}
return
!messageQueue.isEmpty()&&!peer.disposed&&!(peer.transportListener==null);
}