Author: charith
Date: Sat Apr 30 14:38:49 2011
New Revision: 1098119
URL: http://svn.apache.org/viewvc?rev=1098119&view=rev
Log:
adding a MessageForwading processr
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/
- copied from r1098106,
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/BlockingMessageSender.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/MessageForwardingProcessorView.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/MessageForwardingProcessorViewMBean.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledMessageForwardingProcessor.java
Removed:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/DLCConstents.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/DeadLetterChannelView.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/DeadLetterChannelViewMBean.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/RedeliveryJob.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledRedeliveryProcessor.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java?rev=1098119&r1=1098118&r2=1098119&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
Sat Apr 30 14:38:49 2011
@@ -107,4 +107,4 @@ public abstract class AbstractMessagePro
public String getDescription() {
return description;
}
-}
+}
\ No newline at end of file
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java?rev=1098119&r1=1098118&r2=1098119&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
Sat Apr 30 14:38:49 2011
@@ -1,20 +1,21 @@
/*
-* Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights
Reserved.
-*
-* WSO2 Inc. licenses this file to you under the Apache License,
-* Version 2.0 (the "License"); you may not use this file except
-* in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.synapse.message.processors;
import org.apache.synapse.*;
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java?rev=1098119&r1=1098118&r2=1098119&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
Sat Apr 30 14:38:49 2011
@@ -20,11 +20,19 @@ package org.apache.synapse.message.proce
public final class MessageProcessorConsents {
- public static final String MESSAGE_STORE = "MESSAGE_STORE";
+ public static final String MESSAGE_STORE = "message.store";
public static final String PARAMETERS = "parameters";
+ /**
+ * Scheduled Message Processor parameters
+ */
public static final String QUARTZ_CONF = "quartz.conf";
public static final String INTERVAL = "interval";
public static final String CRON_EXPRESSION = "cronExpression";
+ /**
+ * Message processor parameters
+ */
+ public static final String MAX_DELIVER_ATTEMPTS = "max.deliver.attempts";
+
}
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java?rev=1098119&r1=1098118&r2=1098119&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
Sat Apr 30 14:38:49 2011
@@ -32,6 +32,9 @@ import java.util.Map;
public abstract class ScheduledMessageProcessor extends
AbstractMessageProcessor {
+ public static final String SCHEDULED_MESSAGE_PROCESSOR_GROUP =
+ "synapse.message.processor.quartz";
+
protected Log log = LogFactory.getLog(this.getClass());
/**
@@ -65,7 +68,7 @@ public abstract class ScheduledMessagePr
/**
* Keep the state of the message processor
*/
- private State state = State.DESTROY;
+ protected State state = State.DESTROY;
public void start() {
@@ -82,20 +85,21 @@ public abstract class ScheduledMessagePr
e.getMessage() + cronExpression, e);
}
}
- trigger.setName(messageStore + "-trigger");
+ trigger.setName(name + "-trigger");
JobDetail jobDetail = getJobDetail();
- JobDataMap jobDataMap = new JobDataMap();
+ JobDataMap jobDataMap = getJobDataMap();
jobDataMap.put(MessageProcessorConsents.MESSAGE_STORE,
configuration.getMessageStore(messageStore));
jobDataMap.put(MessageProcessorConsents.PARAMETERS, parameters);
jobDetail.setJobDataMap(jobDataMap);
+ jobDetail.setGroup(SCHEDULED_MESSAGE_PROCESSOR_GROUP);
try {
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
throw new SynapseException("Error scheduling job : " + jobDetail
- + " with trigger " + trigger);
+ + " with trigger " + trigger ,e);
}
}
@@ -157,6 +161,7 @@ public abstract class ScheduledMessagePr
try {
scheduler = sf.getScheduler();
+
} catch (SchedulerException e) {
throw new SynapseException("Error getting a scheduler instance
form scheduler" +
" factory " + sf, e);
@@ -175,8 +180,16 @@ public abstract class ScheduledMessagePr
protected abstract JobDetail getJobDetail();
+ protected JobDataMap getJobDataMap() {
+ return new JobDataMap();
+ }
+
public void destroy() {
+ try {
+ scheduler.deleteJob(name +
"-trigger",SCHEDULED_MESSAGE_PROCESSOR_GROUP);
+ } catch (SchedulerException e) {
+ log.error("Error while destroying the task " + e);
+ }
state = State.DESTROY;
}
-
}
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/BlockingMessageSender.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/BlockingMessageSender.java?rev=1098119&view=auto
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/BlockingMessageSender.java
(added)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/BlockingMessageSender.java
Sat Apr 30 14:38:49 2011
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.message.processors.forward;
+
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAP12Constants;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.OperationClient;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.client.ServiceClient;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ConfigurationContextFactory;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.WSDL2Constants;
+import org.apache.axis2.transport.http.HTTPConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.util.MessageHelper;
+
+
+public class BlockingMessageSender {
+
+ private ServiceClient sc = null;
+
+
+ public final static String DEFAULT_CLIENT_REPO =
"./samples/axis2Client/client_repo";
+ public final static String DEFAULT_AXIS2_XML =
"./samples/axis2Client/client_repo/conf/axis2.xml";
+
+
+ private String clientRepository = null;
+ private String axis2xml = null;
+
+ private static Log log = LogFactory.getLog(BlockingMessageSender.class);
+
+ private ConfigurationContext configurationContext = null;
+
+ public void init() {
+ try {
+ configurationContext
+ =
ConfigurationContextFactory.createConfigurationContextFromFileSystem(
+ clientRepository != null ? clientRepository :
DEFAULT_CLIENT_REPO,
+ axis2xml != null ? axis2xml : DEFAULT_AXIS2_XML);
+ sc = new ServiceClient(configurationContext, null);
+ } catch (AxisFault e) {
+ String msg = "Error initializing BlockingMessageSender : " +
e.getMessage();
+ log.error(msg, e);
+ throw new SynapseException(msg, e);
+ }
+ }
+
+ public MessageContext send(MessageContext messageIn , String serviceUrl)
throws Exception {
+
+ if(log.isDebugEnabled()) {
+ log.debug("Start Sending the Message ");
+ }
+
+ try {
+
+ MessageContext messageOut =
MessageHelper.cloneMessageContext(messageIn);
+ Options options = new Options();
+ options.setTo(new EndpointReference(serviceUrl));
+ if(messageIn.getSoapAction() != null) {
+
+ options.setAction(messageIn.getSoapAction());
+
+ } else {
+
+ if (messageIn.isSOAP11()) {
+
options.setProperty(Constants.Configuration.DISABLE_SOAP_ACTION, true);
+ } else {
+ Axis2MessageContext axis2smc = (Axis2MessageContext)
messageOut;
+ org.apache.axis2.context.MessageContext axis2MessageCtx =
+ axis2smc.getAxis2MessageContext();
+ axis2MessageCtx.getTransportOut().addParameter(
+ new Parameter(HTTPConstants.OMIT_SOAP_12_ACTION,
true));
+ }
+
+ }
+
+ //After setting all the options we need to find the MEP of the
Message
+ org.apache.axis2.context.MessageContext axis2Ctx =
+ ((Axis2MessageContext)messageOut).getAxis2MessageContext();
+
+ boolean outOnlyMessage = "true".equals(messageIn.getProperty(
+ SynapseConstants.OUT_ONLY)) ||
WSDL2Constants.MEP_URI_IN_ONLY.equals(
+ axis2Ctx.getOperationContext()
+ .getAxisOperation().getMessageExchangePattern());
+
+ // Here We consider all other Messages that evaluates to
outOnlyMessage == false
+ // follows out in mep.
+ if(log.isDebugEnabled()) {
+ log.debug("Invoking service Url " + serviceUrl + " with
Message" +
+ messageIn.getMessageID());
+ }
+
+
+
+
+
+ options.setProperty(
+ AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES,
Boolean.TRUE);
+
+ sc.setOptions(options);
+ OMElement result = null;
+ try {
+ OMElement payload =
axis2Ctx.getEnvelope().getBody().getFirstElement();
+ result = sc.sendReceive(payload);
+ } catch (Exception axisFault) {
+
+ // Here if Message is not a Out only Message
+ // To indicate that it is a Error we set a new Message Context
property
+ // and return the message context
+ // If its not we throw an Exception
+ if (!outOnlyMessage) {
+
messageOut.setProperty(ForwardingProcessorConstants.BLOCKING_SENDER_ERROR,
+ "true");
+
+ return messageOut;
+ }
+
+ log.error("Error sending Message to url : " + serviceUrl
,axisFault);
+ throw new Exception("Error while Sending Message" , axisFault);
+
+ }
+
+ if(!outOnlyMessage) {
+ if(result != null) {
+ String soapNamespaceURI =
+
axis2Ctx.getEnvelope().getNamespace().getNamespaceURI();
+ SOAPEnvelope envelope = createSOAPEnvelope(result ,
soapNamespaceURI);
+ axis2Ctx.setEnvelope(envelope);
+ return messageOut;
+ }
+ }
+ } catch (AxisFault axisFault) {
+ log.error("Error sending Message to url : " + serviceUrl
,axisFault);
+ throw new Exception("Error while Sending message " , axisFault);
+ }
+ return null;
+ }
+
+ public String getClientRepository() {
+ return clientRepository;
+ }
+
+ public void setClientRepository(String clientRepository) {
+ this.clientRepository = clientRepository;
+ }
+
+ public String getAxis2xml() {
+ return axis2xml;
+ }
+
+ public void setAxis2xml(String axis2xml) {
+ this.axis2xml = axis2xml;
+ }
+
+ private SOAPEnvelope createSOAPEnvelope(OMElement payload , String
soapNamespaceUri) {
+ SOAPFactory soapFactory = null;
+ if
(soapNamespaceUri.equals(SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI)) {
+ soapFactory = OMAbstractFactory.getSOAP12Factory();
+ } else {
+ soapFactory = OMAbstractFactory.getSOAP11Factory();
+ }
+ SOAPEnvelope envelope = soapFactory.getDefaultEnvelope();
+ envelope.getBody().addChild(payload);
+ return envelope;
+ }
+}
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java?rev=1098119&view=auto
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
(added)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
Sat Apr 30 14:38:49 2011
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.message.processors.forward;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.FaultHandler;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseArtifact;
+import org.apache.synapse.endpoints.AddressEndpoint;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.mediators.MediatorFaultHandler;
+import org.apache.synapse.message.processors.MessageProcessorConsents;
+import org.apache.synapse.message.store.AbstractMessageStore;
+import org.apache.synapse.message.store.MessageStore;
+import org.quartz.*;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Redelivery Job will replay all the Messages in the Message Store when
executed
+ * Excluding ones that are already tried redelivering more than max number of
tries
+ */
+public class ForwardingJob implements StatefulJob {
+
+ private static final Log log = LogFactory.getLog(ForwardingJob.class);
+
+
+ public void execute(JobExecutionContext jobExecutionContext) throws
JobExecutionException {
+ JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
+
+ /**
+ * Get the Globle Objects from DataMap
+ */
+ MessageStore messageStore = (MessageStore) jdm.get(
+ MessageProcessorConsents.MESSAGE_STORE);
+ Map<String, Object> parameters = (Map<String, Object>) jdm.get(
+ MessageProcessorConsents.PARAMETERS);
+ BlockingMessageSender sender =
+ (BlockingMessageSender)
jdm.get(ScheduledMessageForwardingProcessor.BLOCKING_SENDER);
+
+ ScheduledMessageForwardingProcessor processor =
+ (ScheduledMessageForwardingProcessor)
jdm.get(ScheduledMessageForwardingProcessor.PROCESSOR_INSTANCE);
+
+
+ int maxDeliverAttempts = -1;
+ String mdaParam = (String)
parameters.get(MessageProcessorConsents.MAX_DELIVER_ATTEMPTS);
+ if (mdaParam != null) {
+ maxDeliverAttempts = Integer.parseInt(mdaParam);
+
+ // Here we look for the edge case
+ if(maxDeliverAttempts == 0) {
+ processor.deactivate();
+ }
+ }
+
+ if(!processor.isActive()) {
+ return;
+ }
+
+ if (maxDeliverAttempts > 0) {
+ processor.incrementSendAttemptCount();
+ }
+
+ boolean errorStop = false;
+ while (!errorStop) {
+
+ MessageContext messageContext = messageStore.peek();
+ if (messageContext != null) {
+ Set proSet = messageContext.getPropertyKeySet();
+
+ if (proSet != null) {
+ if
(proSet.contains(ForwardingProcessorConstants.BLOCKING_SENDER_ERROR)) {
+
proSet.remove(ForwardingProcessorConstants.BLOCKING_SENDER_ERROR);
+ }
+ }
+
+ String targetEp =
+ (String)
messageContext.getProperty(ForwardingProcessorConstants.TARGET_ENDPOINT);
+
+ if (targetEp != null) {
+ Endpoint ep = messageContext.getEndpoint(targetEp);
+
+ if (ep instanceof AddressEndpoint) {
+ AddressEndpoint addEp = (AddressEndpoint) ep;
+ String addressUrl = addEp.getDefinition().getAddress();
+
+ try {
+ MessageContext outCtx =
sender.send(messageContext, addressUrl);
+
+ if (outCtx != null && "true".equals(outCtx.
+
getProperty(ForwardingProcessorConstants.BLOCKING_SENDER_ERROR))) {
+ // This Means an Error has occurred
+ if (parameters != null &&
+ parameters.get(
+
ForwardingProcessorConstants.FAULT_SEQUENCE) != null) {
+
+ String seq = (String) parameters.get(
+
ForwardingProcessorConstants.FAULT_SEQUENCE);
+ Mediator mediator =
outCtx.getSequence(seq);
+ if (mediator != null) {
+ mediator.mediate(outCtx);
+ } else {
+ log.warn("Can't Send the fault Message
, Sequence " + seq +
+ " Does not Exist");
+ }
+
+ }
+
+ if (maxDeliverAttempts > 0) {
+ if(processor.getSendAttemptCount() >
maxDeliverAttempts) {
+ processor.deactivate();
+ }
+ }
+ errorStop = true;
+ continue;
+
+ }
+
+ // If there is a sequence defined to send success
replies,
+ // we must send the message to it
+ if (parameters != null &&
+ parameters.get(
+
ForwardingProcessorConstants.REPLY_SEQUENCE) != null) {
+ if (outCtx != null) {
+ String seq = (String) parameters.get(
+
ForwardingProcessorConstants.REPLY_SEQUENCE);
+ Mediator mediator =
outCtx.getSequence(seq);
+ if (mediator != null) {
+ mediator.mediate(outCtx);
+ } else {
+ log.warn("Can't Send the Out Message ,
Sequence " + seq +
+ " Does not Exist");
+ }
+ }
+ }
+
+ // If no Exception Occurred We remove the Message
+ messageStore.poll();
+ } catch (Exception e) {
+ if (maxDeliverAttempts > 0) {
+ if (processor.getSendAttemptCount() >
maxDeliverAttempts) {
+ processor.deactivate();
+ }
+ }
+ errorStop = true;
+ log.error("Error Forwarding Message ", e);
+ continue;
+ }
+ } else {
+ // Currently only Address Endpoint delivery is
supported
+ log.warn("Address Endpoint Named " + targetEp + " not
found.Hence removing " +
+ "the message form store");
+ messageStore.poll();
+ }
+
+
+ } else {
+ //No Target Endpoint defined for the Message
+ //So we do not have a place to deliver.
+ //Here we log a warning and remove the message
+ //todo: we can improve this by implementing a target
inferring mechanism
+
+ log.warn("Property " +
ForwardingProcessorConstants.TARGET_ENDPOINT +
+ " not found in the message context , Hence
removing the message ");
+ messageStore.poll();
+
+ }
+
+ } else {
+ if (maxDeliverAttempts > 0) {
+ if (processor.getSendAttemptCount() > maxDeliverAttempts) {
+ processor.deactivate();
+ }
+ }
+ errorStop = true;
+ }
+ }
+ }
+
+
+ private BlockingMessageSender initMessageSender(Map<String, Object>
params) {
+
+ BlockingMessageSender sender = null;
+ String axis2repo = (String)
params.get(ForwardingProcessorConstants.AXIS2_REPO);
+ String axis2Config = (String)
params.get(ForwardingProcessorConstants.AXIS2_CONFIG);
+
+ sender = new BlockingMessageSender();
+
+ if (axis2repo != null) {
+ sender.setClientRepository(axis2repo);
+ }
+
+
+ if (axis2Config != null) {
+ sender.setAxis2xml(axis2Config);
+ }
+ sender.init();
+
+ return sender;
+ }
+
+}
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java?rev=1098119&view=auto
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
(added)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
Sat Apr 30 14:38:49 2011
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.message.processors.forward;
+
+/**
+ * class <code>ForwardingProcessorConstants</code> holds the constants that are
+ * used in Forwarding processors
+ */
+public final class ForwardingProcessorConstants {
+
+ /**
+ * Message context property that holds the name of the target endpoint to
be replayed
+ */
+ public static final String TARGET_ENDPOINT = "target.endpoint";
+
+
+
+ /**
+ * The axis2 repository location to be used By the Message Sender
+ */
+ public static final String AXIS2_REPO ="axis2.repo";
+
+ /**
+ * The axis2 configuration file path to be used by the Message Sender
+ */
+ public static final String AXIS2_CONFIG = "axis2.config";
+
+
+
+ public static final String BLOCKING_SENDER_ERROR="blocking.sender.error";
+
+ /**
+ * Used for in_out messages. Processor will forward the message to this
sequence
+ */
+ public static final String REPLY_SEQUENCE =
"message.processor.reply.sequence";
+
+ /**
+ * used for forward in case of Error
+ */
+ public static final String FAULT_SEQUENCE =
"message.processor.fault.sequence";
+
+
+}
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/MessageForwardingProcessorView.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/MessageForwardingProcessorView.java?rev=1098119&view=auto
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/MessageForwardingProcessorView.java
(added)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/MessageForwardingProcessorView.java
Sat Apr 30 14:38:49 2011
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.message.processors.forward;
+
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseArtifact;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.endpoints.AddressEndpoint;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.message.processors.MessageProcessor;
+import org.apache.synapse.message.processors.ScheduledMessageProcessor;
+import org.apache.synapse.message.store.AbstractMessageStore;
+import org.apache.synapse.message.store.MessageStore;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class MessageForwardingProcessorView implements
MessageForwardingProcessorViewMBean {
+
+ private MessageStore messageStore;
+
+ private BlockingMessageSender sender;
+
+ private ScheduledMessageForwardingProcessor processor;
+ private static Log log =
LogFactory.getLog(MessageForwardingProcessorView.class);
+
+
+ public MessageForwardingProcessorView(MessageStore messageStore,
BlockingMessageSender sender,
+ ScheduledMessageForwardingProcessor
processor)
+ throws Exception {
+ if (messageStore != null) {
+ this.messageStore = messageStore;
+ } else {
+ throw new Exception("Error , Can not create Message Forwarding
Processor " +
+ "view with null " + "message store");
+ }
+
+ if (sender != null) {
+ this.sender = sender;
+ } else {
+ throw new Exception("Error , Can not create Message Forwarding
Processor " +
+ "view with null " + "Message Sender");
+ }
+
+
+ if (processor != null) {
+ this.processor = processor;
+ } else {
+ throw new SynapseException("Error , Can not create Message
Forwarding Processor " +
+ "view with null " + "Message Processor");
+ }
+
+ }
+
+ public void resendAll() throws Exception {
+ if (!processor.isActive()) {
+
+ while (messageStore.peek() != null) {
+ sendMessage(messageStore.peek() , true);
+ }
+ } else {
+ throw new Exception("Message Processor is Active, Manual
operations are " +
+ "not supported!");
+ }
+ }
+
+ public void deleteAll() throws Exception {
+ if (!processor.isActive()) {
+ messageStore.clear();
+ } else {
+ throw new Exception("Message Processor is Active, Manual
operations are " +
+ "not supported!");
+ }
+ }
+
+ public List<String> messageIdList() throws Exception {
+ if (!processor.isActive()) {
+ int size = messageStore.size();
+ List<String> idList = new ArrayList<String>();
+ for (int i = 0; i < size; i++) {
+ MessageContext context = messageStore.get(i);
+ if (context != null) {
+ idList.add(context.getMessageID());
+ } else {
+ break;
+ }
+ }
+ return idList;
+ } else {
+ throw new Exception("Message Processor is Active, Manual
operations are " +
+ "not supported!");
+ }
+
+ }
+
+ public void resend(String messageID) throws Exception {
+ if (!processor.isActive()) {
+ if (messageID != null && !"".equals(messageID.trim())) {
+ MessageContext msgCtx = messageStore.get(messageID);
+ if (msgCtx != null) {
+ sendMessage(msgCtx ,false);
+ messageStore.remove(messageID);
+ }
+ }
+ } else {
+ throw new Exception("Message Processor is Active, Manual
operations are " +
+ "not supported!");
+ }
+ }
+
+ public void delete(String messageID) throws Exception {
+ if (!processor.isActive()) {
+ if (messageID != null && !"".equals(messageID.trim())) {
+ messageStore.remove(messageID);
+ }
+ } else {
+ throw new Exception("Message Processor is Active, Manual
operations are " +
+ "not supported!");
+ }
+ }
+
+ public String getEnvelope(String messageID) throws Exception {
+ if (!processor.isActive()) {
+ if (messageID != null && !"".equals(messageID.trim())) {
+ MessageContext msgCtx = messageStore.get(messageID);
+ if (msgCtx != null) {
+ SOAPEnvelope env =
+ ((Axis2MessageContext)
msgCtx).getAxis2MessageContext().getEnvelope();
+ if(env != null) {
+ return env.toString();
+ }
+ }
+ }
+ } else {
+ throw new Exception("Message Processor is Active, Manual
operations are " +
+ "not supported!");
+ }
+
+ return null;
+ }
+
+ public int getSize() {
+ return messageStore.size();
+ }
+
+
+ public boolean isActive() {
+ assert processor != null;
+ return processor.isActive();
+ }
+
+ public void activate() {
+ assert processor != null;
+ processor.resetSentAttemptCount();
+ processor.activate();
+ }
+
+ public void deactivate() {
+ assert processor != null;
+ processor.deactivate();
+ }
+
+ private void sendMessage(MessageContext messageContext, boolean delete)
throws Exception {
+ if (messageContext != null) {
+ Set proSet = messageContext.getPropertyKeySet();
+
+ if (proSet != null) {
+ if
(proSet.contains(ForwardingProcessorConstants.BLOCKING_SENDER_ERROR)) {
+
proSet.remove(ForwardingProcessorConstants.BLOCKING_SENDER_ERROR);
+ }
+ }
+
+ String targetEp =
+ (String)
messageContext.getProperty(ForwardingProcessorConstants.TARGET_ENDPOINT);
+
+ if (targetEp != null) {
+ Endpoint ep = messageContext.getEndpoint(targetEp);
+
+ if (ep instanceof AddressEndpoint) {
+ AddressEndpoint addEp = (AddressEndpoint) ep;
+ String addressUrl = addEp.getDefinition().getAddress();
+
+ try {
+ MessageContext outCtx = sender.send(messageContext,
addressUrl);
+ // If no Exception Occurred We remove the Message
+ if (delete) {
+ messageStore.poll();
+ }
+ } catch (Exception e) {
+ log.error("Error Forwarding Message ", e);
+ throw new Exception(e);
+ }
+ } else {
+ // Currently only Address Endpoint delivery is supported
+ String logMsg = "Address Endpoint Named " + targetEp +
+ " not found.Hence removing " +
+ "the message form store";
+ log.warn(logMsg);
+ if (delete) {
+ messageStore.poll();
+ }
+ throw new Exception(logMsg);
+ }
+
+
+ } else {
+ //No Target Endpoint defined for the Message
+ //So we do not have a place to deliver.
+ //Here we log a warning and remove the message
+ //todo: we can improve this by implementing a target inferring
mechanism
+
+ String logMsg = "Property " +
ForwardingProcessorConstants.TARGET_ENDPOINT +
+ " not found in the message context , Hence removing
the message ";
+ log.warn(logMsg);
+ if (delete) {
+ messageStore.poll();
+ }
+ throw new Exception(logMsg);
+
+ }
+
+ } else {
+ throw new Exception("Error! Cant send Message Context : " +
messageContext);
+ }
+ }
+
+}
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/MessageForwardingProcessorViewMBean.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/MessageForwardingProcessorViewMBean.java?rev=1098119&view=auto
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/MessageForwardingProcessorViewMBean.java
(added)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/MessageForwardingProcessorViewMBean.java
Sat Apr 30 14:38:49 2011
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.message.processors.forward;
+
+import java.util.List;
+
+public interface MessageForwardingProcessorViewMBean {
+
+ /**
+ * try resending all messages stored in the message store via associated
endpoints.
+ */
+ public void resendAll() throws Exception;
+
+ /**
+ * Delete all the Messages in Message store
+ */
+ public void deleteAll() throws Exception;
+
+
+ /**
+ * Get the Message IDs of all stored Messages in the Message store
+ *
+ * @return a list of message ID values
+ */
+ public List<String> messageIdList() throws Exception;
+
+ /**
+ * Resend the Message with the given id
+ * return false if fail to re try deliver the message
+ *
+ * @param messageID ID of the message to be resent
+ * @return true if the resend operation was successful and false otherwise
+ */
+ public void resend(String messageID) throws Exception;
+
+ /**
+ * Delete the Message with Given id
+ *
+ * @param messageID ID of the message to be deleted
+ */
+ public void delete(String messageID) throws Exception;
+
+ /**
+ * Get the SOAP envelope of the given Message with given ID
+ *
+ * @param messageID ID of the message to be returned
+ * @return the SOAP envelope content as a string
+ */
+ public String getEnvelope(String messageID) throws Exception;
+
+ /**
+ *
+ * @return the number of Messages stored in the store.
+ */
+ public int getSize();
+
+ /**
+ * Get the Status of the Message Processor
+ * @return status of the Processor
+ */
+ public boolean isActive();
+
+ /**
+ * Activate the Message Processor.
+ * This will resume processing the Messages if its in deactivated state and
+ * reset the Send attempt count.
+ */
+ public void activate();
+
+ /**
+ * Deactivate the Message Processor
+ * This will stop the processing of Messages.
+ */
+ public void deactivate();
+
+}
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledMessageForwardingProcessor.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledMessageForwardingProcessor.java?rev=1098119&view=auto
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledMessageForwardingProcessor.java
(added)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledMessageForwardingProcessor.java
Sat Apr 30 14:38:49 2011
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.message.processors.forward;
+
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.message.processors.ScheduledMessageProcessor;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.SchedulerException;
+
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Redelivery processor is the Message processor which implements the Dead
letter channel EIP
+ * It will Time to time Redeliver the Messages to a given target.
+ */
+public class ScheduledMessageForwardingProcessor extends
ScheduledMessageProcessor{
+
+ public static final String BLOCKING_SENDER = "blocking.sender";
+ public static final String PROCESSOR_INSTANCE = "processor.instance";
+
+ private BlockingMessageSender sender = null;
+
+ private volatile AtomicBoolean active = new AtomicBoolean(true);
+
+ private volatile AtomicInteger sendAttempts = new AtomicInteger(0);
+
+ private MessageForwardingProcessorView view;
+
+ @Override
+ public void init(SynapseEnvironment se) {
+ super.init(se);
+ try {
+ view = new MessageForwardingProcessorView(
+
se.getSynapseConfiguration().getMessageStore(messageStore),sender,this);
+ } catch (Exception e) {
+ throw new SynapseException(e);
+ }
+
+
org.apache.synapse.commons.jmx.MBeanRegistrar.getInstance().registerMBean(view,
+ "Message Forwarding Processor view", getName());
+ }
+
+ @Override
+ protected JobDetail getJobDetail() {
+ JobDetail jobDetail = new JobDetail();
+ jobDetail.setName(name + "-forward job");
+ jobDetail.setJobClass(ForwardingJob.class);
+ return jobDetail;
+ }
+
+ @Override
+ protected JobDataMap getJobDataMap() {
+ JobDataMap jdm = new JobDataMap();
+ sender = initMessageSender(parameters);
+ jdm.put(BLOCKING_SENDER,sender);
+ jdm.put(PROCESSOR_INSTANCE,this);
+ return jdm;
+ }
+
+ private BlockingMessageSender initMessageSender(Map<String ,Object>
params) {
+
+ String axis2repo = (String)
params.get(ForwardingProcessorConstants.AXIS2_REPO);
+ String axis2Config = (String)
params.get(ForwardingProcessorConstants.AXIS2_CONFIG);
+
+ sender = new BlockingMessageSender();
+
+ if(axis2repo != null) {
+ sender.setClientRepository(axis2repo);
+ }
+
+
+ if(axis2Config != null) {
+ sender.setAxis2xml(axis2Config);
+ }
+ sender.init();
+
+ return sender;
+ }
+
+ public BlockingMessageSender getSender() {
+ return sender;
+ }
+
+ public void setSender(BlockingMessageSender sender) {
+ this.sender = sender;
+ }
+
+ public boolean isActive() {
+ return active.get();
+ }
+
+ public void activate() {
+ active.set(true);
+ }
+
+ public void deactivate() {
+ active.set(false);
+ }
+
+ public int getSendAttemptCount() {
+ return sendAttempts.get();
+ }
+
+ public void incrementSendAttemptCount() {
+ sendAttempts.incrementAndGet();
+ }
+ public void resetSentAttemptCount(){
+ sendAttempts.set(0);
+ }
+
+ @Override
+ public void destroy() {
+ try {
+ scheduler.deleteJob(name + "-forward job",
+
ScheduledMessageProcessor.SCHEDULED_MESSAGE_PROCESSOR_GROUP);
+ } catch (SchedulerException e) {
+ log.error("Error while destroying the task " + e);
+ }
+ state = State.DESTROY;
+ }
+}
+