Hello developers, This is my first post :)
So here goes the story... I have written in C Partial implementation of Stomp Broker and there was a need to interconnect it with apache-mq the performance was the bottleneck. When I took a quick look @ stable src, there was no clue of using Java NIO, but in SVN I found a stub or initial implementation, a few minutes I was Happy as long as I replaced 'StompTransportFactory extends TcpTransportFactory' with StompTransportFactory extends NIOTransportFactory' and was rewarded with Exception :) Further Investigation showed that This NIO implementation makes assumptions about it's user, and it looks like it assumes that the OpenWire Protocol uses it. by the way, I found that there is no Transport that uses the NIOTransportFactory at the moment. My implementation should be 'Drop in replacement' for TcpTransportFactory. I took universal approach and implemented: NIOBufferedInputStream && NIOBufferedOutputStream. Despite the fact that we still have Thread per client (with nio we should use selectors and less threads, but it would require full apache-mq rewrite :) ) the performance improvement is about 80%. Basically it fullfils my requirements. It would be nice to have this patch included. Any Comments && Questions are welcome... The patch is attached.
Index: /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java =================================================================== --- /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (revision 526549) +++ /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (working copy) @@ -22,6 +22,9 @@ import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -32,35 +35,40 @@ import org.apache.activemq.wireformat.WireFormat; /** - * Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol. + * Implements marshalling and unmarsalling the <a + * href="http://stomp.codehaus.org/">Stomp</a> protocol. */ public class StompWireFormat implements WireFormat { - private static final byte[] NO_DATA = new byte[]{}; - private static final byte[] END_OF_FRAME = new byte[]{0,'\n'}; - + private static final byte[] NO_DATA = new byte[] {}; + + private static final byte[] END_OF_FRAME = new byte[] { 0, '\n' }; + private static final int MAX_COMMAND_LENGTH = 1024; - private static final int MAX_HEADER_LENGTH = 1024*10; + + private static final int MAX_HEADER_LENGTH = 1024 * 10; + private static final int MAX_HEADERS = 1000; - private static final int MAX_DATA_LENGTH = 1024*1024*100; - - private int version=1; + private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100; + + private int version = 1; + public ByteSequence marshal(Object command) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(baos); - marshal(command, dos); - dos.close(); - return baos.toByteSequence(); - } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + marshal(command, dos); + dos.close(); + return baos.toByteSequence(); + } - public Object unmarshal(ByteSequence packet) throws IOException { - ByteArrayInputStream stream = new ByteArrayInputStream(packet); - DataInputStream dis = new DataInputStream(stream); - return unmarshal(dis); - } + public Object unmarshal(ByteSequence packet) throws IOException { + ByteArrayInputStream stream = new ByteArrayInputStream(packet); + DataInputStream dis = new DataInputStream(stream); + return unmarshal(dis); + } - public void marshal(Object command, DataOutput os) throws IOException { + public void marshal(Object command, DataOutput os) throws IOException { StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame) command; StringBuffer buffer = new StringBuffer(); @@ -68,7 +76,8 @@ buffer.append(Stomp.NEWLINE); // Output the headers. - for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) { + for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter + .hasNext();) { Map.Entry entry = (Map.Entry) iter.next(); buffer.append(entry.getKey()); buffer.append(Stomp.Headers.SEPERATOR); @@ -83,122 +92,156 @@ os.write(stomp.getContent()); os.write(END_OF_FRAME); } - - public Object unmarshal(DataInput in) throws IOException { - - try { - String action = null; - - // skip white space to next real action line - while (true) { - action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded"); - if (action == null) { - throw new IOException("connection was closed"); - } else { - action = action.trim(); - if (action.length() > 0) { - break; + public Object unmarshal(DataInput in) throws IOException { + + ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); + String action = null; + HashMap headers = new HashMap(25); + byte[] data = NO_DATA; + int contentLength = -1; + String key = null; + String value = null; + byte b = 0; + int i = 0; + int headersCount = 0; + + try { + /* + * Reading Command + */ + + baos.reset(); + + for (i = 0; ((((b = in.readByte()) != '\n') && (i < MAX_COMMAND_LENGTH)) || (baos.size() == 0) ); ++i) + { + if (b == '\n' && baos.size() == 0) + { + continue; + } else + baos.write(b); + } + + if (i == MAX_COMMAND_LENGTH) + throw new ProtocolException( + "The maximum number of command length was exeeded", + true); + + action = new String(baos.toByteArray(), "UTF-8"); + + /* + * Reading Headers + */ + + for (headersCount = 0; headersCount < MAX_HEADERS; ++headersCount) { + /* + * Reading Key + */ + + baos.reset(); + + for (i = 0; (((b = in.readByte()) != '\n') && (b != ':')) + && (i < MAX_HEADER_LENGTH); ++i) + baos.write(b); + + if (i == MAX_HEADER_LENGTH) + throw new ProtocolException( + "The maximum header length was exceeded", true); + + if (b == ':') { + key = new String(baos.toByteArray(), "UTF-8"); + } else if (b == '\n') { + break; + } + + /* + * Reading Value + */ + + baos.reset(); + + for (i = 0; ((b = in.readByte()) != '\n') + && (i < MAX_HEADER_LENGTH); ++i) + baos.write(b); + + if (i == MAX_HEADER_LENGTH) + throw new ProtocolException( + "The maximum header length was exceeded", true); + + value = new String(baos.toByteArray(), "UTF-8"); + + if (key.equals("content-length")) { + + try { + contentLength = Integer.parseInt(value); + } catch (NumberFormatException e) { + throw new ProtocolException( + "Unable to parse content-length", true); } + + if (contentLength < 0) { + throw new ProtocolException("Negative content-length", + true); + } + + if (contentLength > MAX_DATA_LENGTH) { + throw new ProtocolException( + "content-length is more than maximum data length", + true); + } } + + headers.put(key, value); } - - // Parse the headers - HashMap headers = new HashMap(25); - while (true) { - String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded"); - if (line != null && line.trim().length() > 0) { - - if( headers.size() > MAX_HEADERS ) - throw new ProtocolException("The maximum number of headers was exceeded", true); - - try { - int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR); - String name = line.substring(0, seperator_index).trim(); - String value = line.substring(seperator_index + 1, line.length()).trim(); - headers.put(name, value); - } - catch (Exception e) { - throw new ProtocolException("Unable to parser header line [" + line + "]", true); - } - } - else { - break; - } - } - - // Read in the data part. - byte[] data = NO_DATA; - String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH); - if (contentLength!=null) { - - // Bless the client, he's telling us how much data to read in. - int length; - try { - length = Integer.parseInt(contentLength.trim()); - } catch (NumberFormatException e) { - throw new ProtocolException("Specified content-length is not a valid integer", true); - } - if( length > MAX_DATA_LENGTH ) - throw new ProtocolException("The maximum data length was exceeded", true); - - data = new byte[length]; - in.readFully(data); - - if (in.readByte() != 0) { - throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH+" bytes were read and " + "there was no trailing null byte", true); - } - + if (headersCount >= MAX_HEADERS) + throw new ProtocolException( + "The maximum number of headers was exceeded", true); + + /* + * Reading Body + */ + + if (contentLength > 0) { + + baos.reset(); + + while ((contentLength--) > 0) + baos.write(in.readByte()); + + if (in.readByte() == 0) { + data = baos.toByteArray(); + } else + throw new ProtocolException( + "Body was not null terminated as expected", true); + } else { - // We don't know how much to read.. data ends when we hit a 0 - byte b; - ByteArrayOutputStream baos=null; - while ((b = in.readByte()) != 0) { - - if( baos == null ) { - baos = new ByteArrayOutputStream(); - } else if( baos.size() > MAX_DATA_LENGTH ) { - throw new ProtocolException("The maximum data length was exceeded", true); - } - - baos.write(b); - } - - if( baos!=null ) { - baos.close(); - data = baos.toByteArray(); - } - + baos.reset(); + + for (i = 0; ((b = in.readByte()) != 0) && (i < MAX_DATA_LENGTH); ++i) + baos.write(b); + + if (i == MAX_DATA_LENGTH) + throw new ProtocolException( + "content exeeded maximum data length", true); + + data = baos.toByteArray(); } - + return new StompFrame(action, headers, data); - + } catch (ProtocolException e) { return new StompFrameError(e); - } + } - } - - private String readLine(DataInput in, int maxLength, String errorMessage) throws IOException { - byte b; - ByteArrayOutputStream baos=new ByteArrayOutputStream(maxLength); - while ((b = in.readByte()) != '\n') { - if( baos.size() > maxLength ) - throw new ProtocolException(errorMessage, true); - baos.write(b); - } - ByteSequence sequence = baos.toByteSequence(); - return new String(sequence.getData(),sequence.getOffset(),sequence.getLength(),"UTF-8"); } public int getVersion() { - return version; - } + return version; + } - public void setVersion(int version) { - this.version = version; - } - + public void setVersion(int version) { + this.version = version; + } } Index: /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java =================================================================== --- /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java (revision 526549) +++ /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java (working copy) @@ -20,7 +20,8 @@ import java.util.Map; import org.apache.activemq.transport.Transport; -import org.apache.activemq.transport.tcp.TcpTransportFactory; +//import org.apache.activemq.transport.tcp.TcpTransportFactory; +import org.apache.activemq.transport.nio.NIOTransportFactory; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.wireformat.WireFormat; @@ -29,7 +30,7 @@ * * @version $Revision: 1.1.1.1 $ */ -public class StompTransportFactory extends TcpTransportFactory { +public class StompTransportFactory extends NIOTransportFactory /*TcpTransportFactory*/ { protected String getDefaultWireFormatType() { return "stomp"; @@ -38,6 +39,7 @@ public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { transport = new StompTransportFilter(transport, new LegacyFrameTranslator()); IntrospectionSupport.setProperties(transport, options); + return super.compositeConfigure(transport, format, options); } Index: /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java =================================================================== --- /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java (revision 526549) +++ /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java (working copy) @@ -33,6 +33,8 @@ import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.command.Command; import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.tcp.TcpBufferedInputStream; +import org.apache.activemq.transport.tcp.TcpBufferedOutputStream; import org.apache.activemq.transport.tcp.TcpTransport; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.ServiceStopper; @@ -42,15 +44,10 @@ * * @version $Revision$ */ -public class NIOTransport extends TcpTransport { - - //private static final Log log = LogFactory.getLog(NIOTransport.class); - private SocketChannel channel; - private SelectorSelection selection; - private ByteBuffer inputBuffer; - private ByteBuffer currentBuffer; - private int nextFrameSize; - +public class NIOTransport extends TcpTransport +{ + SocketChannel sc = null; + public NIOTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { super(wireFormat, socketFactory, remoteLocation, localLocation); } @@ -60,97 +57,11 @@ } protected void initializeStreams() throws IOException { - channel = socket.getChannel(); - channel.configureBlocking(false); + sc = socket.getChannel(); - // listen for events telling us when the socket is readable. - selection = SelectorManager.getInstance().register(channel, - new SelectorManager.Listener() { - public void onSelect(SelectorSelection selection) { - serviceRead(); - } - public void onError(SelectorSelection selection, Throwable error) { - if( error instanceof IOException ) { - onException((IOException) error); - } else { - onException(IOExceptionSupport.create(error)); - } - } - }); + sc.configureBlocking(false); - // Send the data via the channel -// inputBuffer = ByteBuffer.allocateDirect(8*1024); - inputBuffer = ByteBuffer.allocate(8*1024); - currentBuffer = inputBuffer; - nextFrameSize=-1; - currentBuffer.limit(4); - this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16*1024)); - + this.dataIn = new DataInputStream(new NIOBufferedInputStream(sc)); + this.dataOut = new DataOutputStream(new NIOBufferedOutputStream(sc)); } - - private void serviceRead() { - try { - while( true ) { - - - int readSize = channel.read(currentBuffer); - if( readSize == -1 ) { - onException(new EOFException()); - selection.close(); - break; - } - if( readSize==0 ) { - break; - } - - if( currentBuffer.hasRemaining() ) - continue; - - // Are we trying to figure out the size of the next frame? - if( nextFrameSize==-1 ) { - assert inputBuffer == currentBuffer; - - // If the frame is too big to fit in our direct byte buffer, - // Then allocate a non direct byte buffer of the right size for it. - inputBuffer.flip(); - nextFrameSize = inputBuffer.getInt()+4; - if( nextFrameSize > inputBuffer.capacity() ) { - currentBuffer = ByteBuffer.allocate(nextFrameSize); - currentBuffer.putInt(nextFrameSize); - } else { - inputBuffer.limit(nextFrameSize); - } - - } else { - currentBuffer.flip(); - - Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer))); - doConsume((Command) command); - - nextFrameSize=-1; - inputBuffer.clear(); - inputBuffer.limit(4); - currentBuffer = inputBuffer; - } - - } - - } catch (IOException e) { - onException(e); - } catch (Throwable e) { - onException(IOExceptionSupport.create(e)); - } - } - - - protected void doStart() throws Exception { - connect(); - selection.setInterestOps(SelectionKey.OP_READ); - selection.enable(); - } - - protected void doStop(ServiceStopper stopper) throws Exception { - selection.disable(); - super.doStop(stopper); - } } Index: /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedInputStream.java =================================================================== --- /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedInputStream.java (revision 0) +++ /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedInputStream.java (revision 0) @@ -0,0 +1,198 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.nio; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channel; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; + +/** + * @author Mindaugas Janušaitis <[EMAIL PROTECTED]> + * + * Implementation of InputStream using Java NIO channel,direct buffer and Selector + */ +public class NIOBufferedInputStream extends InputStream { + + private final static int BUFFER_SIZE = 8192; + + private SocketChannel sc = null; + + private ByteBuffer bb = null; + + private Selector rs = null; + + /** + * + */ + public NIOBufferedInputStream(ReadableByteChannel channel, int size) + throws ClosedChannelException, IOException { + + if (size <= 0) { + throw new IllegalArgumentException("Buffer size <= 0"); + } + + this.bb = ByteBuffer.allocateDirect(size); + this.sc = (SocketChannel) channel; + + this.sc.configureBlocking(false); + + this.rs = Selector.open(); + + sc.register(rs, SelectionKey.OP_READ); + + bb.position(0); + bb.limit(0); + } + + /** + * + */ + public NIOBufferedInputStream(ReadableByteChannel channel) + throws ClosedChannelException, IOException { + this(channel, BUFFER_SIZE); + } + + public int available() throws IOException { + if (!rs.isOpen()) + throw new IOException("Input Stream Closed"); + + return bb.remaining(); + } + + public void close() throws IOException { + if (rs.isOpen()) { + rs.close(); + + if (sc.isOpen()) + { + sc.socket().shutdownInput(); + sc.socket().close(); + } + + bb = null; + sc = null; + } + } + + public int read() throws IOException { + if (!rs.isOpen()) + throw new IOException("Input Stream Closed"); + + if (!bb.hasRemaining()) { + try { + fill(1); + } catch (ClosedChannelException e) { + close(); + return -1; + } + } + + return (bb.get() & 0xFF); + } + + public int read(byte[] b, int off, int len) throws IOException { + int bytesCopied = -1; + + if (!rs.isOpen()) + throw new IOException("Input Stream Closed"); + + while (bytesCopied == -1) + { + if (bb.hasRemaining()) + { + bytesCopied = (len < bb.remaining() ? len : bb.remaining()); + bb.get(b, off, bytesCopied); + } else { + try { + fill(1); + } catch (ClosedChannelException e) { + close(); + return -1; + } + } + } + + return bytesCopied; + } + + public long skip(long n) throws IOException { + long skiped = 0; + + + if (!rs.isOpen()) + throw new IOException("Input Stream Closed"); + + while (n > 0) + { + if (n <= bb.remaining()) + { + skiped += n; + bb.position(bb.position()+(int)n); + n = 0; + } else { + skiped += bb.remaining(); + n -= bb.remaining(); + + bb.position(bb.limit()); + + try { + fill((int)n); + } catch (ClosedChannelException e) { + close(); + return skiped; + } + } + } + + return skiped; + } + + private void fill(int n) throws IOException, ClosedChannelException { + int bytesRead = -1; + + if ((n <= 0) || (n <= bb.remaining())) + return; + + bb.compact(); + + n = (bb.remaining() < n ? bb.remaining() : n); + + for (;;) { + bytesRead = sc.read(bb); + + if (bytesRead == -1) + throw new ClosedChannelException(); + + n -= bytesRead; + + if (n <= 0) + break; + + rs.select(0); + rs.selectedKeys().clear(); + } + + bb.flip(); + } +} Index: /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedOutputStream.java =================================================================== --- /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedOutputStream.java (revision 0) +++ /home/mindeh/active-mq-svn-workspace/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedOutputStream.java (revision 0) @@ -0,0 +1,137 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.nio; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.nio.channels.WritableByteChannel; + +public class NIOBufferedOutputStream extends OutputStream { + + private final static int BUFFER_SIZE = 8192; + + private SocketChannel sc = null; + + private ByteBuffer bb = null; + + private Selector ws = null; + + /** + * @author Mindaugas Janušaitis <[EMAIL PROTECTED]> + * + * Implementation of OuputStream using Java NIO channel,direct buffer and Selector + */ + + public NIOBufferedOutputStream(WritableByteChannel channel, int size) + throws ClosedChannelException, IOException { + + if (size <= 0) { + throw new IllegalArgumentException("Buffer size <= 0"); + } + + this.bb = ByteBuffer.allocateDirect(size); + this.sc = (SocketChannel) channel; + + this.sc.configureBlocking(false); + + this.ws = Selector.open(); + + sc.register(ws, SelectionKey.OP_WRITE); + + bb.position(0); + bb.limit(bb.capacity()); + } + + /** + * + */ + public NIOBufferedOutputStream(WritableByteChannel channel) + throws ClosedChannelException, IOException { + this(channel, BUFFER_SIZE); + } + + public void close() throws IOException { + if (ws.isOpen()) { + ws.close(); + + if (sc.isOpen()) { + sc.socket().shutdownOutput(); + sc.socket().close(); + } + + bb = null; + sc = null; + } + } + + public void flush() throws IOException { + + bb.flip(); + + if (!bb.hasRemaining()) { + bb.position(0); + bb.limit(bb.capacity()); + + return; + } + + for (;;) { + sc.write(bb); + + if (!bb.hasRemaining()) { + bb.position(0); + bb.limit(bb.capacity()); + + return; + } + + ws.select(0); + ws.selectedKeys().clear(); + } + } + + public void write(int b) throws IOException { + if (!bb.hasRemaining()) + flush(); + + bb.put((byte) b); + } + + public void write(byte[] b, int off, int len) throws IOException { + + int bytesWritten = 0; + + while (len > 0) { + if (!bb.hasRemaining()) + flush(); + + bytesWritten = (len < bb.remaining() ? len : bb.remaining()); + + bb.put(b, off, bytesWritten); + + off += bytesWritten; + len -= bytesWritten; + } + } +}
