Author: supun
Date: Tue Feb 15 11:47:02 2011
New Revision: 1070852

URL: http://svn.apache.org/viewvc?rev=1070852&view=rev
Log:
improving the initialization logic and some improvements to the sampling 
processor

Modified:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java?rev=1070852&r1=1070851&r2=1070852&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
 Tue Feb 15 11:47:02 2011
@@ -17,6 +17,7 @@
 */
 package org.apache.synapse.message.processors;
 
+import org.apache.synapse.ManagedLifecycle;
 import org.apache.synapse.Mediator;
 import org.apache.synapse.MessageContext;
 import org.apache.synapse.message.store.MessageStore;
@@ -30,7 +31,7 @@ import java.util.Map;
  *Message processing logic and process will depend on the
  *concrete implementation of the MessageStore
  */
-public interface MessageProcessor {
+public interface MessageProcessor extends ManagedLifecycle {
 
     /**
      * Start Message Processor
@@ -49,42 +50,6 @@ public interface MessageProcessor {
     public void setMessageStore(MessageStore messageStore);
 
     /**
-     * Get the Message store that backs the Message processor
-     * @return   the underlying MessageStore instance
-     */
-    public MessageStore getMessageStore();
-
-    /**
-     * Set the Mediator/Sequence to be invoked just before processing a Message
-     * This Mediator or sequence will be invoked just before processing the 
Message
-     * @param mediator   Mediator/sequence instance that will invoked just 
before
-     * processing a Message
-     */
-    public void setOnProcessSequence(Mediator mediator);
-
-
-    /**
-     * Get the On process Mediator or sequence
-     * @return Mediator/sequence instance that will invoked just before 
processing a Message
-     */
-    public Mediator getOnProcessSequence();
-
-    /**
-     * This sequence/Mediator will be invoked when a Message is submitted to 
the MessageProcessor
-     * @param mediator Mediator/sequence instance that will invoked when a 
Message
-     * is submitted to the Processor
-     */
-    public void setOnSubmitSequence(Mediator mediator);
-
-    /**
-     * Get the OnSubmit Sequence which get invoked when a Message is submitted 
to
-     * the MessageProcessor
-     * @return mediator Mediator/sequence instance that will invoked when a 
Message
-     * is submitted to the Processor
-     */
-    public Mediator getOnSubmitSequence();
-
-    /**
      * Set the Message processor parameters that will be used by the specific 
implementation
      * @param parameters
      */

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java?rev=1070852&r1=1070851&r2=1070852&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
 Tue Feb 15 11:47:02 2011
@@ -21,6 +21,7 @@ package org.apache.synapse.message.proce
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.*;
+import org.apache.synapse.core.SynapseEnvironment;
 import org.apache.synapse.endpoints.Endpoint;
 import org.apache.synapse.message.processors.MessageProcessor;
 import org.apache.synapse.message.store.MessageStore;
@@ -41,12 +42,6 @@ public class RedeliveryProcessor impleme
      */
     private MessageStore messageStore;
 
-
-    private Mediator onProcessMediator;
-
-
-    private Mediator onSubmitMediator;
-
     private Map<String, Object> parameters;
 
     /**
@@ -110,26 +105,6 @@ public class RedeliveryProcessor impleme
         }
     }
 
-    public MessageStore getMessageStore() {
-        return this.messageStore;
-    }
-
-    public void setOnProcessSequence(Mediator mediator) {
-        this.onProcessMediator = mediator;
-    }
-
-    public Mediator getOnProcessSequence() {
-        return this.onProcessMediator;
-    }
-
-    public void setOnSubmitSequence(Mediator mediator) {
-        this.onSubmitMediator = mediator;
-    }
-
-    public Mediator getOnSubmitSequence() {
-        return this.onSubmitMediator;
-    }
-
     public void setParameters(Map<String, Object> parameters) {
         this.parameters = parameters;
         if (parameters.containsKey(REDELIVERY_DELAY)) {
@@ -155,17 +130,22 @@ public class RedeliveryProcessor impleme
         return parameters;
     }
 
+    public void init(SynapseEnvironment se) {
+
+    }
+
+    public void destroy() {
+
+    }
+
 
     private class Worker implements Runnable {
 
         public void run() {
             while (started) {
-
-
                 try {
                     synchronized (this) {
                         int delay = redeliveryDelay;
-
                         MessageContext messageContext;
                         messageContext = messageStore.getMessages(0, 0).get(0);
 
@@ -197,13 +177,10 @@ public class RedeliveryProcessor impleme
 
                         messageContext.setProperty(NO_OF_REDELIVERIES, "" + 
(number + 1));
 
-
                         if (exponentialBackoff && backOffMultiplier == -1) {
                             delay = (number + 1) * redeliveryDelay;
-
                         } else if (exponentialBackoff) {
                             delay = (int) Math.pow(backOffMultiplier, number) 
* redeliveryDelay;
-
                         }
 
 
@@ -228,13 +205,10 @@ public class RedeliveryProcessor impleme
                         if (log.isDebugEnabled()) {
                             log.debug("sent \n" + 
messageContext.getEnvelope());
                         }
-
-
                     }
                 } catch (Throwable e) {
-                    log.warn("Error while Running Redelivery process " 
+e.getMessage());
+                    log.warn("Error while Running Redelivery process " + 
e.getMessage());
                 }
-
             }
 
         }

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java?rev=1070852&r1=1070851&r2=1070852&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
 Tue Feb 15 11:47:02 2011
@@ -21,9 +21,6 @@ package org.apache.synapse.message.proce
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.synapse.ManagedLifecycle;
-import org.apache.synapse.Mediator;
-import org.apache.synapse.MessageContext;
 import org.apache.synapse.SynapseException;
 import org.apache.synapse.core.SynapseEnvironment;
 import org.apache.synapse.message.processors.MessageProcessor;
@@ -35,42 +32,54 @@ import java.text.ParseException;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
-public class SamplingProcessor implements MessageProcessor, ManagedLifecycle{
-    public static final String EXECUTOR = "Executor";
-    public static final String MESSAGE_STORE = "MESSAGE_STORE";
+public class SamplingProcessor implements MessageProcessor {
     private Log log = LogFactory.getLog(SamplingProcessor.class);
 
-    private final String QUARTZ_CONF = "quartz.conf";
-
+    public static final String EXECUTOR = "Executor";
+    public static final String MESSAGE_STORE = "MESSAGE_STORE";
+    public static final String QUARTZ_CONF = "quartz.conf";
     public static final String INTERVAL = "interval";
-
     public static final String CRON_EXPRESSION = "cronExpression";
-
     public static final String CONCURRENCY = "concurrency";
-
     public static final String SEQUENCE = "sequence";
 
+    private enum State {
+        INITIALIZED,
+        START,
+        STOP,
+        DESTROY
+    }
+
+    private Map<String, Object> parameters = null;
+
+    /** The quartz configuration file if specified as a parameter */
+    private String quartzConfig = null;
+
+    /** A cron expression to run the sampler */
     private String cronExpression = null;
 
+    /** The interval at which this sampler runs */
     private long interval = 1;
 
-    private String quartzConf = null;
-
+    /** The scheduler, run the the sampler */
     private Scheduler scheduler = null;
 
-    private boolean initialized = false;
+    /** Weather sampler is initialized or not */
+    private State state = State.DESTROY;
 
+    /** The message store */
     private MessageStore messageStore = null;
 
-    private Mediator onProcessSequence = null;
-
-    private Mediator onSubmitSequence = null;
-
+    /** Concurrency at the sampler runs, if the concurrency is 2, 2 threads 
will
+     * be used to dispatch 2 messages, when sampler runs */
     private int concurrency = 1;
 
+    /** An executor */
     private ExecutorService executor = null;
 
+    /** A sequence to run when the sampler is executed */
     private String sequence = null;
+
     /**
      * Creates a Quartz Scheduler and schedule the message processing logic.
      */
@@ -102,34 +111,8 @@ public class SamplingProcessor implement
 
         jobDetail.setJobDataMap(jobDataMap);
 
-        StdSchedulerFactory sf = new StdSchedulerFactory();
         try {
-            if (quartzConf != null && !"".equals(quartzConf)) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Initiating a Scheduler with configuration : " + 
quartzConf);
-                }
-
-                sf.initialize(quartzConf);
-            }
-        } catch (SchedulerException e) {
-            throw new SynapseException("Error initiating scheduler factory "
-                    + sf + "with configuration loaded from " + quartzConf, e);
-        }
-
-
-        try {
-            scheduler = sf.getScheduler();
-        } catch (SchedulerException e) {
-            throw new SynapseException("Error getting a  scheduler instance 
form scheduler" +
-                    " factory " + sf, e);
-        }
-
-        try {
-            scheduler.start();
-
             scheduler.scheduleJob(jobDetail, trigger);
-
-            initialized = true;
         } catch (SchedulerException e) {
             throw new SynapseException("Error scheduling job : " + jobDetail
                     + " with trigger " + trigger);
@@ -137,15 +120,16 @@ public class SamplingProcessor implement
     }
 
     public void stop() {
-        if (initialized) {
+        if (state == State.START) {
             try {
                 if (scheduler != null && scheduler.isStarted()) {
                     if (log.isDebugEnabled()) {
                         log.debug("ShuttingDown Sampling Scheduler : " + 
scheduler.getMetaData());
                     }
-                    scheduler.shutdown();
+                    scheduler.standby();
                 }
-                initialized = false;
+
+                state = State.STOP;
             } catch (SchedulerException e) {
                 throw new SynapseException("Error ShuttingDown Sampling 
scheduler ", e);
             }
@@ -156,27 +140,9 @@ public class SamplingProcessor implement
         this.messageStore = messageStore;
     }
 
-    public MessageStore getMessageStore() {
-        return messageStore;
-    }
-
-    public void setOnProcessSequence(Mediator mediator) {
-        this.onProcessSequence = mediator;
-    }
-
-    public Mediator getOnProcessSequence() {
-        return onProcessSequence;
-    }
-
-    public void setOnSubmitSequence(Mediator mediator) {
-        this.onSubmitSequence = mediator;
-    }
-
-    public Mediator getOnSubmitSequence() {
-        return onSubmitSequence;
-    }
-
     public void setParameters(Map<String, Object> parameters) {
+        this.parameters = parameters;
+
         Object o = parameters.get(CRON_EXPRESSION);
         if (o != null) {
             cronExpression = o.toString();
@@ -194,7 +160,7 @@ public class SamplingProcessor implement
 
         o = parameters.get(QUARTZ_CONF);
         if (o != null) {
-            quartzConf = o.toString();
+            quartzConfig = o.toString();
         }
 
         o = parameters.get(SEQUENCE);
@@ -204,18 +170,47 @@ public class SamplingProcessor implement
     }
 
     public Map<String, Object> getParameters() {
-        return null;
+        return parameters;
     }
 
     public boolean isStarted() {
-        return initialized;
+        return state == State.START;
     }
 
     public void init(SynapseEnvironment se) {
         executor = se.getExecutorService();
+
+        StdSchedulerFactory sf = new StdSchedulerFactory();
+        try {
+            if (quartzConfig != null && !"".equals(quartzConfig)) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Initiating a Scheduler with configuration : " + 
quartzConfig);
+                }
+
+                sf.initialize(quartzConfig);
+            }
+        } catch (SchedulerException e) {
+            throw new SynapseException("Error initiating scheduler factory "
+                    + sf + "with configuration loaded from " + quartzConfig, 
e);
+        }
+
+        try {
+            scheduler = sf.getScheduler();
+        } catch (SchedulerException e) {
+            throw new SynapseException("Error getting a  scheduler instance 
form scheduler" +
+                    " factory " + sf, e);
+        }
+
+        try {
+            scheduler.start();
+
+            state = State.INITIALIZED;
+        } catch (SchedulerException e) {
+            throw new SynapseException("Error starting the scheduler", e);
+        }
     }
 
     public void destroy() {
-
+        state = State.DESTROY;
     }
 }

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java?rev=1070852&r1=1070851&r2=1070852&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
 Tue Feb 15 11:47:02 2011
