Author: supun
Date: Tue Feb 15 11:47:02 2011
New Revision: 1070852
URL: http://svn.apache.org/viewvc?rev=1070852&view=rev
Log:
improving the initialization logic and some improvements to the sampling
processor
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java?rev=1070852&r1=1070851&r2=1070852&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
Tue Feb 15 11:47:02 2011
@@ -17,6 +17,7 @@
*/
package org.apache.synapse.message.processors;
+import org.apache.synapse.ManagedLifecycle;
import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
import org.apache.synapse.message.store.MessageStore;
@@ -30,7 +31,7 @@ import java.util.Map;
*Message processing logic and process will depend on the
*concrete implementation of the MessageStore
*/
-public interface MessageProcessor {
+public interface MessageProcessor extends ManagedLifecycle {
/**
* Start Message Processor
@@ -49,42 +50,6 @@ public interface MessageProcessor {
public void setMessageStore(MessageStore messageStore);
/**
- * Get the Message store that backs the Message processor
- * @return the underlying MessageStore instance
- */
- public MessageStore getMessageStore();
-
- /**
- * Set the Mediator/Sequence to be invoked just before processing a Message
- * This Mediator or sequence will be invoked just before processing the
Message
- * @param mediator Mediator/sequence instance that will invoked just
before
- * processing a Message
- */
- public void setOnProcessSequence(Mediator mediator);
-
-
- /**
- * Get the On process Mediator or sequence
- * @return Mediator/sequence instance that will invoked just before
processing a Message
- */
- public Mediator getOnProcessSequence();
-
- /**
- * This sequence/Mediator will be invoked when a Message is submitted to
the MessageProcessor
- * @param mediator Mediator/sequence instance that will invoked when a
Message
- * is submitted to the Processor
- */
- public void setOnSubmitSequence(Mediator mediator);
-
- /**
- * Get the OnSubmit Sequence which get invoked when a Message is submitted
to
- * the MessageProcessor
- * @return mediator Mediator/sequence instance that will invoked when a
Message
- * is submitted to the Processor
- */
- public Mediator getOnSubmitSequence();
-
- /**
* Set the Message processor parameters that will be used by the specific
implementation
* @param parameters
*/
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java?rev=1070852&r1=1070851&r2=1070852&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
Tue Feb 15 11:47:02 2011
@@ -21,6 +21,7 @@ package org.apache.synapse.message.proce
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.*;
+import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.message.processors.MessageProcessor;
import org.apache.synapse.message.store.MessageStore;
@@ -41,12 +42,6 @@ public class RedeliveryProcessor impleme
*/
private MessageStore messageStore;
-
- private Mediator onProcessMediator;
-
-
- private Mediator onSubmitMediator;
-
private Map<String, Object> parameters;
/**
@@ -110,26 +105,6 @@ public class RedeliveryProcessor impleme
}
}
- public MessageStore getMessageStore() {
- return this.messageStore;
- }
-
- public void setOnProcessSequence(Mediator mediator) {
- this.onProcessMediator = mediator;
- }
-
- public Mediator getOnProcessSequence() {
- return this.onProcessMediator;
- }
-
- public void setOnSubmitSequence(Mediator mediator) {
- this.onSubmitMediator = mediator;
- }
-
- public Mediator getOnSubmitSequence() {
- return this.onSubmitMediator;
- }
-
public void setParameters(Map<String, Object> parameters) {
this.parameters = parameters;
if (parameters.containsKey(REDELIVERY_DELAY)) {
@@ -155,17 +130,22 @@ public class RedeliveryProcessor impleme
return parameters;
}
+ public void init(SynapseEnvironment se) {
+
+ }
+
+ public void destroy() {
+
+ }
+
private class Worker implements Runnable {
public void run() {
while (started) {
-
-
try {
synchronized (this) {
int delay = redeliveryDelay;
-
MessageContext messageContext;
messageContext = messageStore.getMessages(0, 0).get(0);
@@ -197,13 +177,10 @@ public class RedeliveryProcessor impleme
messageContext.setProperty(NO_OF_REDELIVERIES, "" +
(number + 1));
-
if (exponentialBackoff && backOffMultiplier == -1) {
delay = (number + 1) * redeliveryDelay;
-
} else if (exponentialBackoff) {
delay = (int) Math.pow(backOffMultiplier, number)
* redeliveryDelay;
-
}
@@ -228,13 +205,10 @@ public class RedeliveryProcessor impleme
if (log.isDebugEnabled()) {
log.debug("sent \n" +
messageContext.getEnvelope());
}
-
-
}
} catch (Throwable e) {
- log.warn("Error while Running Redelivery process "
+e.getMessage());
+ log.warn("Error while Running Redelivery process " +
e.getMessage());
}
-
}
}
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java?rev=1070852&r1=1070851&r2=1070852&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
Tue Feb 15 11:47:02 2011
@@ -21,9 +21,6 @@ package org.apache.synapse.message.proce
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.synapse.ManagedLifecycle;
-import org.apache.synapse.Mediator;
-import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseException;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.message.processors.MessageProcessor;
@@ -35,42 +32,54 @@ import java.text.ParseException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
-public class SamplingProcessor implements MessageProcessor, ManagedLifecycle{
- public static final String EXECUTOR = "Executor";
- public static final String MESSAGE_STORE = "MESSAGE_STORE";
+public class SamplingProcessor implements MessageProcessor {
private Log log = LogFactory.getLog(SamplingProcessor.class);
- private final String QUARTZ_CONF = "quartz.conf";
-
+ public static final String EXECUTOR = "Executor";
+ public static final String MESSAGE_STORE = "MESSAGE_STORE";
+ public static final String QUARTZ_CONF = "quartz.conf";
public static final String INTERVAL = "interval";
-
public static final String CRON_EXPRESSION = "cronExpression";
-
public static final String CONCURRENCY = "concurrency";
-
public static final String SEQUENCE = "sequence";
+ private enum State {
+ INITIALIZED,
+ START,
+ STOP,
+ DESTROY
+ }
+
+ private Map<String, Object> parameters = null;
+
+ /** The quartz configuration file if specified as a parameter */
+ private String quartzConfig = null;
+
+ /** A cron expression to run the sampler */
private String cronExpression = null;
+ /** The interval at which this sampler runs */
private long interval = 1;
- private String quartzConf = null;
-
+ /** The scheduler, run the the sampler */
private Scheduler scheduler = null;
- private boolean initialized = false;
+ /** Weather sampler is initialized or not */
+ private State state = State.DESTROY;
+ /** The message store */
private MessageStore messageStore = null;
- private Mediator onProcessSequence = null;
-
- private Mediator onSubmitSequence = null;
-
+ /** Concurrency at the sampler runs, if the concurrency is 2, 2 threads
will
+ * be used to dispatch 2 messages, when sampler runs */
private int concurrency = 1;
+ /** An executor */
private ExecutorService executor = null;
+ /** A sequence to run when the sampler is executed */
private String sequence = null;
+
/**
* Creates a Quartz Scheduler and schedule the message processing logic.
*/
@@ -102,34 +111,8 @@ public class SamplingProcessor implement
jobDetail.setJobDataMap(jobDataMap);
- StdSchedulerFactory sf = new StdSchedulerFactory();
try {
- if (quartzConf != null && !"".equals(quartzConf)) {
- if (log.isDebugEnabled()) {
- log.debug("Initiating a Scheduler with configuration : " +
quartzConf);
- }
-
- sf.initialize(quartzConf);
- }
- } catch (SchedulerException e) {
- throw new SynapseException("Error initiating scheduler factory "
- + sf + "with configuration loaded from " + quartzConf, e);
- }
-
-
- try {
- scheduler = sf.getScheduler();
- } catch (SchedulerException e) {
- throw new SynapseException("Error getting a scheduler instance
form scheduler" +
- " factory " + sf, e);
- }
-
- try {
- scheduler.start();
-
scheduler.scheduleJob(jobDetail, trigger);
-
- initialized = true;
} catch (SchedulerException e) {
throw new SynapseException("Error scheduling job : " + jobDetail
+ " with trigger " + trigger);
@@ -137,15 +120,16 @@ public class SamplingProcessor implement
}
public void stop() {
- if (initialized) {
+ if (state == State.START) {
try {
if (scheduler != null && scheduler.isStarted()) {
if (log.isDebugEnabled()) {
log.debug("ShuttingDown Sampling Scheduler : " +
scheduler.getMetaData());
}
- scheduler.shutdown();
+ scheduler.standby();
}
- initialized = false;
+
+ state = State.STOP;
} catch (SchedulerException e) {
throw new SynapseException("Error ShuttingDown Sampling
scheduler ", e);
}
@@ -156,27 +140,9 @@ public class SamplingProcessor implement
this.messageStore = messageStore;
}
- public MessageStore getMessageStore() {
- return messageStore;
- }
-
- public void setOnProcessSequence(Mediator mediator) {
- this.onProcessSequence = mediator;
- }
-
- public Mediator getOnProcessSequence() {
- return onProcessSequence;
- }
-
- public void setOnSubmitSequence(Mediator mediator) {
- this.onSubmitSequence = mediator;
- }
-
- public Mediator getOnSubmitSequence() {
- return onSubmitSequence;
- }
-
public void setParameters(Map<String, Object> parameters) {
+ this.parameters = parameters;
+
Object o = parameters.get(CRON_EXPRESSION);
if (o != null) {
cronExpression = o.toString();
@@ -194,7 +160,7 @@ public class SamplingProcessor implement
o = parameters.get(QUARTZ_CONF);
if (o != null) {
- quartzConf = o.toString();
+ quartzConfig = o.toString();
}
o = parameters.get(SEQUENCE);
@@ -204,18 +170,47 @@ public class SamplingProcessor implement
}
public Map<String, Object> getParameters() {
- return null;
+ return parameters;
}
public boolean isStarted() {
- return initialized;
+ return state == State.START;
}
public void init(SynapseEnvironment se) {
executor = se.getExecutorService();
+
+ StdSchedulerFactory sf = new StdSchedulerFactory();
+ try {
+ if (quartzConfig != null && !"".equals(quartzConfig)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Initiating a Scheduler with configuration : " +
quartzConfig);
+ }
+
+ sf.initialize(quartzConfig);
+ }
+ } catch (SchedulerException e) {
+ throw new SynapseException("Error initiating scheduler factory "
+ + sf + "with configuration loaded from " + quartzConfig,
e);
+ }
+
+ try {
+ scheduler = sf.getScheduler();
+ } catch (SchedulerException e) {
+ throw new SynapseException("Error getting a scheduler instance
form scheduler" +
+ " factory " + sf, e);
+ }
+
+ try {
+ scheduler.start();
+
+ state = State.INITIALIZED;
+ } catch (SchedulerException e) {
+ throw new SynapseException("Error starting the scheduler", e);
+ }
}
public void destroy() {
-
+ state = State.DESTROY;
}
}
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java?rev=1070852&r1=1070851&r2=1070852&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
Tue Feb 15 11:47:02 2011
@@ -19,7 +19,6 @@
package org.apache.synapse.message.store;
-import org.apache.synapse.ManagedLifecycle;
import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
import org.apache.synapse.commons.jmx.MBeanRegistrar;
@@ -29,7 +28,7 @@ import org.apache.synapse.message.proces
import java.util.Map;
-public abstract class AbstractMessageStore implements MessageStore,
ManagedLifecycle {
+public abstract class AbstractMessageStore implements MessageStore {
/**
* message store name
@@ -79,9 +78,10 @@ public abstract class AbstractMessageSto
public void init(SynapseEnvironment se) {
this.synapseEnvironment = se;
this.synapseConfiguration =
synapseEnvironment.getSynapseConfiguration();
- if (processor instanceof ManagedLifecycle) {
- ((ManagedLifecycle) processor).init(se);
- }
+
+ processor.init(se);
+
+ processor.start();
}
public String getName() {
@@ -143,7 +143,9 @@ public abstract class AbstractMessageSto
}
public void destroy() {
+ processor.stop();
+ processor.destroy();
}
public void setDescription(String description) {
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java?rev=1070852&r1=1070851&r2=1070852&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
Tue Feb 15 11:47:02 2011
@@ -40,12 +40,6 @@ public class InMemoryMessageStore extend
mediateSequence(messageContext);
messageList.put(messageContext.getMessageID(), messageContext);
- /**
- * If the associated processor is not started we start it. When
storing the message
- */
- if(!processor.isStarted()) {
- processor.start();
- }
if (log.isDebugEnabled()) {
log.debug("Message " + messageContext.getMessageID() +
" has been stored");
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java?rev=1070852&r1=1070851&r2=1070852&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java
Tue Feb 15 11:47:02 2011
@@ -19,6 +19,7 @@
package org.apache.synapse.message.store;
+import org.apache.synapse.ManagedLifecycle;
import org.apache.synapse.MessageContext;
import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.SynapseArtifact;
@@ -32,7 +33,7 @@ import java.util.Map;
* This is the interface for the Synapse Message Store
* Message Store is used to store failed Messages.
*/
-public interface MessageStore extends SynapseArtifact, Nameable {
+public interface MessageStore extends SynapseArtifact, Nameable,
ManagedLifecycle {
/**
* Store the Message in the Message Store