Author: charith
Date: Sat Apr 30 14:38:49 2011
New Revision: 1098119

URL: http://svn.apache.org/viewvc?rev=1098119&view=rev
Log:
adding a MessageForwading processr

Added:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/
      - copied from r1098106, 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/BlockingMessageSender.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/MessageForwardingProcessorView.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/MessageForwardingProcessorViewMBean.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledMessageForwardingProcessor.java
Removed:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/DLCConstents.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/DeadLetterChannelView.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/DeadLetterChannelViewMBean.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/RedeliveryJob.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledRedeliveryProcessor.java
Modified:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java?rev=1098119&r1=1098118&r2=1098119&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
 Sat Apr 30 14:38:49 2011
@@ -107,4 +107,4 @@ public abstract class AbstractMessagePro
     public String getDescription() {
         return description;
     }
-}
+}
\ No newline at end of file

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java?rev=1098119&r1=1098118&r2=1098119&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
 Sat Apr 30 14:38:49 2011
@@ -1,20 +1,21 @@
 /*
-*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights 
Reserved.
-*
-*  WSO2 Inc. licenses this file to you under the Apache License,
-*  Version 2.0 (the "License"); you may not use this file except
-*  in compliance with the License.
-*  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
 package org.apache.synapse.message.processors;
 
 import org.apache.synapse.*;

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java?rev=1098119&r1=1098118&r2=1098119&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
 Sat Apr 30 14:38:49 2011
@@ -20,11 +20,19 @@ package org.apache.synapse.message.proce
 
 public final class MessageProcessorConsents {
 
-    public static final String MESSAGE_STORE = "MESSAGE_STORE";
+    public static final String MESSAGE_STORE = "message.store";
     public static final String PARAMETERS = "parameters";
 
+    /**
+     * Scheduled Message Processor parameters
+     */
     public static final String QUARTZ_CONF = "quartz.conf";
     public static final String INTERVAL = "interval";
     public static final String CRON_EXPRESSION = "cronExpression";
 
+    /**
+     * Message processor parameters
+     */
+    public static final String MAX_DELIVER_ATTEMPTS = "max.deliver.attempts";
+
 }

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java?rev=1098119&r1=1098118&r2=1098119&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
 Sat Apr 30 14:38:49 2011
@@ -32,6 +32,9 @@ import java.util.Map;
 public abstract class ScheduledMessageProcessor extends 
AbstractMessageProcessor {
 
 
+    public static final String SCHEDULED_MESSAGE_PROCESSOR_GROUP =
+            "synapse.message.processor.quartz";
+
     protected Log log = LogFactory.getLog(this.getClass());
 
     /**
@@ -65,7 +68,7 @@ public abstract class ScheduledMessagePr
     /**
      * Keep the state of the message processor
      */
-    private State state = State.DESTROY;
+    protected State state = State.DESTROY;
 
 
     public void start() {
@@ -82,20 +85,21 @@ public abstract class ScheduledMessagePr
                         e.getMessage() + cronExpression, e);
             }
         }
-        trigger.setName(messageStore + "-trigger");
+        trigger.setName(name + "-trigger");
 
         JobDetail jobDetail = getJobDetail();
-        JobDataMap jobDataMap = new JobDataMap();
+        JobDataMap jobDataMap = getJobDataMap();
         jobDataMap.put(MessageProcessorConsents.MESSAGE_STORE,
                 configuration.getMessageStore(messageStore));
         jobDataMap.put(MessageProcessorConsents.PARAMETERS, parameters);
         jobDetail.setJobDataMap(jobDataMap);
+        jobDetail.setGroup(SCHEDULED_MESSAGE_PROCESSOR_GROUP);
 
         try {
             scheduler.scheduleJob(jobDetail, trigger);
         } catch (SchedulerException e) {
             throw new SynapseException("Error scheduling job : " + jobDetail
-                    + " with trigger " + trigger);
+                    + " with trigger " + trigger ,e);
         }
     }
 
