Author: jvermillard
Date: Wed Jun 27 07:53:29 2007
New Revision: 551184
URL: http://svn.apache.org/viewvc?view=rev&rev=551184
Log:
APRSession and APRConnector created
Added:
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRTransportType.java
Modified:
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSession.java
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
Modified:
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java?view=diff&rev=551184&r1=551183&r2=551184
==============================================================================
---
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java
(original)
+++
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java
Wed Jun 27 07:53:29 2007
@@ -1,14 +1,20 @@
package org.apache.mina.transport.apr;
+import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executor;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.TransportType;
+import org.apache.mina.common.support.AbstractIoFilterChain;
import org.apache.mina.common.support.BaseIoConnector;
+import org.apache.mina.common.support.DefaultConnectFuture;
import org.apache.mina.common.support.IoServiceListenerSupport;
import org.apache.mina.util.NewThreadExecutor;
+import org.apache.tomcat.jni.Address;
import org.apache.tomcat.jni.Pool;
+import org.apache.tomcat.jni.Socket;
public class APRConnector extends BaseIoConnector {
@@ -19,11 +25,12 @@
private final Object lock = new Object();
private final int id = nextId++;
- private final String threadName = "APRConnector-" + id;
+ //private final String threadName = "APRConnector-" + id;
private final int processorCount;
private final Executor executor;
private final APRIoProcessor[] ioProcessors;
-
+ private int processorDistributor = 0;
+
// APR memory pool (package wide mother pool)
static long pool = -1;
@@ -66,13 +73,58 @@
@Override
protected ConnectFuture doConnect(SocketAddress remoteAddress,
SocketAddress localAddress) {
- // TODO Auto-generated method stub
- return null;
+ boolean success = false;
+ try
+ {
+ InetSocketAddress sockAddr=(InetSocketAddress)remoteAddress;
+ pool = Pool.create(pool);
+ long inetAddr=0;
+ inetAddr = Address.info( sockAddr.getHostName(), Socket.APR_INET,
+ sockAddr.getPort(), 0, pool);
+
+ // FIXME : type of socket need to be configurable
+
+ long clientSock=Socket.create(Socket.APR_INET, Socket.SOCK_STREAM,
Socket.APR_PROTO_TCP, pool);
+
+
+ // TODO: error checking ???
+ int ret=Socket.connect(clientSock, inetAddr);
+ System.err.println("Socket.connect : "+ret);
+ if( localAddress != null ) {
+ // TODO, check if it's possible to bind to a local address
+ }
+
+ ConnectFuture future = new DefaultConnectFuture();
+ APRSessionImpl session=new
APRSessionImpl(this,nextProcessor(),clientSock);
+
+ try
+ {
+ getFilterChainBuilder().buildFilterChain(
session.getFilterChain() );
+ }
+ catch( Throwable e )
+ {
+ throw ( IOException ) new IOException( "Failed to create a
session." ).initCause( e );
+ }
+
+ // Set the ConnectFuture of the specified session, which will be
+ // removed and notified by AbstractIoFilterChain eventually.
+ session.setAttribute( AbstractIoFilterChain.CONNECT_FUTURE, future
);
+
+ // Forward the remaining process to the SocketIoProcessor.
+ session.getIoProcessor().addNew( session );
+ future.setSession(session);
+
+ success = true;
+ return future;
+ }
+ catch( Exception e )
+ {
+ return DefaultConnectFuture.newFailedFuture( e );
+ }
}
public TransportType getTransportType() {
- // TODO Auto-generated method stub
- return null;
+ return APRTransportType.APR_SOCKET;
}
@Override
@@ -80,4 +132,15 @@
{
return super.getListeners();
}
+
+ private APRIoProcessor nextProcessor()
+ {
+ if ( processorDistributor++ < 0 )
+ {
+ processorDistributor = 0;
+ }
+
+ return ioProcessors[processorDistributor % processorCount];
+ }
+
}
Added:
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java?view=auto&rev=551184
==============================================================================
---
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java
(added)
+++
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java
Wed Jun 27 07:53:29 2007
@@ -0,0 +1,44 @@
+package org.apache.mina.transport.apr;
+
+import java.io.IOException;
+import java.util.Queue;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteRequest;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+
+public class APRFilterChain extends AbstractIoFilterChain {
+
+ APRFilterChain( IoSession parent )
+ {
+ super( parent );
+ }
+
+ @Override
+ protected void doWrite( IoSession session, WriteRequest writeRequest )
+ {
+ APRSessionImpl s = ( APRSessionImpl ) session;
+ Queue<WriteRequest> writeRequestQueue = s.getWriteRequestQueue();
+
+ // SocketIoProcessor.doFlush() will reset it after write is finished
+ // because the buffer will be passed with messageSent event.
+ ( ( ByteBuffer ) writeRequest.getMessage() ).mark();
+ synchronized( writeRequestQueue )
+ {
+ writeRequestQueue.offer( writeRequest );
+ if( writeRequestQueue.size() == 1 &&
session.getTrafficMask().isWritable() )
+ {
+ // Notify SocketIoProcessor only when writeRequestQueue was
empty.
+ s.getIoProcessor().flush( s );
+ }
+ }
+ }
+
+ @Override
+ protected void doClose( IoSession session ) throws IOException
+ {
+ APRSessionImpl s = ( APRSessionImpl ) session;
+ s.getIoProcessor().remove( s );
+ }
+}
Modified:
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSession.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSession.java?view=diff&rev=551184&r1=551183&r2=551184
==============================================================================
---
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSession.java
(original)
+++
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSession.java
Wed Jun 27 07:53:29 2007
@@ -1,22 +1,12 @@
package org.apache.mina.transport.apr;
import java.net.InetSocketAddress;
-import java.util.Queue;
import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteRequest;
public interface APRSession extends IoSession {
-
-
APRSessionConfig getConfig();
InetSocketAddress getRemoteAddress();
InetSocketAddress getLocalAddress();
InetSocketAddress getServiceAddress();
-
- // TODO : move to package only visible implementation
- long getAPRSocket();
- byte[] getReadBuffer();
- void increaseReadBytes( int increment );
- Queue<WriteRequest> getWriteRequestQueue();
-}
+}
\ No newline at end of file
Modified:
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java?view=diff&rev=551184&r1=551183&r2=551184
==============================================================================
---
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
(original)
+++
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
Wed Jun 27 07:53:29 2007
@@ -1,86 +1,132 @@
package org.apache.mina.transport.apr;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.LinkedList;
import java.util.Queue;
+import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoService;
import org.apache.mina.common.TransportType;
import org.apache.mina.common.WriteRequest;
import org.apache.mina.common.support.BaseIoSession;
+import org.apache.mina.common.support.BaseIoSessionConfig;
public class APRSessionImpl extends BaseIoSession implements APRSession {
+ private long socket;
- @Override
- protected void updateTrafficMask() {
- // TODO Auto-generated method stub
-
+ private final IoService service;
+ private final APRSessionConfig config = new APRSessionConfigImpl();
+ private final APRIoProcessor ioProcessor;
+ private final APRFilterChain filterChain;
+ private final Queue<WriteRequest> writeRequestQueue;
+ private final IoHandler handler;
+ private byte[] readBuffer;
+ private int readBufferSize;
+ private InetSocketAddress remoteAddress;
+ private InetSocketAddress localAddress;
+ /**
+ * Creates a new instance.
+ */
+ APRSessionImpl(IoService service, APRIoProcessor ioProcessor, long
socket) {
+ this.service = service;
+ this.ioProcessor = ioProcessor;
+ this.filterChain = new APRFilterChain( this );
+ this.writeRequestQueue = new LinkedList<WriteRequest>();
+ this.handler = service.getHandler();
+
+ }
+
+ void doConnect(SocketAddress remoteAddress, SocketAddress localAddress)
{
+ this.remoteAddress=(InetSocketAddress)remoteAddress;
+ this.localAddress=(InetSocketAddress)localAddress;
+ // TODO : connect
+ }
+
+ long getAPRSocket() {
+ return socket;
}
- public long getAPRSocket() {
- // TODO Auto-generated method stub
- return 0;
+ @Override
+ protected void updateTrafficMask() {
+ // TODO : this.ioProcessor.updateTrafficMask( this );
}
public APRSessionConfig getConfig() {
- // TODO Auto-generated method stub
- return null;
+ return config;
}
public InetSocketAddress getLocalAddress() {
- // TODO Auto-generated method stub
- return null;
+ return localAddress;
}
- public byte[] getReadBuffer() {
- // TODO Auto-generated method stub
- return null;
+ byte[] getReadBuffer() {
+ return readBuffer;
}
public InetSocketAddress getRemoteAddress() {
- // TODO Auto-generated method stub
- return null;
+ return remoteAddress;
}
- public Queue<WriteRequest> getWriteRequestQueue() {
- // TODO Auto-generated method stub
- return null;
+ Queue<WriteRequest> getWriteRequestQueue() {
+ return writeRequestQueue;
}
public IoFilterChain getFilterChain() {
- // TODO Auto-generated method stub
- return null;
+
+ return filterChain;
}
public IoHandler getHandler() {
- // TODO Auto-generated method stub
- return null;
+ return service.getHandler();
}
- public int getScheduledWriteBytes() {
- // TODO Auto-generated method stub
- return 0;
- }
+ public int getScheduledWriteMessages()
+ {
+ synchronized( writeRequestQueue )
+ {
+ return writeRequestQueue.size();
+ }
+ }
- public int getScheduledWriteMessages() {
- // TODO Auto-generated method stub
- return 0;
- }
+ public int getScheduledWriteBytes()
+ {
+ int size = 0;
+ synchronized( writeRequestQueue )
+ {
+ for( Object o: writeRequestQueue )
+ {
+ if( o instanceof ByteBuffer )
+ {
+ size += ( ( ByteBuffer ) o ).remaining();
+ }
+ }
+ }
+
+ return size;
+ }
public IoService getService() {
- // TODO Auto-generated method stub
- return null;
+ return service;
}
public TransportType getTransportType() {
- // TODO Auto-generated method stub
- return null;
+ return APRTransportType.APR_SOCKET;
}
+ APRIoProcessor getIoProcessor()
+ {
+ return ioProcessor;
+ }
+
@Override
public InetSocketAddress getServiceAddress() {
return (InetSocketAddress) super.getServiceAddress();
}
-
+
+ private class APRSessionConfigImpl extends BaseIoSessionConfig implements
APRSessionConfig {
+
+ }
}
Added:
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRTransportType.java
URL:
http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRTransportType.java?view=auto&rev=551184
==============================================================================
---
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRTransportType.java
(added)
+++
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRTransportType.java
Wed Jun 27 07:53:29 2007
@@ -0,0 +1,16 @@
+package org.apache.mina.transport.apr;
+
+import java.net.InetSocketAddress;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.support.DefaultTransportType;
+
+public class APRTransportType extends DefaultTransportType {
+
+ public static final APRTransportType APR_SOCKET=new APRTransportType();
+
+ private APRTransportType() {
+ super("Apache Portable Runtime socket",
false,InetSocketAddress.class, ByteBuffer.class,APRSessionConfig.class);
+ }
+
+}