Author: charith
Date: Fri Feb 18 04:01:47 2011
New Revision: 1071875

URL: http://svn.apache.org/viewvc?rev=1071875&view=rev
Log:
improving code reuse of Message processor

Added:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DLCConstents.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
Modified:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java

Added: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java?rev=1071875&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
 (added)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
 Fri Feb 18 04:01:47 2011
@@ -0,0 +1,189 @@
+/*
+*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights 
Reserved.
+*
+*  WSO2 Inc. licenses this file to you under the Apache License,
+*  Version 2.0 (the "License"); you may not use this file except
+*  in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.synapse.message.processors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.message.store.MessageStore;
+import org.quartz.*;
+import org.quartz.impl.StdSchedulerFactory;
+
+import java.text.ParseException;
+import java.util.Map;
+
+/**
+ * Class <code>AbstractMessageProcessor</code> is handles Message processing 
of the messages
+ * in Message Store. Abstract Message Store is assumes that Message processors 
can be implemented
+ * using the quartz scheduler jobs. If in case we user wants a different 
implementation They can
+ * directly use <code>MessageProcessor</code> interface for that 
implementations
+ */
+public abstract class AbstractMessageProcessor implements MessageProcessor {
+
+    protected Log log = LogFactory.getLog(this.getClass());
+
+    /** The scheduler, run the the processor */
+    protected Scheduler scheduler = null;
+
+    /** Message Store associated with Message processor */
+    protected MessageStore messageStore;
+
+    /** The interval at which this processor runs */
+    protected long interval = 1;
+
+
+
+    protected enum State {
+        INITIALIZED,
+        START,
+        STOP,
+        DESTROY
+    }
+
+    /**message store parameters */
+    protected Map<String, Object> parameters = null;
+
+    /** The quartz configuration file if specified as a parameter */
+    protected String quartzConfig = null;
+
+    /** A cron expression to run the sampler */
+    protected String cronExpression = null;
+
+    /** Keep the state of the message processor */
+    private State state  = State.DESTROY;
+
+
+    public void start() {
+        Trigger trigger;
+        if (cronExpression == null || "".equals(cronExpression)) {
+            trigger = 
TriggerUtils.makeImmediateTrigger(SimpleTrigger.REPEAT_INDEFINITELY, interval);
+        } else {
+            CronTrigger cronTrigger = new CronTrigger();
+            try {
+                cronTrigger.setCronExpression(cronExpression);
+                trigger = cronTrigger;
+            } catch (ParseException e) {
+                throw new SynapseException("Error setting cron expression : " +
+                        e.getMessage() + cronExpression, e);
+            }
+        }
+        trigger.setName(messageStore.getName() + "-trigger");
+
+        JobDetail jobDetail = getJobDetail();
+        JobDataMap jobDataMap = new JobDataMap();
+        jobDataMap.put(MessageProcessorConsents.MESSAGE_STORE, messageStore);
+        jobDataMap.put(MessageProcessorConsents.PARAMETERS,parameters);
+        jobDetail.setJobDataMap(jobDataMap);
+
+        try {
+            scheduler.scheduleJob(jobDetail, trigger);
+        } catch (SchedulerException e) {
+            throw new SynapseException("Error scheduling job : " + jobDetail
+                    + " with trigger " + trigger);
+        }
+    }
+
+    public void stop() {
+        if (state == State.START) {
+            try {
+                if (scheduler != null && scheduler.isStarted()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("ShuttingDown Message Processor Scheduler : 
" + scheduler.getMetaData());
+                    }
+                    scheduler.standby();
+                }
+
+                state = State.STOP;
+            } catch (SchedulerException e) {
+                throw new SynapseException("Error ShuttingDown Message 
processor scheduler ", e);
+            }
+        }
+    }
+
+    public void setMessageStore(MessageStore messageStore) {
+        this.messageStore = messageStore;
+    }
+
+    public void setParameters(Map<String, Object> parameters) {
+        this.parameters = parameters;
+        if(parameters != null && !parameters.isEmpty()) {
+            Object o = 
parameters.get(MessageProcessorConsents.CRON_EXPRESSION);
+        if (o != null) {
+            cronExpression = o.toString();
+        }
+
+        o = parameters.get(MessageProcessorConsents.INTERVAL);
+        if (o != null) {
+            interval = Integer.parseInt(o.toString());
+        }
+
+
+        o = parameters.get(MessageProcessorConsents.QUARTZ_CONF);
+        if (o != null) {
+            quartzConfig = o.toString();
+        }
+
+        }
+    }
+
+    public Map<String, Object> getParameters() {
+        return parameters;
+    }
+
+    public boolean isStarted() {
+        return state == State.START;
+    }
+
+    public void init(SynapseEnvironment se) {
+        StdSchedulerFactory sf = new StdSchedulerFactory();
+        try {
+            if (quartzConfig != null && !"".equals(quartzConfig)) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Initiating a Scheduler with configuration : " + 
quartzConfig);
+                }
+
+                sf.initialize(quartzConfig);
+            }
+        } catch (SchedulerException e) {
+            throw new SynapseException("Error initiating scheduler factory "
+                    + sf + "with configuration loaded from " + quartzConfig, 
e);
+        }
+
+        try {
+            scheduler = sf.getScheduler();
+        } catch (SchedulerException e) {
+            throw new SynapseException("Error getting a  scheduler instance 
form scheduler" +
+                    " factory " + sf, e);
+        }
+
+        try {
+            scheduler.start();
+
+            state = State.INITIALIZED;
+        } catch (SchedulerException e) {
+            throw new SynapseException("Error starting the scheduler", e);
+        }
+    }
+
+    protected abstract JobDetail getJobDetail();
+
+    public void destroy() {
+        state = State.DESTROY;
+    }
+}

