Author: azeez Date: Wed Jun 13 09:36:02 2007 New Revision: 546951 URL: http://svn.apache.org/viewvc?view=rev&rev=546951 Log: Removing the Replicationhandler and invoking the replication logic at the MR level after the business logic is invoked.
Added: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java Removed: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/receivers/AbstractInMessageReceiver.java webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/receivers/AbstractInOutSyncMessageReceiver.java webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/receivers/AbstractMessageReceiver.java webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/receivers/AbstractRobustInMessageReceiver.java Added: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java?view=auto&rev=546951 ============================================================================== --- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java (added) +++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java Wed Jun 13 09:36:02 2007 @@ -0,0 +1,122 @@ +/* + * Copyright 2004,2005 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.axis2.clustering.context; + +import org.apache.axis2.clustering.ClusterManager; +import org.apache.axis2.clustering.ClusteringFault; +import org.apache.axis2.context.*; +import org.apache.axis2.description.AxisOperation; +import org.apache.axis2.engine.AxisConfiguration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.ArrayList; +import java.util.List; + +public final class Replicator { + + private static final Log log = LogFactory.getLog(Replicator.class); + + public static void replicate(MessageContext msgContext) throws ClusteringFault { + + // Do replication only if context replication is enabled. + // Also note that if there are no members, we need not do any replication + ClusterManager clusterManager = + msgContext.getConfigurationContext().getAxisConfiguration().getClusterManager(); + if (clusterManager == null || + clusterManager.getContextManager() == null || + clusterManager.getMemberCount() == 0) { + return; + } + + AxisOperation axisOperation = msgContext.getAxisOperation(); + if (axisOperation == null) { + return; + } + log.debug("Going to replicate state..."); + try { + replicateState(msgContext); + } catch (Exception e) { + String message = "Could not replicate the state"; + throw new ClusteringFault(message, e); + } + } + + private static void replicateState(MessageContext msgContext) throws ClusteringFault { + ConfigurationContext configurationContext = msgContext.getConfigurationContext(); + AxisConfiguration axisConfiguration = configurationContext.getAxisConfiguration(); + ClusterManager clusterManager = axisConfiguration.getClusterManager(); + + if (clusterManager != null) { + + ContextManager contextManager = clusterManager.getContextManager(); + if (contextManager == null) { + String msg = "Cannot replicate contexts since " + + "ContextManager is not specified in the axis2.xml file."; + throw new ClusteringFault(msg); + } + + List contexts = new ArrayList(); + + // Do we need to replicate state stored in ConfigurationContext? + if (!configurationContext.getPropertyDifferences().isEmpty()) { + contexts.add(configurationContext); + } + + // Do we need to replicate state stored in ServiceGroupContext? + ServiceGroupContext sgContext = msgContext.getServiceGroupContext(); + if (sgContext != null && !sgContext.getPropertyDifferences().isEmpty()) { + contexts.add(sgContext); + } + + // Do we need to replicate state stored in ServiceContext? + ServiceContext serviceContext = msgContext.getServiceContext(); + if (serviceContext != null && !serviceContext.getPropertyDifferences().isEmpty()) { + contexts.add(serviceContext); + } + + // Do the actual replication here + if (!contexts.isEmpty()) { + String msgUUID = + contextManager.updateContexts((AbstractContext[]) contexts. + toArray(new AbstractContext[contexts.size()])); + + long start = System.currentTimeMillis(); + + // Wait till all members have ACKed receipt & successful processing of + // the message with UUID 'msgUUID' + do { + + // Wait sometime before checking whether message is ACKed + try { + Thread.sleep(50); + } catch (InterruptedException ignored) { + } + if (System.currentTimeMillis() - start > 40000) { + throw new ClusteringFault("ACKs not received from all members within 40 sec. " + + "Aborting wait."); + } + } while (!contextManager.isMessageAcknowledged(msgUUID)); + } + + } else { + String msg = "Cannot replicate contexts since " + + "ClusterManager is not specified in the axis2.xml file."; + throw new ClusteringFault(msg); + } + } +} Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/receivers/AbstractInMessageReceiver.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/receivers/AbstractInMessageReceiver.java?view=diff&rev=546951&r1=546950&r2=546951 ============================================================================== --- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/receivers/AbstractInMessageReceiver.java (original) +++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/receivers/AbstractInMessageReceiver.java Wed Jun 13 09:36:02 2007 @@ -31,6 +31,7 @@ ThreadContextDescriptor tc = setThreadContext(messageCtx); try { invokeBusinessLogic(messageCtx); + replicateState(messageCtx); } finally { restoreThreadContext(tc); } Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/receivers/AbstractInOutSyncMessageReceiver.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/receivers/AbstractInOutSyncMessageReceiver.java?view=diff&rev=546951&r1=546950&r2=546951 ============================================================================== --- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/receivers/AbstractInOutSyncMessageReceiver.java (original) +++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/receivers/AbstractInOutSyncMessageReceiver.java Wed Jun 13 09:36:02 2007 @@ -37,14 +37,10 @@ ThreadContextDescriptor tc = setThreadContext(msgContext); try { invokeBusinessLogic(msgContext, outMsgContext); + replicateState(msgContext); } finally { restoreThreadContext(tc); } - - AxisEngine engine = - new AxisEngine( - msgContext.getConfigurationContext()); - - engine.send(outMsgContext); + AxisEngine.send(outMsgContext); } } Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/receivers/AbstractMessageReceiver.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/receivers/AbstractMessageReceiver.java?view=diff&rev=546951&r1=546950&r2=546951 ============================================================================== --- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/receivers/AbstractMessageReceiver.java (original) +++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/receivers/AbstractMessageReceiver.java Wed Jun 13 09:36:02 2007 @@ -23,6 +23,8 @@ import org.apache.axiom.soap.SOAPFactory; import org.apache.axis2.AxisFault; import org.apache.axis2.Constants; +import org.apache.axis2.clustering.context.Replicator; +import org.apache.axis2.clustering.ClusteringFault; import org.apache.axis2.context.MessageContext; import org.apache.axis2.context.ServiceContext; import org.apache.axis2.description.AxisService; @@ -47,6 +49,10 @@ public class ThreadContextDescriptor { public ClassLoader oldClassLoader; public MessageContext oldMessageContext; + } + + public void replicateState(MessageContext messageContext) throws ClusteringFault { + Replicator.replicate(messageContext); } /** Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/receivers/AbstractRobustInMessageReceiver.java URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/receivers/AbstractRobustInMessageReceiver.java?view=diff&rev=546951&r1=546950&r2=546951 ============================================================================== --- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/receivers/AbstractRobustInMessageReceiver.java (original) +++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/receivers/AbstractRobustInMessageReceiver.java Wed Jun 13 09:36:02 2007 @@ -31,6 +31,7 @@ ThreadContextDescriptor tc = setThreadContext(messageCtx); try { invokeBusinessLogic(messageCtx); + replicateState(messageCtx); } finally { restoreThreadContext(tc); } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]