Author: rajdavies
Date: Thu Aug 17 06:01:50 2006
New Revision: 432221
URL: http://svn.apache.org/viewvc?rev=432221&view=rev
Log:
Added optinal async delivery to VMTransport
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
(with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java?rev=432221&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
Thu Aug 17 06:01:50 2006
@@ -0,0 +1,43 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+import java.io.IOException;
+
+/**
+ * This is exception is thrown when the transport is disposed
+ *
+ * @version $Revision$
+ */
+public class TransportDisposedIOException extends IOException {
+
+ private static final long serialVersionUID=-7107323414439622596L;
+
+ public TransportDisposedIOException() {
+ super();
+ }
+
+ /**
+ * @param message
+ */
+ public TransportDisposedIOException(String message) {
+ super(message);
+ }
+
+
+}
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=432221&r1=432220&r2=432221&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Thu Aug 17 06:01:50 2006
@@ -1,19 +1,15 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.transport.vm;
@@ -23,37 +19,46 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
+import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
/**
* A Transport implementation that uses direct method invocations.
*
* @version $Revision$
*/
-public class VMTransport implements Transport{
+public class VMTransport implements Transport,Task{
private static final Log log=LogFactory.getLog(VMTransport.class);
- private static final AtomicLong nextId = new AtomicLong(0);
-
+ private static final AtomicLong nextId=new AtomicLong(0);
+ private static final TaskRunnerFactory taskRunnerFactory=new
TaskRunnerFactory("VMTransport",Thread.NORM_PRIORITY,
+ true,1000);
protected VMTransport peer;
protected TransportListener transportListener;
protected boolean disposed;
protected boolean marshal;
protected boolean network;
- protected List queue = Collections.synchronizedList(new LinkedList());
+ protected boolean async=false;
+ protected boolean started=false;
+ protected int asyncQueueDepth=2000;
+ protected List prePeerSetQueue=Collections.synchronizedList(new
LinkedList());
+ protected LinkedBlockingQueue messageQueue;
protected final URI location;
protected final long id;
-
- public VMTransport(URI location) {
- this.location = location;
+ private TaskRunner taskRunner;
+
+ public VMTransport(URI location){
+ this.location=location;
this.id=nextId.getAndIncrement();
}
@@ -66,57 +71,75 @@
}
public void oneway(Command command) throws IOException{
- if(disposed)
- throw new IOException("Transport disposed.");
+ if(disposed){
+ throw new TransportDisposedIOException("Transport disposed.");
+ }
if(peer==null)
throw new IOException("Peer not connected.");
- if (!peer.disposed){
- TransportListener tl = peer.transportListener;
- queue = peer.queue;
- if (tl != null){
+ if(!peer.disposed){
+ final TransportListener tl=peer.transportListener;
+ messageQueue=getMessageQueue();
+ prePeerSetQueue=peer.prePeerSetQueue;
+ if(tl==null){
+ prePeerSetQueue.add(command);
+ }else if(!async){
tl.onCommand(command);
- }else {
- queue.add(command);
+ }else{
+ try{
+ messageQueue.put(command);
+ wakeup();
+ }catch(final InterruptedException e){
+ log.error("messageQueue interuppted",e);
+ throw new IOException(e.getMessage());
+ }
}
- } else {
- throw new IOException("Peer disconnected.");
+ }else{
+ throw new TransportDisposedIOException("Peer ("+peer.toString()+")
disposed.");
}
}
- public FutureResponse asyncRequest(Command command, ResponseCallback
responseCallback) throws IOException{
+ public FutureResponse asyncRequest(Command command,ResponseCallback
responseCallback) throws IOException{
throw new AssertionError("Unsupported Method");
}
public Response request(Command command) throws IOException{
throw new AssertionError("Unsupported Method");
}
-
- public Response request(Command command,int timeout) throws IOException {
+
+ public Response request(Command command,int timeout) throws IOException{
throw new AssertionError("Unsupported Method");
}
- public synchronized TransportListener getTransportListener() {
+ public synchronized TransportListener getTransportListener(){
return transportListener;
}
synchronized public void setTransportListener(TransportListener
commandListener){
this.transportListener=commandListener;
+ wakeup();
}
public synchronized void start() throws Exception{
+ started=true;
if(transportListener==null)
throw new IOException("TransportListener not set.");
- for (Iterator iter = queue.iterator(); iter.hasNext();) {
- Command command = (Command) iter.next();
+ for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
+ Command command=(Command) iter.next();
transportListener.onCommand(command);
iter.remove();
}
+ wakeup();
}
public void stop() throws Exception{
+ started=false;
if(!disposed){
disposed=true;
}
+ if(taskRunner!=null){
+ taskRunner.shutdown();
+ taskRunner=null;
+ }
}
public Object narrow(Class target){
@@ -141,16 +164,75 @@
public void setNetwork(boolean network){
this.network=network;
}
-
- public String toString() {
+
+ public String toString(){
return location+"#"+id;
}
- public String getRemoteAddress() {
- if(peer != null){
- return peer.toString();
- }
- return null;
- }
+ public String getRemoteAddress(){
+ if(peer!=null){
+ return peer.toString();
+ }
+ return null;
+ }
+ // task implementation
+ public boolean iterate(){
+ TransportListener tl=peer.transportListener;
+ if(!messageQueue.isEmpty()&&!peer.disposed&&tl!=null){
+ Command command=(Command) messageQueue.poll();
+ if(tl!=null){
+ tl.onCommand(command);
+ }
+ }
+ return
!messageQueue.isEmpty()&&!peer.disposed&&!(peer.transportListener==null);
+ }
+
+ /**
+ * @return the async
+ */
+ public boolean isAsync(){
+ return async;
+ }
+
+ /**
+ * @param async the async to set
+ */
+ public void setAsync(boolean async){
+ this.async=async;
+ }
+
+ /**
+ * @return the asyncQueueDepth
+ */
+ public int getAsyncQueueDepth(){
+ return asyncQueueDepth;
+ }
+
+ /**
+ * @param asyncQueueDepth the asyncQueueDepth to set
+ */
+ public void setAsyncQueueDepth(int asyncQueueDepth){
+ this.asyncQueueDepth=asyncQueueDepth;
+ }
+
+ protected void wakeup(){
+ if(async&&messageQueue!=null&&!messageQueue.isEmpty()){
+ if(taskRunner==null){
+
taskRunner=taskRunnerFactory.createTaskRunner(this,"VMTransport: "+toString());
+ }
+ try{
+ taskRunner.wakeup();
+ }catch(InterruptedException e){
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ protected synchronized LinkedBlockingQueue getMessageQueue(){
+ if(messageQueue==null){
+ messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
+ }
+ return messageQueue;
+ }
}