Author: charith
Date: Fri Feb 18 04:01:47 2011
New Revision: 1071875
URL: http://svn.apache.org/viewvc?rev=1071875&view=rev
Log:
improving code reuse of Message processor
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DLCConstents.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java?rev=1071875&view=auto
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
(added)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
Fri Feb 18 04:01:47 2011
@@ -0,0 +1,189 @@
+/*
+* Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights
Reserved.
+*
+* WSO2 Inc. licenses this file to you under the Apache License,
+* Version 2.0 (the "License"); you may not use this file except
+* in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.synapse.message.processors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.message.store.MessageStore;
+import org.quartz.*;
+import org.quartz.impl.StdSchedulerFactory;
+
+import java.text.ParseException;
+import java.util.Map;
+
+/**
+ * Class <code>AbstractMessageProcessor</code> is handles Message processing
of the messages
+ * in Message Store. Abstract Message Store is assumes that Message processors
can be implemented
+ * using the quartz scheduler jobs. If in case we user wants a different
implementation They can
+ * directly use <code>MessageProcessor</code> interface for that
implementations
+ */
+public abstract class AbstractMessageProcessor implements MessageProcessor {
+
+ protected Log log = LogFactory.getLog(this.getClass());
+
+ /** The scheduler, run the the processor */
+ protected Scheduler scheduler = null;
+
+ /** Message Store associated with Message processor */
+ protected MessageStore messageStore;
+
+ /** The interval at which this processor runs */
+ protected long interval = 1;
+
+
+
+ protected enum State {
+ INITIALIZED,
+ START,
+ STOP,
+ DESTROY
+ }
+
+ /**message store parameters */
+ protected Map<String, Object> parameters = null;
+
+ /** The quartz configuration file if specified as a parameter */
+ protected String quartzConfig = null;
+
+ /** A cron expression to run the sampler */
+ protected String cronExpression = null;
+
+ /** Keep the state of the message processor */
+ private State state = State.DESTROY;
+
+
+ public void start() {
+ Trigger trigger;
+ if (cronExpression == null || "".equals(cronExpression)) {
+ trigger =
TriggerUtils.makeImmediateTrigger(SimpleTrigger.REPEAT_INDEFINITELY, interval);
+ } else {
+ CronTrigger cronTrigger = new CronTrigger();
+ try {
+ cronTrigger.setCronExpression(cronExpression);
+ trigger = cronTrigger;
+ } catch (ParseException e) {
+ throw new SynapseException("Error setting cron expression : " +
+ e.getMessage() + cronExpression, e);
+ }
+ }
+ trigger.setName(messageStore.getName() + "-trigger");
+
+ JobDetail jobDetail = getJobDetail();
+ JobDataMap jobDataMap = new JobDataMap();
+ jobDataMap.put(MessageProcessorConsents.MESSAGE_STORE, messageStore);
+ jobDataMap.put(MessageProcessorConsents.PARAMETERS,parameters);
+ jobDetail.setJobDataMap(jobDataMap);
+
+ try {
+ scheduler.scheduleJob(jobDetail, trigger);
+ } catch (SchedulerException e) {
+ throw new SynapseException("Error scheduling job : " + jobDetail
+ + " with trigger " + trigger);
+ }
+ }
+
+ public void stop() {
+ if (state == State.START) {
+ try {
+ if (scheduler != null && scheduler.isStarted()) {
+ if (log.isDebugEnabled()) {
+ log.debug("ShuttingDown Message Processor Scheduler :
" + scheduler.getMetaData());
+ }
+ scheduler.standby();
+ }
+
+ state = State.STOP;
+ } catch (SchedulerException e) {
+ throw new SynapseException("Error ShuttingDown Message
processor scheduler ", e);
+ }
+ }
+ }
+
+ public void setMessageStore(MessageStore messageStore) {
+ this.messageStore = messageStore;
+ }
+
+ public void setParameters(Map<String, Object> parameters) {
+ this.parameters = parameters;
+ if(parameters != null && !parameters.isEmpty()) {
+ Object o =
parameters.get(MessageProcessorConsents.CRON_EXPRESSION);
+ if (o != null) {
+ cronExpression = o.toString();
+ }
+
+ o = parameters.get(MessageProcessorConsents.INTERVAL);
+ if (o != null) {
+ interval = Integer.parseInt(o.toString());
+ }
+
+
+ o = parameters.get(MessageProcessorConsents.QUARTZ_CONF);
+ if (o != null) {
+ quartzConfig = o.toString();
+ }
+
+ }
+ }
+
+ public Map<String, Object> getParameters() {
+ return parameters;
+ }
+
+ public boolean isStarted() {
+ return state == State.START;
+ }
+
+ public void init(SynapseEnvironment se) {
+ StdSchedulerFactory sf = new StdSchedulerFactory();
+ try {
+ if (quartzConfig != null && !"".equals(quartzConfig)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Initiating a Scheduler with configuration : " +
quartzConfig);
+ }
+
+ sf.initialize(quartzConfig);
+ }
+ } catch (SchedulerException e) {
+ throw new SynapseException("Error initiating scheduler factory "
+ + sf + "with configuration loaded from " + quartzConfig,
e);
+ }
+
+ try {
+ scheduler = sf.getScheduler();
+ } catch (SchedulerException e) {
+ throw new SynapseException("Error getting a scheduler instance
form scheduler" +
+ " factory " + sf, e);
+ }
+
+ try {
+ scheduler.start();
+
+ state = State.INITIALIZED;
+ } catch (SchedulerException e) {
+ throw new SynapseException("Error starting the scheduler", e);
+ }
+ }
+
+ protected abstract JobDetail getJobDetail();
+
+ public void destroy() {
+ state = State.DESTROY;
+ }
+}
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java?rev=1071875&view=auto
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
(added)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
Fri Feb 18 04:01:47 2011
@@ -0,0 +1,28 @@
+/*
+* Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights
Reserved.
+*
+* WSO2 Inc. licenses this file to you under the Apache License,
+* Version 2.0 (the "License"); you may not use this file except
+* in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.synapse.message.processors;
+
+public final class MessageProcessorConsents {
+
+ public static final String MESSAGE_STORE = "MESSAGE_STORE";
+ public static final String QUARTZ_CONF = "quartz.conf";
+ public static final String INTERVAL = "interval";
+ public static final String CRON_EXPRESSION = "cronExpression";
+ public static final String PARAMETERS = "parameters";
+
+}
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DLCConstents.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DLCConstents.java?rev=1071875&view=auto
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DLCConstents.java
(added)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DLCConstents.java
Fri Feb 18 04:01:47 2011
@@ -0,0 +1,52 @@
+/*
+* Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights
Reserved.
+*
+* WSO2 Inc. licenses this file to you under the Apache License,
+* Version 2.0 (the "License"); you may not use this file except
+* in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.synapse.message.processors.dlc;
+
+/**
+ * class <code>DLCConstents</code> holds the constants that are used in the
Dead Letter channel
+ */
+public final class DLCConstents {
+
+ /**
+ * Max number of redeliver attempts per message
+ */
+ public static final String MAX_REDELIVERY_COUNT = "redelivery.count";
+
+ /**
+ * Message context property that holds the name of the target endpoint to
be replayed
+ */
+ public static final String REPLAY_ENDPOINT = "replay.endpoint";
+
+ /**
+ * Message context property that holds the name of the target sequence to
be replayed
+ */
+ public static final String REPLAY_SEQUENCE = "replay.sequence";
+
+ /**
+ * Message context property that holds the name of the fault handler that
must be set to
+ * the Message before replaying it
+ */
+ public static final String REPLAY_FAULT_HANDLER = "replay.fault.handler";
+
+ /**
+ *Message context property that holds number of redelivers for a given
message
+ */
+ public static final String NO_OF_REDELIVERS = "number.of.redelivers";
+
+
+}
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java?rev=1071875&r1=1071874&r2=1071875&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
Fri Feb 18 04:01:47 2011
@@ -21,6 +21,7 @@ package org.apache.synapse.message.proce
import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseArtifact;
+import org.apache.synapse.SynapseException;
import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.message.store.MessageStore;
@@ -31,9 +32,12 @@ public class DeadLetterChannelView imple
private MessageStore messageStore;
+ private RedeliveryProcessor redeliveryProcessor;
+
public DeadLetterChannelView(MessageStore messageStore) {
this.messageStore = messageStore;
+ this.redeliveryProcessor = (RedeliveryProcessor)
messageStore.getMessageProcessor();
}
public void resendAll() {
@@ -58,7 +62,7 @@ public class DeadLetterChannelView imple
int size = messageStore.getSize();
List<String> list = new ArrayList<String>();
for(int i = 0; i < size ; i++) {
- MessageContext messageContext = messageStore.unstore(0,0).get(0);
+ MessageContext messageContext =
messageStore.getMessages(0,0).get(0);
if(messageContext != null) {
list.add(messageContext.getMessageID());
}
@@ -91,14 +95,14 @@ public class DeadLetterChannelView imple
}
private void redeliver(MessageContext messageContext) {
- SynapseArtifact artifact =
RedeliveryProcessor.getReplayTarget(messageContext);
+ SynapseArtifact artifact =
RedeliveryJob.getReplayTarget(messageContext);
if (artifact instanceof Endpoint) {
- if (!RedeliveryProcessor.handleEndpointReplay((Endpoint) artifact,
+ if (!RedeliveryJob.handleEndpointReplay((Endpoint) artifact,
messageContext)) {
messageStore.store(messageContext);
}
} else if (artifact instanceof Mediator) {
- if (!RedeliveryProcessor.handleSequenceReplay((Mediator) artifact,
+ if (!RedeliveryJob.handleSequenceReplay((Mediator) artifact,
messageContext)) {
messageStore.store(messageContext);
}
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java?rev=1071875&view=auto
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
(added)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
Fri Feb 18 04:01:47 2011
@@ -0,0 +1,192 @@
+/*
+* Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights
Reserved.
+*
+* WSO2 Inc. licenses this file to you under the Apache License,
+* Version 2.0 (the "License"); you may not use this file except
+* in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.synapse.message.processors.dlc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.FaultHandler;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseArtifact;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.mediators.MediatorFaultHandler;
+import org.apache.synapse.message.processors.MessageProcessorConsents;
+import org.apache.synapse.message.store.AbstractMessageStore;
+import org.apache.synapse.message.store.MessageStore;
+import org.quartz.Job;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Redelivery Job will replay all the Messages in the Message Store when
executed
+ * Excluding ones that are already tried redelivering more than max number of
tries
+ */
+public class RedeliveryJob implements Job {
+
+ private static final Log log = LogFactory.getLog(RedeliveryJob.class);
+
+ private MessageStore messageStore;
+ private Lock lock;
+ private int maxNumberOfRedelivers;
+
+ public void execute(JobExecutionContext jobExecutionContext) throws
JobExecutionException {
+ JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
+
+ messageStore = (MessageStore)
jdm.get(MessageProcessorConsents.MESSAGE_STORE);
+ lock = ((AbstractMessageStore) messageStore).getLock();
+ Map<String, Object> parameters = (Map<String, Object>) jdm.get(
+ MessageProcessorConsents.PARAMETERS);
+ maxNumberOfRedelivers = Integer.parseInt((String)
parameters.get(DLCConstents.
+ MAX_REDELIVERY_COUNT));
+
+ /**
+ * We will keep the message store lock till the redelivery over
+ */
+ if (lock.tryLock()) {
+
+ try {
+ int size = messageStore.getSize();
+ for (int i = 0; i < size; i++) {
+ MessageContext messageContext = messageStore.unstore(0,
0).get(0);
+ if (messageContext != null) {
+ SynapseArtifact artifact =
getReplayTarget(messageContext);
+
+ if
(messageContext.getProperty(DLCConstents.NO_OF_REDELIVERS) == null) {
+
messageContext.setProperty(DLCConstents.NO_OF_REDELIVERS, "0");
+ }
+
+ String numberS = (String) messageContext.getProperty(
+
DLCConstents.NO_OF_REDELIVERS);
+ int number = Integer.parseInt(numberS);
+
+ if (number >= maxNumberOfRedelivers) {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Maximum number of attempts tried
for Message with ID " +
+ messageContext.getMessageID() +
+ "will be put back to the Message
Store");
+
+ }
+ messageStore.store(messageContext);
+ continue;
+ }
+
+
messageContext.setProperty(DLCConstents.NO_OF_REDELIVERS, "" + (number + 1));
+
+
+ if (artifact instanceof Endpoint) {
+ if (!handleEndpointReplay((Endpoint) artifact,
messageContext)) {
+ messageStore.store(messageContext);
+ }
+ } else if (artifact instanceof Mediator) {
+ if (!handleSequenceReplay((Mediator) artifact,
messageContext)) {
+ messageStore.store(messageContext);
+ }
+ } else {
+ messageStore.store(messageContext);
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Processed \n" +
messageContext.getEnvelope());
+ }
+
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+
+ }
+
+
+ }
+
+ /**
+ * This will handle the Message replay to the endpoints
+ * @param endpoint target endpoint to be redelivered
+ * @param messageContext message context of the message to be redelivered
+ * @return true if success
+ */
+ static boolean handleEndpointReplay(Endpoint endpoint, MessageContext
messageContext) {
+ setFaultHandler(messageContext);
+ if (endpoint != null && messageContext != null) {
+ return false;
+ } else if (endpoint.readyToSend()) {
+ endpoint.send(messageContext);
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * This will handle the Message replay to sequences
+ * @param mediator target mediator to replay
+ * @param messageContext message context of the message to be replayed
+ * @return true of success
+ */
+ static boolean handleSequenceReplay(Mediator mediator, MessageContext
messageContext) {
+ setFaultHandler(messageContext);
+ mediator.mediate(messageContext);
+ return true;
+ }
+
+ /**
+ * Get the replay target from the message context
+ * @param context Message Context
+ * @return Endpoint or Mediator to be replayed
+ */
+ static SynapseArtifact getReplayTarget(MessageContext context) {
+ //Endpoint replay get priority
+ if (context.getProperty(DLCConstents.REPLAY_ENDPOINT) != null) {
+ String endpointName = (String)
context.getProperty(DLCConstents.REPLAY_ENDPOINT);
+ return
context.getConfiguration().getDefinedEndpoints().get(endpointName);
+ } else if (context.getProperty(DLCConstents.REPLAY_SEQUENCE) != null) {
+ String sequenceName = (String)
context.getProperty(DLCConstents.REPLAY_SEQUENCE);
+
+ return context.getConfiguration().getSequence(sequenceName);
+ }
+
+ return null;
+ }
+
+ private static void setFaultHandler(MessageContext messageContext) {
+ String replayFaultHandler = (String)messageContext.getProperty(
+ DLCConstents.REPLAY_FAULT_HANDLER);
+ if(replayFaultHandler != null) {
+ if(messageContext.getEndpoint(replayFaultHandler) != null ) {
+ Endpoint ep = messageContext.getEndpoint(replayFaultHandler);
+ messageContext.pushFaultHandler((FaultHandler)ep);
+ } else if (messageContext.getSequence(replayFaultHandler) != null)
{
+ Mediator mediator =
messageContext.getSequence(replayFaultHandler);
+ MediatorFaultHandler faultHandler = new
MediatorFaultHandler(mediator);
+ messageContext.pushFaultHandler(faultHandler);
+ } else {
+ log.warn("Error handler " + replayFaultHandler + " Not defined
in the synapse " +
+ "configuration");
+ }
+ }else {
+ log.warn("No fault handler defined for the replaying Message with
id " +
+ messageContext.getMessageID());
+ }
+ }
+}
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java?rev=1071875&r1=1071874&r2=1071875&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
Fri Feb 18 04:01:47 2011
@@ -23,9 +23,11 @@ import org.apache.commons.logging.LogFac
import org.apache.synapse.*;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.message.processors.AbstractMessageProcessor;
import org.apache.synapse.message.processors.MessageProcessor;
import org.apache.synapse.message.store.MessageStore;
import org.apache.synapse.securevault.commons.MBeanRegistrar;
+import org.quartz.JobDetail;
import java.util.Map;
@@ -33,230 +35,12 @@ import java.util.Map;
* Redelivery processor is the Message processor which implements the Dead
letter channel EIP
* It will Time to time Redeliver the Messages to a given target.
*/
-public class RedeliveryProcessor implements MessageProcessor {
-
- private static final Log log =
LogFactory.getLog(RedeliveryProcessor.class);
-
- /**
- * Associated MessageStore
- */
- private MessageStore messageStore;
-
- private Map<String, Object> parameters;
-
- /**
- * Maximum number of redelivery's per message
- */
- private int maxRedeleveries = 0;
-
- /**
- * Delay between two consecutive redelivery attempts
- */
- private int redeliveryDelay = 2000;
-
- /**
- * enable/disable exponential backoff
- */
- private boolean exponentialBackoff = false;
-
- /**
- * the multiplier that will be used in the exponential backoff algorithm
- */
- private int backOffMultiplier = -1;
-
-
- private DeadLetterChannelViewMBean dlcView;
-
- private boolean started;
-
- public static final String REDELIVERY_DELAY = "redelivery.delay";
-
- public static final String MAX_REDELIVERY_COUNT = "redelivery.count";
-
- public static final String EXPONENTIAL_BACKOFF =
"redelivery.exponentialBackoff";
-
- public static final String BACKOFF_MUTIPLIER =
"redelivery.backoffMultiplier";
-
-
- public static final String REPLAY_ENDPOINT = "replay.endpoint";
-
- public static final String REPLAY_SEQUENCE = "replay.sequence";
-
- public static final String NO_OF_REDELIVERIES = "number.of.redeliveries";
-
- public void start() {
- if (!started) {
- Thread t = new Thread(new Worker());
- t.start();
- }
- }
-
- public void stop() {
- started = false;
- }
-
- public void setMessageStore(MessageStore messageStore) {
- this.messageStore = messageStore;
- if(messageStore !=null) {
- DeadLetterChannelView view = new
DeadLetterChannelView(messageStore);
- this.dlcView = view;
- MBeanRegistrar.getInstance().registerMBean(view,"Dead Letter
Channel",
- messageStore.getName());
- }
- }
-
- public void setParameters(Map<String, Object> parameters) {
- this.parameters = parameters;
- if (parameters.containsKey(REDELIVERY_DELAY)) {
- redeliveryDelay = Integer.parseInt((String)
parameters.get(REDELIVERY_DELAY));
- }
-
- if (parameters.containsKey(MAX_REDELIVERY_COUNT)) {
- maxRedeleveries = Integer.parseInt((String)
parameters.get(MAX_REDELIVERY_COUNT));
- }
-
- if (parameters.containsKey(EXPONENTIAL_BACKOFF)) {
- if ("true".equals(parameters.get(EXPONENTIAL_BACKOFF))) {
- exponentialBackoff = true;
- }
- }
-
- if (parameters.containsKey(BACKOFF_MUTIPLIER)) {
- backOffMultiplier = Integer.parseInt((String)
parameters.get(BACKOFF_MUTIPLIER));
- }
- }
-
- public Map<String, Object> getParameters() {
- return parameters;
- }
-
- public void init(SynapseEnvironment se) {
-
- }
-
- public void destroy() {
-
- }
-
-
- private class Worker implements Runnable {
-
- public void run() {
- while (started) {
- try {
- synchronized (this) {
- int delay = redeliveryDelay;
- MessageContext messageContext;
- messageContext = messageStore.getMessages(0, 0).get(0);
-
- if (messageContext == null) {
- continue;
- }
-
- SynapseArtifact artifact =
getReplayTarget(messageContext);
- messageStore.unstore(0, 0);
- if (messageContext.getProperty(NO_OF_REDELIVERIES) ==
null) {
- messageContext.setProperty(NO_OF_REDELIVERIES,
"0");
- delay = redeliveryDelay;
- }
-
- String numberS = (String)
messageContext.getProperty(NO_OF_REDELIVERIES);
- int number = Integer.parseInt(numberS);
-
- if (number >= maxRedeleveries) {
-
- if (log.isDebugEnabled()) {
- log.debug("Maximum number of attempts tried
for Message with ID " +
- messageContext.getMessageID() +
- "will be put back to the Message
Store");
-
- }
- messageStore.store(messageContext);
- continue;
- }
-
- messageContext.setProperty(NO_OF_REDELIVERIES, "" +
(number + 1));
-
- if (exponentialBackoff && backOffMultiplier == -1) {
- delay = (number + 1) * redeliveryDelay;
- } else if (exponentialBackoff) {
- delay = (int) Math.pow(backOffMultiplier, number)
* redeliveryDelay;
- }
-
-
- try {
- Thread.sleep(delay);
- } catch (InterruptedException ignored) {
-
- }
-
- if (artifact instanceof Endpoint) {
- if (!handleEndpointReplay((Endpoint) artifact,
messageContext)) {
- messageStore.store(messageContext);
- }
- } else if (artifact instanceof Mediator) {
- if (!handleSequenceReplay((Mediator) artifact,
messageContext)) {
- messageStore.store(messageContext);
- }
- } else {
- messageStore.store(messageContext);
- }
-
- if (log.isDebugEnabled()) {
- log.debug("sent \n" +
messageContext.getEnvelope());
- }
- }
- } catch (Throwable e) {
- log.warn("Error while Running Redelivery process " +
e.getMessage());
- }
- }
-
- }
- }
-
- public static SynapseArtifact getReplayTarget(MessageContext context) {
- //Endpoint replay get priority
- if (context.getProperty(REPLAY_ENDPOINT) != null) {
- String endpointName = (String)
context.getProperty(REPLAY_ENDPOINT);
- return
context.getConfiguration().getDefinedEndpoints().get(endpointName);
- } else if (context.getProperty(REPLAY_SEQUENCE) != null) {
- String sequenceName = (String)
context.getProperty(REPLAY_SEQUENCE);
-
- return context.getConfiguration().getSequence(sequenceName);
- }
-
- return null;
- }
-
-
- public static boolean handleEndpointReplay(Endpoint endpoint,
MessageContext messageContext) {
- if (endpoint.readyToSend()) {
- endpoint.send(messageContext);
- return true;
- }
-
- return false;
- }
-
-
- public static boolean handleSequenceReplay(Mediator mediator,
MessageContext messageContext) {
- mediator.mediate(messageContext);
- return true;
- }
-
- /**
- * Get the DLC related JMX API
- * @return instance of Dead letter channel jms api
- */
- public DeadLetterChannelViewMBean getDlcView() {
- return dlcView;
- }
-
- /**
- * Get the started status of the Message processor
- * @return started status of message processor (true/false)
- */
- public boolean isStarted() {
- return started;
+public class RedeliveryProcessor extends AbstractMessageProcessor{
+ @Override
+ protected JobDetail getJobDetail() {
+ JobDetail jobDetail = new JobDetail();
+ jobDetail.setName(messageStore.getName() + "- redelivery job");
+ jobDetail.setJobClass(RedeliveryJob.class);
+ return jobDetail;
}
}
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java?rev=1071875&r1=1071874&r2=1071875&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
Fri Feb 18 04:01:47 2011
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
+import org.apache.synapse.message.processors.MessageProcessorConsents;
import org.apache.synapse.message.store.MessageStore;
import org.quartz.Job;
import org.quartz.JobDataMap;
@@ -30,6 +31,7 @@ import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Lock;
@@ -39,11 +41,13 @@ public class SamplingJob implements Job
public void execute(JobExecutionContext jobExecutionContext) throws
JobExecutionException {
JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
+ final MessageStore messageStore = (MessageStore) jdm.get(
+
MessageProcessorConsents.MESSAGE_STORE);
+
+ Map<String ,Object> parameters = (Map<String, Object>) jdm.get(
+ MessageProcessorConsents.PARAMETERS);
final Object concurrency = jdm.get(SamplingProcessor.CONCURRENCY);
- final MessageStore messageStore = (MessageStore)
jdm.get(SamplingProcessor.MESSAGE_STORE);
- final ExecutorService executor = (ExecutorService)
jdm.get(SamplingProcessor.EXECUTOR);
- final String sequence = (String) jdm.get(SamplingProcessor.SEQUENCE);
- final Lock lock = (Lock) jdm.get(SamplingProcessor.LOCK);
+ final String sequence = (String)
parameters.get(SamplingProcessor.SEQUENCE);
int conc = 1;
if (concurrency instanceof Integer) {
@@ -59,6 +63,8 @@ public class SamplingJob implements Job
final MessageContext messageContext = list.get(0);
if (messageContext != null) {
messageStore.unstore(0, 0);
+ final ExecutorService executor =
messageContext.getEnvironment().
+
getExecutorService();
executor.submit(new Runnable() {
public void run() {
try {
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java?rev=1071875&r1=1071874&r2=1071875&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
Fri Feb 18 04:01:47 2011
@@ -21,202 +21,20 @@ package org.apache.synapse.message.proce
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.synapse.SynapseException;
-import org.apache.synapse.core.SynapseEnvironment;
-import org.apache.synapse.message.processors.MessageProcessor;
-import org.apache.synapse.message.store.MessageStore;
-import org.quartz.*;
-import org.quartz.impl.StdSchedulerFactory;
+import org.apache.synapse.message.processors.AbstractMessageProcessor;
+import org.quartz.JobDetail;
-import java.text.ParseException;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-public class SamplingProcessor implements MessageProcessor {
+public class SamplingProcessor extends AbstractMessageProcessor{
private Log log = LogFactory.getLog(SamplingProcessor.class);
- public static final String LOCK = "lock";
- public static final String EXECUTOR = "Executor";
- public static final String MESSAGE_STORE = "MESSAGE_STORE";
- public static final String QUARTZ_CONF = "quartz.conf";
- public static final String INTERVAL = "interval";
- public static final String CRON_EXPRESSION = "cronExpression";
public static final String CONCURRENCY = "concurrency";
public static final String SEQUENCE = "sequence";
- private enum State {
- INITIALIZED,
- START,
- STOP,
- DESTROY
- }
-
- private Map<String, Object> parameters = null;
-
- /** The quartz configuration file if specified as a parameter */
- private String quartzConfig = null;
-
- /** A cron expression to run the sampler */
- private String cronExpression = null;
-
- /** The interval at which this sampler runs */
- private long interval = 1;
-
- /** The scheduler, run the the sampler */
- private Scheduler scheduler = null;
-
- /** Weather sampler is initialized or not */
- private State state = State.DESTROY;
-
- /** The message store */
- private MessageStore messageStore = null;
-
- /** Concurrency at the sampler runs, if the concurrency is 2, 2 threads
will
- * be used to dispatch 2 messages, when sampler runs */
- private int concurrency = 1;
-
- /** An executor */
- private ExecutorService executor = null;
-
- /** A sequence to run when the sampler is executed */
- private String sequence = null;
-
- private Lock lock = new ReentrantLock();
-
- /**
- * Creates a Quartz Scheduler and schedule the message processing logic.
- */
- public void start() {
- Trigger trigger;
- if (cronExpression == null || "".equals(cronExpression)) {
- trigger =
TriggerUtils.makeImmediateTrigger(SimpleTrigger.REPEAT_INDEFINITELY, interval);
- } else {
- CronTrigger cronTrigger = new CronTrigger();
- try {
- cronTrigger.setCronExpression(cronExpression);
- trigger = cronTrigger;
- } catch (ParseException e) {
- throw new SynapseException("Error setting cron expression : " +
- e.getMessage() + cronExpression, e);
- }
- }
- trigger.setName(messageStore.getName() + "-trigger");
-
+ @Override
+ protected JobDetail getJobDetail() {
JobDetail jobDetail = new JobDetail();
jobDetail.setName(messageStore.getName() + "-job");
jobDetail.setJobClass(SamplingJob.class);
-
- JobDataMap jobDataMap = new JobDataMap();
- jobDataMap.put(CONCURRENCY, concurrency);
- jobDataMap.put(EXECUTOR, executor);
- jobDataMap.put(MESSAGE_STORE, messageStore);
- jobDataMap.put(SEQUENCE, sequence);
- jobDataMap.put(LOCK, lock);
-
- jobDetail.setJobDataMap(jobDataMap);
-
- try {
- scheduler.scheduleJob(jobDetail, trigger);
- } catch (SchedulerException e) {
- throw new SynapseException("Error scheduling job : " + jobDetail
- + " with trigger " + trigger);
- }
- }
-
- public void stop() {
- if (state == State.START) {
- try {
- if (scheduler != null && scheduler.isStarted()) {
- if (log.isDebugEnabled()) {
- log.debug("ShuttingDown Sampling Scheduler : " +
scheduler.getMetaData());
- }
- scheduler.standby();
- }
-
- state = State.STOP;
- } catch (SchedulerException e) {
- throw new SynapseException("Error ShuttingDown Sampling
scheduler ", e);
- }
- }
- }
-
- public void setMessageStore(MessageStore messageStore) {
- this.messageStore = messageStore;
- }
-
- public void setParameters(Map<String, Object> parameters) {
- this.parameters = parameters;
-
- Object o = parameters.get(CRON_EXPRESSION);
- if (o != null) {
- cronExpression = o.toString();
- }
-
- o = parameters.get(INTERVAL);
- if (o != null) {
- interval = Integer.parseInt(o.toString());
- }
-
- o = parameters.get(CONCURRENCY);
- if (o != null) {
- concurrency = Integer.parseInt(o.toString());
- }
-
- o = parameters.get(QUARTZ_CONF);
- if (o != null) {
- quartzConfig = o.toString();
- }
-
- o = parameters.get(SEQUENCE);
- if (o != null) {
- sequence = o.toString();
- }
- }
-
- public Map<String, Object> getParameters() {
- return parameters;
- }
-
- public boolean isStarted() {
- return state == State.START;
- }
-
- public void init(SynapseEnvironment se) {
- executor = se.getExecutorService();
-
- StdSchedulerFactory sf = new StdSchedulerFactory();
- try {
- if (quartzConfig != null && !"".equals(quartzConfig)) {
- if (log.isDebugEnabled()) {
- log.debug("Initiating a Scheduler with configuration : " +
quartzConfig);
- }
-
- sf.initialize(quartzConfig);
- }
- } catch (SchedulerException e) {
- throw new SynapseException("Error initiating scheduler factory "
- + sf + "with configuration loaded from " + quartzConfig,
e);
- }
-
- try {
- scheduler = sf.getScheduler();
- } catch (SchedulerException e) {
- throw new SynapseException("Error getting a scheduler instance
form scheduler" +
- " factory " + sf, e);
- }
-
- try {
- scheduler.start();
-
- state = State.INITIALIZED;
- } catch (SchedulerException e) {
- throw new SynapseException("Error starting the scheduler", e);
- }
- }
-
- public void destroy() {
- state = State.DESTROY;
+ return jobDetail;
}
}
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java?rev=1071875&r1=1071874&r2=1071875&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
Fri Feb 18 04:01:47 2011
@@ -27,6 +27,8 @@ import org.apache.synapse.core.SynapseEn
import org.apache.synapse.message.processors.MessageProcessor;
import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
public abstract class AbstractMessageStore implements MessageStore {
@@ -70,6 +72,10 @@ public abstract class AbstractMessageSto
*/
protected String fileName;
+
+ protected Lock lock = new ReentrantLock();
+
+
/**
* Message processor instance associated with the MessageStore
*/
@@ -163,4 +169,8 @@ public abstract class AbstractMessageSto
public String getFileName() {
return this.fileName;
}
+
+ public Lock getLock() {
+ return lock;
+ }
}
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java?rev=1071875&r1=1071874&r2=1071875&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
Fri Feb 18 04:01:47 2011
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFac
import org.apache.synapse.MessageContext;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -35,7 +36,7 @@ public class InMemoryMessageStore extend
private static final Log log =
LogFactory.getLog(InMemoryMessageStore.class);
/** The map that keeps the stored messages */
- private Map<String, MessageContext> messageList = new HashMap<String,
MessageContext>();
+ private Map<String, MessageContext> messageList = new
ConcurrentHashMap<String, MessageContext>();
private Lock lock = new ReentrantLock();
@@ -50,6 +51,14 @@ public class InMemoryMessageStore extend
log.debug("Message " + messageContext.getMessageID() +
" has been stored");
}
+
+ if(processor != null && !processor.isStarted()) {
+
+ if(log.isDebugEnabled()) {
+ log.debug("Starting Message processor " +
processor.getClass().getName());
+ }
+ processor.start();
+ }
}
} finally {
lock.unlock();
@@ -101,9 +110,11 @@ public class InMemoryMessageStore extend
lock.lock();
try {
List<MessageContext> returnlist = new ArrayList<MessageContext>();
- if (from <= to && (from <= messageList.size() && to <=
messageList.size()) && messageList.size() > 0) {
+ if (from <= to && (from <= messageList.size() && to <=
messageList.size()) &&
+ messageList.size() > 0) {
- String[] keys = messageList.keySet().toArray(new
String[messageList.keySet().size()]);
+ String[] keys = messageList.keySet().toArray(
+ new String[messageList.keySet().size()]);
for (int i = from; i <= to; i++) {
returnlist.add(messageList.remove(keys[i]));
@@ -119,8 +130,10 @@ public class InMemoryMessageStore extend
lock.lock();
try {
List<MessageContext> returnList = new ArrayList<MessageContext>();
- if (from <= to && (from <= messageList.size() && to <=
messageList.size()) && messageList.size() > 0) {
- String[] keys = messageList.keySet().toArray(new
String[messageList.keySet().size()]);
+ if (from <= to && (from <= messageList.size() && to <=
messageList.size()) &&
+ messageList.size() > 0) {
+ String[] keys = messageList.keySet().toArray(
+ new String[messageList.keySet().size()]);
for (int i = from; i <= to; i++) {
returnList.add(messageList.get(keys[i]));
@@ -157,22 +170,6 @@ public class InMemoryMessageStore extend
return null;
}
- public List<MessageContext> getMessages(int maxNumberOfMessages) {
- lock.lock();
- try {
- List<MessageContext> returnList = new ArrayList<MessageContext>();
-
- Iterator<String> it = messageList.keySet().iterator();
- while (it.hasNext() && maxNumberOfMessages > 0) {
- returnList.add(messageList.get(it.next()));
- maxNumberOfMessages--;
- }
-
- return returnList;
- } finally {
- lock.unlock();
- }
- }
public int getSize() {
lock.lock();