Author: charith
Date: Sun Feb 20 05:54:59 2011
New Revision: 1072506

URL: http://svn.apache.org/viewvc?rev=1072506&view=rev
Log:
making Message processor as a top level element

Added:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/ScheduledRedeliveryProcessor.java
      - copied, changed from r1071899, 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
Removed:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
Modified:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java?rev=1072506&r1=1072505&r2=1072506&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
 Sun Feb 20 05:54:59 2011
@@ -22,12 +22,10 @@ package org.apache.synapse.message.proce
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.SynapseException;
+import org.apache.synapse.config.SynapseConfiguration;
 import org.apache.synapse.core.SynapseEnvironment;
 import org.apache.synapse.message.store.MessageStore;
-import org.quartz.*;
-import org.quartz.impl.StdSchedulerFactory;
 
-import java.text.ParseException;
 import java.util.Map;
 
 /**
@@ -40,16 +38,14 @@ public abstract class AbstractMessagePro
 
     protected Log log = LogFactory.getLog(this.getClass());
 
-    /** The scheduler, run the the processor */
-    protected Scheduler scheduler = null;
-
     /** Message Store associated with Message processor */
-    protected MessageStore messageStore;
+    protected String  messageStore;
 
-    /** The interval at which this processor runs , default value is 1000ms*/
-    protected long interval = 1000;
+    protected String description;
 
+    protected String name;
 
