Author: fhanik Date: Mon Mar 13 21:44:46 2006 New Revision: 385742 URL: http://svn.apache.org/viewcvs?rev=385742&view=rev Log: Optimizing the interceptors
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastServiceImpl.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java?rev=385742&r1=385741&r2=385742&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/ChannelCoordinator.java Mon Mar 13 21:44:46 2006 @@ -37,6 +37,7 @@ private ChannelReceiver clusterReceiver; private ChannelSender clusterSender; private MembershipService membershipService; + private boolean started = false; public ChannelCoordinator() { @@ -75,15 +76,18 @@ * SND_RX_SEQ - starts the replication receiver<BR> * @throws ChannelException if a startup error occurs or the service is already started. */ - public void start(int svc) throws ChannelException { + public synchronized void start() throws ChannelException { try { + if (started) return; + //must start the receiver first so that we can coordinate the port it + //listens to with the local membership settings + clusterReceiver.start(); + clusterSender.start(); //synchronize, big time FIXME membershipService.setLocalMemberProperties(getClusterReceiver().getHost(), getClusterReceiver().getPort()); - //end FIXME - if ( (svc & Channel.SND_RX_SEQ) == Channel.SND_RX_SEQ) clusterReceiver.start(); - if ( (svc & Channel.SND_TX_SEQ) == Channel.SND_TX_SEQ) clusterSender.start(); - if ( (svc & Channel.MBR_RX_SEQ) == Channel.MBR_RX_SEQ) membershipService.start(MembershipService.MBR_RX); - if ( (svc & Channel.MBR_TX_SEQ) == Channel.MBR_TX_SEQ) membershipService.start(MembershipService.MBR_TX); + membershipService.start(MembershipService.MBR_RX); + membershipService.start(MembershipService.MBR_TX); + this.started = true; }catch ( ChannelException cx ) { throw cx; }catch ( Exception x ) { @@ -102,14 +106,16 @@ * SND_RX_SEQ - starts the replication receiver<BR> * @throws ChannelException if a startup error occurs or the service is already started. */ - public void stop(int svc) throws ChannelException { + public void stop() throws ChannelException { try { - if ( (svc & Channel.MBR_RX_SEQ) == Channel.MBR_RX_SEQ) membershipService.stop(); - if ( (svc & Channel.SND_RX_SEQ) == Channel.SND_RX_SEQ) clusterReceiver.stop(); - if ( (svc & Channel.SND_TX_SEQ) == Channel.SND_TX_SEQ) clusterSender.stop(); - if ( (svc & Channel.MBR_TX_SEQ) == Channel.MBR_RX_SEQ) membershipService.stop(); + membershipService.stop(); + clusterReceiver.stop(); + clusterSender.stop(); + membershipService.stop(); }catch ( Exception x ) { throw new ChannelException(x); + } finally { + started = false; } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=385742&r1=385741&r2=385742&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Mon Mar 13 21:44:46 2006 @@ -173,7 +173,7 @@ * @throws ChannelException if a startup error occurs or the service is already started. */ public void start(int svc) throws ChannelException { - coordinator.start(svc); + coordinator.start(); } /** @@ -188,7 +188,7 @@ * @throws ChannelException if a startup error occurs or the service is already started. */ public void stop(int svc) throws ChannelException { - coordinator.stop(svc); + coordinator.stop(); } public ChannelReceiver getChannelReceiver() { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?rev=385742&r1=385741&r2=385742&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java Mon Mar 13 21:44:46 2006 @@ -60,12 +60,14 @@ public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { for ( int i=0; i<destination.length; i++ ) { - ChannelMessage tmp = msg.clone(); int nr = incCounter(destination[i]); //reduce byte copy - //tmp.getMessage().append(XByteBuffer.toBytes(nr),0,4); - tmp.getMessage().append(nr); - getNext().sendMessage(new Member[] {destination[i]}, tmp, payload); + msg.getMessage().append(nr); + try { + getNext().sendMessage(new Member[] {destination[i]}, msg, payload); + }finally { + msg.getMessage().trim(4); + } } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java?rev=385742&r1=385741&r2=385742&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/io/ClusterData.java Mon Mar 13 21:44:46 2006 @@ -212,9 +212,20 @@ } else return false; } + /** + * Create a shallow clone, only the data gets recreated + * @return ClusterData + */ public ClusterData clone() { - byte[] d = this.getDataPackage(); - return ClusterData.getDataFromPackage(d); +// byte[] d = this.getDataPackage(); +// return ClusterData.getDataFromPackage(d); + ClusterData clone = new ClusterData(false); + clone.options = this.options; + clone.message = new XByteBuffer(this.message.getBytesDirect(),false); + clone.timestamp = this.timestamp; + clone.uniqueId = this.uniqueId; + clone.address = this.address; + return clone; } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java?rev=385742&r1=385741&r2=385742&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastMember.java Mon Mar 13 21:44:46 2006 @@ -207,7 +207,7 @@ int dlen = XByteBuffer.toInt(dlend, 0); byte[] domaind = new byte[dlen]; System.arraycopy(data, 20, domaind, 0, domaind.length); - member.setDomain(new String(domaind)); + member.domain = domaind; member.setHost(addr); member.setPort(XByteBuffer.toInt(portd, 0)); member.setMemberAliveTime(XByteBuffer.toLong(alived, 0)); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastServiceImpl.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastServiceImpl.java?rev=385742&r1=385741&r2=385742&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastServiceImpl.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/mcast/McastServiceImpl.java Mon Mar 13 21:44:46 2006 @@ -210,6 +210,7 @@ McastMember m = McastMember.getMember(data); if(log.isDebugEnabled()) log.debug("Mcast receive ping from member " + m); + if ( membership.memberAlive(m) ) { if(log.isDebugEnabled()) log.debug("Mcast add member " + m); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java?rev=385742&r1=385741&r2=385742&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java Mon Mar 13 21:44:46 2006 @@ -64,6 +64,7 @@ private int tcpThreadCount; private long tcpSelectorTimeout; private Selector selector = null; + private ServerSocketChannel serverChannel = null; private java.net.InetAddress bind; private String tcpListenAddress; @@ -135,6 +136,7 @@ } try { getBind(); + this.bind(); Thread t = new Thread(this, "NioReceiver"); t.setDaemon(true); t.start(); @@ -142,31 +144,63 @@ log.fatal("Unable to start cluster receiver", x); } } - + /** - * get data from channel and store in byte array - * send it to cluster + * recursive bind to find the next available port + * @param socket ServerSocket + * @param portstart int + * @param retries int + * @return int * @throws IOException - * @throws java.nio.channels.ClosedChannelException */ - protected void listen() throws Exception { - if (doListen) { - log.warn("ServerSocketChannel allready started"); - return; + protected int bind(ServerSocket socket, int portstart, int retries) throws IOException { + while ( retries > 0 ) { + try { + InetSocketAddress addr = new InetSocketAddress(getBind(), portstart); + socket.bind(addr); + setTcpListenPort(portstart); + log.info("Nio Server Socket bound to:"+addr); + return 0; + }catch ( IOException x) { + retries--; + if ( retries <= 0 ) throw x; + portstart++; + retries = bind(socket,portstart,retries); + } } - doListen = true; + return retries; + } + + protected void bind() throws IOException { // allocate an unbound server socket channel - ServerSocketChannel serverChannel = ServerSocketChannel.open(); + serverChannel = ServerSocketChannel.open(); // Get the associated ServerSocket to bind it with ServerSocket serverSocket = serverChannel.socket(); // create a new Selector for use below selector = Selector.open(); // set the port the server channel will listen to - serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort())); + //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort())); + bind(serverSocket,getTcpListenPort(),10); // set non-blocking mode for the listening socket serverChannel.configureBlocking(false); // register the ServerSocketChannel with the Selector serverChannel.register(selector, SelectionKey.OP_ACCEPT); + + } + /** + * get data from channel and store in byte array + * send it to cluster + * @throws IOException + * @throws java.nio.channels.ClosedChannelException + */ + protected void listen() throws Exception { + if (doListen) { + log.warn("ServerSocketChannel allready started"); + return; + } + + doListen = true; + while (doListen && selector != null) { // this may block for a long time, upon return the // selected set contains keys of the ready channels --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]