@@ -19,7 +19,6 @@
 
 package org.apache.synapse.message.store;
 
-import org.apache.synapse.ManagedLifecycle;
 import org.apache.synapse.Mediator;
 import org.apache.synapse.MessageContext;
 import org.apache.synapse.commons.jmx.MBeanRegistrar;
@@ -29,7 +28,7 @@ import org.apache.synapse.message.proces
 
 import java.util.Map;
 
-public abstract class AbstractMessageStore implements MessageStore, 
ManagedLifecycle {
+public abstract class AbstractMessageStore implements MessageStore {
 
     /**
      * message store name
@@ -79,9 +78,10 @@ public abstract class AbstractMessageSto
     public void init(SynapseEnvironment se) {
         this.synapseEnvironment = se;
         this.synapseConfiguration = 
synapseEnvironment.getSynapseConfiguration();
-        if (processor instanceof ManagedLifecycle) {
-            ((ManagedLifecycle) processor).init(se);
-        }
+
+        processor.init(se);
+
+        processor.start();
     }
 
     public String getName() {
@@ -143,7 +143,9 @@ public abstract class AbstractMessageSto
     }
 
     public void destroy() {
+        processor.stop();
 
+        processor.destroy();
     }
 
     public void setDescription(String description) {

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java?rev=1070852&r1=1070851&r2=1070852&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
 Tue Feb 15 11:47:02 2011
@@ -40,12 +40,6 @@ public class InMemoryMessageStore extend
             mediateSequence(messageContext);
             messageList.put(messageContext.getMessageID(), messageContext);
 
-            /**
-             * If the associated processor is not started we start it. When 
storing the message
-             */
-            if(!processor.isStarted()) {
-                processor.start();
-            }
             if (log.isDebugEnabled()) {
                 log.debug("Message " + messageContext.getMessageID() +
                         " has been stored");

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java?rev=1070852&r1=1070851&r2=1070852&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java
 Tue Feb 15 11:47:02 2011
@@ -19,6 +19,7 @@
 
 package org.apache.synapse.message.store;
 
+import org.apache.synapse.ManagedLifecycle;
 import org.apache.synapse.MessageContext;
 import org.apache.synapse.config.SynapseConfiguration;
 import org.apache.synapse.SynapseArtifact;
@@ -32,7 +33,7 @@ import java.util.Map;
  * This is the interface  for the Synapse Message Store
  * Message Store is used to store failed Messages.
  */
-public interface MessageStore extends SynapseArtifact, Nameable {
+public interface MessageStore extends SynapseArtifact, Nameable, 
ManagedLifecycle {
 
     /**
      * Store the Message in the Message Store


Reply via email to