Author: azeez Date: Mon Jan 14 12:44:05 2008 New Revision: 611922 URL: http://svn.apache.org/viewvc?rev=611922&view=rev Log: 1. Fixing few potential issues in the test cases 2. Makng the methods in MembershipManager non-static 3. Adding an OrderInterceptor to preserve sender ordering 4. Fixing a bug in the AtMostOnceInterceptor
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java?rev=611922&r1=611921&r2=611922&view=diff ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java Mon Jan 14 12:44:05 2008 @@ -15,45 +15,21 @@ */ package org.apache.axis2.clustering.tribes; -import org.apache.catalina.tribes.ChannelException; -import org.apache.catalina.tribes.ChannelInterceptor; import org.apache.catalina.tribes.ChannelMessage; -import org.apache.catalina.tribes.Member; -import org.apache.catalina.tribes.RemoteProcessException; -import org.apache.catalina.tribes.Channel; -import org.apache.catalina.tribes.ByteMessage; -import org.apache.catalina.tribes.util.UUIDGenerator; -import org.apache.catalina.tribes.io.ChannelData; -import org.apache.catalina.tribes.io.XByteBuffer; -import org.apache.catalina.tribes.membership.Membership; -import org.apache.catalina.tribes.membership.MemberImpl; import org.apache.catalina.tribes.group.ChannelInterceptorBase; -import org.apache.catalina.tribes.group.InterceptorPayload; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.axis2.engine.AxisConfiguration; -import org.apache.axis2.description.AxisServiceGroup; -import org.apache.axis2.description.AxisModule; +import java.util.ArrayList; import java.util.HashMap; -import java.util.Arrays; -import java.util.Map; -import java.util.TimerTask; -import java.util.Timer; import java.util.Iterator; import java.util.List; -import java.util.ArrayList; -import java.net.Socket; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; -import java.net.ConnectException; -import java.io.Serializable; +import java.util.Map; /** * Message intereceptor for handling at-most-once message processing semantics */ -public class AtMostOnceInterceptor extends ChannelInterceptorBase { +public class AtMostOnceInterceptor extends ChannelInterceptorBase { private static Log log = LogFactory.getLog(AtMostOnceInterceptor.class); private static final Map receivedMessages = new HashMap(); @@ -64,36 +40,58 @@ private static final int TIMEOUT = 60 * 1000; public AtMostOnceInterceptor() { + Thread cleanupThread = new Thread(new MessageCleanupTask()); + cleanupThread.setPriority(Thread.MIN_PRIORITY); + cleanupThread.start(); + } - TimerTask cleanupTask = new TimerTask() { - public void run() { - List toBeRemoved = new ArrayList(); - for (Iterator iterator = receivedMessages.keySet().iterator(); - iterator.hasNext();) { - ChannelMessage msg = (ChannelMessage) iterator.next(); - long arrivalTime = ((Long) receivedMessages.get(msg)).longValue(); - if (System.currentTimeMillis() - arrivalTime >= TIMEOUT) { - toBeRemoved.add(msg); - } + public void messageReceived(ChannelMessage msg) { + synchronized (receivedMessages) { + if (receivedMessages.get(msg) == null) { // If it is a new message, keep track of it + receivedMessages.put(msg, new Long(System.currentTimeMillis())); + super.messageReceived(msg); + } else { // If it is a duplicate message, discard it. i.e. dont call super.messageReceived + log.info("Duplicate message received from " + TribesUtil.getHost(msg.getAddress())); + } + } + } + + private class MessageCleanupTask implements Runnable { + + public void run() { + while (true) { + try { + Thread.sleep(TIMEOUT); + } catch (InterruptedException e) { + e.printStackTrace(); } - for (Iterator iterator = toBeRemoved.iterator(); iterator.hasNext();) { - ChannelMessage msg = (ChannelMessage) iterator.next(); - receivedMessages.remove(msg); - if (log.isDebugEnabled()) { - log.debug("Cleaned up message "); + try { + List toBeRemoved = new ArrayList(); + synchronized (receivedMessages) { + for (Iterator iterator = receivedMessages.keySet().iterator(); + iterator.hasNext();) { + ChannelMessage msg = (ChannelMessage) iterator.next(); + long arrivalTime = ((Long) receivedMessages.get(msg)).longValue(); + if (System.currentTimeMillis() - arrivalTime >= TIMEOUT) { + toBeRemoved.add(msg); + } + } + long start = System.currentTimeMillis(); + for (Iterator iterator = toBeRemoved.iterator(); iterator.hasNext();) { + ChannelMessage msg = (ChannelMessage) iterator.next(); + receivedMessages.remove(msg); + if (log.isDebugEnabled()) { + log.debug("Cleaned up message "); + } + if(System.currentTimeMillis() - start > 30000){ + break; + } + } } + } catch (Exception e) { + log.error("Exception occurred while trying to cleanup messages", e); } } - }; - new Timer().scheduleAtFixedRate(cleanupTask, TIMEOUT, TIMEOUT); - } - - public void messageReceived(ChannelMessage msg) { - super.messageReceived(msg); - if (receivedMessages.get(msg) == null) { // If it is a new message, keep track of it - receivedMessages.put(msg, new Long(System.currentTimeMillis())); - } else { // If it is a duplicate message, discard it. i.e. dont call super.messageReceived - log.info("Duplicate message received from " + TribesUtil.getHost(msg.getAddress())); } } } Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=611922&r1=611921&r2=611922&view=diff ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Mon Jan 14 12:44:05 2008 @@ -39,9 +39,13 @@ private Log log = LogFactory.getLog(ChannelSender.class); private Channel channel; private boolean synchronizeAllMembers; + private MembershipManager membershipManager; - public ChannelSender(Channel channel, boolean synchronizeAllMembers) { + public ChannelSender(Channel channel, + MembershipManager membershipManager, + boolean synchronizeAllMembers) { this.channel = channel; + this.membershipManager = membershipManager; this.synchronizeAllMembers = synchronizeAllMembers; } @@ -49,7 +53,7 @@ if (channel == null) { return; } - Member[] members = MembershipManager.getMembers(); + Member[] members = membershipManager.getMembers(); // Keep retrying, since at the point of trying to send the msg, a member may leave the group // causing a view change. All nodes in a view should get the msg @@ -57,7 +61,9 @@ try { if (synchronizeAllMembers) { channel.send(members, toByteMessage(msg), - Channel.SEND_OPTIONS_USE_ACK | Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); + Channel.SEND_OPTIONS_USE_ACK | + Channel.SEND_OPTIONS_SYNCHRONIZED_ACK | + TribesClusterManager.MSG_ORDER_OPTION); } else { channel.send(members, toByteMessage(msg), Channel.SEND_OPTIONS_ASYNCHRONOUS); } Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=611922&r1=611921&r2=611922&view=diff ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Mon Jan 14 12:44:05 2008 @@ -25,21 +25,21 @@ * Responsible for managing the membership */ public class MembershipManager { - private static final List members = new ArrayList(); + private final List members = new ArrayList(); - public synchronized static void memberAdded(Member member) { + public synchronized void memberAdded(Member member) { members.add(member); } - public synchronized static void memberDisappeared(Member member) { + public synchronized void memberDisappeared(Member member) { members.remove(member); } - public synchronized static Member[] getMembers() { + public synchronized Member[] getMembers() { return (Member[]) members.toArray(new Member[members.size()]); } - public synchronized static Member getLongestLivingMember() { + public synchronized Member getLongestLivingMember() { Member longestLivingMember = null; if (members.size() > 0) { Member member0 = (Member) members.get(0); @@ -56,11 +56,11 @@ return longestLivingMember; } - public static void removeAllMembers() { + public synchronized void removeAllMembers() { members.clear(); } - public synchronized static Member getRandomMember() { + public synchronized Member getRandomMember() { if (members.size() == 0) { return null; } Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=611922&r1=611921&r2=611922&view=diff ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Mon Jan 14 12:44:05 2008 @@ -49,6 +49,7 @@ import org.apache.catalina.tribes.group.RpcChannel; import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor; import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector; +import org.apache.catalina.tribes.group.interceptors.OrderInterceptor; import org.apache.catalina.tribes.transport.ReceiverBase; import org.apache.catalina.tribes.transport.ReplicationTransmitter; import org.apache.commons.logging.Log; @@ -60,6 +61,7 @@ import java.util.List; public class TribesClusterManager implements ClusterManager { + public static final int MSG_ORDER_OPTION = 512; private static final Log log = LogFactory.getLog(TribesClusterManager.class); private DefaultConfigurationManager configurationManager; @@ -72,6 +74,7 @@ private ControlCommandProcessor controlCmdProcessor; private ChannelListener channelListener; private ChannelSender channelSender; + private MembershipManager membershipManager; public TribesClusterManager() { parameters = new HashMap(); @@ -125,9 +128,9 @@ } } } - + membershipManager = new MembershipManager(); channel = new GroupChannel(); - channelSender = new ChannelSender(channel, synchronizeAllMembers()); + channelSender = new ChannelSender(channel, membershipManager, synchronizeAllMembers()); channelListener = new ChannelListener(configurationContext, configurationManager, contextManager, controlCmdProcessor); @@ -183,20 +186,22 @@ mcastProps.setProperty("tcpListenPort", "4000"); mcastProps.setProperty("tcpListenHost", "127.0.0.1");*/ -// OrderInterceptor orderInterceptor = new OrderInterceptor(); + // Add the OrderInterceptor to preserve sender ordering + OrderInterceptor orderInterceptor = new OrderInterceptor(); + orderInterceptor.setOptionFlag(MSG_ORDER_OPTION); + channel.addInterceptor(orderInterceptor); // Add a AtMostOnceInterceptor to support at-most-once message processing semantics AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor(); channel.addInterceptor(atMostOnceInterceptor); - atMostOnceInterceptor.setPrevious(dfi); // Add a reliable failure detector TcpFailureDetector tcpFailureDetector = new TcpFailureDetector(); - tcpFailureDetector.setPrevious(atMostOnceInterceptor); channel.addInterceptor(tcpFailureDetector); channel.addChannelListener(channelListener); - TribesMembershipListener membershipListener = new TribesMembershipListener(); + + TribesMembershipListener membershipListener = new TribesMembershipListener(membershipManager); channel.addMembershipListener(membershipListener); try { channel.start(Channel.DEFAULT); @@ -219,8 +224,8 @@ new RpcChannel(domain, channel, new InitializationRequestHandler(controlCmdProcessor)); - log.info("Local Tribes Member " + TribesUtil.getLocalHost(channel)); - TribesUtil.printMembers(); + log.info("Local Member " + TribesUtil.getLocalHost(channel)); + TribesUtil.printMembers(membershipManager); // If configuration management is enabled, get the latest config from a neighbour if (configurationManager != null) { @@ -259,13 +264,13 @@ // Do not send another request to these members List sentMembersList = new ArrayList(); sentMembersList.add(TribesUtil.getLocalHost(channel)); - Member[] members = MembershipManager.getMembers(); + Member[] members = membershipManager.getMembers(); if(members.length == 0) return; - while (members.length > 0 && numberOfTries < 50) { + while (members.length > 0 && numberOfTries < 5) { Member member = (numberOfTries == 0) ? - MembershipManager.getLongestLivingMember() : // First try to get from the longest member alive - MembershipManager.getRandomMember(); // Else get from a random member + membershipManager.getLongestLivingMember() : // First try to get from the longest member alive + membershipManager.getRandomMember(); // Else get from a random member String memberHost = TribesUtil.getHost(member); try { if (!sentMembersList.contains(memberHost)) { @@ -274,8 +279,10 @@ RpcChannel.FIRST_REPLY, Channel.SEND_OPTIONS_ASYNCHRONOUS, 10000); - ((ControlCommand) responses[0].getMessage()).execute(configurationContext); // Do the initialization - break; + if (responses.length > 0) { + ((ControlCommand) responses[0].getMessage()).execute(configurationContext); // Do the initialization + break; + } } } catch (ChannelException e) { log.error("Cannot get initialization information from " + @@ -284,10 +291,14 @@ try { Thread.sleep(2000); } catch (InterruptedException ignored) { + log.debug("Interrupted", ignored); } } numberOfTries++; - members = MembershipManager.getMembers(); + members = membershipManager.getMembers(); + if(numberOfTries >= members.length){ + break; + } } } Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java?rev=611922&r1=611921&r2=611922&view=diff ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java Mon Jan 14 12:44:05 2008 @@ -23,22 +23,27 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -/** +/** In * */ public class TribesMembershipListener implements MembershipListener { private static Log log = LogFactory.getLog(TribesMembershipListener.class); + private MembershipManager membershipManager; + + public TribesMembershipListener(MembershipManager membershipManager) { + this.membershipManager = membershipManager; + } public void memberAdded(Member member) { log.info("New member " + TribesUtil.getHost(member) + " joined cluster."); - MembershipManager.memberAdded(member); + membershipManager.memberAdded(member); // System.err.println("++++++ IS COORD="+TribesClusterManager.nbc.isCoordinator()); } public void memberDisappeared(Member member) { log.info("Member " + TribesUtil.getHost(member) + " left cluster"); - MembershipManager.memberDisappeared(member); + membershipManager.memberDisappeared(member); // System.err.println("++++++ IS COORD="+TribesClusterManager.nbc.isCoordinator()); } Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java?rev=611922&r1=611921&r2=611922&view=diff ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java Mon Jan 14 12:44:05 2008 @@ -28,8 +28,8 @@ private static Log log = LogFactory.getLog(TribesUtil.class); - public static void printMembers() { - Member[] members = MembershipManager.getMembers(); + public static void printMembers(MembershipManager membershipManager) { + Member[] members = membershipManager.getMembers(); if (members != null) { int length = members.length; if (length > 0) { Modified: webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java?rev=611922&r1=611921&r2=611922&view=diff ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java (original) +++ webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java Mon Jan 14 12:44:05 2008 @@ -25,7 +25,6 @@ import org.apache.axis2.clustering.context.DefaultContextManager; import org.apache.axis2.clustering.context.DefaultContextManagerListener; import org.apache.axis2.clustering.tribes.TribesClusterManager; -import org.apache.axis2.clustering.tribes.MembershipManager; import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.context.ConfigurationContextFactory; import org.apache.axis2.context.ServiceContext; @@ -36,14 +35,14 @@ import org.apache.axis2.engine.AxisConfiguration; import org.apache.axis2.transport.http.server.HttpUtils; -import java.util.ArrayList; -import java.util.List; -import java.net.MulticastSocket; -import java.net.InetAddress; +import java.io.IOException; import java.net.DatagramPacket; -import java.net.ServerSocket; +import java.net.InetAddress; import java.net.InetSocketAddress; -import java.io.IOException; +import java.net.MulticastSocket; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.List; /** * Tests the replication of properties placed the ConfigurationContext, ServiceGroupContext & @@ -127,7 +126,6 @@ byte buf[] = new byte[1024]; DatagramPacket pack = new DatagramPacket(buf, buf.length); s.receive(pack); - System.out.println("Received data from: " + pack.getAddress().toString() + ":" + pack.getPort() + " with length: " + pack.getLength()); @@ -163,6 +161,13 @@ } }; sender.start(); + + // Join the receiver until we can verify whether multicasting can be done + try { + receiver.join(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } } public static void main(String[] args) { @@ -267,6 +272,7 @@ String val2 = "configCtxVal1"; configurationContext2.setProperty(key2, val2); ctxMan2.updateContext(configurationContext2); + Thread.sleep(1000); String value = (String) configurationContext1.getProperty(key2); assertEquals(val2, value); } @@ -347,7 +353,7 @@ // Remove the property serviceGroupContext2.removeProperty(key1); assertNull(serviceGroupContext2.getProperty(key1)); - ctxMan1.updateContext(serviceGroupContext2); + ctxMan2.updateContext(serviceGroupContext2); assertNull(serviceGroupContext1.getProperty(key1)); } @@ -408,7 +414,7 @@ // Remove the property serviceGroupContext2.removeProperty(key1); assertNull(serviceGroupContext2.getProperty(key1)); - ctxMan1.updateContext(serviceGroupContext2); + ctxMan2.updateContext(serviceGroupContext2); assertNull(serviceGroupContext1.getProperty(key1)); } @@ -502,7 +508,7 @@ // Remove the property serviceContext2.removeProperty(key1); assertNull(serviceContext2.getProperty(key1)); - ctxMan1.updateContext(serviceContext2); + ctxMan2.updateContext(serviceContext2); assertNull(serviceContext1.getProperty(key1)); } @@ -538,7 +544,7 @@ // Remove the property serviceContext2.removeProperty(key1); assertNull(serviceContext2.getProperty(key1)); - ctxMan1.updateContext(serviceContext2); + ctxMan2.updateContext(serviceContext2); assertNull(serviceContext1.getProperty(key1)); } @@ -623,7 +629,7 @@ clusterManager2.shutdown(); System.out.println("------ CLuster-2 shutdown complete ------"); } - MembershipManager.removeAllMembers(); +// MembershipManager.removeAllMembers(); Thread.sleep(500); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]