Author: charith
Date: Thu May 5 05:06:45 2011
New Revision: 1099682
URL: http://svn.apache.org/viewvc?rev=1099682&view=rev
Log:
adding MultiXML seriealisation for Message processors
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationSerializer.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationSerializer.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationSerializer.java?rev=1099682&r1=1099681&r2=1099682&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationSerializer.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationSerializer.java
Thu May 5 05:06:45 2011
@@ -28,6 +28,7 @@ import org.apache.synapse.config.xml.eve
import org.apache.synapse.config.xml.endpoints.EndpointSerializer;
import org.apache.synapse.endpoints.Template;
import org.apache.synapse.mediators.template.TemplateMediator;
+import org.apache.synapse.message.processors.MessageProcessor;
import org.apache.synapse.registry.Registry;
import org.apache.synapse.core.axis2.ProxyService;
import org.apache.synapse.eventing.SynapseEventSource;
@@ -105,6 +106,7 @@ public class MultiXMLConfigurationSerial
serializeExecutors(synapseConfig.getPriorityExecutors().values(),
synapseConfig, definitions);
serializeMessageStores(synapseConfig.getMessageStores().values(),
definitions);
+
serializeMessageProcessors(synapseConfig.getMessageProcessors().values(),definitions);
serializeSynapseXML(definitions);
serializationDone = true;
@@ -163,6 +165,8 @@ public class MultiXMLConfigurationSerial
Collection localEntries = synapseConfig.getLocalRegistry().values();
Collection<PriorityExecutor> executors =
synapseConfig.getPriorityExecutors().values();
Collection<MessageStore> messageStores =
synapseConfig.getMessageStores().values();
+ Collection<MessageProcessor> messageProcessors =
+ synapseConfig.getMessageProcessors().values();
for (ProxyService service : proxyServices) {
if (service.getFileName() == null) {
@@ -231,6 +235,12 @@ public class MultiXMLConfigurationSerial
}
}
+ for(MessageProcessor messageProcessor : messageProcessors) {
+ if(messageProcessor.getFileName() == null) {
+
MessageProcessorSerializer.serializeMessageProcessor(definitions,messageProcessor);
+ }
+ }
+
serializeSynapseXML(definitions);
}
@@ -469,6 +479,29 @@ public class MultiXMLConfigurationSerial
return messageStoreElem;
}
+
+ public OMElement serializeMessageProcessor(MessageProcessor
messageProcessor,
+ OMElement parent) throws Exception {
+
+ File messageProcessorDir = createDirectory(currentDirectory,
+ MultiXMLConfigurationBuilder.MESSAGE_PROCESSOR_DIR);
+ OMElement messageProcessorElem =
MessageProcessorSerializer.serializeMessageProcessor(null,
+ messageProcessor);
+
+ String fileName = messageProcessor.getFileName();
+ if (fileName != null) {
+ // TODO: Call handleDeployment
+ File messageProcessorFile = new File(messageProcessorDir ,
fileName);
+ writeToFile(messageProcessorElem , messageProcessorFile);
+
+ } else if (parent != null) {
+ parent.addChild(messageProcessorElem);
+ }
+
+ return messageProcessorElem;
+ }
+
+
private void writeToFile(OMElement content, File file) throws Exception {
File tempFile = File.createTempFile("syn_mx_", ".xml");
OutputStream out = new FileOutputStream(tempFile);
@@ -537,6 +570,13 @@ public class MultiXMLConfigurationSerial
}
}
+ private void serializeMessageProcessors(Collection<MessageProcessor>
messageProcessors,
+ OMElement parent) throws Exception{
+ for (MessageProcessor messageProcessor : messageProcessors) {
+ serializeMessageProcessor(messageProcessor, parent);
+ }
+ }
+
private void serializeSynapseXML(OMElement definitions) throws Exception {
File synapseXML = new File(currentDirectory,
SynapseConstants.SYNAPSE_XML);
if (!currentDirectory.exists()) {
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java?rev=1099682&r1=1099681&r2=1099682&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java
Thu May 5 05:06:45 2011
@@ -17,8 +17,11 @@
*/
package org.apache.synapse.mediators.store;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.engine.AxisConfiguration;
import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseConstants;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.mediators.AbstractMediator;
import org.apache.synapse.message.store.MessageStore;
@@ -58,6 +61,24 @@ public class MessageStoreMediator extend
if(log.isDebugEnabled()) {
log.debug("Message Store mediator storing the message : \n
" + synCtx.getEnvelope());
}
+
+ // Here we set the server name in the message context before
storing the message.
+ //This can be used by the Processors in a clustering setup.
+ if(synCtx instanceof Axis2MessageContext) {
+
+ String serverName =
+
getAxis2ParameterValue(((Axis2MessageContext)synCtx).
+ getAxis2MessageContext().
+
getConfigurationContext().getAxisConfiguration(),
+
SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
+ if(serverName != null) {
+
synCtx.setProperty(SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME,
+ serverName);
+ }
+
+ }
+
+
messageStore.offer(synCtx);
// with the nio transport, this causes the listener not to
write a 202
@@ -95,4 +116,26 @@ public class MessageStoreMediator extend
public void setOnStoreSequence(String onStoreSequence) {
this.onStoreSequence = onStoreSequence;
}
+
+ /**
+ * Helper method to get a value of a parameters in the AxisConfiguration
+ *
+ * @param axisConfiguration AxisConfiguration instance
+ * @param paramKey The name / key of the parameter
+ * @return The value of the parameter
+ */
+ private static String getAxis2ParameterValue(AxisConfiguration
axisConfiguration,
+ String paramKey) {
+
+ Parameter parameter = axisConfiguration.getParameter(paramKey);
+ if (parameter == null) {
+ return null;
+ }
+ Object value = parameter.getValue();
+ if (value != null && value instanceof String) {
+ return (String) parameter.getValue();
+ } else {
+ return null;
+ }
+ }
}
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java?rev=1099682&r1=1099681&r2=1099682&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
Thu May 5 05:06:45 2011
@@ -18,12 +18,13 @@
*/
package org.apache.synapse.message.processors.forward;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.synapse.FaultHandler;
-import org.apache.synapse.Mediator;
-import org.apache.synapse.MessageContext;
-import org.apache.synapse.SynapseArtifact;
+import org.apache.synapse.*;
+import org.apache.synapse.config.SynapseConfiguration;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.endpoints.AddressEndpoint;
import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.mediators.MediatorFaultHandler;
@@ -86,6 +87,27 @@ public class ForwardingJob implements St
MessageContext messageContext = messageStore.peek();
if (messageContext != null) {
+
+
+ //If The Message not belongs to this server we ignore it.
+ String serverName = (String)
+
messageContext.getProperty(SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
+
+ if(serverName != null && messageContext instanceof
Axis2MessageContext) {
+
+ AxisConfiguration configuration =
((Axis2MessageContext)messageContext).
+ getAxis2MessageContext().
+ getConfigurationContext().getAxisConfiguration();
+
+ String myServerName = getAxis2ParameterValue(configuration,
+ SynapseConstants.Axis2Param.SYNAPSE_SERVER_NAME);
+
+ if(!serverName.equals(myServerName)) {
+ return;
+ }
+
+ }
+
Set proSet = messageContext.getPropertyKeySet();
if (proSet != null) {
@@ -219,4 +241,27 @@ public class ForwardingJob implements St
return sender;
}
+
+ /**
+ * Helper method to get a value of a parameters in the AxisConfiguration
+ *
+ * @param axisConfiguration AxisConfiguration instance
+ * @param paramKey The name / key of the parameter
+ * @return The value of the parameter
+ */
+ private static String getAxis2ParameterValue(AxisConfiguration
axisConfiguration,
+ String paramKey) {
+
+ Parameter parameter = axisConfiguration.getParameter(paramKey);
+ if (parameter == null) {
+ return null;
+ }
+ Object value = parameter.getValue();
+ if (value != null && value instanceof String) {
+ return (String) parameter.getValue();
+ } else {
+ return null;
+ }
+ }
+
}