Author: azeez Date: Wed Jan 25 08:59:32 2012 New Revision: 1235695 URL: http://svn.apache.org/viewvc?rev=1235695&view=rev Log: Resend MEMBER_LIST message to WKA members who do not respond to the MEMBER_LIST message
Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=1235695&r1=1235694&r2=1235695&view=diff ============================================================================== --- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (original) +++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Wed Jan 25 08:59:32 2012 @@ -37,6 +37,10 @@ import org.apache.commons.logging.LogFac import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * Responsible for managing the membership. Handles membership changes. @@ -68,10 +72,22 @@ public class MembershipManager { private final List<Member> wkaMembers = new ArrayList<Member>(); /** + * List of Well-Known members which have not responded to the MEMBER_LIST message. + * We need to retry sending the MEMBER_LIST message to these members until they respond, + * otherwise, we cannot be sure whether these WKA members added the members in the MEMBER_LIST + */ + private final List<Member> nonRespondingWkaMembers = new CopyOnWriteArrayList<Member>(); + + /** * The member representing this node */ private Member localMember; + /** + * + */ + private boolean isMemberListResponseReceived; + public MembershipManager(ConfigurationContext configContext) { this.configContext = configContext; } @@ -87,9 +103,10 @@ public class MembershipManager { return rpcMembershipChannel; } - public void setStaticMembershipInterceptor( - StaticMembershipInterceptor staticMembershipInterceptor) { + public void setupStaticMembershipManagement(StaticMembershipInterceptor staticMembershipInterceptor) { this.staticMembershipInterceptor = staticMembershipInterceptor; + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + scheduler.scheduleWithFixedDelay(new MemberListSenderTask(), 5, 5, TimeUnit.SECONDS); } public void setGroupManagementAgent(GroupManagementAgent groupManagementAgent) { @@ -157,54 +174,93 @@ public class MembershipManager { } if (shouldAddMember) { + boolean wkaMemberBelongsToLocalDomain = true; if (rpcMembershipChannel != null && isLocalMemberInitialized() && wkaMembers.contains(member)) { // if it is a well-known member log.info("A WKA member " + TribesUtil.getName(member) + " just joined the group. Sending MEMBER_LIST message."); - // send the member list to it - MemberListCommand memListCmd; - try { - memListCmd = new MemberListCommand(); - List<Member> members = new ArrayList<Member>(this.members); - members.add(localMember); // Need to set the local member too - memListCmd.setMembers(members.toArray(new Member[members.size()])); - - Response[] responses = - rpcMembershipChannel.send(new Member[]{member}, memListCmd, - RpcChannel.ALL_REPLY, - Channel.SEND_OPTIONS_ASYNCHRONOUS | - TribesConstants.MEMBERSHIP_MSG_OPTION, 10000); - - // Once a response is received from the WKA member to the MEMBER_LIST message, - // if it does not belong to this domain, simply remove it from the members - if (responses != null && responses.length > 0 && responses[0] != null) { - Member source = responses[0].getSource(); - if (!TribesUtil.areInSameDomain(source, member)) { - if (log.isDebugEnabled()) { - log.debug("WKA Member " + TribesUtil.getName(source) + - " does not belong to local domain " + new String(domain) + - ". Hence removing it from the list."); - } - members.remove(member); - return false; + wkaMemberBelongsToLocalDomain = sendMemberListToWellKnownMember(member); + } + if (wkaMemberBelongsToLocalDomain) { + members.add(member); + if (log.isDebugEnabled()) { + log.debug("Added group member " + TribesUtil.getName(member) + " to domain " + + new String(member.getDomain())); + } + return true; + } + } + return false; + } + + /** + * Task which send MEMBER_LIST messages to WKA members which have not yet responded to the + * MEMBER_LIST message + */ + private class MemberListSenderTask implements Runnable { + public void run() { + try { + if (nonRespondingWkaMembers != null && !nonRespondingWkaMembers.isEmpty()) { + for (Member wkaMember : nonRespondingWkaMembers) { + if (wkaMember != null) { + sendMemberListToWellKnownMember(wkaMember); } } - } catch (Exception e) { - String errMsg = "Could not send MEMBER_LIST to well-known member " + - TribesUtil.getName(member); - log.error(errMsg, e); - throw new RemoteProcessException(errMsg, e); } + } catch (Throwable e) { + log.error("Could not send MemberList to WKA Members", e); } - members.add(member); - if (log.isDebugEnabled()) { - log.debug("Added group member " + TribesUtil.getName(member) + " to domain " + - new String(member.getDomain())); + } + } + + /** + * Send MEMBER_LIST message to WKA member + * + * @param wkaMember The WKA member to whom the MEMBER_LIST has to be sent + * @return true - if the WKA member belongs to the domain of this local member + */ + private boolean sendMemberListToWellKnownMember(Member wkaMember) { + /*if (wkaMember.isFailing() || wkaMember.isSuspect()) { + return false; + }*/ + // send the member list to it + MemberListCommand memListCmd; + try { + memListCmd = new MemberListCommand(); + List<Member> members = new ArrayList<Member>(this.members); + members.add(localMember); // Need to set the local member too + memListCmd.setMembers(members.toArray(new Member[members.size()])); + + Response[] responses = + rpcMembershipChannel.send(new Member[]{wkaMember}, memListCmd, + RpcChannel.ALL_REPLY, + Channel.SEND_OPTIONS_ASYNCHRONOUS | + TribesConstants.MEMBERSHIP_MSG_OPTION, 10000); + + // Once a response is received from the WKA member to the MEMBER_LIST message, + // if it does not belong to this domain, simply remove it from the members + if (responses != null && responses.length > 0 && responses[0] != null) { + nonRespondingWkaMembers.remove(wkaMember); + Member source = responses[0].getSource(); + if (!TribesUtil.areInSameDomain(source, wkaMember)) { + if (log.isDebugEnabled()) { + log.debug("WKA Member " + TribesUtil.getName(source) + + " does not belong to local domain " + new String(domain) + + ". Hence removing it from the list."); + } + return false; + } + } else { // No response from WKA member + nonRespondingWkaMembers.add(wkaMember); } - return true; + } catch (Exception e) { + String errMsg = "Could not send MEMBER_LIST to well-known member " + + TribesUtil.getName(wkaMember); + log.error(errMsg, e); + throw new RemoteProcessException(errMsg, e); } - return false; + return true; } /** @@ -278,6 +334,7 @@ public class MembershipManager { */ public void memberDisappeared(Member member) { members.remove(member); + nonRespondingWkaMembers.remove(member); // Is this an application domain member? if (groupManagementAgent != null) { Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java?rev=1235695&r1=1235694&r2=1235695&view=diff ============================================================================== --- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java (original) +++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java Wed Jan 25 08:59:32 2012 @@ -1,17 +1,17 @@ -/* - * Copyright 2004,2005 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. +/* + * Copyright 2004,2005 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.axis2.clustering.tribes; @@ -289,7 +289,7 @@ public class WkaBasedMembershipScheme im log.debug("Adding Interceptors..."); } TcpPingInterceptor tcpPingInterceptor = new TcpPingInterceptor(); - tcpPingInterceptor.setInterval(100); + tcpPingInterceptor.setInterval(10000); channel.addInterceptor(tcpPingInterceptor); if (log.isDebugEnabled()) { log.debug("Added TCP Ping Interceptor"); @@ -299,7 +299,7 @@ public class WkaBasedMembershipScheme im TcpFailureDetector tcpFailureDetector = new TcpFailureDetector(); // tcpFailureDetector.setPrevious(dfi); //TODO: check this tcpFailureDetector.setReadTestTimeout(120000); - tcpFailureDetector.setConnectTimeout(60000); + tcpFailureDetector.setConnectTimeout(180000); channel.addInterceptor(tcpFailureDetector); if (log.isDebugEnabled()) { log.debug("Added TCP Failure Detector"); @@ -310,7 +310,7 @@ public class WkaBasedMembershipScheme im staticMembershipInterceptor = new StaticMembershipInterceptor(); staticMembershipInterceptor.setLocalMember(primaryMembershipManager.getLocalMember()); - primaryMembershipManager.setStaticMembershipInterceptor(staticMembershipInterceptor); + primaryMembershipManager.setupStaticMembershipManagement(staticMembershipInterceptor); channel.addInterceptor(staticMembershipInterceptor); if (log.isDebugEnabled()) { log.debug("Added Static Membership Interceptor"); @@ -350,7 +350,7 @@ public class WkaBasedMembershipScheme im // Have multiple RPC channels with multiple RPC request handlers for each localDomain // This is needed only when this member is running as a load balancer for (MembershipManager appDomainMembershipManager : applicationDomainMembershipManagers) { - appDomainMembershipManager.setStaticMembershipInterceptor(staticMembershipInterceptor); + appDomainMembershipManager.setupStaticMembershipManagement(staticMembershipInterceptor); // Create an RpcChannel for each localDomain String domain = new String(appDomainMembershipManager.getDomain()); @@ -377,7 +377,6 @@ public class WkaBasedMembershipScheme im // Send JOIN message to a WKA member if (primaryMembershipManager.getMembers().length > 0) { - log.info("Sending JOIN message to WKA members..."); org.apache.catalina.tribes.Member[] wkaMembers = primaryMembershipManager.getMembers(); // The well-known members /*try { Thread.sleep(3000); // Wait for sometime so that the WKA members can receive the MEMBER_LIST message, if they have just joined the group @@ -386,6 +385,7 @@ public class WkaBasedMembershipScheme im Response[] responses = null; do { try { + log.info("Sending JOIN message to WKA members..."); responses = rpcMembershipChannel.send(wkaMembers, new JoinGroupCommand(), RpcChannel.ALL_REPLY, @@ -394,10 +394,8 @@ public class WkaBasedMembershipScheme im 10000); if (responses.length == 0) { try { - if (log.isDebugEnabled()) { - log.debug("No responses received"); - } - Thread.sleep(500); + log.info("No responses received from WKA members"); + Thread.sleep(5000); } catch (InterruptedException ignored) { } }