Author: charith
Date: Thu May  5 05:06:45 2011
New Revision: 1099682

URL: http://svn.apache.org/viewvc?rev=1099682&view=rev
Log:
adding MultiXML seriealisation for Message processors

Modified:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationSerializer.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationSerializer.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationSerializer.java?rev=1099682&r1=1099681&r2=1099682&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationSerializer.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationSerializer.java
 Thu May  5 05:06:45 2011
@@ -28,6 +28,7 @@ import org.apache.synapse.config.xml.eve
 import org.apache.synapse.config.xml.endpoints.EndpointSerializer;
 import org.apache.synapse.endpoints.Template;
 import org.apache.synapse.mediators.template.TemplateMediator;
+import org.apache.synapse.message.processors.MessageProcessor;
 import org.apache.synapse.registry.Registry;
 import org.apache.synapse.core.axis2.ProxyService;
 import org.apache.synapse.eventing.SynapseEventSource;
@@ -105,6 +106,7 @@ public class MultiXMLConfigurationSerial
             serializeExecutors(synapseConfig.getPriorityExecutors().values(),
                     synapseConfig, definitions);
             serializeMessageStores(synapseConfig.getMessageStores().values(), 
definitions);
+            
serializeMessageProcessors(synapseConfig.getMessageProcessors().values(),definitions);
             serializeSynapseXML(definitions);
 
             serializationDone = true;
@@ -163,6 +165,8 @@ public class MultiXMLConfigurationSerial
         Collection localEntries = synapseConfig.getLocalRegistry().values();
         Collection<PriorityExecutor> executors = 
synapseConfig.getPriorityExecutors().values();
         Collection<MessageStore> messageStores = 
synapseConfig.getMessageStores().values();
+        Collection<MessageProcessor> messageProcessors =
+                synapseConfig.getMessageProcessors().values();
 
         for (ProxyService service : proxyServices) {
             if (service.getFileName() == null) {
@@ -231,6 +235,12 @@ public class MultiXMLConfigurationSerial
             }
         }
 
+        for(MessageProcessor messageProcessor : messageProcessors) {
+            if(messageProcessor.getFileName() == null) {
+                
MessageProcessorSerializer.serializeMessageProcessor(definitions,messageProcessor);
+            }
+        }
+
         serializeSynapseXML(definitions);
     }
 
@@ -469,6 +479,29 @@ public class MultiXMLConfigurationSerial
         return messageStoreElem;
     }
 
+
+     public OMElement serializeMessageProcessor(MessageProcessor 
messageProcessor,
+                                           OMElement parent) throws Exception {
+
+        File messageProcessorDir = createDirectory(currentDirectory,
+                MultiXMLConfigurationBuilder.MESSAGE_PROCESSOR_DIR);
+        OMElement messageProcessorElem = 
MessageProcessorSerializer.serializeMessageProcessor(null,
+                messageProcessor);
+
+        String fileName = messageProcessor.getFileName();
+        if (fileName != null) {
+            // TODO: Call handleDeployment
+            File messageProcessorFile = new File(messageProcessorDir , 
fileName);
+            writeToFile(messageProcessorElem , messageProcessorFile);
+
+        } else if (parent != null) {
+            parent.addChild(messageProcessorElem);
+        }
+
+        return messageProcessorElem;
+    }
+
+
     private void writeToFile(OMElement content, File file) throws Exception {
         File tempFile = File.createTempFile("syn_mx_", ".xml");
         OutputStream out = new FileOutputStream(tempFile);
@@ -537,6 +570,13 @@ public class MultiXMLConfigurationSerial
         }
     }
 
