Author: dejanb
Date: Fri Sep 11 08:47:11 2009
New Revision: 813722
URL: http://svn.apache.org/viewvc?rev=813722&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2386 - stomp+nio using selectors
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=813722&r1=813721&r2=813722&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
Fri Sep 11 08:47:11 2009
@@ -22,14 +22,21 @@
import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import javax.net.SocketFactory;
+import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.nio.NIOBufferedInputStream;
import org.apache.activemq.transport.nio.NIOOutputStream;
+import org.apache.activemq.transport.nio.SelectorManager;
+import org.apache.activemq.transport.nio.SelectorSelection;
import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
/**
@@ -40,6 +47,7 @@
public class StompNIOTransport extends TcpTransport {
private SocketChannel channel;
+ private SelectorSelection selection;
public StompNIOTransport(WireFormat wireFormat, SocketFactory
socketFactory, URI remoteLocation, URI localLocation) throws
UnknownHostException, IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
@@ -53,8 +61,47 @@
channel = socket.getChannel();
channel.configureBlocking(false);
- this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16 *
1024));
- this.dataIn = new DataInputStream(new NIOBufferedInputStream(channel,
8 * 1024));
+ // listen for events telling us when the socket is readable.
+ selection = SelectorManager.getInstance().register(channel, new
SelectorManager.Listener() {
+ public void onSelect(SelectorSelection selection) {
+ serviceRead();
+ }
+
+ public void onError(SelectorSelection selection, Throwable error) {
+ if (error instanceof IOException) {
+ onException((IOException)error);
+ } else {
+ onException(IOExceptionSupport.create(error));
+ }
+ }
+ });
+
+ this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 8 *
1024));
+ }
+
+ private void serviceRead() {
+ try {
+ DataInputStream in = new DataInputStream(new
NIOBufferedInputStream(channel, 8 * 1024));
+ while (true) {
+ Object command = wireFormat.unmarshal(in);
+ doConsume((Command)command);
+ }
+
+ } catch (IOException e) {
+ onException(e);
+ } catch (Throwable e) {
+ onException(IOExceptionSupport.create(e));
+ }
}
+ protected void doStart() throws Exception {
+ connect();
+ selection.setInterestOps(SelectionKey.OP_READ);
+ selection.enable();
+ }
+
+ protected void doStop(ServiceStopper stopper) throws Exception {
+ selection.disable();
+ super.doStop(stopper);
+ }
}