@@ -157,6 +161,7 @@ public abstract class ScheduledMessagePr
 
         try {
             scheduler = sf.getScheduler();
+
         } catch (SchedulerException e) {
             throw new SynapseException("Error getting a  scheduler instance 
form scheduler" +
                     " factory " + sf, e);
@@ -175,8 +180,16 @@ public abstract class ScheduledMessagePr
 
     protected abstract JobDetail getJobDetail();
 
+    protected JobDataMap getJobDataMap() {
+        return new JobDataMap();
+    }
+
     public void destroy() {
+        try {
+            scheduler.deleteJob(name + 
"-trigger",SCHEDULED_MESSAGE_PROCESSOR_GROUP);
+        } catch (SchedulerException e) {
+            log.error("Error while destroying the task " + e);
+        }
         state = State.DESTROY;
     }
-
 }

Added: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/BlockingMessageSender.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/BlockingMessageSender.java?rev=1098119&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/BlockingMessageSender.java
 (added)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/BlockingMessageSender.java
 Sat Apr 30 14:38:49 2011
@@ -0,0 +1,195 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.message.processors.forward;
+
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAP12Constants;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.AddressingConstants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.OperationClient;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.client.ServiceClient;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ConfigurationContextFactory;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.WSDL2Constants;
+import org.apache.axis2.transport.http.HTTPConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.util.MessageHelper;
+
+
+public class BlockingMessageSender {
+
+    private ServiceClient sc = null;
+
+
+    public final static String DEFAULT_CLIENT_REPO = 
"./samples/axis2Client/client_repo";
+    public final static String DEFAULT_AXIS2_XML = 
"./samples/axis2Client/client_repo/conf/axis2.xml";
+
+
+    private String clientRepository = null;
+    private String axis2xml = null;
+
+    private static Log log = LogFactory.getLog(BlockingMessageSender.class);
+
+    private ConfigurationContext configurationContext = null;
+
+    public void init() {
+         try {
+            configurationContext
+                    = 
ConfigurationContextFactory.createConfigurationContextFromFileSystem(
+                    clientRepository != null ? clientRepository : 
DEFAULT_CLIENT_REPO,
+                    axis2xml != null ? axis2xml : DEFAULT_AXIS2_XML);
+            sc = new ServiceClient(configurationContext, null);
+        } catch (AxisFault e) {
+            String msg = "Error initializing BlockingMessageSender : " + 
e.getMessage();
+            log.error(msg, e);
+            throw new SynapseException(msg, e);
+        }
+    }
+
+    public MessageContext send(MessageContext messageIn , String serviceUrl) 
throws Exception {
+
+        if(log.isDebugEnabled()) {
+            log.debug("Start Sending the Message ");
+        }
+
+        try {
+
+            MessageContext messageOut = 
MessageHelper.cloneMessageContext(messageIn);
+            Options options = new Options();
+            options.setTo(new EndpointReference(serviceUrl));
+            if(messageIn.getSoapAction() != null) {
+
+                options.setAction(messageIn.getSoapAction());
+
+            } else {
+
+                if (messageIn.isSOAP11()) {
+                    
options.setProperty(Constants.Configuration.DISABLE_SOAP_ACTION, true);
+                } else {
+                    Axis2MessageContext axis2smc = (Axis2MessageContext) 
messageOut;
+                    org.apache.axis2.context.MessageContext axis2MessageCtx =
+                            axis2smc.getAxis2MessageContext();
+                    axis2MessageCtx.getTransportOut().addParameter(
+                            new Parameter(HTTPConstants.OMIT_SOAP_12_ACTION, 
true));
+                }
+
+            }
+
+           //After setting all the options we need to find the MEP of the 
Message
+           org.apache.axis2.context.MessageContext axis2Ctx =
+                   ((Axis2MessageContext)messageOut).getAxis2MessageContext();
+
+           boolean outOnlyMessage = "true".equals(messageIn.getProperty(
+                SynapseConstants.OUT_ONLY)) || 
WSDL2Constants.MEP_URI_IN_ONLY.equals(
+                axis2Ctx.getOperationContext()
+                        .getAxisOperation().getMessageExchangePattern());
+
+            // Here We consider all other Messages that evaluates to 
outOnlyMessage == false
+            // follows out in mep.
+            if(log.isDebugEnabled()) {
+                log.debug("Invoking service Url " + serviceUrl + " with 
Message" +
+                        messageIn.getMessageID());
+            }
+
+
+
+
+
+            options.setProperty(
+                    AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES, 
Boolean.TRUE);
+
+            sc.setOptions(options);
+            OMElement result = null;
+            try {
+                OMElement payload = 
axis2Ctx.getEnvelope().getBody().getFirstElement();
+                result = sc.sendReceive(payload);
+            } catch (Exception axisFault) {
+
+                // Here if Message is not a Out only Message
+                // To indicate that it is a Error we set a new Message Context 
property
+                // and return the message context
+                // If its not we throw an Exception
+                if (!outOnlyMessage) {
+                    
messageOut.setProperty(ForwardingProcessorConstants.BLOCKING_SENDER_ERROR,
+                            "true");
+
+                    return messageOut;
+                }
+
+                log.error("Error sending Message to url : " + serviceUrl 
,axisFault);
+                throw new Exception("Error while Sending Message" , axisFault);
+
+            }
+
+            if(!outOnlyMessage) {
+                    if(result != null) {
+                        String soapNamespaceURI =
+                                
axis2Ctx.getEnvelope().getNamespace().getNamespaceURI();
+                        SOAPEnvelope envelope = createSOAPEnvelope(result , 
soapNamespaceURI);
+                        axis2Ctx.setEnvelope(envelope);
+                        return messageOut;
+                    }
+            }
+        } catch (AxisFault axisFault) {
+            log.error("Error sending Message to url : " + serviceUrl 
,axisFault);
+            throw new Exception("Error while Sending message " , axisFault);
+        }
+        return null;
+    }
+
+    public String getClientRepository() {
+        return clientRepository;
+    }
+
+    public void setClientRepository(String clientRepository) {
+        this.clientRepository = clientRepository;
+    }
+
+    public String getAxis2xml() {
+        return axis2xml;
+    }
+
+    public void setAxis2xml(String axis2xml) {
+        this.axis2xml = axis2xml;
+    }
+
+    private SOAPEnvelope createSOAPEnvelope(OMElement payload , String 
soapNamespaceUri) {
+         SOAPFactory soapFactory = null;
+                if 
(soapNamespaceUri.equals(SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI)) {
+                    soapFactory = OMAbstractFactory.getSOAP12Factory();
+                } else {
+                    soapFactory = OMAbstractFactory.getSOAP11Factory();
+                }
+        SOAPEnvelope envelope = soapFactory.getDefaultEnvelope();
+        envelope.getBody().addChild(payload);
+        return envelope;
+    }
+}