+    private void serializeMessageProcessors(Collection<MessageProcessor> 
messageProcessors,
+                                         OMElement parent) throws Exception{
+        for (MessageProcessor messageProcessor : messageProcessors) {
+            serializeMessageProcessor(messageProcessor, parent);
+        }
+    }
+
     private void serializeSynapseXML(OMElement definitions) throws Exception {
         File synapseXML = new File(currentDirectory, 
SynapseConstants.SYNAPSE_XML);
         if (!currentDirectory.exists()) {

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java?rev=1099682&r1=1099681&r2=1099682&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java
 Thu May  5 05:06:45 2011
@@ -17,8 +17,11 @@
 */
 package org.apache.synapse.mediators.store;
 
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.synapse.Mediator;
 import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseConstants;
 import org.apache.synapse.core.axis2.Axis2MessageContext;
 import org.apache.synapse.mediators.AbstractMediator;
 import org.apache.synapse.message.store.MessageStore;
@@ -58,6 +61,24 @@ public class MessageStoreMediator extend
                 if(log.isDebugEnabled()) {
                     log.debug("Message Store mediator storing the message : \n 
" + synCtx.getEnvelope());
                 }
+
+                // Here we set the server name in the message context before 
storing the message.
+                //This can be used by the Processors in a clustering setup.
+                if(synCtx instanceof Axis2MessageContext) {
+
+                    String serverName =
+                                        
getAxis2ParameterValue(((Axis2MessageContext)synCtx).
+                                        getAxis2MessageContext().
+                                        
getConfigurationContext().getAxisConfiguration(),
+                                        
SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
+                    if(serverName != null) {
+                        
synCtx.setProperty(SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME,
+                                serverName);
+                    }
+
+                }
+
+
                 messageStore.offer(synCtx);
 
                 // with the nio transport, this causes the listener not to 
write a 202
@@ -95,4 +116,26 @@ public class MessageStoreMediator extend
     public void setOnStoreSequence(String  onStoreSequence) {
         this.onStoreSequence = onStoreSequence;
     }
+
+     /**
+     * Helper method to get a value of a parameters in the AxisConfiguration
+     *
+     * @param axisConfiguration AxisConfiguration instance
+     * @param paramKey The name / key of the parameter
+     * @return The value of the parameter
+     */
+    private static String getAxis2ParameterValue(AxisConfiguration 
axisConfiguration,
+                                                 String paramKey) {
+
+        Parameter parameter = axisConfiguration.getParameter(paramKey);
+        if (parameter == null) {
+            return null;
+        }
+        Object value = parameter.getValue();
+        if (value != null && value instanceof String) {
+            return (String) parameter.getValue();
+        } else {
+            return null;
+        }
+    }
 }

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java?rev=1099682&r1=1099681&r2=1099682&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
 Thu May  5 05:06:45 2011
@@ -18,12 +18,13 @@
  */
 package org.apache.synapse.message.processors.forward;
 
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.synapse.FaultHandler;
-import org.apache.synapse.Mediator;
-import org.apache.synapse.MessageContext;
-import org.apache.synapse.SynapseArtifact;
+import org.apache.synapse.*;
+import org.apache.synapse.config.SynapseConfiguration;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
 import org.apache.synapse.endpoints.AddressEndpoint;
 import org.apache.synapse.endpoints.Endpoint;
 import org.apache.synapse.mediators.MediatorFaultHandler;
@@ -86,6 +87,27 @@ public class ForwardingJob implements St
 
             MessageContext messageContext = messageStore.peek();
             if (messageContext != null) {
+
+
+                //If The Message not belongs to this server we ignore it.
+                String serverName = (String)
+                        
messageContext.getProperty(SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
+
+                if(serverName != null && messageContext instanceof 
Axis2MessageContext) {
+
+                    AxisConfiguration configuration = 
((Axis2MessageContext)messageContext).
+                            getAxis2MessageContext().
+                            getConfigurationContext().getAxisConfiguration();
+
+                    String myServerName = getAxis2ParameterValue(configuration,
+                            SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
+
+                    if(!serverName.equals(myServerName)) {
+                        return;
+                    }
+
+                }
+
                 Set proSet = messageContext.getPropertyKeySet();
 
                 if (proSet != null) {
@@ -219,4 +241,27 @@ public class ForwardingJob implements St
         return sender;
     }
 
+
+    /**
+     * Helper method to get a value of a parameters in the AxisConfiguration
+     *
+     * @param axisConfiguration AxisConfiguration instance
+     * @param paramKey The name / key of the parameter
+     * @return The value of the parameter
+     */
+    private static String getAxis2ParameterValue(AxisConfiguration 
axisConfiguration,
+                                                 String paramKey) {
+
+        Parameter parameter = axisConfiguration.getParameter(paramKey);
+        if (parameter == null) {
+            return null;
+        }
+        Object value = parameter.getValue();
+        if (value != null && value instanceof String) {
+            return (String) parameter.getValue();
+        } else {
+            return null;
+        }
+    }
+
 }


Reply via email to