Author: dejanb
Date: Fri Sep 11 07:57:05 2009
New Revision: 813703
URL: http://svn.apache.org/viewvc?rev=813703&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2239 - stomp+nio transport
implementation
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedInputStream.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java
activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOTest.java
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/niostomp-auth-broker.xml
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedInputStream.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedInputStream.java?rev=813703&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedInputStream.java
(added)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOBufferedInputStream.java
Fri Sep 11 07:57:05 2009
@@ -0,0 +1,185 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.nio;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+
+/**
+ * Implementation of InputStream using Java NIO channel,direct buffer and
+ * Selector
+ */
+public class NIOBufferedInputStream extends InputStream {
+
+ private final static int BUFFER_SIZE = 8192;
+
+ private SocketChannel sc = null;
+
+ private ByteBuffer bb = null;
+
+ private Selector rs = null;
+
+ public NIOBufferedInputStream(ReadableByteChannel channel, int size)
+ throws ClosedChannelException, IOException {
+
+ if (size <= 0) {
+ throw new IllegalArgumentException("Buffer size <= 0");
+ }
+
+ this.bb = ByteBuffer.allocateDirect(size);
+ this.sc = (SocketChannel) channel;
+
+ this.sc.configureBlocking(false);
+
+ this.rs = Selector.open();
+
+ sc.register(rs, SelectionKey.OP_READ);
+
+ bb.position(0);
+ bb.limit(0);
+ }
+
+ public NIOBufferedInputStream(ReadableByteChannel channel)
+ throws ClosedChannelException, IOException {
+ this(channel, BUFFER_SIZE);
+ }
+
+ public int available() throws IOException {
+ if (!rs.isOpen())
+ throw new IOException("Input Stream Closed");
+
+ return bb.remaining();
+ }
+
+ public void close() throws IOException {
+ if (rs.isOpen()) {
+ rs.close();
+
+ if (sc.isOpen()) {
+ sc.socket().shutdownInput();
+ sc.socket().close();
+ }
+
+ bb = null;
+ sc = null;
+ }
+ }
+
+ public int read() throws IOException {
+ if (!rs.isOpen())
+ throw new IOException("Input Stream Closed");
+
+ if (!bb.hasRemaining()) {
+ try {
+ fill(1);
+ } catch (ClosedChannelException e) {
+ close();
+ return -1;
+ }
+ }
+
+ return (bb.get() & 0xFF);
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException {
+ int bytesCopied = -1;
+
+ if (!rs.isOpen())
+ throw new IOException("Input Stream Closed");
+
+ while (bytesCopied == -1) {
+ if (bb.hasRemaining()) {
+ bytesCopied = (len < bb.remaining() ? len : bb.remaining());
+ bb.get(b, off, bytesCopied);
+ } else {
+ try {
+ fill(1);
+ } catch (ClosedChannelException e) {
+ close();
+ return -1;
+ }
+ }
+ }
+
+ return bytesCopied;
+ }
+
+ public long skip(long n) throws IOException {
+ long skiped = 0;
+
+ if (!rs.isOpen())
+ throw new IOException("Input Stream Closed");
+
+ while (n > 0) {
+ if (n <= bb.remaining()) {
+ skiped += n;
+ bb.position(bb.position() + (int) n);
+ n = 0;
+ } else {
+ skiped += bb.remaining();
+ n -= bb.remaining();
+
+ bb.position(bb.limit());
+
+ try {
+ fill((int) n);
+ } catch (ClosedChannelException e) {
+ close();
+ return skiped;
+ }
+ }
+ }
+
+ return skiped;
+ }
+
+ private void fill(int n) throws IOException, ClosedChannelException {
+ int bytesRead = -1;
+
+ if ((n <= 0) || (n <= bb.remaining()))
+ return;
+
+ bb.compact();
+
+ n = (bb.remaining() < n ? bb.remaining() : n);
+
+ for (;;) {
+ bytesRead = sc.read(bb);
+
+ if (bytesRead == -1)
+ throw new ClosedChannelException();
+
+ n -= bytesRead;
+
+ if (n <= 0)
+ break;
+
+ rs.select(0);
+ rs.selectedKeys().clear();
+ }
+
+ bb.flip();
+ }
+}
\ No newline at end of file
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=813703&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
(added)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
Fri Sep 11 07:57:05 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.channels.SocketChannel;
+
+import javax.net.SocketFactory;
+
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.nio.NIOBufferedInputStream;
+import org.apache.activemq.transport.nio.NIOOutputStream;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * An implementation of the {...@link Transport} interface for using Stomp
over NIO
+ *
+ * @version $Revision$
+ */
+public class StompNIOTransport extends TcpTransport {
+
+ private SocketChannel channel;
+
+ public StompNIOTransport(WireFormat wireFormat, SocketFactory
socketFactory, URI remoteLocation, URI localLocation) throws
UnknownHostException, IOException {
+ super(wireFormat, socketFactory, remoteLocation, localLocation);
+ }
+
+ public StompNIOTransport(WireFormat wireFormat, Socket socket) throws
IOException {
+ super(wireFormat, socket);
+ }
+
+ protected void initializeStreams() throws IOException {
+ channel = socket.getChannel();
+ channel.configureBlocking(false);
+
+ this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16 *
1024));
+ this.dataIn = new DataInputStream(new NIOBufferedInputStream(channel,
8 * 1024));
+ }
+
+}
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java?rev=813703&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java
(added)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java
Fri Sep 11 07:57:05 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.Map;
+
+import javax.net.ServerSocketFactory;
+import javax.net.SocketFactory;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.nio.NIOTransport;
+import org.apache.activemq.transport.nio.NIOTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.xbean.XBeanBrokerService;
+import org.springframework.context.ApplicationContext;
+
+/**
+ * A <a href="http://stomp.codehaus.org/">STOMP</a> over NIO transport factory
+ *
+ * @version $Revision: 645574 $
+ */
+public class StompNIOTransportFactory extends NIOTransportFactory implements
BrokerServiceAware {
+
+ private ApplicationContext applicationContext = null;
+
+ protected String getDefaultWireFormatType() {
+ return "stomp";
+ }
+
+ protected TcpTransportServer createTcpTransportServer(URI location,
ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException
{
+ return new TcpTransportServer(this, location, serverSocketFactory) {
+ protected Transport createTransport(Socket socket, WireFormat
format) throws IOException {
+ return new StompNIOTransport(format, socket);
+ }
+ };
+ }
+
+ protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory
socketFactory, URI location, URI localLocation) throws UnknownHostException,
IOException {
+ return new StompNIOTransport(wf, socketFactory, location,
localLocation);
+ }
+
+ public Transport compositeConfigure(Transport transport, WireFormat
format, Map options) {
+ transport = new StompTransportFilter(transport, new
LegacyFrameTranslator(), applicationContext);
+ IntrospectionSupport.setProperties(transport, options);
+ return super.compositeConfigure(transport, format, options);
+ }
+
+ protected boolean isUseInactivityMonitor(Transport transport) {
+ // lets disable the inactivity monitor as stomp does not use keep alive
+ // packets
+ return false;
+ }
+
+ public void setBrokerService(BrokerService brokerService) {
+ if (brokerService instanceof XBeanBrokerService) {
+ this.applicationContext =
((XBeanBrokerService)brokerService).getApplicationContext();
+ }
+ }
+
+}
+
Added:
activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp%2Bnio?rev=813703&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio
(added)
+++
activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/stomp+nio
Fri Sep 11 07:57:05 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.stomp.StompNIOTransportFactory
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOTest.java?rev=813703&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompNIOTest.java
Fri Sep 11 07:57:05 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+
+/**
+ * @version $Revision: 732672 $
+ */
+public class StompNIOTest extends StompTest {
+
+ protected void setUp() throws Exception {
+ bindAddress = "stomp+nio://localhost:61612";
+ confUri =
"xbean:org/apache/activemq/transport/stomp/niostomp-auth-broker.xml";
+ super.setUp();
+ }
+}
Added:
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/niostomp-auth-broker.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/niostomp-auth-broker.xml?rev=813703&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/niostomp-auth-broker.xml
(added)
+++
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/niostomp-auth-broker.xml
Fri Sep 11 07:57:05 2009
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- this file can only be parsed using the xbean-spring library -->
+<!-- START SNIPPET: example -->
+<beans>
+ <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+ <bean class="org.apache.activemq.util.XStreamFactoryBean" name="xstream">
+ <property
name="annotatedClass"><value>org.apache.activemq.transport.stomp.SamplePojo</value></property>
+ </bean>
+
+ <broker useJmx="true" persistent="false"
xmlns="http://activemq.org/config/1.0" populateJMSXUserID="true">
+
+ <transportConnectors>
+ <transportConnector name="stomp+nio"
uri="stomp+nio://localhost:61612"/>
+ </transportConnectors>
+
+ <plugins>
+ <simpleAuthenticationPlugin>
+ <users>
+ <authenticationUser username="system"
password="manager"
+ groups="users,admins"/>
+ <authenticationUser username="user"
password="password"
+ groups="users"/>
+ <authenticationUser username="guest"
password="password" groups="guests"/>
+ </users>
+ </simpleAuthenticationPlugin>
+
+
+ <!-- lets configure a destination based authorization mechanism -->
+ <authorizationPlugin>
+ <map>
+ <authorizationMap>
+ <authorizationEntries>
+ <authorizationEntry queue=">" read="admins" write="admins"
admin="admins" />
+ <authorizationEntry queue="USERS.>" read="users" write="users"
admin="users" />
+ <authorizationEntry queue="GUEST.>" read="guests"
write="guests,users" admin="guests,users" />
+
+ <authorizationEntry topic=">" read="admins" write="admins"
admin="admins" />
+ <authorizationEntry topic="USERS.>" read="users" write="users"
admin="users" />
+ <authorizationEntry topic="GUEST.>" read="guests"
write="guests,users" admin="guests,users" />
+
+ <authorizationEntry topic="ActiveMQ.Advisory.>"
read="guests,users" write="guests,users" admin="guests,users"/>
+ </authorizationEntries>
+ </authorizationMap>
+ </map>
+ </authorizationPlugin>
+ </plugins>
+ </broker>
+
+</beans>
\ No newline at end of file