Author: azeez Date: Wed May 30 04:10:28 2007 New Revision: 542797 URL: http://svn.apache.org/viewvc?view=rev&rev=542797 Log: Introducing a ACKing mechanism. The response is sent to the client only if the state change is successfully replicated across the cluster. Each member has to send an ACK for a particular message received.
Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/AckCommand.java webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java?view=diff&rev=542797&r1=542796&r2=542797 ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java Wed May 30 04:10:28 2007 @@ -15,14 +15,16 @@ */ package org.apache.axis2.clustering.context; +import org.apache.axiom.om.util.UUIDGenerator; import org.apache.axis2.clustering.context.commands.*; +import org.apache.axis2.clustering.tribes.AckManager; import org.apache.axis2.context.*; import org.apache.axis2.deployment.DeploymentConstants; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.axiom.om.util.UUIDGenerator; import java.io.Serializable; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -34,6 +36,26 @@ private static final Log log = LogFactory.getLog(ContextClusteringCommandFactory.class); + public static ContextClusteringCommandCollection + getCommandCollection(AbstractContext[] contexts, + Map excludedReplicationPatterns) { + + ArrayList commands = new ArrayList(contexts.length); + ContextClusteringCommandCollection collection = + new ContextClusteringCommandCollection(commands); + for (int i = 0; i < contexts.length; i++) { + ContextClusteringCommand cmd = getUpdateCommand(contexts[i], + excludedReplicationPatterns, + false); + if (cmd != null) { + commands.add(cmd); + } + } + collection.setUniqueId(UUIDGenerator.getUUID()); + AckManager.addInitialAcknowledgement(collection); + return collection; + } + /** * @param context * @param excludedPropertyPatterns @@ -82,6 +104,8 @@ } if (cmd != null && ((UpdateContextCommand) cmd).isPropertiesEmpty()) { cmd = null; + } else { + AckManager.addInitialAcknowledgement(cmd); } context.clearPropertyDifferences(); // Once we send the diffs, we should clear the diffs return cmd; @@ -175,7 +199,7 @@ if (abstractContext instanceof ServiceGroupContext) { ServiceGroupContext sgCtx = (ServiceGroupContext) abstractContext; ServiceGroupContextCommand cmd = new CreateServiceGroupContextCommand(); - //TODO impl + cmd.setUniqueId(UUIDGenerator.getUUID()); cmd.setServiceGroupName(sgCtx.getDescription().getServiceGroupName()); cmd.setServiceGroupContextId(sgCtx.getId()); return cmd; @@ -183,6 +207,7 @@ ServiceContext serviceCtx = (ServiceContext) abstractContext; ServiceContextCommand cmd = new CreateServiceContextCommand(); ServiceGroupContext sgCtx = (ServiceGroupContext) serviceCtx.getParent(); + cmd.setUniqueId(UUIDGenerator.getUUID()); cmd.setServiceGroupContextId(sgCtx.getId()); cmd.setServiceGroupName(sgCtx.getDescription().getServiceGroupName()); cmd.setServiceName(serviceCtx.getAxisService().getName()); @@ -196,6 +221,7 @@ ServiceGroupContext sgCtx = (ServiceGroupContext) abstractContext; ServiceGroupContextCommand cmd = new DeleteServiceGroupContextCommand(); // TODO: impl + cmd.setUniqueId(UUIDGenerator.getUUID()); cmd.setServiceGroupName(sgCtx.getDescription().getServiceGroupName()); cmd.setServiceGroupContextId(sgCtx.getId()); return cmd; @@ -203,6 +229,7 @@ ServiceContext serviceCtx = (ServiceContext) abstractContext; ServiceContextCommand cmd = new DeleteServiceContextCommand(); // TODO: impl + cmd.setUniqueId(UUIDGenerator.getUUID()); cmd.setServiceGroupName(serviceCtx.getGroupName()); cmd.setServiceName(serviceCtx.getAxisService().getName()); return cmd; Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java?view=diff&rev=542797&r1=542796&r2=542797 ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java Wed May 30 04:10:28 2007 @@ -20,6 +20,8 @@ import org.apache.axis2.AxisFault; import org.apache.axis2.clustering.ClusteringFault; import org.apache.axis2.clustering.MessageSender; +import org.apache.axis2.clustering.tribes.AckManager; +import org.apache.axis2.clustering.tribes.ChannelSender; import org.apache.axis2.clustering.context.commands.ContextClusteringCommandCollection; import org.apache.axis2.context.AbstractContext; import org.apache.axis2.context.ConfigurationContext; @@ -35,56 +37,59 @@ private Map parameters = new HashMap(); - private MessageSender sender; + private ChannelSender sender; private ContextReplicationProcessor processor = new ContextReplicationProcessor(); private Map excludedReplicationPatterns = new HashMap(); - public void setSender(MessageSender sender) { + //TODO: Try how to use an interface + public void setSender(ChannelSender sender) { this.sender = sender; } public DefaultContextManager() { } - public void addContext(final AbstractContext context) throws ClusteringFault { - processor.process(ContextClusteringCommandFactory.getCreateCommand(context)); + public String addContext(final AbstractContext context) throws ClusteringFault { + ContextClusteringCommand cmd = ContextClusteringCommandFactory.getCreateCommand(context); + processor.process(cmd); + return cmd.getUniqueId(); } - public void removeContext(AbstractContext context) throws ClusteringFault { - processor.process(ContextClusteringCommandFactory.getRemoveCommand(context)); + public String removeContext(AbstractContext context) throws ClusteringFault { + ContextClusteringCommand cmd = ContextClusteringCommandFactory.getRemoveCommand(context); + processor.process(cmd); + return cmd.getUniqueId(); } - public void updateContext(AbstractContext context) throws ClusteringFault { - ContextClusteringCommand message = + public String updateContext(AbstractContext context) throws ClusteringFault { + ContextClusteringCommand cmd = ContextClusteringCommandFactory.getUpdateCommand(context, excludedReplicationPatterns, false); - if (message != null) { - processor.process(message); + if (cmd != null) { + processor.process(cmd); + return cmd.getUniqueId(); } + return null; } - public void updateContexts(AbstractContext[] contexts) throws ClusteringFault { - ArrayList commands = new ArrayList(contexts.length); - ContextClusteringCommandCollection collection = - new ContextClusteringCommandCollection(commands); - for (int i = 0; i < contexts.length; i++) { - ContextClusteringCommand cmd = - ContextClusteringCommandFactory.getUpdateCommand(contexts[i], - excludedReplicationPatterns, - false); - if (cmd != null) { - commands.add(cmd); - } - } - processor.process(collection); + public String updateContexts(AbstractContext[] contexts) throws ClusteringFault { + ContextClusteringCommandCollection cmd = + ContextClusteringCommandFactory.getCommandCollection(contexts, + excludedReplicationPatterns); + processor.process(cmd); + return cmd.getUniqueId(); } public boolean isContextClusterable(AbstractContext context) { return (context instanceof ConfigurationContext) || (context instanceof ServiceContext) || (context instanceof ServiceGroupContext); + } + + public boolean isMessageAcknowledged(String messageUniqueId) throws ClusteringFault { + return AckManager.isMessageAcknowledged(messageUniqueId, sender); } public void process(ContextClusteringCommand command) throws ClusteringFault { Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/AckCommand.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/AckCommand.java?view=diff&rev=542797&r1=542796&r2=542797 ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/AckCommand.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/AckCommand.java Wed May 30 04:10:28 2007 @@ -15,13 +15,16 @@ */ package org.apache.axis2.clustering.control; -import org.apache.axis2.clustering.ClusteringCommand; +import org.apache.axis2.clustering.ClusteringFault; +import org.apache.axis2.clustering.tribes.AckManager; +import org.apache.axis2.context.ConfigurationContext; /** - * ACK for the message with id <code>uniqueId</code> + * ACK for the message with id <code>uniqueId</code> */ -public class AckCommand extends ClusteringCommand { +public class AckCommand extends ControlCommand { private String uniqueId; + private String memberId; public AckCommand(String messageUniqueId) { this.uniqueId = messageUniqueId; @@ -31,7 +34,19 @@ return uniqueId; } + public void setMemberId(String memberId) { + this.memberId = memberId; + } + public int getCommandType() { return Integer.MAX_VALUE; + } + + public void execute(ConfigurationContext configurationContext) throws ClusteringFault { + AckManager.addAcknowledgement(uniqueId, memberId); + } + + public String toString() { + return "ACK for message with UUID " + uniqueId; } } Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java?view=diff&rev=542797&r1=542796&r2=542797 ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java Wed May 30 04:10:28 2007 @@ -59,15 +59,17 @@ if (updateCmd != null) { cmdList.add(updateCmd); } - for (Iterator iter2 = sgCtx.getServiceContexts(); iter2.hasNext();) { - ServiceContext serviceCtx = (ServiceContext) iter2.next(); - cmdList.add(ContextClusteringCommandFactory.getCreateCommand(serviceCtx)); - ContextClusteringCommand updateServiceCtxCmd = - ContextClusteringCommandFactory.getUpdateCommand(serviceCtx, - excludedPropPatterns, - true); - if (updateServiceCtxCmd != null) { - cmdList.add(updateServiceCtxCmd); + if (sgCtx.getServiceContexts() != null) { + for (Iterator iter2 = sgCtx.getServiceContexts(); iter2.hasNext();) { + ServiceContext serviceCtx = (ServiceContext) iter2.next(); + cmdList.add(ContextClusteringCommandFactory.getCreateCommand(serviceCtx)); + ContextClusteringCommand updateServiceCtxCmd = + ContextClusteringCommandFactory.getUpdateCommand(serviceCtx, + excludedPropPatterns, + true); + if (updateServiceCtxCmd != null) { + cmdList.add(updateServiceCtxCmd); + } } } } Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java?view=diff&rev=542797&r1=542796&r2=542797 ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java Wed May 30 04:10:28 2007 @@ -35,7 +35,13 @@ public InvocationResponse invoke(MessageContext msgContext) throws AxisFault { log.debug("Going to replicate state on invoke"); - replicateState(msgContext); + try { + replicateState(msgContext); + } catch (Exception e) { + System.err.println("###########################"); + e.printStackTrace(); + System.err.println("###########################"); + } return InvocationResponse.CONTINUE; } @@ -85,10 +91,28 @@ // Do the actual replication here if (!contexts.isEmpty()) { - contextManager. + String msgUUID = contextManager. updateContexts((AbstractContext[]) contexts. toArray(new AbstractContext[contexts.size()])); + + long start = System.currentTimeMillis(); + + // Wait till all members have ACKed receipt & successful processing of + // the message with UUID 'msgUUID' + while (!contextManager.isMessageAcknowledged(msgUUID)) { + if(System.currentTimeMillis() - start > 20000){ + throw new ClusteringFault("ACKs not received from all members within 1 min. " + + "Aborting wait."); + } + try { + Thread.sleep(2); + } catch (InterruptedException e) { + log.error(e); + break; + } + } } + } else { String msg = "Cannot replicate contexts since " + "ClusterManager is not specified in the axis2.xml file."; Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java?view=auto&rev=542797 ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java (added) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java Wed May 30 04:10:28 2007 @@ -0,0 +1,93 @@ +/* + * Copyright 2004,2005 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.axis2.clustering.tribes; + +import org.apache.axis2.clustering.ClusteringFault; +import org.apache.axis2.clustering.context.ContextClusteringCommand; +import org.apache.catalina.tribes.Member; + +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.Vector; + +/** + * + */ +public final class AckManager { + + private static Map messageAckTable = new Hashtable(); + + public static void addInitialAcknowledgement(ContextClusteringCommand command) { + messageAckTable.put(command.getUniqueId(), new MessageACK(command)); + } + + public static void addAcknowledgement(String messageUniqueId, + String memberId) { + MessageACK ack = (MessageACK) messageAckTable.get(messageUniqueId); + if (ack != null) { + List memberList = ack.getMemberList(); + memberList.add(memberId); + } + } + + public static boolean isMessageAcknowledged(String messageUniqueId, + ChannelSender sender) throws ClusteringFault { + boolean isAcknowledged = false; + MessageACK ack = (MessageACK) messageAckTable.get(messageUniqueId); + List memberList = ack.getMemberList(); + + // Check that all members in the memberList are same as the total member list, + // which will indicate that all members have ACKed the message + Member[] members = sender.getChannel().getMembers(); + for (int i = 0; i < members.length; i++) { + Member member = members[i]; + if (!memberList.contains(member.getName())) { + + // At this point, resend the original message back to the node which has not + // sent an ACK + sender.sendToMember(ack.getCommand(), member); + isAcknowledged = false; + break; + } else { + isAcknowledged = true; + } + } + + // If a message is ACKed, we don't have to keep track of it in our ackTbl anymore + if (isAcknowledged) { + messageAckTable.remove(messageUniqueId); + } + return isAcknowledged; + } + + private static class MessageACK { + private ContextClusteringCommand command; + private List memberList = new Vector(); + + public MessageACK(ContextClusteringCommand command) { + this.command = command; + } + + public ContextClusteringCommand getCommand() { + return command; + } + + public List getMemberList() { + return memberList; + } + } +} Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?view=diff&rev=542797&r1=542796&r2=542797 ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Wed May 30 04:10:28 2007 @@ -16,12 +16,17 @@ package org.apache.axis2.clustering.tribes; +import org.apache.axis2.clustering.ClusteringConstants; import org.apache.axis2.clustering.configuration.ConfigurationClusteringCommand; import org.apache.axis2.clustering.configuration.DefaultConfigurationManager; import org.apache.axis2.clustering.context.ContextClusteringCommand; import org.apache.axis2.clustering.context.DefaultContextManager; +import org.apache.axis2.clustering.context.commands.ContextClusteringCommandCollection; +import org.apache.axis2.clustering.context.commands.UpdateContextCommand; import org.apache.axis2.clustering.control.AckCommand; import org.apache.axis2.clustering.control.ControlCommand; +import org.apache.axis2.clustering.control.GetStateResponseCommand; +import org.apache.axis2.context.ConfigurationContext; import org.apache.catalina.tribes.Member; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,7 +54,10 @@ */ private Thread messageProcessor; - public ChannelListener(DefaultConfigurationManager configurationManager, + private ConfigurationContext configurationContext; + + public ChannelListener(ConfigurationContext configurationContext, + DefaultConfigurationManager configurationManager, DefaultContextManager contextManager, TribesControlCommandProcessor controlCommandProcessor, ChannelSender sender) { @@ -57,6 +65,7 @@ this.contextManager = contextManager; this.controlCommandProcessor = controlCommandProcessor; this.sender = sender; + this.configurationContext = configurationContext; startMessageProcessor(); } @@ -68,12 +77,24 @@ this.configurationManager = configurationManager; } + public void setConfigurationContext(ConfigurationContext configurationContext) { + this.configurationContext = configurationContext; + } + public boolean accept(Serializable msg, Member sender) { return true; } public void messageReceived(Serializable msg, Member sender) { - log.debug("Message received : " + msg); + + // If the system has not still been intialized, reject all incoming messages, except the + // GetStateResponseCommand message + if (configurationContext. + getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null + && !(msg instanceof GetStateResponseCommand)) { + return; + } + log.debug("RECEIVED MESSAGE " + msg); synchronized (cmdQueue) { cmdQueue.enqueue(new MemberMessage(msg, sender)); } @@ -129,10 +150,16 @@ if (msg instanceof ContextClusteringCommand && contextManager != null) { ContextClusteringCommand ctxCmd = (ContextClusteringCommand) msg; contextManager.process(ctxCmd); - AckCommand ackCmd = new AckCommand(ctxCmd.getUniqueId()); - // Send the ACK - sender.sendToMember(ackCmd, memberMessage.getSender()); + // Sending ACKs for ContextClusteringCommandCollection or + // UpdateContextCommand is sufficient + if (msg instanceof ContextClusteringCommandCollection || + msg instanceof UpdateContextCommand) { + AckCommand ackCmd = new AckCommand(ctxCmd.getUniqueId()); + + // Send the ACK + sender.sendToMember(ackCmd, memberMessage.getSender()); + } } else if (msg instanceof ConfigurationClusteringCommand && configurationManager != null) { configurationManager.process((ConfigurationClusteringCommand) msg); Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?view=diff&rev=542797&r1=542796&r2=542797 ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Wed May 30 04:10:28 2007 @@ -22,15 +22,11 @@ import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.Member; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; public class ChannelSender implements MessageSender { private Channel channel; - private static final Log log = LogFactory.getLog(ChannelSender.class); - public void sendToGroup(ClusteringCommand msg) throws ClusteringFault { if(channel == null) return; Member[] members = channel.getMembers(); @@ -82,14 +78,5 @@ public void setChannel(Channel channel) { this.channel = channel; - } - - private void printMember(Member member) { - member.getUniqueId(); - log.debug("\n==============================="); - log.debug("Member Name " + member.getName()); - log.debug("Member Host" + member.getHost()); - log.debug("Member Payload" + member.getPayload()); - log.debug("===============================\n"); } } Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?view=diff&rev=542797&r1=542796&r2=542797 ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Wed May 30 04:10:28 2007 @@ -52,6 +52,7 @@ private ManagedChannel channel; private ConfigurationContext configurationContext; private TribesControlCommandProcessor controlCmdProcessor; + private ChannelListener channelListener; public TribesClusterManager() { parameters = new HashMap(); @@ -69,10 +70,11 @@ public void init() throws ClusteringFault { ChannelSender sender = new ChannelSender(); - ChannelListener listener = new ChannelListener(configurationManager, - contextManager, - controlCmdProcessor, - sender); + channelListener = new ChannelListener(configurationContext, + configurationManager, + contextManager, + controlCmdProcessor, + sender); if (configurationManager != null) { configurationManager.setSender(sender); @@ -114,7 +116,7 @@ // tcpFailureDetector.setPrevious(nbc); // channel.addInterceptor(tcpFailureDetector); - channel.addChannelListener(listener); + channel.addChannelListener(channelListener); TribesMembershipListener membershipListener = new TribesMembershipListener(); channel.addMembershipListener(membershipListener); channel.start(Channel.DEFAULT); @@ -122,7 +124,7 @@ if (contextManager != null) { contextManager.setSender(sender); - listener.setContextManager(contextManager); + channelListener.setContextManager(contextManager); Member[] members = channel.getMembers(); TribesUtil.printMembers(members); @@ -220,5 +222,8 @@ public void setConfigurationContext(ConfigurationContext configurationContext) { this.configurationContext = configurationContext; controlCmdProcessor.setConfigurationContext(configurationContext); + if (channelListener != null) { + channelListener.setConfigurationContext(configurationContext); + } } } Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java?view=diff&rev=542797&r1=542796&r2=542797 ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java Wed May 30 04:10:28 2007 @@ -17,6 +17,7 @@ import org.apache.axis2.clustering.ClusteringConstants; import org.apache.axis2.clustering.ClusteringFault; +import org.apache.axis2.clustering.control.AckCommand; import org.apache.axis2.clustering.control.ControlCommand; import org.apache.axis2.clustering.control.GetStateCommand; import org.apache.axis2.clustering.control.GetStateResponseCommand; @@ -58,6 +59,10 @@ GetStateResponseCommand getStateRespCmd = new GetStateResponseCommand(); getStateRespCmd.setCommands(((GetStateCommand) command).getCommands()); channelSender.sendToMember(getStateRespCmd, sender); + } else if (command instanceof AckCommand) { + AckCommand cmd = (AckCommand) command; + cmd.setMemberId(sender.getName()); + cmd.execute(configurationContext); } else { command.execute(configurationContext); } Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java?view=diff&rev=542797&r1=542796&r2=542797 ============================================================================== --- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java (original) +++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java Wed May 30 04:10:28 2007 @@ -17,10 +17,8 @@ import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.MembershipListener; -import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.axis2.context.ConfigurationContext; /** * @@ -28,32 +26,14 @@ public class TribesMembershipListener implements MembershipListener { private static Log log = LogFactory.getLog(TribesMembershipListener.class); -// private ConfigurationContext configContext; public void memberAdded(Member member) { - log.info("New member " + getHostSocket(member) + " added to Tribes group."); - /* TODO: Send state information to this member. - But if all of the members start sending these messages, there is - it is going to be messy. Need to ensure that only one node send this message*/ - -// System.err.println("++++++ IS COORD="+TribesClusterManager.nbc.isCoordinator()); + log.info("New member " + member.getName() + " joined cluster."); + // System.err.println("++++++ IS COORD="+TribesClusterManager.nbc.isCoordinator()); } public void memberDisappeared(Member member) { - log.info("Member " + getHostSocket(member) + " left Tribes group"); + log.info("Member " + member.getName() + " left cluster"); // System.err.println("++++++ IS COORD="+TribesClusterManager.nbc.isCoordinator()); } - - private String getHostSocket(Member member) { - String host = null; - byte[] hostBytes = member.getHost(); - for (int i = 0; i < hostBytes.length; i++) { - host = (host == null) ? ("" + hostBytes[i]) : (host + "." + hostBytes[i]); - } - return host + ":" + member.getPort(); - }/* - - public void setConfigContext(ConfigurationContext configContext) { - this.configContext = configContext; - }*/ } Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java?view=diff&rev=542797&r1=542796&r2=542797 ============================================================================== --- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java (original) +++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java Wed May 30 04:10:28 2007 @@ -18,6 +18,7 @@ package org.apache.axis2.clustering; public final class ClusteringConstants { + private ClusteringConstants() { } Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java?view=diff&rev=542797&r1=542796&r2=542797 ============================================================================== --- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java (original) +++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java Wed May 30 04:10:28 2007 @@ -30,17 +30,19 @@ * This method is called when a new [EMAIL PROTECTED] AbstractContext} is added to the system * * @param context + * @return The UUID of the message that was sent to the group communications framework * @throws ClusteringFault */ - void addContext(AbstractContext context) throws ClusteringFault; + String addContext(AbstractContext context) throws ClusteringFault; /** * This method is called when a new [EMAIL PROTECTED] AbstractContext} is removed from the system * * @param context + * @return The UUID of the message that was sent to the group communications framework * @throws ClusteringFault */ - void removeContext(AbstractContext context) throws ClusteringFault; + String removeContext(AbstractContext context) throws ClusteringFault; /** * This method is called when properties in an [EMAIL PROTECTED] AbstractContext} are updated. @@ -48,9 +50,10 @@ * removal of properties. * * @param context + * @return The UUID of the message that was sent to the group communications framework * @throws ClusteringFault */ - void updateContext(AbstractContext context) throws ClusteringFault; + String updateContext(AbstractContext context) throws ClusteringFault; /** * This method is called when properties in a collection of [EMAIL PROTECTED] AbstractContext}s are updated. @@ -58,17 +61,27 @@ * removal of properties. * * @param contexts + * @return The UUID of the message that was sent to the group communications framework * @throws ClusteringFault */ - void updateContexts(AbstractContext[] contexts) throws ClusteringFault; + String updateContexts(AbstractContext[] contexts) throws ClusteringFault; /** - * * @param context * @return True - if the provided [EMAIL PROTECTED] AbstractContext} is clusterable * @throws ClusteringFault */ boolean isContextClusterable(AbstractContext context) throws ClusteringFault; + + /** + * Indicates whether a particular message has been ACKed by all members of a cluster + * + * @param messageUniqueId + * @return true - if all memebers have ACKed the message with ID <code>messageUniqueId</code> + * false - otherwise + * @throws ClusteringFault + */ + boolean isMessageAcknowledged(String messageUniqueId) throws ClusteringFault; /** * @param listener --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]