Batch up multiple socket write calls in the TCP transport.
----------------------------------------------------------
Key: AMQ-2139
URL: https://issues.apache.org/activemq/browse/AMQ-2139
Project: ActiveMQ
Issue Type: Improvement
Components: Transport
Affects Versions: 5.2.0
Reporter: Hiram Chirino
Assignee: Hiram Chirino
Fix For: 6.0.0
Investigate using an async write thread for the TCP transport. It would be
able to more efficiently batch up multiple writes into a single socket write.
{code}
$ svn diff
Index:
activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
===================================================================
---
activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
(revision 742546)
+++
activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
(working copy)
@@ -29,7 +29,9 @@
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -119,6 +121,9 @@
private Boolean tcpNoDelay;
private Thread runnerThread;
+ private final ArrayBlockingQueue<Object> outbound = new
ArrayBlockingQueue<Object>(100);
+ private Thread onewayThread;
+
/**
* Connect to a remote Node - e.g. a Broker
*
@@ -157,16 +162,39 @@
this.localLocation = null;
setDaemon(true);
}
-
+
/**
* A one way asynchronous send
*/
public void oneway(Object command) throws IOException {
checkStarted();
- wireFormat.marshal(command, dataOut);
- dataOut.flush();
+ try {
+ outbound.put(command);
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
}
+ protected void sendOneways() {
+ try {
+ while(!isStopped()) {
+ Object command = outbound.poll(500, TimeUnit.MILLISECONDS);
+ if( command!=null ) {
+ try {
+ while( command!=null ) {
+ wireFormat.marshal(command, dataOut);
+ command = outbound.poll();
+ }
+ dataOut.flush();
+ } catch (IOException e) {
+ getTransportListener().onException(e);
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+
/**
* @return pretty print of 'this'
*/
@@ -399,6 +427,11 @@
protected void doStart() throws Exception {
connect();
+ onewayThread = new Thread(null, new Runnable(){
+ public void run() {
+ sendOneways();
+ }}, "ActiveMQ Transport Sender: " + toString(), getStackSize());
+ onewayThread.start();
stoppedLatch.set(new CountDownLatch(1));
super.doStart();
}
@@ -487,8 +520,12 @@
LOG.debug("Caught exception closing socket",e);
}
}
-
}
+ if( onewayThread!=null ) {
+ onewayThread.join();
+ onewayThread = null;
+ outbound.clear();
+ }
}
/**
{code}
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.