Author: azeez Date: Mon Jan 7 08:06:51 2008 New Revision: 609656 URL: http://svn.apache.org/viewvc?rev=609656&view=rev Log: 1. More improvements to membership management 2. First try to get the config & state information from the longest living member of the group, when a node comes up
Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java?rev=609656&r1=609655&r2=609656&view=diff ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java Mon Jan 7 08:06:51 2008 @@ -45,7 +45,7 @@ * When a particular member send an ACK for a particular message, the ACK is stored here * * @param messageUniqueId ID of the message being ACKed - * @param memberId The ID of the member who ACKed the above message + * @param memberId The ID of the member who ACKed the above message */ public static void addAcknowledgement(String messageUniqueId, String memberId) { @@ -67,9 +67,9 @@ * and then return false. * * @param messageUniqueId ID of the message being ACKed - * @param sender + * @param sender The utility for sending the message * @return true - if all members have ACKed the message, false - otherwise - * @throws ClusteringFault + * @throws ClusteringFault If an error occurs while retrannsmitting a message */ public static boolean isMessageAcknowledged(String messageUniqueId, ChannelSender sender) throws ClusteringFault { @@ -86,7 +86,7 @@ // Check that all members in the memberList are same as the total member list, // which will indicate that all members have ACKed the message - Member[] members = sender.getChannel().getMembers(); + Member[] members = MembershipManager.getMembers(); if (members.length == 0) { isAcknowledged = true; } else { Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=609656&r1=609655&r2=609656&view=diff ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Mon Jan 7 08:06:51 2008 @@ -45,67 +45,46 @@ } long timeToSend = 0; - // Keep retrying, since at the point of trying to send the msg, a member may leave the group - // causing a view change. All nodes in a view should get the msg - //TODO: Sometimes Tribes incorrectly detects that a member has left a group - while (true) { - if (channel.getMembers().length > 0) { - try { - long start = System.currentTimeMillis(); - channel.send(channel.getMembers(), toByteMessage(msg), Channel.SEND_OPTIONS_USE_ACK); - timeToSend = System.currentTimeMillis() - start; - log.debug("Sent " + msg + " to group"); - break; - } catch (NotSerializableException e) { - String message = "Could not send command message " + msg + - " to group since it is not serializable."; - log.error(message, e); - throw new ClusteringFault(message, e); - } catch (Exception e) { - String message = "Error sending command message : " + msg + - ". Reason " + e.getMessage(); - log.warn(message, e); - } - } else { - break; - } - } - return timeToSend; - } - - public long sendToGroup(ClusteringCommand msg, Member[] members) throws ClusteringFault { - if (channel == null) { - return 0; - } - long timeToSend = 0; + Member[] members = MembershipManager.getMembers(); // Keep retrying, since at the point of trying to send the msg, a member may leave the group // causing a view change. All nodes in a view should get the msg //TODO: Sometimes Tribes incorrectly detects that a member has left a group - while (true) { - if (channel.getMembers().length > 0) { - try { - long start = System.currentTimeMillis(); - channel.send(channel.getMembers(), toByteMessage(msg), Channel.SEND_OPTIONS_USE_ACK); - timeToSend = System.currentTimeMillis() - start; - log.debug("Sent " + msg + " to group"); - break; - } catch (NotSerializableException e) { - String message = "Could not send command message " + msg + - " to group since it is not serializable."; - log.error(message, e); - throw new ClusteringFault(message, e); - } catch (ChannelException e) { - - } catch (Exception e) { - String message = "Error sending command message : " + msg + - ". Reason " + e.getMessage(); - log.warn(message, e); +// while (true) { + if (members.length > 0) { + try { + long start = System.currentTimeMillis(); + channel.send(members, toByteMessage(msg), Channel.SEND_OPTIONS_USE_ACK); + timeToSend = System.currentTimeMillis() - start; + log.debug("Sent " + msg + " to group"); +// break; + } catch (NotSerializableException e) { + String message = "Could not send command message " + msg + + " to group since it is not serializable."; + log.error(message, e); + throw new ClusteringFault(message, e); + } catch (ChannelException e) { + //TODO: What to do for faulty members + ChannelException.FaultyMember[] faultyMembers = e.getFaultyMembers(); + for (int i = 0; i < faultyMembers.length; i++) { + ChannelException.FaultyMember faultyMember = faultyMembers[i]; + Member member = faultyMember.getMember(); + log.error("Member " + TribesUtil.getHost(member) + " is faulty. Cause " + + faultyMember.getCause(), faultyMember.getCause()); + + //TODO: Shall we try to resend to these members? } - } else { - break; + + } catch (Exception e) { + String message = "Error sending command message : " + msg + + ". Reason " + e.getMessage(); + log.warn(message, e); } } +// else { +// break; +// } +// } return timeToSend; } @@ -146,6 +125,10 @@ " since it is not serializable."; log.error(message, e); throw new ClusteringFault(message, e); + } catch (ChannelException e) { + ChannelException.FaultyMember[] faultyMembers = e.getFaultyMembers(); + log.error("Member " + TribesUtil.getHost(member) + " is faulty. Cause " + + faultyMembers[0].getCause(), faultyMembers[0].getCause()); } catch (Exception e) { String message = "Could not send message to " + TribesUtil.getHost(member) + ". Reason " + e.getMessage(); Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=609656&view=auto ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (added) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Mon Jan 7 08:06:51 2008 @@ -0,0 +1,65 @@ +/* + * Copyright 2004,2005 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.axis2.clustering.tribes; + +import org.apache.catalina.tribes.Member; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +/** + * Responsible for managing the membership + */ +public class MembershipManager { + private static final List members = new ArrayList(); + + public synchronized static void memberAdded(Member member) { + members.add(member); + } + + public synchronized static void memberDisappeared(Member member) { + members.remove(member); + } + + public synchronized static Member[] getMembers() { + return (Member[]) members.toArray(new Member[members.size()]); + } + + public synchronized static Member getLongestAliveMember() { + Member longestAliveMember = null; + if (members.size() > 0) { + long longestAliveTime = ((Member) members.get(0)).getMemberAliveTime(); + for (int i = 0; i < members.size(); i++) { + Member member = (Member) members.get(i); + if (longestAliveTime < member.getMemberAliveTime()) { + longestAliveTime = member.getMemberAliveTime(); + longestAliveMember = member; + } + } + } + return longestAliveMember; + } + + public synchronized static Member getRandomMember() { + if(members.size() == 0){ + return null; + } + int memberIndex = new Random().nextInt(members.size()); + return (Member) members.get(memberIndex); + } + +} Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=609656&r1=609655&r2=609656&view=diff ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Mon Jan 7 08:06:51 2008 @@ -22,7 +22,6 @@ import org.apache.axiom.om.OMElement; import org.apache.axis2.AxisFault; import org.apache.axis2.clustering.ClusterManager; -import org.apache.axis2.clustering.ClusteringCommand; import org.apache.axis2.clustering.ClusteringConstants; import org.apache.axis2.clustering.ClusteringFault; import org.apache.axis2.clustering.RequestBlockingHandler; @@ -31,6 +30,7 @@ import org.apache.axis2.clustering.context.ClusteringContextListener; import org.apache.axis2.clustering.context.ContextManager; import org.apache.axis2.clustering.context.DefaultContextManager; +import org.apache.axis2.clustering.control.ControlCommand; import org.apache.axis2.clustering.control.GetConfigurationCommand; import org.apache.axis2.clustering.control.GetStateCommand; import org.apache.axis2.context.ConfigurationContext; @@ -55,7 +55,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Random; public class TribesClusterManager implements ClusterManager { private static final Log log = LogFactory.getLog(TribesClusterManager.class); @@ -199,21 +198,21 @@ } sender.setChannel(channel); - Member[] members = channel.getMembers(); +// Member[] members = channel.getMembers(); log.info("Local Tribes Member " + TribesUtil.getLocalHost(channel)); - TribesUtil.printMembers(members); + TribesUtil.printMembers(); // If configuration management is enabled, get the latest config from a neighbour TODO: from the longest living neighbour if (configurationManager != null) { configurationManager.setSender(sender); - getInitializationMessage(members, sender, new GetConfigurationCommand()); + getInitializationMessage(sender, new GetConfigurationCommand()); } // If context replication is enabled, get the latest state from a neighbour TODO: from the longest living neighbour if (contextManager != null) { contextManager.setSender(sender); channelListener.setContextManager(contextManager); - getInitializationMessage(members, sender, new GetStateCommand()); + getInitializationMessage(sender, new GetStateCommand()); ClusteringContextListener contextListener = new ClusteringContextListener(sender); configurationContext.addContextListener(contextListener); } @@ -225,20 +224,18 @@ * Get some information from a neighbour. This information will be used by this node to * initialize itself * - * @param members - * @param sender - * @param command + * @param sender The utility for sending messages to the channel + * @param command The control command to send */ - private void getInitializationMessage(Member[] members, - ChannelSender sender, - ClusteringCommand command) { - // If there is at least one member in the Tribe, get the current initialization info from a member - Random random = new Random(); + private void getInitializationMessage(ChannelSender sender, ControlCommand command) { + // If there is at least one member in the cluster, + // get the current initialization info from a member int numberOfTries = 0; // Don't keep on trying indefinitely // Keep track of members to whom we already sent an initialization command // Do not send another request to these members List sentMembersList = new ArrayList(); + Member[] members = MembershipManager.getMembers(); while (members.length > 0 && configurationContext. getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null @@ -246,13 +243,9 @@ // While there are members and GetStateResponseCommand is not received do the following try { - members = channel.getMembers(); - - //TODO: Can get longest alive member, willdo with membership awareness - members[0].getMemberAliveTime(); - - int memberIndex = random.nextInt(members.length); - Member member = members[memberIndex]; + Member member = (numberOfTries == 0) ? + MembershipManager.getLongestAliveMember() : // First try to get from the longest alive member + MembershipManager.getRandomMember(); // Else get from a random member if (!sentMembersList.contains(TribesUtil.getHost(member))) { long tts = sender.sendToMember(command, member); configurationContext. @@ -260,12 +253,13 @@ new Long(tts)); sentMembersList.add(TribesUtil.getHost(member)); log.debug("WAITING FOR STATE INITIALIZATION MESSAGE..."); - Thread.sleep(tts + 5); + Thread.sleep(tts + 5 * (numberOfTries + 1)); } } catch (Exception e) { log.error(e); break; } + members = MembershipManager.getMembers(); numberOfTries++; } } @@ -334,9 +328,6 @@ public boolean synchronizeAllMembers() { Parameter syncAllParam = getParameter(ClusteringConstants.SYNCHRONIZE_ALL_MEMBERS); - if (syncAllParam == null) { - return true; - } - return Boolean.parseBoolean((String) syncAllParam.getValue()); + return syncAllParam == null || Boolean.parseBoolean((String) syncAllParam.getValue()); } } Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java?rev=609656&r1=609655&r2=609656&view=diff ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java Mon Jan 7 08:06:51 2008 @@ -32,11 +32,14 @@ public void memberAdded(Member member) { log.info("New member " + TribesUtil.getHost(member) + " joined cluster."); + MembershipManager.memberAdded(member); // System.err.println("++++++ IS COORD="+TribesClusterManager.nbc.isCoordinator()); } public void memberDisappeared(Member member) { log.info("Member " + TribesUtil.getHost(member) + " left cluster"); + MembershipManager.memberDisappeared(member); + // System.err.println("++++++ IS COORD="+TribesClusterManager.nbc.isCoordinator()); } } Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java?rev=609656&r1=609655&r2=609656&view=diff ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java Mon Jan 7 08:06:51 2008 @@ -28,7 +28,8 @@ private static Log log = LogFactory.getLog(TribesUtil.class); - public static void printMembers(Member[] members) { + public static void printMembers() { + Member[] members = MembershipManager.getMembers(); if (members != null) { int length = members.length; if (length > 0) { @@ -56,20 +57,5 @@ public static String getLocalHost(Channel channel) { return getHost(channel.getLocalMember(true)); - } - - public static Member getLongestAliveMember(Member[] members) { - Member longestAliveMember = null; - if (members.length > 0) { - long longestAliveTime = members[0].getMemberAliveTime(); - for (int i = 0; i < members.length; i++) { - Member member = members[i]; - if (longestAliveTime < member.getMemberAliveTime()) { - longestAliveTime = member.getMemberAliveTime(); - longestAliveMember = member; - } - } - } - return longestAliveMember; } } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]