Added: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java?rev=1071875&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
 (added)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
 Fri Feb 18 04:01:47 2011
@@ -0,0 +1,28 @@
+/*
+*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights 
Reserved.
+*
+*  WSO2 Inc. licenses this file to you under the Apache License,
+*  Version 2.0 (the "License"); you may not use this file except
+*  in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.synapse.message.processors;
+
+public final class MessageProcessorConsents {
+
+    public static final String MESSAGE_STORE = "MESSAGE_STORE";
+    public static final String QUARTZ_CONF = "quartz.conf";
+    public static final String INTERVAL = "interval";
+    public static final String CRON_EXPRESSION = "cronExpression";
+    public static final String PARAMETERS = "parameters";
+
+}

Added: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DLCConstents.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DLCConstents.java?rev=1071875&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DLCConstents.java
 (added)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DLCConstents.java
 Fri Feb 18 04:01:47 2011
@@ -0,0 +1,52 @@
+/*
+*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights 
Reserved.
+*
+*  WSO2 Inc. licenses this file to you under the Apache License,
+*  Version 2.0 (the "License"); you may not use this file except
+*  in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.synapse.message.processors.dlc;
+
+/**
+ * class <code>DLCConstents</code> holds the constants that are used in the 
Dead Letter channel
+ */
+public final class DLCConstents {
+
+    /**
+     * Max number of redeliver attempts per message
+     */
+    public static final String MAX_REDELIVERY_COUNT = "redelivery.count";
+
+    /**
+     * Message context property that holds the name of the target endpoint to 
be replayed
+     */
+    public static final String REPLAY_ENDPOINT = "replay.endpoint";
+
+    /**
+     * Message context property that holds the name of the target sequence to 
be replayed
+     */
+    public static final String REPLAY_SEQUENCE = "replay.sequence";
+
+    /**
+     * Message context property that holds the name of the fault handler that 
must be set to
+     * the Message before replaying it
+     */
+    public static final String REPLAY_FAULT_HANDLER = "replay.fault.handler";
+
+    /**
+     *Message context property that holds number of redelivers for a given 
message
+     */
+    public static final String NO_OF_REDELIVERS = "number.of.redelivers";
+
+
+}

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java?rev=1071875&r1=1071874&r2=1071875&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
 Fri Feb 18 04:01:47 2011
@@ -21,6 +21,7 @@ package org.apache.synapse.message.proce
 import org.apache.synapse.Mediator;
 import org.apache.synapse.MessageContext;
 import org.apache.synapse.SynapseArtifact;
+import org.apache.synapse.SynapseException;
 import org.apache.synapse.endpoints.Endpoint;
 import org.apache.synapse.message.store.MessageStore;
 
@@ -31,9 +32,12 @@ public class DeadLetterChannelView imple
 
     private MessageStore messageStore;
 
