Author: chirino
Date: Wed Dec 28 18:10:15 2005
New Revision: 359675
URL: http://svn.apache.org/viewcvs?rev=359675&view=rev
Log:
added missing files and updated copyright header.
Added:
incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/
incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannel.java
incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelFactory.java
incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOSyncChannelServer.java
Modified:
incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelTest.java
incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/ChannelFactoryTest.java
Added:
incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannel.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannel.java?rev=359675&view=auto
==============================================================================
---
incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannel.java
(added)
+++
incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannel.java
Wed Dec 28 18:10:15 2005
@@ -0,0 +1,262 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.aio;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+
+import org.apache.activeio.packet.ByteBufferPacket;
+import org.apache.activeio.packet.EOSPacket;
+import org.apache.activeio.packet.Packet;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.AsyncChannelListener;
+import org.apache.activeio.stream.sync.socket.SocketMetadata;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+import com.ibm.io.async.AsyncSocketChannel;
+import com.ibm.io.async.IAbstractAsyncFuture;
+import com.ibm.io.async.IAsyncFuture;
+import com.ibm.io.async.ICompletionListener;
+
+/**
+ * @version $Revision$
+ */
+final public class AIOAsyncChannel implements AsyncChannel,
ICompletionListener, SocketMetadata {
+
+ protected static final int DEFAULT_BUFFER_SIZE =
ByteBufferPacket.DEFAULT_DIRECT_BUFFER_SIZE;
+
+ private final AsyncSocketChannel socketChannel;
+ private final Socket socket;
+
+ private AsyncChannelListener channelListener;
+ private ByteBuffer inputByteBuffer;
+
+ private final AtomicBoolean running = new AtomicBoolean(false);
+ private CountDownLatch doneCountDownLatch;
+
+ protected AIOAsyncChannel(AsyncSocketChannel socketChannel) throws
IOException {
+ this.socketChannel = socketChannel;
+ this.socket = socketChannel.socket();
+ this.socket.setSendBufferSize(DEFAULT_BUFFER_SIZE);
+ this.socket.setReceiveBufferSize(DEFAULT_BUFFER_SIZE);
+ this.socket.setSoTimeout(0);
+ }
+
+ private ByteBuffer allocateBuffer() {
+ return ByteBuffer.allocateDirect(DEFAULT_BUFFER_SIZE);
+ }
+
+ public void setAsyncChannelListener(AsyncChannelListener channelListener) {
+ this.channelListener = channelListener;
+ }
+
+ public AsyncChannelListener getAsyncChannelListener() {
+ return channelListener;
+ }
+
+ public Object getAdapter(Class target) {
+ if( target.isAssignableFrom(getClass()) ) {
+ return this;
+ }
+ return null;
+ }
+
+ public void dispose() {
+ if( running.get() && channelListener!=null ) {
+ channelListener.onPacketError(new SocketException("Socket
closed."));
+ }
+ try {
+ stop();
+ } catch (IOException e) {
+ }
+ try {
+ socketChannel.close();
+ } catch (IOException e) {
+ }
+ }
+
+ public void start() throws IOException {
+ if( running.compareAndSet(false, true) ) {
+ doneCountDownLatch = new CountDownLatch(1);
+ requestNextRead();
+ }
+ }
+
+ public void stop() throws IOException {
+ if( running.compareAndSet(true, false) ) {
+ try {
+ doneCountDownLatch.await(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ }
+ }
+
+ public void write(Packet packet) throws IOException {
+ ByteBuffer data = ((ByteBufferPacket)packet).getByteBuffer();
+ while( data.hasRemaining() ) {
+ IAsyncFuture future = socketChannel.write(data);
+ try {
+ future.getByteCount();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ }
+ }
+
+ public void flush() throws IOException {
+ }
+
+ public void futureCompleted(IAbstractAsyncFuture abstractFuture, Object
attribute) {
+ IAsyncFuture future = (IAsyncFuture)abstractFuture;
+ try {
+
+ if( inputByteBuffer.position()>0 ) {
+ ByteBuffer remaining = inputByteBuffer.slice();
+ Packet data = new
ByteBufferPacket(((ByteBuffer)inputByteBuffer.flip()).slice());
+
+ channelListener.onPacket(data);
+ // Keep the remaining buffer around to fill with data.
+ inputByteBuffer = remaining;
+ requestNextRead();
+
+ } else {
+ channelListener.onPacket(EOSPacket.EOS_PACKET);
+ }
+
+ } catch (IOException e) {
+ channelListener.onPacketError(e);
+ }
+ }
+
+ private void requestNextRead() throws InterruptedIOException {
+
+ // Don't do next read if we have stopped running.
+ if( !running.get() ) {
+ doneCountDownLatch.countDown();
+ return;
+ }
+
+ try {
+
+ if( inputByteBuffer==null || !inputByteBuffer.hasRemaining() ) {
+ inputByteBuffer = allocateBuffer();
+ }
+
+ IAsyncFuture future = socketChannel.read(inputByteBuffer);
+ future.addCompletionListener(this, null, false);
+
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+
+ }
+
+ public InetAddress getInetAddress() {
+ return socket.getInetAddress();
+ }
+ public boolean getKeepAlive() throws SocketException {
+ return socket.getKeepAlive();
+ }
+ public InetAddress getLocalAddress() {
+ return socket.getLocalAddress();
+ }
+ public int getLocalPort() {
+ return socket.getLocalPort();
+ }
+ public SocketAddress getLocalSocketAddress() {
+ return socket.getLocalSocketAddress();
+ }
+ public boolean getOOBInline() throws SocketException {
+ return socket.getOOBInline();
+ }
+ public int getPort() {
+ return socket.getPort();
+ }
+ public int getReceiveBufferSize() throws SocketException {
+ return socket.getReceiveBufferSize();
+ }
+ public SocketAddress getRemoteSocketAddress() {
+ return socket.getRemoteSocketAddress();
+ }
+ public boolean getReuseAddress() throws SocketException {
+ return socket.getReuseAddress();
+ }
+ public int getSendBufferSize() throws SocketException {
+ return socket.getSendBufferSize();
+ }
+ public int getSoLinger() throws SocketException {
+ return socket.getSoLinger();
+ }
+ public int getSoTimeout() throws SocketException {
+ return socket.getSoTimeout();
+ }
+ public boolean getTcpNoDelay() throws SocketException {
+ return socket.getTcpNoDelay();
+ }
+ public int getTrafficClass() throws SocketException {
+ return socket.getTrafficClass();
+ }
+ public boolean isBound() {
+ return socket.isBound();
+ }
+ public boolean isClosed() {
+ return socket.isClosed();
+ }
+ public boolean isConnected() {
+ return socket.isConnected();
+ }
+ public void setKeepAlive(boolean on) throws SocketException {
+ socket.setKeepAlive(on);
+ }
+ public void setOOBInline(boolean on) throws SocketException {
+ socket.setOOBInline(on);
+ }
+ public void setReceiveBufferSize(int size) throws SocketException {
+ socket.setReceiveBufferSize(size);
+ }
+ public void setReuseAddress(boolean on) throws SocketException {
+ socket.setReuseAddress(on);
+ }
+ public void setSendBufferSize(int size) throws SocketException {
+ socket.setSendBufferSize(size);
+ }
+ public void setSoLinger(boolean on, int linger) throws SocketException {
+ socket.setSoLinger(on, linger);
+ }
+ public void setTcpNoDelay(boolean on) throws SocketException {
+ socket.setTcpNoDelay(on);
+ }
+ public void setTrafficClass(int tc) throws SocketException {
+ socket.setTrafficClass(tc);
+ }
+ public void setSoTimeout(int i) throws SocketException {
+ socket.setSoTimeout(i);
+ }
+ public String toString() {
+ return "AIO Connection: "+getLocalSocketAddress()+" ->
"+getRemoteSocketAddress();
+ }
+
+ }
\ No newline at end of file
Added:
incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelFactory.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelFactory.java?rev=359675&view=auto
==============================================================================
---
incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelFactory.java
(added)
+++
incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelFactory.java
Wed Dec 28 18:10:15 2005
@@ -0,0 +1,115 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.aio;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.activeio.adapter.SyncToAsyncChannelServer;
+import org.apache.activeio.packet.ByteBufferPacket;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.AsyncChannelFactory;
+import org.apache.activeio.packet.async.AsyncChannelServer;
+import org.apache.activeio.packet.async.filter.WriteBufferedAsyncChannel;
+import org.apache.activeio.util.URISupport;
+
+import com.ibm.io.async.AsyncServerSocketChannel;
+import com.ibm.io.async.AsyncSocketChannel;
+
+/**
+ * A TcpAsyncChannelFactory creates [EMAIL PROTECTED]
org.apache.activeio.net.TcpAsyncChannel}
+ * and [EMAIL PROTECTED] org.apache.activeio.net.TcpAsyncChannelServer}
objects.
+ *
+ * @version $Revision$
+ */
+public class AIOAsyncChannelFactory implements AsyncChannelFactory {
+
+ protected static final int DEFAULT_BACKLOG = 500;
+ private int backlog = DEFAULT_BACKLOG;
+
+ /**
+ * Uses the [EMAIL PROTECTED] location}'s host and port to create a tcp
connection to a remote host.
+ *
+ * @see
org.apache.activeio.AsyncChannelFactory#openAsyncChannel(java.net.URI)
+ */
+ public AsyncChannel openAsyncChannel(URI location) throws IOException {
+ AsyncSocketChannel channel = AsyncSocketChannel.open();
+ channel.connect(new InetSocketAddress(location.getHost(),
location.getPort()));
+ return createAsyncChannel(channel);
+ }
+
+ /**
+ * @param channel
+ * @return
+ * @throws IOException
+ */
+ protected AsyncChannel createAsyncChannel(AsyncSocketChannel
socketChannel) throws IOException {
+ AsyncChannel channel = new AIOAsyncChannel(socketChannel);
+ channel = new WriteBufferedAsyncChannel(channel,
ByteBufferPacket.createDefaultBuffer(true), false);
+ return channel;
+ }
+
+ /**
+ * Binds a server socket a the [EMAIL PROTECTED] location}'s port.
+ *
+ * @see
org.apache.activeio.AsyncChannelFactory#bindAsyncChannel(java.net.URI)
+ */
+ public AsyncChannelServer bindAsyncChannel(URI bindURI) throws IOException
{
+
+ String host = bindURI.getHost();
+ InetSocketAddress address;
+ if( host == null || host.length() == 0 || host.equals("localhost") ||
host.equals("0.0.0.0") || InetAddress.getLocalHost().getHostName().equals(host)
) {
+ address = new InetSocketAddress(bindURI.getPort());
+ } else {
+ address = new InetSocketAddress(bindURI.getHost(),
bindURI.getPort());
+ }
+
+ AsyncServerSocketChannel serverSocketChannel =
AsyncServerSocketChannel.open();
+ serverSocketChannel.socket().bind(address,backlog);
+
+ URI connectURI = bindURI;
+ try {
+// connectURI = URISupport.changeHost(connectURI,
InetAddress.getLocalHost().getHostName());
+ connectURI = URISupport.changePort(connectURI,
serverSocketChannel.socket().getLocalPort());
+ } catch (URISyntaxException e) {
+ throw (IOException)new IOException("Could not build connect URI:
"+e).initCause(e);
+ }
+
+ return SyncToAsyncChannelServer.adapt(
+ new AIOSyncChannelServer(serverSocketChannel, bindURI,
connectURI));
+ }
+
+ /**
+ * @return Returns the backlog.
+ */
+ public int getBacklog() {
+ return backlog;
+ }
+
+ /**
+ * @param backlog
+ * The backlog to set.
+ */
+ public void setBacklog(int backlog) {
+ this.backlog = backlog;
+ }
+
+
+}
Added:
incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOSyncChannelServer.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOSyncChannelServer.java?rev=359675&view=auto
==============================================================================
---
incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOSyncChannelServer.java
(added)
+++
incubator/activemq/trunk/activeio/activeio-aio/src/main/java/org/apache/activeio/packet/async/aio/AIOSyncChannelServer.java
Wed Dec 28 18:10:15 2005
@@ -0,0 +1,107 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeio.packet.async.aio;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+
+import org.apache.activeio.Channel;
+import org.apache.activeio.packet.ByteBufferPacket;
+import org.apache.activeio.packet.async.AsyncChannel;
+import org.apache.activeio.packet.async.filter.WriteBufferedAsyncChannel;
+import org.apache.activeio.packet.sync.SyncChannelServer;
+
+import com.ibm.io.async.AsyncServerSocketChannel;
+
+/**
+ * @version $Revision$
+ */
+public class AIOSyncChannelServer implements SyncChannelServer {
+
+ private final AsyncServerSocketChannel serverSocket;
+ private final URI bindURI;
+ private final URI connectURI;
+ private int curentSoTimeout;
+
+ public AIOSyncChannelServer(AsyncServerSocketChannel serverSocket, URI
bindURI, URI connectURI) throws IOException {
+ this.serverSocket=serverSocket;
+ this.bindURI=bindURI;
+ this.connectURI=connectURI;
+ this.curentSoTimeout = serverSocket.socket().getSoTimeout();
+ }
+
+ public URI getBindURI() {
+ return bindURI;
+ }
+
+ public URI getConnectURI() {
+ return this.connectURI;
+ }
+
+ public void dispose() {
+ try {
+ serverSocket.close();
+ } catch (IOException e) {
+ }
+ }
+
+ synchronized public void start() throws IOException {
+ }
+
+ synchronized public void stop() {
+ }
+
+ public Channel accept(long timeout) throws IOException {
+ try {
+
+ if (timeout == SyncChannelServer.WAIT_FOREVER_TIMEOUT)
+ setSoTimeout(0);
+ else if (timeout == SyncChannelServer.NO_WAIT_TIMEOUT)
+ setSoTimeout(1);
+ else
+ setSoTimeout((int) timeout);
+
+ AsyncChannel channel = new AIOAsyncChannel(serverSocket.accept());
+ channel = new WriteBufferedAsyncChannel(channel,
ByteBufferPacket.createDefaultBuffer(true), false);
+ return channel;
+
+ } catch (SocketTimeoutException ignore) {
+ }
+ return null;
+ }
+
+ private void setSoTimeout(int i) throws SocketException {
+ if (curentSoTimeout != i) {
+ serverSocket.socket().setSoTimeout(i);
+ curentSoTimeout = i;
+ }
+ }
+
+ public Object getAdapter(Class target) {
+ if( target.isAssignableFrom(getClass()) ) {
+ return this;
+ }
+ return null;
+ }
+
+ public String toString() {
+ return "AIO Server: "+getConnectURI();
+ }
+
+}
\ No newline at end of file
Modified:
incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelTest.java?rev=359675&r1=359674&r2=359675&view=diff
==============================================================================
---
incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelTest.java
(original)
+++
incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/AIOAsyncChannelTest.java
Wed Dec 28 18:10:15 2005
@@ -1,18 +1,18 @@
/**
*
- * Copyright 2004 Hiram Chirino
+ * Copyright 2005-2006 The Apache Software Foundation
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.activeio.packet.async.aio;
Modified:
incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/ChannelFactoryTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/ChannelFactoryTest.java?rev=359675&r1=359674&r2=359675&view=diff
==============================================================================
---
incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/ChannelFactoryTest.java
(original)
+++
incubator/activemq/trunk/activeio/activeio-aio/src/test/java/org/apache/activeio/packet/async/aio/ChannelFactoryTest.java
Wed Dec 28 18:10:15 2005
@@ -1,20 +1,19 @@
-/**
- *
- * Copyright 2004 Hiram Chirino
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- **/
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.activeio.packet.async.aio;
import java.io.IOException;