Hello developers,

This is my first post :)

So here goes the story... I have written in C Partial implementation of
Stomp Broker
and there was a need to interconnect it with apache-mq the performance was
the bottleneck.

When I took a quick look @ stable src, there was no clue of using Java NIO,
but in SVN I found
a stub or initial implementation, a few minutes I was Happy as long as I
replaced 'StompTransportFactory extends TcpTransportFactory'
with StompTransportFactory extends NIOTransportFactory' and  was rewarded
with Exception :)

Further Investigation showed that This NIO implementation makes assumptions
about it's user, and it looks like it assumes that the OpenWire Protocol
uses it.
by the way, I found that there is no Transport that uses the
NIOTransportFactory at the moment.

My implementation should be 'Drop in replacement' for TcpTransportFactory. I
took universal approach and implemented: NIOBufferedInputStream &&
NIOBufferedOutputStream.   Despite the fact that we still have Thread per
client (with nio we should use selectors and less threads, but it would
require full apache-mq rewrite :) )  the performance improvement is about
80%. Basically it fullfils my requirements. It would be nice to have this
patch included.

Any Comments && Questions are welcome...

The patch is attached.
Index: /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
===================================================================
--- /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java	(revision 526549)
+++ /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java	(working copy)
@@ -22,6 +22,9 @@
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -32,35 +35,40 @@
 import org.apache.activemq.wireformat.WireFormat;
 
 /**
- * Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/";>Stomp</a> protocol.
+ * Implements marshalling and unmarsalling the <a
+ * href="http://stomp.codehaus.org/";>Stomp</a> protocol.
  */
 public class StompWireFormat implements WireFormat {
 
-    private static final byte[] NO_DATA = new byte[]{};
-	private static final byte[] END_OF_FRAME = new byte[]{0,'\n'};
-	
+	private static final byte[] NO_DATA = new byte[] {};
+
+	private static final byte[] END_OF_FRAME = new byte[] { 0, '\n' };
+
 	private static final int MAX_COMMAND_LENGTH = 1024;
-	private static final int MAX_HEADER_LENGTH = 1024*10;
+
+	private static final int MAX_HEADER_LENGTH = 1024 * 10;
+
 	private static final int MAX_HEADERS = 1000;
-	private static final int MAX_DATA_LENGTH = 1024*1024*100;
-    
-	private int version=1;
 
+	private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+
+	private int version = 1;
+
 	public ByteSequence marshal(Object command) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(baos);
-        marshal(command, dos);
-        dos.close();
-        return baos.toByteSequence();
-    }
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		DataOutputStream dos = new DataOutputStream(baos);
+		marshal(command, dos);
+		dos.close();
+		return baos.toByteSequence();
+	}
 
-    public Object unmarshal(ByteSequence packet) throws IOException {
-        ByteArrayInputStream stream = new ByteArrayInputStream(packet);
-        DataInputStream dis = new DataInputStream(stream);
-        return unmarshal(dis);
-    }
+	public Object unmarshal(ByteSequence packet) throws IOException {
+		ByteArrayInputStream stream = new ByteArrayInputStream(packet);
+		DataInputStream dis = new DataInputStream(stream);
+		return unmarshal(dis);
+	}
 
-    public void marshal(Object command, DataOutput os) throws IOException {
+	public void marshal(Object command, DataOutput os) throws IOException {
 		StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame) command;
 
 		StringBuffer buffer = new StringBuffer();
@@ -68,7 +76,8 @@
 		buffer.append(Stomp.NEWLINE);
 
 		// Output the headers.
-		for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
+		for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter
+				.hasNext();) {
 			Map.Entry entry = (Map.Entry) iter.next();
 			buffer.append(entry.getKey());
 			buffer.append(Stomp.Headers.SEPERATOR);
@@ -83,122 +92,156 @@
 		os.write(stomp.getContent());
 		os.write(END_OF_FRAME);
 	}
-    
 