+    private RedeliveryProcessor redeliveryProcessor;
+
 
     public DeadLetterChannelView(MessageStore messageStore) {
         this.messageStore = messageStore;
+        this.redeliveryProcessor = (RedeliveryProcessor) 
messageStore.getMessageProcessor();
     }
 
     public void resendAll() {
@@ -58,7 +62,7 @@ public class DeadLetterChannelView imple
        int size = messageStore.getSize();
        List<String> list = new ArrayList<String>();
         for(int i = 0; i < size ; i++) {
-            MessageContext messageContext = messageStore.unstore(0,0).get(0);
+            MessageContext messageContext = 
messageStore.getMessages(0,0).get(0);
             if(messageContext != null) {
                 list.add(messageContext.getMessageID());
             }
@@ -91,14 +95,14 @@ public class DeadLetterChannelView imple
     }
 
     private void redeliver(MessageContext messageContext) {
-        SynapseArtifact artifact = 
RedeliveryProcessor.getReplayTarget(messageContext);
+        SynapseArtifact artifact = 
RedeliveryJob.getReplayTarget(messageContext);
         if (artifact instanceof Endpoint) {
-            if (!RedeliveryProcessor.handleEndpointReplay((Endpoint) artifact,
+            if (!RedeliveryJob.handleEndpointReplay((Endpoint) artifact,
                     messageContext)) {
                 messageStore.store(messageContext);
             }
         } else if (artifact instanceof Mediator) {
-            if (!RedeliveryProcessor.handleSequenceReplay((Mediator) artifact,
+            if (!RedeliveryJob.handleSequenceReplay((Mediator) artifact,
                     messageContext)) {
                 messageStore.store(messageContext);
             }

Added: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java?rev=1071875&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
 (added)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
 Fri Feb 18 04:01:47 2011
@@ -0,0 +1,192 @@
+/*
+*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights 
Reserved.
+*
+*  WSO2 Inc. licenses this file to you under the Apache License,
+*  Version 2.0 (the "License"); you may not use this file except
+*  in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.synapse.message.processors.dlc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.FaultHandler;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseArtifact;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.mediators.MediatorFaultHandler;
+import org.apache.synapse.message.processors.MessageProcessorConsents;
+import org.apache.synapse.message.store.AbstractMessageStore;
+import org.apache.synapse.message.store.MessageStore;
+import org.quartz.Job;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Redelivery Job will replay all the Messages in the Message Store when 
executed
+ * Excluding ones that are already tried redelivering more than max number of 
tries
+ */
+public class RedeliveryJob implements Job {
+
+    private static final Log log = LogFactory.getLog(RedeliveryJob.class);
+
+    private MessageStore messageStore;
+    private Lock lock;
+    private int maxNumberOfRedelivers;
+
+    public void execute(JobExecutionContext jobExecutionContext) throws 
JobExecutionException {
+        JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
+
+        messageStore = (MessageStore) 
jdm.get(MessageProcessorConsents.MESSAGE_STORE);
+        lock = ((AbstractMessageStore) messageStore).getLock();
+        Map<String, Object> parameters = (Map<String, Object>) jdm.get(
+                MessageProcessorConsents.PARAMETERS);
+        maxNumberOfRedelivers = Integer.parseInt((String) 
parameters.get(DLCConstents.
+                MAX_REDELIVERY_COUNT));
+
+        /**
+         * We will keep the message store lock till the redelivery over
+         */
+        if (lock.tryLock()) {
+
+            try {
+                int size = messageStore.getSize();
+                for (int i = 0; i < size; i++) {
+                    MessageContext messageContext = messageStore.unstore(0, 
0).get(0);
+                    if (messageContext != null) {
+                        SynapseArtifact artifact = 
getReplayTarget(messageContext);
+
+                        if 
(messageContext.getProperty(DLCConstents.NO_OF_REDELIVERS) == null) {
+                            
messageContext.setProperty(DLCConstents.NO_OF_REDELIVERS, "0");
+                        }
+
+                        String numberS = (String) messageContext.getProperty(
+                                                                    
DLCConstents.NO_OF_REDELIVERS);
+                        int number = Integer.parseInt(numberS);
+
+                        if (number >= maxNumberOfRedelivers) {
+
+                            if (log.isDebugEnabled()) {
+                                log.debug("Maximum number of attempts tried 
for Message with ID " +
+                                        messageContext.getMessageID() +
+                                        "will be put back to the Message 
Store");
+
+                            }
+                            messageStore.store(messageContext);
+                            continue;
+                        }
+
+                        
messageContext.setProperty(DLCConstents.NO_OF_REDELIVERS, "" + (number + 1));
+
+
+                        if (artifact instanceof Endpoint) {
+                            if (!handleEndpointReplay((Endpoint) artifact, 
messageContext)) {
+                                messageStore.store(messageContext);
+                            }
+                        } else if (artifact instanceof Mediator) {
+                            if (!handleSequenceReplay((Mediator) artifact, 
messageContext)) {
+                                messageStore.store(messageContext);
+                            }
+                        } else {
+                            messageStore.store(messageContext);
+                        }
+
+                        if (log.isDebugEnabled()) {
+                            log.debug("Processed \n" + 
messageContext.getEnvelope());
+                        }
+
+                    }
+                }
+            } finally {
+                lock.unlock();
+            }
+
+        }
+
+
+    }
+
+    /**
+     * This will handle the Message replay to the endpoints
+     * @param endpoint   target endpoint to be redelivered
+     * @param messageContext message context of the message to be redelivered
+     * @return  true if success
+     */
+   static boolean handleEndpointReplay(Endpoint endpoint, MessageContext 
messageContext) {
+        setFaultHandler(messageContext);
+        if (endpoint != null && messageContext != null) {
+            return false;
+        } else if (endpoint.readyToSend()) {
+            endpoint.send(messageContext);
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * This will handle the Message replay to sequences
+     * @param mediator  target mediator to replay
+     * @param messageContext message context of the message to be replayed
+     * @return  true of success
+     */
+    static boolean handleSequenceReplay(Mediator mediator, MessageContext 
messageContext) {
+        setFaultHandler(messageContext);
+        mediator.mediate(messageContext);
+        return true;
+    }
+
+    /**
+     * Get the replay target from the message context
+     * @param context Message Context
+     * @return Endpoint or Mediator to be replayed
+     */
+   static SynapseArtifact getReplayTarget(MessageContext context) {
+        //Endpoint replay get priority
+        if (context.getProperty(DLCConstents.REPLAY_ENDPOINT) != null) {
+            String endpointName = (String) 
context.getProperty(DLCConstents.REPLAY_ENDPOINT);
+            return 
context.getConfiguration().getDefinedEndpoints().get(endpointName);
+        } else if (context.getProperty(DLCConstents.REPLAY_SEQUENCE) != null) {
+            String sequenceName = (String) 
context.getProperty(DLCConstents.REPLAY_SEQUENCE);
+
+            return context.getConfiguration().getSequence(sequenceName);
+        }
+
+        return null;
+    }
+
+    private static void setFaultHandler(MessageContext messageContext) {
+        String replayFaultHandler = (String)messageContext.getProperty(
+                DLCConstents.REPLAY_FAULT_HANDLER);
+        if(replayFaultHandler != null) {
+            if(messageContext.getEndpoint(replayFaultHandler) != null ) {
+                Endpoint ep = messageContext.getEndpoint(replayFaultHandler);
+                messageContext.pushFaultHandler((FaultHandler)ep);
+            } else if (messageContext.getSequence(replayFaultHandler) != null) 
{
+                Mediator mediator = 
messageContext.getSequence(replayFaultHandler);
+                MediatorFaultHandler faultHandler = new 
MediatorFaultHandler(mediator);
+                messageContext.pushFaultHandler(faultHandler);
+            } else {
+                log.warn("Error handler " + replayFaultHandler + " Not defined 
in the synapse " +
+                        "configuration");
+            }
+        }else {
+            log.warn("No fault handler defined for the replaying Message with 
id " +
+                    messageContext.getMessageID());
+        }
+    }
+}

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java?rev=1071875&r1=1071874&r2=1071875&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
 Fri Feb 18 04:01:47 2011
@@ -23,9 +23,11 @@ import org.apache.commons.logging.LogFac
 import org.apache.synapse.*;
 import org.apache.synapse.core.SynapseEnvironment;
 import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.message.processors.AbstractMessageProcessor;
 import org.apache.synapse.message.processors.MessageProcessor;
 import org.apache.synapse.message.store.MessageStore;
 import org.apache.synapse.securevault.commons.MBeanRegistrar;
+import org.quartz.JobDetail;
 
 import java.util.Map;
 
@@ -33,230 +35,12 @@ import java.util.Map;
  * Redelivery processor is the Message processor which implements the Dead 
letter channel EIP
  * It will Time to time Redeliver the Messages to a given target.
  */
-public class RedeliveryProcessor implements MessageProcessor {
-
-    private static final Log log = 
LogFactory.getLog(RedeliveryProcessor.class);
-
-    /**
-     * Associated MessageStore
-     */
-    private MessageStore messageStore;
-
-    private Map<String, Object> parameters;
-
-    /**
-     * Maximum number of redelivery's per message
-     */
-    private int maxRedeleveries = 0;
-
-    /**
-     * Delay between two consecutive redelivery attempts
-     */
-    private int redeliveryDelay = 2000;
-
-    /**
-     * enable/disable exponential backoff
-     */
-    private boolean exponentialBackoff = false;
-
-    /**
-     * the multiplier that will be used in the exponential backoff algorithm
-     */
-    private int backOffMultiplier = -1;
-
-
-    private DeadLetterChannelViewMBean dlcView;
-
-    private boolean started;
-
-    public static final String REDELIVERY_DELAY = "redelivery.delay";
-
-    public static final String MAX_REDELIVERY_COUNT = "redelivery.count";
-
-    public static final String EXPONENTIAL_BACKOFF = 
"redelivery.exponentialBackoff";
-
-    public static final String BACKOFF_MUTIPLIER = 
"redelivery.backoffMultiplier";
-
-
-    public static final String REPLAY_ENDPOINT = "replay.endpoint";
-
-    public static final String REPLAY_SEQUENCE = "replay.sequence";
-
-    public static final String NO_OF_REDELIVERIES = "number.of.redeliveries";
-
-    public void start() {
-        if (!started) {
-            Thread t = new Thread(new Worker());
-            t.start();
-        }
-    }
-
-    public void stop() {
-        started = false;
-    }
-
-    public void setMessageStore(MessageStore messageStore) {
-        this.messageStore = messageStore;
-        if(messageStore !=null) {
-            DeadLetterChannelView view = new 
DeadLetterChannelView(messageStore);
-            this.dlcView = view;
-            MBeanRegistrar.getInstance().registerMBean(view,"Dead Letter 
Channel",
-                    messageStore.getName());
-        }
-    }
-
-    public void setParameters(Map<String, Object> parameters) {
-        this.parameters = parameters;
-        if (parameters.containsKey(REDELIVERY_DELAY)) {
-            redeliveryDelay = Integer.parseInt((String) 
parameters.get(REDELIVERY_DELAY));
-        }
-
-        if (parameters.containsKey(MAX_REDELIVERY_COUNT)) {
-            maxRedeleveries = Integer.parseInt((String) 
parameters.get(MAX_REDELIVERY_COUNT));
-        }
-
-        if (parameters.containsKey(EXPONENTIAL_BACKOFF)) {
-            if ("true".equals(parameters.get(EXPONENTIAL_BACKOFF))) {
-                exponentialBackoff = true;
-            }
-        }
-
-        if (parameters.containsKey(BACKOFF_MUTIPLIER)) {
-            backOffMultiplier = Integer.parseInt((String) 
parameters.get(BACKOFF_MUTIPLIER));
-        }
-    }
-
-    public Map<String, Object> getParameters() {
-        return parameters;
-    }
-
-    public void init(SynapseEnvironment se) {
-
-    }
-
-    public void destroy() {
-
-    }
-
-
-    private class Worker implements Runnable {
-
-        public void run() {
-            while (started) {
-                try {
-                    synchronized (this) {
-                        int delay = redeliveryDelay;
-                        MessageContext messageContext;
-                        messageContext = messageStore.getMessages(0, 0).get(0);
-
-                        if (messageContext == null) {
-                            continue;
-                        }
-
-                        SynapseArtifact artifact = 
getReplayTarget(messageContext);
-                        messageStore.unstore(0, 0);
-                        if (messageContext.getProperty(NO_OF_REDELIVERIES) == 
null) {
-                            messageContext.setProperty(NO_OF_REDELIVERIES, 
"0");
-                            delay = redeliveryDelay;
-                        }
-
-                        String numberS = (String) 
messageContext.getProperty(NO_OF_REDELIVERIES);
-                        int number = Integer.parseInt(numberS);
-
-                        if (number >= maxRedeleveries) {
-
-                            if (log.isDebugEnabled()) {
-                                log.debug("Maximum number of attempts tried 
for Message with ID " +
-                                        messageContext.getMessageID() +
-                                        "will be put back to the Message 
Store");
-
-                            }
-                            messageStore.store(messageContext);
-                            continue;
-                        }
-
-                        messageContext.setProperty(NO_OF_REDELIVERIES, "" + 
(number + 1));
-
-                        if (exponentialBackoff && backOffMultiplier == -1) {
-                            delay = (number + 1) * redeliveryDelay;
-                        } else if (exponentialBackoff) {
-                            delay = (int) Math.pow(backOffMultiplier, number) 
* redeliveryDelay;
-                        }
-
-
-                        try {
-                            Thread.sleep(delay);
-                        } catch (InterruptedException ignored) {
-
-                        }
-
-                        if (artifact instanceof Endpoint) {
-                            if (!handleEndpointReplay((Endpoint) artifact, 
messageContext)) {
-                                messageStore.store(messageContext);
-                            }
-                        } else if (artifact instanceof Mediator) {
-                            if (!handleSequenceReplay((Mediator) artifact, 
messageContext)) {
-                                messageStore.store(messageContext);
-                            }
-                        } else {
-                            messageStore.store(messageContext);
-                        }
-
-                        if (log.isDebugEnabled()) {
-                            log.debug("sent \n" + 
messageContext.getEnvelope());
-                        }
-                    }
-                } catch (Throwable e) {
-                    log.warn("Error while Running Redelivery process " + 
e.getMessage());
-                }
-            }
-
-        }
-    }
-
-    public static SynapseArtifact getReplayTarget(MessageContext context) {
-        //Endpoint replay get priority
-        if (context.getProperty(REPLAY_ENDPOINT) != null) {
-            String endpointName = (String) 
context.getProperty(REPLAY_ENDPOINT);
-            return 
context.getConfiguration().getDefinedEndpoints().get(endpointName);
-        } else if (context.getProperty(REPLAY_SEQUENCE) != null) {
-            String sequenceName = (String) 
context.getProperty(REPLAY_SEQUENCE);
-
-            return context.getConfiguration().getSequence(sequenceName);
-        }
-
-        return null;
-    }
-
-
-    public static boolean handleEndpointReplay(Endpoint endpoint, 
MessageContext messageContext) {
-        if (endpoint.readyToSend()) {
-            endpoint.send(messageContext);
-            return true;
-        }
-
-        return false;
-    }
-
-
-    public static boolean handleSequenceReplay(Mediator mediator, 
MessageContext messageContext) {
-        mediator.mediate(messageContext);
-        return true;
-    }
-
-    /**
-     * Get the DLC related JMX API
-     * @return   instance of Dead letter channel jms api
-     */
-    public DeadLetterChannelViewMBean getDlcView() {
-        return dlcView;
-    }
-
-    /**
-     * Get the started status of the Message processor
-     * @return started status of message processor (true/false)
-     */
-    public boolean isStarted() {
-        return started;
+public class RedeliveryProcessor extends AbstractMessageProcessor{
+      @Override
+    protected JobDetail getJobDetail() {
+        JobDetail jobDetail = new JobDetail();
+        jobDetail.setName(messageStore.getName() + "- redelivery job");
+        jobDetail.setJobClass(RedeliveryJob.class);
+        return jobDetail;
     }
 }

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java?rev=1071875&r1=1071874&r2=1071875&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
 Fri Feb 18 04:01:47 2011
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.Mediator;
 import org.apache.synapse.MessageContext;
+import org.apache.synapse.message.processors.MessageProcessorConsents;
 import org.apache.synapse.message.store.MessageStore;
 import org.quartz.Job;
 import org.quartz.JobDataMap;
@@ -30,6 +31,7 @@ import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
 
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.Lock;
 
@@ -39,11 +41,13 @@ public class SamplingJob implements Job 
     public void execute(JobExecutionContext jobExecutionContext) throws 
JobExecutionException {
         JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
 
+        final MessageStore messageStore = (MessageStore) jdm.get(
+                                            
MessageProcessorConsents.MESSAGE_STORE);
+
+        Map<String ,Object> parameters = (Map<String, Object>) jdm.get(
+                MessageProcessorConsents.PARAMETERS);
         final Object concurrency = jdm.get(SamplingProcessor.CONCURRENCY);
-        final MessageStore messageStore = (MessageStore) 
jdm.get(SamplingProcessor.MESSAGE_STORE);
-        final ExecutorService executor = (ExecutorService) 
jdm.get(SamplingProcessor.EXECUTOR);
-        final String sequence = (String) jdm.get(SamplingProcessor.SEQUENCE);
-        final Lock lock = (Lock) jdm.get(SamplingProcessor.LOCK);
+        final String sequence = (String) 
parameters.get(SamplingProcessor.SEQUENCE);
 
         int conc = 1;
         if (concurrency instanceof Integer) {
@@ -59,6 +63,8 @@ public class SamplingJob implements Job 
                     final MessageContext messageContext = list.get(0);
                     if (messageContext != null) {
                         messageStore.unstore(0, 0);
+                        final ExecutorService executor = 
messageContext.getEnvironment().
+                                                            
getExecutorService();
                         executor.submit(new Runnable() {
                             public void run() {
                                 try {

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java?rev=1071875&r1=1071874&r2=1071875&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
 Fri Feb 18 04:01:47 2011
@@ -21,202 +21,20 @@ package org.apache.synapse.message.proce
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.synapse.SynapseException;
-import org.apache.synapse.core.SynapseEnvironment;
-import org.apache.synapse.message.processors.MessageProcessor;
-import org.apache.synapse.message.store.MessageStore;
-import org.quartz.*;
-import org.quartz.impl.StdSchedulerFactory;
+import org.apache.synapse.message.processors.AbstractMessageProcessor;
+import org.quartz.JobDetail;
 
-import java.text.ParseException;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-public class SamplingProcessor implements MessageProcessor {
+public class SamplingProcessor extends AbstractMessageProcessor{
     private Log log = LogFactory.getLog(SamplingProcessor.class);
 
-    public static final String LOCK = "lock";
-    public static final String EXECUTOR = "Executor";
-    public static final String MESSAGE_STORE = "MESSAGE_STORE";
-    public static final String QUARTZ_CONF = "quartz.conf";
-    public static final String INTERVAL = "interval";
-    public static final String CRON_EXPRESSION = "cronExpression";
     public static final String CONCURRENCY = "concurrency";
     public static final String SEQUENCE = "sequence";
 
-    private enum State {
-        INITIALIZED,
-        START,
-        STOP,
-        DESTROY
-    }
-
-    private Map<String, Object> parameters = null;
-
-    /** The quartz configuration file if specified as a parameter */
-    private String quartzConfig = null;
-
-    /** A cron expression to run the sampler */
-    private String cronExpression = null;
-
-    /** The interval at which this sampler runs */
-    private long interval = 1;
-
-    /** The scheduler, run the the sampler */
-    private Scheduler scheduler = null;
-
-    /** Weather sampler is initialized or not */
-    private State state = State.DESTROY;
-
-    /** The message store */
-    private MessageStore messageStore = null;
-
-    /** Concurrency at the sampler runs, if the concurrency is 2, 2 threads 
will
-     * be used to dispatch 2 messages, when sampler runs */
-    private int concurrency = 1;
-
-    /** An executor */
-    private ExecutorService executor = null;
-
-    /** A sequence to run when the sampler is executed */
-    private String sequence = null;
-
-    private Lock lock = new ReentrantLock();
-
-    /**
-     * Creates a Quartz Scheduler and schedule the message processing logic.
-     */
-    public void start() {
-        Trigger trigger;
-        if (cronExpression == null || "".equals(cronExpression)) {
-            trigger = 
TriggerUtils.makeImmediateTrigger(SimpleTrigger.REPEAT_INDEFINITELY, interval);
-        } else {
-            CronTrigger cronTrigger = new CronTrigger();
-            try {
-                cronTrigger.setCronExpression(cronExpression);
-                trigger = cronTrigger;
-            } catch (ParseException e) {
-                throw new SynapseException("Error setting cron expression : " +
-                        e.getMessage() + cronExpression, e);
-            }
-        }
-        trigger.setName(messageStore.getName() + "-trigger");
-
+    @Override
+    protected JobDetail getJobDetail() {
         JobDetail jobDetail = new JobDetail();
         jobDetail.setName(messageStore.getName() + "-job");
         jobDetail.setJobClass(SamplingJob.class);
-
-        JobDataMap jobDataMap = new JobDataMap();
-        jobDataMap.put(CONCURRENCY, concurrency);
-        jobDataMap.put(EXECUTOR, executor);
-        jobDataMap.put(MESSAGE_STORE, messageStore);
-        jobDataMap.put(SEQUENCE, sequence);
-        jobDataMap.put(LOCK, lock);
-
-        jobDetail.setJobDataMap(jobDataMap);
-
-        try {
-            scheduler.scheduleJob(jobDetail, trigger);
-        } catch (SchedulerException e) {
-            throw new SynapseException("Error scheduling job : " + jobDetail
-                    + " with trigger " + trigger);
-        }
-    }
-
-    public void stop() {
-        if (state == State.START) {
-            try {
-                if (scheduler != null && scheduler.isStarted()) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("ShuttingDown Sampling Scheduler : " + 
scheduler.getMetaData());
-                    }
-                    scheduler.standby();
-                }
-
-                state = State.STOP;
-            } catch (SchedulerException e) {
-                throw new SynapseException("Error ShuttingDown Sampling 
scheduler ", e);
-            }
-        }
-    }
-
-    public void setMessageStore(MessageStore messageStore) {
-        this.messageStore = messageStore;
-    }
-
-    public void setParameters(Map<String, Object> parameters) {
-        this.parameters = parameters;
-
-        Object o = parameters.get(CRON_EXPRESSION);
-        if (o != null) {
-            cronExpression = o.toString();
-        }
-
-        o = parameters.get(INTERVAL);
-        if (o != null) {
-            interval = Integer.parseInt(o.toString());
-        }
-
-        o = parameters.get(CONCURRENCY);
-        if (o != null) {
-            concurrency = Integer.parseInt(o.toString());
-        }
-
-        o = parameters.get(QUARTZ_CONF);
-        if (o != null) {
-            quartzConfig = o.toString();
-        }
-
-        o = parameters.get(SEQUENCE);
-        if (o != null) {
-            sequence = o.toString();
-        }
-    }
-
-    public Map<String, Object> getParameters() {
-        return parameters;
-    }
-
-    public boolean isStarted() {
-        return state == State.START;
-    }
-
-    public void init(SynapseEnvironment se) {
-        executor = se.getExecutorService();
-
-        StdSchedulerFactory sf = new StdSchedulerFactory();
-        try {
-            if (quartzConfig != null && !"".equals(quartzConfig)) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Initiating a Scheduler with configuration : " + 
quartzConfig);
-                }
-
-                sf.initialize(quartzConfig);
-            }
-        } catch (SchedulerException e) {
-            throw new SynapseException("Error initiating scheduler factory "
-                    + sf + "with configuration loaded from " + quartzConfig, 
e);
-        }
-
-        try {
-            scheduler = sf.getScheduler();
-        } catch (SchedulerException e) {
-            throw new SynapseException("Error getting a  scheduler instance 
form scheduler" +
-                    " factory " + sf, e);
-        }
-
-        try {
-            scheduler.start();
-
-            state = State.INITIALIZED;
-        } catch (SchedulerException e) {
-            throw new SynapseException("Error starting the scheduler", e);
-        }
-    }
-
-    public void destroy() {
-        state = State.DESTROY;
+        return jobDetail;
     }
 }

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java?rev=1071875&r1=1071874&r2=1071875&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
 Fri Feb 18 04:01:47 2011
@@ -27,6 +27,8 @@ import org.apache.synapse.core.SynapseEn
 import org.apache.synapse.message.processors.MessageProcessor;
 
 import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 public abstract class AbstractMessageStore implements MessageStore {
 
@@ -70,6 +72,10 @@ public abstract class AbstractMessageSto
      */
     protected String fileName;
 
+
+    protected Lock lock = new ReentrantLock();
+
+
     /**
      * Message processor instance associated with the MessageStore
      */
@@ -163,4 +169,8 @@ public abstract class AbstractMessageSto
     public String getFileName() {
         return this.fileName;
     }
+
+    public Lock getLock() {
+        return lock;
+    }
 }

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java?rev=1071875&r1=1071874&r2=1071875&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
 Fri Feb 18 04:01:47 2011
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.synapse.MessageContext;
 
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -35,7 +36,7 @@ public class InMemoryMessageStore extend
     private static final Log log = 
LogFactory.getLog(InMemoryMessageStore.class);
 
     /** The map that keeps the stored messages */
-    private Map<String, MessageContext> messageList = new HashMap<String, 
MessageContext>();
+    private Map<String, MessageContext> messageList = new 
ConcurrentHashMap<String, MessageContext>();
 
     private Lock lock = new ReentrantLock();
 
@@ -50,6 +51,14 @@ public class InMemoryMessageStore extend
                     log.debug("Message " + messageContext.getMessageID() +
                             " has been stored");
                 }
+
+                if(processor != null && !processor.isStarted()) {
+
+                    if(log.isDebugEnabled()) {
+                        log.debug("Starting Message processor " + 
processor.getClass().getName());
+                    }
+                    processor.start();
+                }
             }
         } finally {
             lock.unlock();
@@ -101,9 +110,11 @@ public class InMemoryMessageStore extend
         lock.lock();
         try {
             List<MessageContext> returnlist = new ArrayList<MessageContext>();
-            if (from <= to && (from <= messageList.size() && to <= 
messageList.size()) && messageList.size() > 0) {
+            if (from <= to && (from <= messageList.size() && to <= 
messageList.size()) &&
+                    messageList.size() > 0) {
 
-                String[] keys = messageList.keySet().toArray(new 
String[messageList.keySet().size()]);
+                String[] keys = messageList.keySet().toArray(
+                        new String[messageList.keySet().size()]);
 
                 for (int i = from; i <= to; i++) {
                     returnlist.add(messageList.remove(keys[i]));
@@ -119,8 +130,10 @@ public class InMemoryMessageStore extend
         lock.lock();
         try {
             List<MessageContext> returnList = new ArrayList<MessageContext>();
-            if (from <= to && (from <= messageList.size() && to <= 
messageList.size()) && messageList.size() > 0) {
-                String[] keys = messageList.keySet().toArray(new 
String[messageList.keySet().size()]);
+            if (from <= to && (from <= messageList.size() && to <= 
messageList.size()) &&
+                    messageList.size() > 0) {
+                String[] keys = messageList.keySet().toArray(
+                        new String[messageList.keySet().size()]);
 
                 for (int i = from; i <= to; i++) {
                     returnList.add(messageList.get(keys[i]));
@@ -157,22 +170,6 @@ public class InMemoryMessageStore extend
         return null;
     }
 
-    public List<MessageContext> getMessages(int maxNumberOfMessages) {
-        lock.lock();
-        try {
-            List<MessageContext> returnList = new ArrayList<MessageContext>();
-
-            Iterator<String> it = messageList.keySet().iterator();
-            while (it.hasNext() && maxNumberOfMessages > 0) {
-                returnList.add(messageList.get(it.next()));
-                maxNumberOfMessages--;
-            }
-
-            return returnList;
-        } finally {
-            lock.unlock();
-        }
-    }
 
     public int getSize() {
         lock.lock();


Reply via email to