+    protected SynapseConfiguration configuration;
 
     protected enum State {
         INITIALIZED,
@@ -61,64 +57,17 @@ public abstract class AbstractMessagePro
     /**message store parameters */
     protected Map<String, Object> parameters = null;
 
-    /** The quartz configuration file if specified as a parameter */
-    protected String quartzConfig = null;
-
-    /** A cron expression to run the sampler */
-    protected String cronExpression = null;
 
     /** Keep the state of the message processor */
     private State state  = State.DESTROY;
 
 
-    public void start() {
-        Trigger trigger;
-        if (cronExpression == null || "".equals(cronExpression)) {
-            trigger = 
TriggerUtils.makeImmediateTrigger(SimpleTrigger.REPEAT_INDEFINITELY, interval);
-        } else {
-            CronTrigger cronTrigger = new CronTrigger();
-            try {
-                cronTrigger.setCronExpression(cronExpression);
-                trigger = cronTrigger;
-            } catch (ParseException e) {
-                throw new SynapseException("Error setting cron expression : " +
-                        e.getMessage() + cronExpression, e);
-            }
-        }
-        trigger.setName(messageStore.getName() + "-trigger");
-
-        JobDetail jobDetail = getJobDetail();
-        JobDataMap jobDataMap = new JobDataMap();
-        jobDataMap.put(MessageProcessorConsents.MESSAGE_STORE, messageStore);
-        jobDataMap.put(MessageProcessorConsents.PARAMETERS,parameters);
-        jobDetail.setJobDataMap(jobDataMap);
-
-        try {
-            scheduler.scheduleJob(jobDetail, trigger);
-        } catch (SchedulerException e) {
-            throw new SynapseException("Error scheduling job : " + jobDetail
-                    + " with trigger " + trigger);
-        }
+    public void init(SynapseEnvironment se) {
+        configuration = se.getSynapseConfiguration();
     }
 
-    public void stop() {
-        if (state == State.START) {
-            try {
-                if (scheduler != null && scheduler.isStarted()) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("ShuttingDown Message Processor Scheduler : 
" + scheduler.getMetaData());
-                    }
-                    scheduler.standby();
-                }
-
-                state = State.STOP;
-            } catch (SchedulerException e) {
-                throw new SynapseException("Error ShuttingDown Message 
processor scheduler ", e);
-            }
-        }
-    }
 
-    public void setMessageStore(MessageStore messageStore) {
+    public void setMessageStoreName(String  messageStore) {
         if (messageStore != null) {
             this.messageStore = messageStore;
         } else {
@@ -126,26 +75,12 @@ public abstract class AbstractMessagePro
         }
     }
 
+    public String getMessageStoreName() {
+        return messageStore;
+    }
+
     public void setParameters(Map<String, Object> parameters) {
         this.parameters = parameters;
-        if(parameters != null && !parameters.isEmpty()) {
-            Object o = 
parameters.get(MessageProcessorConsents.CRON_EXPRESSION);
-        if (o != null) {
-            cronExpression = o.toString();
-        }
-
-        o = parameters.get(MessageProcessorConsents.INTERVAL);
-        if (o != null) {
-            interval = Integer.parseInt(o.toString());
-        }
-
-
-        o = parameters.get(MessageProcessorConsents.QUARTZ_CONF);
-        if (o != null) {
-            quartzConfig = o.toString();
-        }
-
-        }
     }
 
     public Map<String, Object> getParameters() {
@@ -156,40 +91,20 @@ public abstract class AbstractMessagePro
         return state == State.START;
     }
 
-    public void init(SynapseEnvironment se) {
-        StdSchedulerFactory sf = new StdSchedulerFactory();
-        try {
-            if (quartzConfig != null && !"".equals(quartzConfig)) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Initiating a Scheduler with configuration : " + 
quartzConfig);
-                }
-
-                sf.initialize(quartzConfig);
-            }
-        } catch (SchedulerException e) {
-            throw new SynapseException("Error initiating scheduler factory "
-                    + sf + "with configuration loaded from " + quartzConfig, 
e);
-        }
+    public String getName() {
+        return name;
+    }
 
-        try {
-            scheduler = sf.getScheduler();
-        } catch (SchedulerException e) {
-            throw new SynapseException("Error getting a  scheduler instance 
form scheduler" +
-                    " factory " + sf, e);
-        }
+    public void setName(String name) {
+        this.name = name;
+    }
 
-        try {
-            scheduler.start();
 
-            state = State.INITIALIZED;
-        } catch (SchedulerException e) {
-            throw new SynapseException("Error starting the scheduler", e);
-        }
+    public void setDescription(String description) {
+        this.description=description;
     }
 
-    protected abstract JobDetail getJobDetail();
-
-    public void destroy() {
-        state = State.DESTROY;
+    public String getDescription() {
+        return description;
     }
 }

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java?rev=1072506&r1=1072505&r2=1072506&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
 Sun Feb 20 05:54:59 2011
@@ -17,9 +17,7 @@
 */
 package org.apache.synapse.message.processors;
 
-import org.apache.synapse.ManagedLifecycle;
-import org.apache.synapse.Mediator;
-import org.apache.synapse.MessageContext;
+import org.apache.synapse.*;
 import org.apache.synapse.message.store.MessageStore;
 
 import java.util.HashMap;
@@ -31,7 +29,7 @@ import java.util.Map;
  *Message processing logic and process will depend on the
  *concrete implementation of the MessageStore
  */
-public interface MessageProcessor extends ManagedLifecycle {
+public interface MessageProcessor extends ManagedLifecycle , Nameable , 
SynapseArtifact{
 
     /**
      * Start Message Processor
@@ -44,10 +42,16 @@ public interface MessageProcessor extend
     public void stop();
 
     /**
-     * Set the Message Store that backs the Message processor
-     * @param messageStore the underlying MessageStore instance
+     * Set the Message Store name that backs the Message processor
+     * @param messageStore name the underlying MessageStore instance
      */
-    public void setMessageStore(MessageStore messageStore);
+    public void setMessageStoreName(String  messageStore);
+
+    /**
+     * Get message store name associated with the Message processor
+     * @return  message store name associated with message processor
+     */
+    public String getMessageStoreName();
 
     /**
      * Set the Message processor parameters that will be used by the specific 
implementation

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java?rev=1072506&r1=1072505&r2=1072506&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
 Sun Feb 20 05:54:59 2011
@@ -21,9 +21,10 @@ package org.apache.synapse.message.proce
 public final class MessageProcessorConsents {
 
     public static final String MESSAGE_STORE = "MESSAGE_STORE";
+    public static final String PARAMETERS = "parameters";
+
     public static final String QUARTZ_CONF = "quartz.conf";
     public static final String INTERVAL = "interval";
     public static final String CRON_EXPRESSION = "cronExpression";
-    public static final String PARAMETERS = "parameters";
 
 }

Added: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java?rev=1072506&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
 (added)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
 Sun Feb 20 05:54:59 2011
@@ -0,0 +1,183 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.message.processors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.quartz.*;
+import org.quartz.impl.StdSchedulerFactory;
+
+import java.text.ParseException;
+import java.util.Map;
+
+public abstract class ScheduledMessageProcessor extends 
AbstractMessageProcessor {
+
+
+    protected Log log = LogFactory.getLog(this.getClass());
+
+    /**
+     * The scheduler, run the the processor
+     */
+    protected Scheduler scheduler = null;
+
+    /**
+     * The interval at which this processor runs , default value is 1000ms
+     */
+    protected long interval = 1000;
+
+
+    protected enum State {
+        INITIALIZED,
+        START,
+        STOP,
+        DESTROY
+    }
+
+    /**
+     * message store parameters
+     */
+    protected Map<String, Object> parameters = null;
+
+    /**
+     * The quartz configuration file if specified as a parameter
+     */
+    protected String quartzConfig = null;
+
+    /**
+     * A cron expression to run the sampler
+     */
+    protected String cronExpression = null;
+
+    /**
+     * Keep the state of the message processor
+     */
+    private State state = State.DESTROY;
+
+
+    public void start() {
+        Trigger trigger;
+        if (cronExpression == null || "".equals(cronExpression)) {
+            trigger = 
TriggerUtils.makeImmediateTrigger(SimpleTrigger.REPEAT_INDEFINITELY, interval);
+        } else {
+            CronTrigger cronTrigger = new CronTrigger();
+            try {
+                cronTrigger.setCronExpression(cronExpression);
+                trigger = cronTrigger;
+            } catch (ParseException e) {
+                throw new SynapseException("Error setting cron expression : " +
+                        e.getMessage() + cronExpression, e);
+            }
+        }
+        trigger.setName(messageStore + "-trigger");
+
+        JobDetail jobDetail = getJobDetail();
+        JobDataMap jobDataMap = new JobDataMap();
+        jobDataMap.put(MessageProcessorConsents.MESSAGE_STORE, messageStore);
+        jobDataMap.put(MessageProcessorConsents.PARAMETERS, parameters);
+        jobDetail.setJobDataMap(jobDataMap);
+
+        try {
+            scheduler.scheduleJob(jobDetail, trigger);
+        } catch (SchedulerException e) {
+            throw new SynapseException("Error scheduling job : " + jobDetail
+                    + " with trigger " + trigger);
+        }
+    }
+
+    public void stop() {
+        if (state == State.START) {
+            try {
+                if (scheduler != null && scheduler.isStarted()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("ShuttingDown Message Processor Scheduler : 
" + scheduler.getMetaData());
+                    }
+                    scheduler.standby();
+                }
+
+                state = State.STOP;
+            } catch (SchedulerException e) {
+                throw new SynapseException("Error ShuttingDown Message 
processor scheduler ", e);
+            }
+        }
+    }
+
+
+    public void setParameters(Map<String, Object> parameters) {
+        super.setParameters(parameters);
+        if (parameters != null && !parameters.isEmpty()) {
+            Object o = 
parameters.get(MessageProcessorConsents.CRON_EXPRESSION);
+            if (o != null) {
+                cronExpression = o.toString();
+            }
+
+            o = parameters.get(MessageProcessorConsents.INTERVAL);
+            if (o != null) {
+                interval = Integer.parseInt(o.toString());
+            }
+
+
+            o = parameters.get(MessageProcessorConsents.QUARTZ_CONF);
+            if (o != null) {
+                quartzConfig = o.toString();
+            }
+
+        }
+    }
+
+    public void init(SynapseEnvironment se) {
+        super.init(se);
+        StdSchedulerFactory sf = new StdSchedulerFactory();
+        try {
+            if (quartzConfig != null && !"".equals(quartzConfig)) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Initiating a Scheduler with configuration : " + 
quartzConfig);
+                }
+
+                sf.initialize(quartzConfig);
+            }
+        } catch (SchedulerException e) {
+            throw new SynapseException("Error initiating scheduler factory "
+                    + sf + "with configuration loaded from " + quartzConfig, 
e);
+        }
+
+        try {
+            scheduler = sf.getScheduler();
+        } catch (SchedulerException e) {
+            throw new SynapseException("Error getting a  scheduler instance 
form scheduler" +
+                    " factory " + sf, e);
+        }
+
+        try {
+            scheduler.start();
+
+            state = State.INITIALIZED;
+        } catch (SchedulerException e) {
+            throw new SynapseException("Error starting the scheduler", e);
+        }
+    }
+
+    protected abstract JobDetail getJobDetail();
+
+    public void destroy() {
+        state = State.DESTROY;
+    }
+
+}

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java?rev=1072506&r1=1072505&r2=1072506&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
 Sun Feb 20 05:54:59 2011