-    public Object unmarshal(DataInput in) throws IOException {
-        	
-        try {
-			String action = null;
-			
-			// skip white space to next real action line
-			while (true) {
-				action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
-				if (action == null) {
-					throw new IOException("connection was closed");
-				} else {
-					action = action.trim();
-					if (action.length() > 0) {
-						break;
+	public Object unmarshal(DataInput in) throws IOException {
+
+		ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+		String action = null;
+		HashMap headers = new HashMap(25);
+		byte[] data = NO_DATA;
+		int contentLength = -1;
+		String key = null;
+		String value = null;
+		byte b = 0;
+		int i = 0;
+		int headersCount = 0;
+
+		try {
+			/*
+			 * Reading Command
+			 */
+
+			baos.reset();
+
+			for (i = 0; ((((b = in.readByte()) != '\n') && (i < MAX_COMMAND_LENGTH)) || (baos.size() == 0) ); ++i)
+			{  
+				if (b == '\n' && baos.size() == 0)
+				{
+					continue;
+				} else
+					baos.write(b);
+			}
+
+			if (i == MAX_COMMAND_LENGTH)
+				throw new ProtocolException(
+						"The maximum number of command length was exeeded",
+						true);
+
+			action = new String(baos.toByteArray(), "UTF-8");
+
+			/*
+			 * Reading Headers
+			 */
+
+			for (headersCount = 0; headersCount < MAX_HEADERS; ++headersCount) {
+				/*
+				 * Reading Key
+				 */
+
+				baos.reset();
+
+				for (i = 0; (((b = in.readByte()) != '\n') && (b != ':'))
+						&& (i < MAX_HEADER_LENGTH); ++i)
+					baos.write(b);
+
+				if (i == MAX_HEADER_LENGTH)
+					throw new ProtocolException(
+							"The maximum header length was exceeded", true);
+
+				if (b == ':') {
+					key = new String(baos.toByteArray(), "UTF-8");
+				} else if (b == '\n') {
+					break;
+				}
+
+				/*
+				 * Reading Value
+				 */
+
+				baos.reset();
+
+				for (i = 0; ((b = in.readByte()) != '\n')
+						&& (i < MAX_HEADER_LENGTH); ++i)
+					baos.write(b);
+
+				if (i == MAX_HEADER_LENGTH)
+					throw new ProtocolException(
+							"The maximum header length was exceeded", true);
+
+				value = new String(baos.toByteArray(), "UTF-8");
+
+				if (key.equals("content-length")) {
+
+					try {
+						contentLength = Integer.parseInt(value);
+					} catch (NumberFormatException e) {
+						throw new ProtocolException(
+								"Unable to parse content-length", true);
 					}
+
+					if (contentLength < 0) {
+						throw new ProtocolException("Negative content-length",
+								true);
+					}
+
+					if (contentLength > MAX_DATA_LENGTH) {
+						throw new ProtocolException(
+								"content-length is more than maximum data length",
+								true);
+					}
 				}
+
+				headers.put(key, value);
 			}
-			
-			// Parse the headers
-			HashMap headers = new HashMap(25);
-			while (true) {
-			    String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
-			    if (line != null && line.trim().length() > 0) {
-			    	
-			    	if( headers.size() > MAX_HEADERS )
-			    		throw new ProtocolException("The maximum number of headers was exceeded", true);
-			    	
-			        try {
-			            int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
-			            String name = line.substring(0, seperator_index).trim();
-			            String value = line.substring(seperator_index + 1, line.length()).trim();
-			            headers.put(name, value);
-			        }
-			        catch (Exception e) {
-			            throw new ProtocolException("Unable to parser header line [" + line + "]", true);
-			        }
-			    }
-			    else {
-			        break;
-			    }
-			}
-			
-			// Read in the data part.
-			byte[] data = NO_DATA;
-			String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
-			if (contentLength!=null) {
-			    
-				// Bless the client, he's telling us how much data to read in.        	
-				int length;
-				try {
-					length = Integer.parseInt(contentLength.trim());
-				} catch (NumberFormatException e) {
-					throw new ProtocolException("Specified content-length is not a valid integer", true);
-				}
 
-				if( length > MAX_DATA_LENGTH )
-					throw new ProtocolException("The maximum data length was exceeded", true);
-				
-			    data = new byte[length];
-			    in.readFully(data);
-			    
-			    if (in.readByte() != 0) {
-			        throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH+" bytes were read and " + "there was no trailing null byte", true);
-			    }
-			
+			if (headersCount >= MAX_HEADERS)
+				throw new ProtocolException(
+						"The maximum number of headers was exceeded", true);
+
+			/*
+			 * Reading Body
+			 */
+
+			if (contentLength > 0) {
+
+				baos.reset();
+
+				while ((contentLength--) > 0)
+					baos.write(in.readByte());
+					
+				if (in.readByte() == 0) {
+					data = baos.toByteArray();
+				} else
+					throw new ProtocolException(
+							"Body was not null terminated as expected", true);
+
 			} else {
 
-				// We don't know how much to read.. data ends when we hit a 0
-			    byte b;
-			    ByteArrayOutputStream baos=null;
-			    while ((b = in.readByte()) != 0) {
-					
-					if( baos == null ) {
-			    		baos = new ByteArrayOutputStream();
-			    	} else if( baos.size() > MAX_DATA_LENGTH ) {
-			    		throw new ProtocolException("The maximum data length was exceeded", true);
-			    	}
-			    
-			        baos.write(b);
-			    }
-			    
-			    if( baos!=null ) {
-			        baos.close();
-			        data = baos.toByteArray();
-			    }
-			    
+				baos.reset();
+
+				for (i = 0; ((b = in.readByte()) != 0) && (i < MAX_DATA_LENGTH); ++i)
+					baos.write(b);
+
+				if (i == MAX_DATA_LENGTH)
+					throw new ProtocolException(
+							"content exeeded maximum data length", true);
+
+				data = baos.toByteArray();
 			}
-			
+
 			return new StompFrame(action, headers, data);
-			
+
 		} catch (ProtocolException e) {
 			return new StompFrameError(e);
-		} 
+		}
 
-    }
-
-    private String readLine(DataInput in, int maxLength, String errorMessage) throws IOException {
-        byte b;
-        ByteArrayOutputStream baos=new ByteArrayOutputStream(maxLength);
-        while ((b = in.readByte()) != '\n') {
-        	if( baos.size() > maxLength )
-        		throw new ProtocolException(errorMessage, true);
-            baos.write(b);
-        }
-        ByteSequence sequence = baos.toByteSequence();
-		return new String(sequence.getData(),sequence.getOffset(),sequence.getLength(),"UTF-8");
 	}
 
 	public int getVersion() {
-        return version;
-    }
+		return version;
+	}
 
-    public void setVersion(int version) {
-        this.version = version;
-    }
-
+	public void setVersion(int version) {
+		this.version = version;
+	}
 }
Index: /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
===================================================================
--- /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java	(revision 526549)
+++ /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java	(working copy)
@@ -20,7 +20,8 @@
 import java.util.Map;
 
 import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.tcp.TcpTransportFactory;
+//import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.transport.nio.NIOTransportFactory;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.wireformat.WireFormat;
 
@@ -29,7 +30,7 @@
  *
  * @version $Revision: 1.1.1.1 $
  */
-public class StompTransportFactory extends TcpTransportFactory {
+public class StompTransportFactory extends NIOTransportFactory  /*TcpTransportFactory*/ {
 
     protected String getDefaultWireFormatType() {
         return "stomp";
@@ -38,6 +39,7 @@
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
     	transport = new StompTransportFilter(transport, new LegacyFrameTranslator());
     	IntrospectionSupport.setProperties(transport, options);
+    	
     	return super.compositeConfigure(transport, format, options);
     }
 
Index: /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
===================================================================
--- /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java	(revision 526549)
+++ /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java	(working copy)
@@ -33,6 +33,8 @@
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpBufferedInputStream;
+import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
 import org.apache.activemq.transport.tcp.TcpTransport;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.ServiceStopper;
@@ -42,15 +44,10 @@
  * 
  * @version $Revision$
  */
-public class NIOTransport extends TcpTransport {
-
-	//private static final Log log = LogFactory.getLog(NIOTransport.class);
-	private SocketChannel channel;
-	private SelectorSelection selection;
-	private ByteBuffer inputBuffer;
-	private ByteBuffer currentBuffer;
-	private int nextFrameSize;
-
+public class NIOTransport extends TcpTransport 
+{
+	SocketChannel sc = null;
+	
 	public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
 		super(wireFormat, socketFactory, remoteLocation, localLocation);
 	}
@@ -60,97 +57,11 @@
 	}
 
 	protected void initializeStreams() throws IOException {
-		channel = socket.getChannel();		
-		channel.configureBlocking(false);
+		sc = socket.getChannel(); 
 		
-		// listen for events telling us when the socket is readable.
-		selection = SelectorManager.getInstance().register(channel,
-				new SelectorManager.Listener() {
-					public void onSelect(SelectorSelection selection) {
-						serviceRead();
-					}
-					public void onError(SelectorSelection selection, Throwable error) {
-						if( error instanceof IOException ) {
-							onException((IOException) error);							
-						} else {
-							onException(IOExceptionSupport.create(error));							
-						}
-					}
-				});
+		sc.configureBlocking(false);		
 		
-		// Send the data via the channel
-//        inputBuffer = ByteBuffer.allocateDirect(8*1024);
-        inputBuffer = ByteBuffer.allocate(8*1024);
-        currentBuffer = inputBuffer;
-        nextFrameSize=-1;
-        currentBuffer.limit(4);
-        this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16*1024));
-        
+		this.dataIn  = new DataInputStream(new NIOBufferedInputStream(sc));
+		this.dataOut = new DataOutputStream(new NIOBufferedOutputStream(sc));
 	}
-	
-    private void serviceRead() {
-        try {
-            while( true ) {
-            	
-	
-	            int readSize = channel.read(currentBuffer);
-	            if( readSize == -1 ) {
-					onException(new EOFException());
-	                selection.close();
-	                break;
-	            }
-	            if( readSize==0 ) {
-	                break;
-	            }
-	            
-            	if( currentBuffer.hasRemaining() )
-            		continue;
-
-	            // Are we trying to figure out the size of the next frame?
-	            if( nextFrameSize==-1 ) {
-	            	assert inputBuffer == currentBuffer;
-
-	            	// If the frame is too big to fit in our direct byte buffer,
-	            	// Then allocate a non direct byte buffer of the right size for it.
-	            	inputBuffer.flip();
-	            	nextFrameSize = inputBuffer.getInt()+4;
-	            	if( nextFrameSize > inputBuffer.capacity() ) {
-	            		currentBuffer = ByteBuffer.allocate(nextFrameSize);
-	            		currentBuffer.putInt(nextFrameSize);
-	            	} else {
-	            		inputBuffer.limit(nextFrameSize);	            		
-	            	}
-	            	
-            	} else {
-            		currentBuffer.flip();
-    				
-            		Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
-            		doConsume((Command) command);
-            		
-            		nextFrameSize=-1;
-    				inputBuffer.clear();
-    				inputBuffer.limit(4);
-    				currentBuffer = inputBuffer;
-            	}
-	            
-            }
-            
-        } catch (IOException e) {
-            onException(e);
-        } catch (Throwable e) {
-        	onException(IOExceptionSupport.create(e));
-        }
-    }
-
-
-	protected void doStart() throws Exception {
-        connect();
-        selection.setInterestOps(SelectionKey.OP_READ);
-        selection.enable();
-    }
-
-	protected void doStop(ServiceStopper stopper) throws Exception {
-		selection.disable();
-		super.doStop(stopper);		
-	}
 }
Index: /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedInputStream.java
===================================================================
--- /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedInputStream.java	(revision 0)
+++ /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedInputStream.java	(revision 0)
@@ -0,0 +1,198 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+
+/**
+ * @author Mindaugas Janušaitis <[EMAIL PROTECTED]>
+ * 
+ * Implementation of InputStream using Java NIO channel,direct buffer and Selector 
+ */
+public class NIOBufferedInputStream extends InputStream {
+
+	private final static int BUFFER_SIZE = 8192;
+
+	private SocketChannel sc = null;
+
+	private ByteBuffer bb = null;
+
+	private Selector rs = null;
+
+	/**
+	 * 
+	 */
+	public NIOBufferedInputStream(ReadableByteChannel channel, int size)
+			throws ClosedChannelException, IOException {
+
+		if (size <= 0) {
+			throw new IllegalArgumentException("Buffer size <= 0");
+		}
+
+		this.bb = ByteBuffer.allocateDirect(size);
+		this.sc = (SocketChannel) channel;
+
+		this.sc.configureBlocking(false);
+
+		this.rs = Selector.open();
+
+		sc.register(rs, SelectionKey.OP_READ);
+
+		bb.position(0);
+		bb.limit(0);
+	}
+
+	/**
+	 * 
+	 */
+	public NIOBufferedInputStream(ReadableByteChannel channel)
+			throws ClosedChannelException, IOException {
+		this(channel, BUFFER_SIZE);
+	}
+
+	public int available() throws IOException {
+		if (!rs.isOpen())
+			throw new IOException("Input Stream Closed");
+
+		return bb.remaining();
+	}
+
+	public void close() throws IOException {
+		if (rs.isOpen()) {
+			rs.close();
+
+			if (sc.isOpen())
+			{
+				sc.socket().shutdownInput();
+				sc.socket().close();
+			}
+
+			bb = null;
+			sc = null;
+		}
+	}
+
+	public int read() throws IOException {
+		if (!rs.isOpen())
+			throw new IOException("Input Stream Closed");
+
+		if (!bb.hasRemaining()) {
+			try {
+				fill(1);
+			} catch (ClosedChannelException e) {
+				close();
+				return -1;
+			}
+		}
+
+		return (bb.get() & 0xFF);
+	}
+
+	public int read(byte[] b, int off, int len) throws IOException {
+		int bytesCopied = -1;
+		
+		if (!rs.isOpen())
+			throw new IOException("Input Stream Closed");
+
+		while (bytesCopied == -1)
+		{
+			if (bb.hasRemaining())
+			{
+				bytesCopied = (len < bb.remaining() ? len : bb.remaining());
+				bb.get(b, off, bytesCopied);
+			} else {
+				try {
+					fill(1);
+				} catch (ClosedChannelException e) {
+					close();
+					return -1;
+				}
+		    }
+		}
+		
+		return bytesCopied;
+	}
+
+	public long skip(long n) throws IOException {
+		long skiped = 0;
+		
+		
+		if (!rs.isOpen())
+			throw new IOException("Input Stream Closed");
+
+		while (n > 0)
+		{
+			if (n <= bb.remaining())
+			{
+				skiped += n;
+				bb.position(bb.position()+(int)n);
+				n = 0;
+			} else {
+				skiped += bb.remaining();
+				n      -= bb.remaining();
+					
+				bb.position(bb.limit());
+				
+				try {
+					fill((int)n);
+				} catch (ClosedChannelException e) {
+					close();
+					return skiped;
+				}
+			}
+		}
+			
+		return skiped;
+	}
+
+	private void fill(int n) throws IOException, ClosedChannelException {
+		int bytesRead = -1;
+
+		if ((n <= 0) || (n <= bb.remaining()))
+			return;
+
+		bb.compact();
+
+		n = (bb.remaining() < n ? bb.remaining() : n);
+
+		for (;;) {
+			bytesRead = sc.read(bb);
+
+			if (bytesRead == -1)
+				throw new ClosedChannelException();
+
+			n -= bytesRead;
+
+			if (n <= 0)
+				break;
+
+			rs.select(0);
+			rs.selectedKeys().clear();
+		}
+
+		bb.flip();
+	}
+}
Index: /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedOutputStream.java
===================================================================
--- /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedOutputStream.java	(revision 0)
+++ /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedOutputStream.java	(revision 0)
@@ -0,0 +1,137 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
+
+public class NIOBufferedOutputStream extends OutputStream {
+
+	private final static int BUFFER_SIZE = 8192;
+
+	private SocketChannel sc = null;
+
+	private ByteBuffer bb = null;
+
+	private Selector ws = null;
+
+	/**
+	 * @author Mindaugas Janušaitis <[EMAIL PROTECTED]>
+	 * 
+	 * Implementation of OuputStream using Java NIO channel,direct buffer and Selector
+	 */
+
+	public NIOBufferedOutputStream(WritableByteChannel channel, int size)
+			throws ClosedChannelException, IOException {
+
+		if (size <= 0) {
+			throw new IllegalArgumentException("Buffer size <= 0");
+		}
+
+		this.bb = ByteBuffer.allocateDirect(size);
+		this.sc = (SocketChannel) channel;
+
+		this.sc.configureBlocking(false);
+
+		this.ws = Selector.open();
+
+		sc.register(ws, SelectionKey.OP_WRITE);
+
+		bb.position(0);
+		bb.limit(bb.capacity());
+	}
+
+	/**
+	 * 
+	 */
+	public NIOBufferedOutputStream(WritableByteChannel channel)
+			throws ClosedChannelException, IOException {
+		this(channel, BUFFER_SIZE);
+	}
+
+	public void close() throws IOException {
+		if (ws.isOpen()) {
+			ws.close();
+
+			if (sc.isOpen()) {
+				sc.socket().shutdownOutput();
+				sc.socket().close();
+			}
+
+			bb = null;
+			sc = null;
+		}
+	}
+
+	public void flush() throws IOException {
+
+		bb.flip();
+
+		if (!bb.hasRemaining()) {
+			bb.position(0);
+			bb.limit(bb.capacity());
+
+			return;
+		}
+
+		for (;;) {
+			sc.write(bb);
+
+			if (!bb.hasRemaining()) {
+				bb.position(0);
+				bb.limit(bb.capacity());
+
+				return;
+			}
+
+			ws.select(0);
+			ws.selectedKeys().clear();
+		}
+	}
+
+	public void write(int b) throws IOException {
+		if (!bb.hasRemaining())
+			flush();
+
+		bb.put((byte) b);
+	}
+
+	public void write(byte[] b, int off, int len) throws IOException {
+
+		int bytesWritten = 0;
+
+		while (len > 0) {
+			if (!bb.hasRemaining())
+				flush();
+
+			bytesWritten = (len < bb.remaining() ? len : bb.remaining());
+
+			bb.put(b, off, bytesWritten);
+
+			off += bytesWritten;
+			len -= bytesWritten;
+		}
+	}
+}

Reply via email to