Added: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java?rev=1098119&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
 (added)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingJob.java
 Sat Apr 30 14:38:49 2011
@@ -0,0 +1,222 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.message.processors.forward;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.FaultHandler;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseArtifact;
+import org.apache.synapse.endpoints.AddressEndpoint;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.mediators.MediatorFaultHandler;
+import org.apache.synapse.message.processors.MessageProcessorConsents;
+import org.apache.synapse.message.store.AbstractMessageStore;
+import org.apache.synapse.message.store.MessageStore;
+import org.quartz.*;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Redelivery Job will replay all the Messages in the Message Store when 
executed
+ * Excluding ones that are already tried redelivering more than max number of 
tries
+ */
+public class ForwardingJob implements StatefulJob {
+
+    private static final Log log = LogFactory.getLog(ForwardingJob.class);
+
+
+    public void execute(JobExecutionContext jobExecutionContext) throws 
JobExecutionException {
+        JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
+
+        /**
+         * Get the Globle Objects from DataMap
+         */
+        MessageStore messageStore = (MessageStore) jdm.get(
+                MessageProcessorConsents.MESSAGE_STORE);
+        Map<String, Object> parameters = (Map<String, Object>) jdm.get(
+                MessageProcessorConsents.PARAMETERS);
+        BlockingMessageSender sender =
+                (BlockingMessageSender) 
jdm.get(ScheduledMessageForwardingProcessor.BLOCKING_SENDER);
+
+        ScheduledMessageForwardingProcessor processor =
+                (ScheduledMessageForwardingProcessor) 
jdm.get(ScheduledMessageForwardingProcessor.PROCESSOR_INSTANCE);
+
+
+        int maxDeliverAttempts = -1;
+        String mdaParam = (String) 
parameters.get(MessageProcessorConsents.MAX_DELIVER_ATTEMPTS);
+        if (mdaParam != null) {
+            maxDeliverAttempts = Integer.parseInt(mdaParam);
+
+            // Here we look for the edge case
+            if(maxDeliverAttempts == 0) {
+                processor.deactivate();
+            }
+        }
+
+        if(!processor.isActive()) {
+            return;
+        }
+
+        if (maxDeliverAttempts > 0) {
+            processor.incrementSendAttemptCount();
+        }
+
+        boolean errorStop = false;
+        while (!errorStop) {
+
+            MessageContext messageContext = messageStore.peek();
+            if (messageContext != null) {
+                Set proSet = messageContext.getPropertyKeySet();
+
+                if (proSet != null) {
+                    if 
(proSet.contains(ForwardingProcessorConstants.BLOCKING_SENDER_ERROR)) {
+                        
proSet.remove(ForwardingProcessorConstants.BLOCKING_SENDER_ERROR);
+                    }
+                }
+
+                String targetEp =
+                        (String) 
messageContext.getProperty(ForwardingProcessorConstants.TARGET_ENDPOINT);
+
+                if (targetEp != null) {
+                    Endpoint ep = messageContext.getEndpoint(targetEp);
+
+                    if (ep instanceof AddressEndpoint) {
+                        AddressEndpoint addEp = (AddressEndpoint) ep;
+                        String addressUrl = addEp.getDefinition().getAddress();
+
+                        try {
+                            MessageContext outCtx = 
sender.send(messageContext, addressUrl);
+
+                            if (outCtx != null && "true".equals(outCtx.
+                                    
getProperty(ForwardingProcessorConstants.BLOCKING_SENDER_ERROR))) {
+                                // This Means an Error has occurred
+                                if (parameters != null &&
+                                        parameters.get(
+                                                
ForwardingProcessorConstants.FAULT_SEQUENCE) != null) {
+
+                                    String seq = (String) parameters.get(
+                                            
ForwardingProcessorConstants.FAULT_SEQUENCE);
+                                    Mediator mediator = 
outCtx.getSequence(seq);
+                                    if (mediator != null) {
+                                        mediator.mediate(outCtx);
+                                    } else {
+                                        log.warn("Can't Send the fault Message 
, Sequence " + seq +
+                                                " Does not Exist");
+                                    }
+
+                                }
+
+                                if (maxDeliverAttempts > 0) {
+                                    if(processor.getSendAttemptCount() > 
maxDeliverAttempts) {
+                                        processor.deactivate();
+                                    }
+                                }
+                                errorStop = true;
+                                continue;
+
+                            }
+
+                            // If there is a sequence defined to send success 
replies,
+                            // we must send the message to it
+                            if (parameters != null &&
+                                    parameters.get(
+                                            
ForwardingProcessorConstants.REPLY_SEQUENCE) != null) {
+                                if (outCtx != null) {
+                                    String seq = (String) parameters.get(
+                                            
ForwardingProcessorConstants.REPLY_SEQUENCE);
+                                    Mediator mediator = 
outCtx.getSequence(seq);
+                                    if (mediator != null) {
+                                        mediator.mediate(outCtx);
+                                    } else {
+                                        log.warn("Can't Send the Out Message , 
Sequence " + seq +
+                                                " Does not Exist");
+                                    }
+                                }
+                            }
+
+                            // If no Exception Occurred We remove the Message
+                            messageStore.poll();
+                        } catch (Exception e) {
+                            if (maxDeliverAttempts > 0) {
+                                if (processor.getSendAttemptCount() > 
maxDeliverAttempts) {
+                                    processor.deactivate();
+                                }
+                            }
+                            errorStop = true;
+                            log.error("Error Forwarding Message ", e);
+                            continue;
+                        }
+                    } else {
+                        // Currently only Address Endpoint delivery is 
supported
+                        log.warn("Address Endpoint Named " + targetEp + " not 
found.Hence removing " +
+                                "the message form store");
+                        messageStore.poll();
+                    }
+
+
+                } else {
+                    //No Target Endpoint defined for the Message
+                    //So we do not have a place to deliver.
+                    //Here we log a warning and remove the message
+                    //todo: we can improve this by implementing a target 
inferring mechanism
+
+                    log.warn("Property " + 
ForwardingProcessorConstants.TARGET_ENDPOINT +
+                            " not found in the message context , Hence 
removing the message ");
+                    messageStore.poll();
+
+                }
+
+            } else {
+                if (maxDeliverAttempts > 0) {
+                    if (processor.getSendAttemptCount() > maxDeliverAttempts) {
+                        processor.deactivate();
+                    }
+                }
+                errorStop = true;
+            }
+        }
+    }
+
+
+    private BlockingMessageSender initMessageSender(Map<String, Object> 
params) {
+
+        BlockingMessageSender sender = null;
+        String axis2repo = (String) 
params.get(ForwardingProcessorConstants.AXIS2_REPO);
+        String axis2Config = (String) 
params.get(ForwardingProcessorConstants.AXIS2_CONFIG);
+
+        sender = new BlockingMessageSender();
+
+        if (axis2repo != null) {
+            sender.setClientRepository(axis2repo);
+        }
+
+
+        if (axis2Config != null) {
+            sender.setAxis2xml(axis2Config);
+        }
+        sender.init();
+
+        return sender;
+    }
+
+}

Added: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java?rev=1098119&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
 (added)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ForwardingProcessorConstants.java
 Sat Apr 30 14:38:49 2011
@@ -0,0 +1,59 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.message.processors.forward;
+
+/**
+ * class <code>ForwardingProcessorConstants</code> holds the constants that are
+ * used in Forwarding processors
+ */
+public final class ForwardingProcessorConstants {
+
+    /**
+     * Message context property that holds the name of the target endpoint to 
be replayed
+     */
+    public static final String TARGET_ENDPOINT = "target.endpoint";
+
+
+
+    /**
+     * The axis2 repository location to be used By the Message Sender
+     */
+    public static final String AXIS2_REPO ="axis2.repo";
+
+    /**
+     * The axis2 configuration file path to be used by the Message Sender
+     */
+    public static final String AXIS2_CONFIG = "axis2.config";
+
+
+
+    public static final String BLOCKING_SENDER_ERROR="blocking.sender.error";
+
+    /**
+     * Used for in_out messages. Processor will forward the message to this 
sequence
+     */
+    public static final String REPLY_SEQUENCE = 
"message.processor.reply.sequence";
+
+    /**
+     * used for forward in case of Error
+     */
+    public static final String FAULT_SEQUENCE = 
"message.processor.fault.sequence";
+
+
+}

Added: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/MessageForwardingProcessorView.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/MessageForwardingProcessorView.java?rev=1098119&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/MessageForwardingProcessorView.java
 (added)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/MessageForwardingProcessorView.java
 Sat Apr 30 14:38:49 2011
@@ -0,0 +1,249 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.message.processors.forward;
+
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseArtifact;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.endpoints.AddressEndpoint;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.message.processors.MessageProcessor;
+import org.apache.synapse.message.processors.ScheduledMessageProcessor;
+import org.apache.synapse.message.store.AbstractMessageStore;
+import org.apache.synapse.message.store.MessageStore;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class MessageForwardingProcessorView implements 
MessageForwardingProcessorViewMBean {
+
+    private MessageStore messageStore;
+
+    private BlockingMessageSender sender;
+
+    private ScheduledMessageForwardingProcessor processor;
+    private static Log log = 
LogFactory.getLog(MessageForwardingProcessorView.class);
+
+
+    public MessageForwardingProcessorView(MessageStore messageStore, 
BlockingMessageSender sender,
+                                          ScheduledMessageForwardingProcessor 
processor)
+            throws Exception {
+        if (messageStore != null) {
+            this.messageStore = messageStore;
+        } else {
+            throw new Exception("Error , Can not create Message Forwarding 
Processor " +
+                    "view with null " + "message store");
+        }
+
+        if (sender != null) {
+            this.sender = sender;
+        } else {
+            throw new Exception("Error , Can not create Message Forwarding 
Processor " +
+                    "view with null " + "Message Sender");
+        }
+
+
+        if (processor != null) {
+            this.processor = processor;
+        } else {
+            throw new SynapseException("Error , Can not create Message 
Forwarding Processor " +
+                    "view with null " + "Message Processor");
+        }
+
+    }
+
+    public void resendAll() throws Exception {
+        if (!processor.isActive()) {
+
+            while (messageStore.peek() != null) {
+                sendMessage(messageStore.peek() , true);
+            }
+        } else {
+            throw new Exception("Message Processor is Active, Manual 
operations are " +
+                    "not supported!");
+        }
+    }
+
+    public void deleteAll() throws Exception {
+        if (!processor.isActive()) {
+            messageStore.clear();
+        } else {
+            throw new Exception("Message Processor is Active, Manual 
operations are " +
+                    "not supported!");
+        }
+    }
+
+    public List<String> messageIdList() throws Exception {
+        if (!processor.isActive()) {
+            int size = messageStore.size();
+            List<String> idList = new ArrayList<String>();
+            for (int i = 0; i < size; i++) {
+                MessageContext context = messageStore.get(i);
+                if (context != null) {
+                    idList.add(context.getMessageID());
+                } else {
+                    break;
+                }
+            }
+            return idList;
+        } else {
+            throw new Exception("Message Processor is Active, Manual 
operations are " +
+                    "not supported!");
+        }
+
+    }
+
+    public void resend(String messageID) throws Exception {
+        if (!processor.isActive()) {
+            if (messageID != null && !"".equals(messageID.trim())) {
+                MessageContext msgCtx = messageStore.get(messageID);
+                if (msgCtx != null) {
+                    sendMessage(msgCtx ,false);
+                    messageStore.remove(messageID);
+                }
+            }
+        } else {
+            throw new Exception("Message Processor is Active, Manual 
operations are " +
+                    "not supported!");
+        }
+    }
+
+    public void delete(String messageID) throws Exception {
+        if (!processor.isActive()) {
+             if (messageID != null && !"".equals(messageID.trim())) {
+               messageStore.remove(messageID);
+            }
+        } else {
+            throw new Exception("Message Processor is Active, Manual 
operations are " +
+                    "not supported!");
+        }
+    }
+
+    public String getEnvelope(String messageID) throws Exception {
+        if (!processor.isActive()) {
+             if (messageID != null && !"".equals(messageID.trim())) {
+                MessageContext msgCtx = messageStore.get(messageID);
+                if (msgCtx != null) {
+                   SOAPEnvelope env =
+                           ((Axis2MessageContext) 
msgCtx).getAxis2MessageContext().getEnvelope();
+                   if(env != null) {
+                       return env.toString();
+                   }
+                }
+            }
+        } else {
+            throw new Exception("Message Processor is Active, Manual 
operations are " +
+                    "not supported!");
+        }
+
+        return null;
+    }
+
+    public int getSize() {
+        return messageStore.size();
+    }
+
+
+    public boolean isActive() {
+        assert processor != null;
+        return processor.isActive();
+    }
+
+    public void activate() {
+        assert processor != null;
+        processor.resetSentAttemptCount();
+        processor.activate();
+    }
+
+    public void deactivate() {
+        assert processor != null;
+        processor.deactivate();
+    }
+
+    private void sendMessage(MessageContext messageContext, boolean delete) 
throws Exception {
+        if (messageContext != null) {
+            Set proSet = messageContext.getPropertyKeySet();
+
+            if (proSet != null) {
+                if 
(proSet.contains(ForwardingProcessorConstants.BLOCKING_SENDER_ERROR)) {
+                    
proSet.remove(ForwardingProcessorConstants.BLOCKING_SENDER_ERROR);
+                }
+            }
+
+            String targetEp =
+                    (String) 
messageContext.getProperty(ForwardingProcessorConstants.TARGET_ENDPOINT);
+
+            if (targetEp != null) {
+                Endpoint ep = messageContext.getEndpoint(targetEp);
+
+                if (ep instanceof AddressEndpoint) {
+                    AddressEndpoint addEp = (AddressEndpoint) ep;
+                    String addressUrl = addEp.getDefinition().getAddress();
+
+                    try {
+                        MessageContext outCtx = sender.send(messageContext, 
addressUrl);
+                        // If no Exception Occurred We remove the Message
+                        if (delete) {
+                            messageStore.poll();
+                        }
+                    } catch (Exception e) {
+                        log.error("Error Forwarding Message ", e);
+                        throw new Exception(e);
+                    }
+                } else {
+                    // Currently only Address Endpoint delivery is supported
+                    String logMsg = "Address Endpoint Named " + targetEp +
+                            " not found.Hence removing " +
+                            "the message form store";
+                    log.warn(logMsg);
+                    if (delete) {
+                        messageStore.poll();
+                    }
+                    throw new Exception(logMsg);
+                }
+
+
+            } else {
+                //No Target Endpoint defined for the Message
+                //So we do not have a place to deliver.
+                //Here we log a warning and remove the message
+                //todo: we can improve this by implementing a target inferring 
mechanism
+
+                String logMsg = "Property " + 
ForwardingProcessorConstants.TARGET_ENDPOINT +
+                        " not found in the message context , Hence removing 
the message ";
+                log.warn(logMsg);
+                if (delete) {
+                    messageStore.poll();
+                }
+                throw new Exception(logMsg);
+
+            }
+
+        } else {
+            throw new Exception("Error! Cant send Message Context : " + 
messageContext);
+        }
+    }
+
+}

Added: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/MessageForwardingProcessorViewMBean.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/MessageForwardingProcessorViewMBean.java?rev=1098119&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/MessageForwardingProcessorViewMBean.java
 (added)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/MessageForwardingProcessorViewMBean.java
 Sat Apr 30 14:38:49 2011
@@ -0,0 +1,92 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.message.processors.forward;
+
+import java.util.List;
+
+public interface MessageForwardingProcessorViewMBean {
+
+    /**
+     * try resending all messages stored in the message store via associated 
endpoints.
+     */
+    public void resendAll() throws Exception;
+
+    /**
+     * Delete all the Messages in Message store
+     */
+    public void deleteAll() throws Exception;
+
+
+    /**
+     * Get the Message IDs of all stored Messages in the Message store
+     *
+     * @return a list of message ID values
+     */
+    public List<String> messageIdList() throws Exception;
+
+    /**
+     * Resend the Message with the given id
+     * return false if fail to re try deliver the message
+     *
+     * @param messageID ID of the message to be resent
+     * @return true if the resend operation was successful and false otherwise
+     */
+    public void  resend(String messageID) throws Exception;
+
+    /**
+     * Delete the Message with Given id
+     *
+     * @param messageID ID of the message to be deleted
+     */
+    public void delete(String messageID) throws Exception;
+
+    /**
+     * Get the SOAP envelope of the given Message with given ID
+     *
+     * @param messageID ID of the message to be returned
+     * @return the SOAP envelope content as a string
+     */
+    public String getEnvelope(String messageID) throws Exception;
+
+    /**
+     *
+     * @return the number of Messages stored in the store.
+     */
+    public int getSize();
+
+    /**
+     * Get the Status of the Message Processor
+     * @return  status of the Processor
+     */
+    public boolean isActive();
+
+    /**
+     * Activate the Message Processor.
+     * This will resume processing the Messages if its in deactivated state and
+     * reset the Send attempt count.
+     */
+    public void activate();
+
+    /**
+     * Deactivate the Message Processor
+     * This will stop the processing of Messages.
+     */
+    public void deactivate();
+
+}

Added: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledMessageForwardingProcessor.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledMessageForwardingProcessor.java?rev=1098119&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledMessageForwardingProcessor.java
 (added)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledMessageForwardingProcessor.java
 Sat Apr 30 14:38:49 2011
@@ -0,0 +1,143 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.message.processors.forward;
+
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.message.processors.ScheduledMessageProcessor;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.SchedulerException;
+
+import java.util.Map;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Redelivery processor is the Message processor which implements the Dead 
letter channel EIP
+ * It will Time to time Redeliver the Messages to a given target.
+ */
+public class ScheduledMessageForwardingProcessor extends 
ScheduledMessageProcessor{
+
+    public static final String BLOCKING_SENDER = "blocking.sender";
+    public static final String PROCESSOR_INSTANCE = "processor.instance";
+
+    private BlockingMessageSender sender = null;
+
+    private volatile AtomicBoolean active = new AtomicBoolean(true);
+
+    private volatile AtomicInteger sendAttempts = new AtomicInteger(0);
+
+    private MessageForwardingProcessorView view;
+
+    @Override
+    public void init(SynapseEnvironment se) {
+        super.init(se);
+        try {
+            view = new MessageForwardingProcessorView(
+                    
se.getSynapseConfiguration().getMessageStore(messageStore),sender,this);
+        } catch (Exception e) {
+            throw new SynapseException(e);
+        }
+
+        
org.apache.synapse.commons.jmx.MBeanRegistrar.getInstance().registerMBean(view,
+                "Message Forwarding Processor view", getName());
+    }
+
+    @Override
+    protected JobDetail getJobDetail() {
+        JobDetail jobDetail = new JobDetail();
+        jobDetail.setName(name + "-forward job");
+        jobDetail.setJobClass(ForwardingJob.class);
+        return jobDetail;
+    }
+
+    @Override
+    protected JobDataMap getJobDataMap() {
+        JobDataMap jdm = new JobDataMap();
+        sender = initMessageSender(parameters);
+        jdm.put(BLOCKING_SENDER,sender);
+        jdm.put(PROCESSOR_INSTANCE,this);
+        return jdm;
+    }
+
+     private BlockingMessageSender initMessageSender(Map<String ,Object> 
params) {
+
+        String axis2repo = (String) 
params.get(ForwardingProcessorConstants.AXIS2_REPO);
+        String axis2Config = (String) 
params.get(ForwardingProcessorConstants.AXIS2_CONFIG);
+
+        sender = new BlockingMessageSender();
+
+        if(axis2repo != null) {
+            sender.setClientRepository(axis2repo);
+        }
+
+
+        if(axis2Config != null) {
+            sender.setAxis2xml(axis2Config);
+        }
+        sender.init();
+
+        return sender;
+    }
+
+    public BlockingMessageSender getSender() {
+        return sender;
+    }
+
+    public void setSender(BlockingMessageSender sender) {
+        this.sender = sender;
+    }
+
+    public boolean isActive() {
+        return active.get();
+    }
+
+    public void activate() {
+        active.set(true);
+    }
+
+    public void deactivate() {
+        active.set(false);
+    }
+
+    public int getSendAttemptCount() {
+        return sendAttempts.get();
+    }
+
+    public void incrementSendAttemptCount() {
+        sendAttempts.incrementAndGet();
+    }
+    public void resetSentAttemptCount(){
+        sendAttempts.set(0);
+    }
+
+    @Override
+    public void destroy() {
+         try {
+            scheduler.deleteJob(name + "-forward job",
+                    
ScheduledMessageProcessor.SCHEDULED_MESSAGE_PROCESSOR_GROUP);
+        } catch (SchedulerException e) {
+            log.error("Error while destroying the task " + e);
+        }
+        state = State.DESTROY;
+    }
+}
+


Reply via email to