Author: azeez Date: Mon Jan 7 06:15:04 2008 New Revision: 609612 URL: http://svn.apache.org/viewvc?rev=609612&view=rev Log: Handle duplicate message receipt
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?rev=609612&r1=609611&r2=609612&view=diff ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Mon Jan 7 06:15:04 2008 @@ -43,29 +43,48 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; public class ChannelListener implements org.apache.catalina.tribes.ChannelListener { private static final Log log = LogFactory.getLog(ChannelListener.class); + /** + * The time a message lives in the receivedMessages Map + */ + private static final int TIME_TO_LIVE = 5 * 60 * 1000; // 5 mins + private DefaultContextManager contextManager; private DefaultConfigurationManager configurationManager; private TribesControlCommandProcessor controlCommandProcessor; private ChannelSender channelSender; private ConfigurationContext configurationContext; + private boolean synchronizeAllMembers; + + private Map receivedMessages = new HashMap(); public ChannelListener(ConfigurationContext configurationContext, DefaultConfigurationManager configurationManager, DefaultContextManager contextManager, TribesControlCommandProcessor controlCommandProcessor, - ChannelSender sender) { + ChannelSender sender, + boolean synchronizeAllMembers) { this.configurationManager = configurationManager; this.contextManager = contextManager; this.controlCommandProcessor = controlCommandProcessor; this.channelSender = sender; this.configurationContext = configurationContext; + this.synchronizeAllMembers = synchronizeAllMembers; + + Timer cleanupTimer = new Timer(); + cleanupTimer.scheduleAtFixedRate(new ReceivedMessageCleanupTask(), + TIME_TO_LIVE, + TIME_TO_LIVE); } public void setContextManager(DefaultContextManager contextManager) { @@ -104,9 +123,10 @@ msg = XByteBuffer.deserialize(message, 0, message.length, - (ClassLoader[])classLoaders.toArray(new ClassLoader[classLoaders.size()])); + (ClassLoader[]) classLoaders.toArray(new ClassLoader[classLoaders.size()])); } catch (Exception e) { - log.error(e); + log.error("Cannot deserialize received message", e); + return; } // If the system has not still been intialized, reject all incoming messages, except the @@ -128,27 +148,57 @@ } private void processMessage(Serializable msg, Member sender) throws ClusteringFault { - //TODO: Reject duplicates that can be received due to retransmissions - //TODO: ACK implosion? + //TODO: Handle ACK implosion? if (msg instanceof ContextClusteringCommand && contextManager != null) { ContextClusteringCommand ctxCmd = (ContextClusteringCommand) msg; + String msgId = ctxCmd.getUniqueId(); + + // Check for duplicate messages and ignore duplicates in order to support at-most-once semantics + if (receivedMessages.containsKey(msgId)) { + log.debug("Received duplicate message " + ctxCmd); + receivedMessages.put(msgId, new Long(System.currentTimeMillis()));// Let's keep track of the message as well as the time at which it was last received + return; + } + receivedMessages.put(msgId, new Long(System.currentTimeMillis()));// Let's keep track of the message as well as the time at which it was first received + + // Process the message contextManager.process(ctxCmd); // Sending ACKs for ContextClusteringCommandCollection or // UpdateContextCommand is sufficient - if (msg instanceof ContextClusteringCommandCollection || - msg instanceof UpdateContextCommand) { - AckCommand ackCmd = new AckCommand(ctxCmd.getUniqueId()); - - // Send the ACK - this.channelSender.sendToMember(ackCmd, sender); + if (synchronizeAllMembers) { // Send ACK only if the relevant cluster config parameter is set + if (msg instanceof ContextClusteringCommandCollection || + msg instanceof UpdateContextCommand) { + AckCommand ackCmd = new AckCommand(msgId); + + // Send the ACK + this.channelSender.sendToMember(ackCmd, sender); + } } } else if (msg instanceof ConfigurationClusteringCommand && configurationManager != null) { configurationManager.process((ConfigurationClusteringCommand) msg); } else if (msg instanceof ControlCommand && controlCommandProcessor != null) { controlCommandProcessor.process((ControlCommand) msg, sender); + } + } + + private class ReceivedMessageCleanupTask extends TimerTask { + + public void run() { + List toBeRemoved = new ArrayList(); + for (Iterator iterator = receivedMessages.keySet().iterator(); iterator.hasNext();) { + String msgId = (String) iterator.next(); + Long recdTime = (Long) receivedMessages.get(msgId); + if (System.currentTimeMillis() - recdTime.longValue() >= TIME_TO_LIVE) { + toBeRemoved.add(msgId); + } + } + for (Iterator iterator = toBeRemoved.iterator(); iterator.hasNext();) { + String msgId = (String) iterator.next(); + receivedMessages.remove(msgId); + } } } } Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=609612&r1=609611&r2=609612&view=diff ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Mon Jan 7 06:15:04 2008 @@ -128,7 +128,8 @@ configurationManager, contextManager, controlCmdProcessor, - sender); + sender, + synchronizeAllMembers()); controlCmdProcessor.setChannelSender(sender); channel = new GroupChannel(); @@ -202,13 +203,13 @@ log.info("Local Tribes Member " + TribesUtil.getLocalHost(channel)); TribesUtil.printMembers(members); - // If configuration management is enabled, get the latest config from a neighbour + // If configuration management is enabled, get the latest config from a neighbour TODO: from the longest living neighbour if (configurationManager != null) { configurationManager.setSender(sender); getInitializationMessage(members, sender, new GetConfigurationCommand()); } - // If context replication is enabled, get the latest state from a neighbour + // If context replication is enabled, get the latest state from a neighbour TODO: from the longest living neighbour if (contextManager != null) { contextManager.setSender(sender); channelListener.setContextManager(contextManager); @@ -233,7 +234,7 @@ ClusteringCommand command) { // If there is at least one member in the Tribe, get the current initialization info from a member Random random = new Random(); - int numberOfTries = 0; // Don't keep on trying infinitely + int numberOfTries = 0; // Don't keep on trying indefinitely // Keep track of members to whom we already sent an initialization command // Do not send another request to these members @@ -246,6 +247,10 @@ // While there are members and GetStateResponseCommand is not received do the following try { members = channel.getMembers(); + + //TODO: Can get longest alive member, willdo with membership awareness + members[0].getMemberAliveTime(); + int memberIndex = random.nextInt(members.length); Member member = members[memberIndex]; if (!sentMembersList.contains(TribesUtil.getHost(member))) { Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java?rev=609612&r1=609611&r2=609612&view=diff ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java Mon Jan 7 06:15:04 2008 @@ -57,4 +57,19 @@ public static String getLocalHost(Channel channel) { return getHost(channel.getLocalMember(true)); } + + public static Member getLongestAliveMember(Member[] members) { + Member longestAliveMember = null; + if (members.length > 0) { + long longestAliveTime = members[0].getMemberAliveTime(); + for (int i = 0; i < members.length; i++) { + Member member = members[i]; + if (longestAliveTime < member.getMemberAliveTime()) { + longestAliveTime = member.getMemberAliveTime(); + longestAliveMember = member; + } + } + } + return longestAliveMember; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]