@@ -33,19 +33,18 @@ public class DeadLetterChannelView imple
 
     private MessageStore messageStore;
 
-    private RedeliveryProcessor redeliveryProcessor;
 
 
     public DeadLetterChannelView(MessageStore messageStore) {
         this.messageStore = messageStore;
-        this.redeliveryProcessor = (RedeliveryProcessor) 
messageStore.getMessageProcessor();
+
     }
 
     public void resendAll() {
         if (((AbstractMessageStore) messageStore).getLock().tryLock()) {
-            int size = messageStore.getSize();
+            int size = messageStore.size();
             for (int i = 0; i < size; i++) {
-                MessageContext messageContext = messageStore.unstore(0, 
0).get(0);
+                MessageContext messageContext = messageStore.poll();
                 if (messageContext != null) {
                     redeliver(messageContext);
                 }
@@ -58,9 +57,9 @@ public class DeadLetterChannelView imple
 
     public void deleteAll() {
         if (((AbstractMessageStore) messageStore).getLock().tryLock()) {
-            int size = messageStore.getSize();
+            int size = messageStore.size();
             for (int i = 0; i < size; i++) {
-                messageStore.unstore(0, 0);
+                messageStore.poll();
             }
         } else {
             throw new SynapseException("Error Message store being used re try 
later");
@@ -68,10 +67,10 @@ public class DeadLetterChannelView imple
     }
 
     public List<String> getMessageIds() {
-        int size = messageStore.getSize();
+        int size = messageStore.size();
         List<String> list = new ArrayList<String>();
         for (int i = 0; i < size; i++) {
-            MessageContext messageContext = messageStore.getMessages(0, 
0).get(0);
+            MessageContext messageContext = messageStore.peek();
             if (messageContext != null) {
                 list.add(messageContext.getMessageID());
             }
@@ -81,7 +80,7 @@ public class DeadLetterChannelView imple
 
     public void resend(String messageID) {
         if (((AbstractMessageStore) messageStore).getLock().tryLock()) {
-            MessageContext messageContext = messageStore.unstore(messageID);
+            MessageContext messageContext = messageStore.remove(messageID);
             if (messageContext != null) {
                 redeliver(messageContext);
             }
@@ -92,12 +91,12 @@ public class DeadLetterChannelView imple
 
     public void delete(String messageID) {
         if (((AbstractMessageStore) messageStore).getLock().tryLock()) {
-            messageStore.unstore(messageID);
+            messageStore.remove(messageID);
         }
     }
 
     public String getEnvelope(String messageID) {
-        MessageContext messageContext = messageStore.getMessage(messageID);
+        MessageContext messageContext = messageStore.get(messageID);
         if (messageContext != null) {
             return messageContext.getEnvelope().toString();
         }
@@ -106,7 +105,7 @@ public class DeadLetterChannelView imple
     }
 
     public int getSize() {
-        return messageStore.getSize();
+        return messageStore.size();
     }
 
     private void redeliver(MessageContext messageContext) {
@@ -114,15 +113,15 @@ public class DeadLetterChannelView imple
         if (artifact instanceof Endpoint) {
             if (!RedeliveryJob.handleEndpointReplay((Endpoint) artifact,
                     messageContext)) {
-                messageStore.store(messageContext);
+                messageStore.offer(messageContext);
             }
         } else if (artifact instanceof Mediator) {
             if (!RedeliveryJob.handleSequenceReplay((Mediator) artifact,
                     messageContext)) {
-                messageStore.store(messageContext);
+                messageStore.offer(messageContext);
             }
         } else {
-            messageStore.store(messageContext);
+            messageStore.offer(messageContext);
         }
     }
 }

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java?rev=1072506&r1=1072505&r2=1072506&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
 Sun Feb 20 05:54:59 2011
@@ -69,14 +69,14 @@ public class RedeliveryJob implements Jo
         }
 
         /**
-         * We will keep the message store lock till the redelivery over
+         * We will keep the message store lock till the redelivery is over
          */
         if (lock.tryLock()) {
 
             try {
-                int size = messageStore.getSize();
+                int size = messageStore.size();
                 for (int i = 0; i < size; i++) {
-                    MessageContext messageContext = messageStore.unstore(0, 
0).get(0);
+                    MessageContext messageContext = messageStore.poll();
                     if (messageContext != null) {
                         SynapseArtifact artifact = 
getReplayTarget(messageContext);
 
@@ -96,7 +96,7 @@ public class RedeliveryJob implements Jo
                                         "will be put back to the Message 
Store");
 
                             }
-                            messageStore.store(messageContext);
+                            messageStore.offer(messageContext);
                             continue;
                         }
 
@@ -105,14 +105,14 @@ public class RedeliveryJob implements Jo
 
                         if (artifact instanceof Endpoint) {
                             if (!handleEndpointReplay((Endpoint) artifact, 
messageContext)) {
-                                messageStore.store(messageContext);
+                                messageStore.offer(messageContext);
                             }
                         } else if (artifact instanceof Mediator) {
                             if (!handleSequenceReplay((Mediator) artifact, 
messageContext)) {
-                                messageStore.store(messageContext);
+                                messageStore.offer(messageContext);
                             }
                         } else {
-                            messageStore.store(messageContext);
+                            messageStore.offer(messageContext);
                         }
 
                         if (log.isDebugEnabled()) {

Copied: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/ScheduledRedeliveryProcessor.java
 (from r1071899, 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java)
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/ScheduledRedeliveryProcessor.java?p2=synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/ScheduledRedeliveryProcessor.java&p1=synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java&r1=1071899&r2=1072506&rev=1072506&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/ScheduledRedeliveryProcessor.java
 Sun Feb 20 05:54:59 2011
@@ -18,7 +18,9 @@
  */
 package org.apache.synapse.message.processors.dlc;
 
+import org.apache.synapse.core.SynapseEnvironment;
 import org.apache.synapse.message.processors.AbstractMessageProcessor;
+import org.apache.synapse.message.processors.ScheduledMessageProcessor;
 import org.apache.synapse.message.store.MessageStore;
 import org.quartz.JobDetail;
 
@@ -26,23 +28,23 @@ import org.quartz.JobDetail;
  * Redelivery processor is the Message processor which implements the Dead 
letter channel EIP
  * It will Time to time Redeliver the Messages to a given target.
  */
-public class RedeliveryProcessor extends AbstractMessageProcessor{
+public class ScheduledRedeliveryProcessor extends ScheduledMessageProcessor{
 
     /**Dead Letter channel JMX API*/
     private DeadLetterChannelView dlcView;
 
     @Override
-    public void setMessageStore(MessageStore messageStore) {
-        super.setMessageStore(messageStore);
-        dlcView = new DeadLetterChannelView(messageStore);
+    public void init(SynapseEnvironment se) {
+        super.init(se);
+        dlcView = new 
DeadLetterChannelView(configuration.getMessageStore(messageStore));
         
org.apache.synapse.commons.jmx.MBeanRegistrar.getInstance().registerMBean(dlcView,
-                "Dead Letter Channel", messageStore.getName());
+                "Dead Letter Channel", messageStore);
     }
 
     @Override
     protected JobDetail getJobDetail() {
         JobDetail jobDetail = new JobDetail();
-        jobDetail.setName(messageStore.getName() + "- redelivery job");
+        jobDetail.setName(messageStore + "- redelivery job");
         jobDetail.setJobClass(RedeliveryJob.class);
         return jobDetail;
     }

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java?rev=1072506&r1=1072505&r2=1072506&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
 Sun Feb 20 05:54:59 2011
@@ -42,9 +42,9 @@ public class SamplingJob implements Job 
         JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
 
         final MessageStore messageStore = (MessageStore) jdm.get(
-                                            
MessageProcessorConsents.MESSAGE_STORE);
+                MessageProcessorConsents.MESSAGE_STORE);
 
-        Map<String ,Object> parameters = (Map<String, Object>) jdm.get(
+        Map<String, Object> parameters = (Map<String, Object>) jdm.get(
                 MessageProcessorConsents.PARAMETERS);
         final Object concurrency = jdm.get(SamplingProcessor.CONCURRENCY);
         final String sequence = (String) 
parameters.get(SamplingProcessor.SEQUENCE);
@@ -56,30 +56,28 @@ public class SamplingJob implements Job 
 
         for (int i = 0; i < conc; i++) {
             //lock.lock();
-            synchronized (messageStore){
-                List<MessageContext> list = messageStore.getMessages(0, 0);
+            synchronized (messageStore) {
+                final MessageContext messageContext = messageStore.peek();
 
-                if (list != null && list.size() == 1) {
-                    final MessageContext messageContext = list.get(0);
-                    if (messageContext != null) {
-                        messageStore.unstore(0, 0);
-                        final ExecutorService executor = 
messageContext.getEnvironment().
-                                                            
getExecutorService();
-                        executor.submit(new Runnable() {
-                            public void run() {
-                                try {
-                                    Mediator processingSequence = 
messageContext.getSequence(sequence);
-                                    if (processingSequence != null) {
-                                        
processingSequence.mediate(messageContext);
-                                    }
-                                } catch (Throwable t) {
-                                    log.error("Error occurred while executing 
the message", t);
+                if (messageContext != null) {
+                    messageStore.poll();
+                    final ExecutorService executor = 
messageContext.getEnvironment().
+                            getExecutorService();
+                    executor.submit(new Runnable() {
+                        public void run() {
+                            try {
+                                Mediator processingSequence = 
messageContext.getSequence(sequence);
+                                if (processingSequence != null) {
+                                    processingSequence.mediate(messageContext);
                                 }
+                            } catch (Throwable t) {
+                                log.error("Error occurred while executing the 
message", t);
                             }
-                        });
-                    }
+                        }
+                    });
                 }
             }
         }
     }
+
 }

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java?rev=1072506&r1=1072505&r2=1072506&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
 Sun Feb 20 05:54:59 2011
@@ -22,9 +22,10 @@ package org.apache.synapse.message.proce
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.message.processors.AbstractMessageProcessor;
+import org.apache.synapse.message.processors.ScheduledMessageProcessor;
 import org.quartz.JobDetail;
 
-public class SamplingProcessor extends AbstractMessageProcessor{
+public class SamplingProcessor extends ScheduledMessageProcessor{
     private Log log = LogFactory.getLog(SamplingProcessor.class);
 
     public static final String CONCURRENCY = "concurrency";
@@ -33,7 +34,7 @@ public class SamplingProcessor extends A
     @Override
     protected JobDetail getJobDetail() {
         JobDetail jobDetail = new JobDetail();
-        jobDetail.setName(messageStore.getName() + "-job");
+        jobDetail.setName(messageStore + "-job");
         jobDetail.setJobClass(SamplingJob.class);
         return jobDetail;
     }


Reply via email to