Author: charith
Date: Sun Feb 20 05:54:59 2011
New Revision: 1072506
URL: http://svn.apache.org/viewvc?rev=1072506&view=rev
Log:
making Message processor as a top level element
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/ScheduledRedeliveryProcessor.java
- copied, changed from r1071899,
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
Removed:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java?rev=1072506&r1=1072505&r2=1072506&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
Sun Feb 20 05:54:59 2011
@@ -22,12 +22,10 @@ package org.apache.synapse.message.proce
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.SynapseException;
+import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.message.store.MessageStore;
-import org.quartz.*;
-import org.quartz.impl.StdSchedulerFactory;
-import java.text.ParseException;
import java.util.Map;
/**
@@ -40,16 +38,14 @@ public abstract class AbstractMessagePro
protected Log log = LogFactory.getLog(this.getClass());
- /** The scheduler, run the the processor */
- protected Scheduler scheduler = null;
-
/** Message Store associated with Message processor */
- protected MessageStore messageStore;
+ protected String messageStore;
- /** The interval at which this processor runs , default value is 1000ms*/
- protected long interval = 1000;
+ protected String description;
+ protected String name;
+ protected SynapseConfiguration configuration;
protected enum State {
INITIALIZED,
@@ -61,64 +57,17 @@ public abstract class AbstractMessagePro
/**message store parameters */
protected Map<String, Object> parameters = null;
- /** The quartz configuration file if specified as a parameter */
- protected String quartzConfig = null;
-
- /** A cron expression to run the sampler */
- protected String cronExpression = null;
/** Keep the state of the message processor */
private State state = State.DESTROY;
- public void start() {
- Trigger trigger;
- if (cronExpression == null || "".equals(cronExpression)) {
- trigger =
TriggerUtils.makeImmediateTrigger(SimpleTrigger.REPEAT_INDEFINITELY, interval);
- } else {
- CronTrigger cronTrigger = new CronTrigger();
- try {
- cronTrigger.setCronExpression(cronExpression);
- trigger = cronTrigger;
- } catch (ParseException e) {
- throw new SynapseException("Error setting cron expression : " +
- e.getMessage() + cronExpression, e);
- }
- }
- trigger.setName(messageStore.getName() + "-trigger");
-
- JobDetail jobDetail = getJobDetail();
- JobDataMap jobDataMap = new JobDataMap();
- jobDataMap.put(MessageProcessorConsents.MESSAGE_STORE, messageStore);
- jobDataMap.put(MessageProcessorConsents.PARAMETERS,parameters);
- jobDetail.setJobDataMap(jobDataMap);
-
- try {
- scheduler.scheduleJob(jobDetail, trigger);
- } catch (SchedulerException e) {
- throw new SynapseException("Error scheduling job : " + jobDetail
- + " with trigger " + trigger);
- }
+ public void init(SynapseEnvironment se) {
+ configuration = se.getSynapseConfiguration();
}
- public void stop() {
- if (state == State.START) {
- try {
- if (scheduler != null && scheduler.isStarted()) {
- if (log.isDebugEnabled()) {
- log.debug("ShuttingDown Message Processor Scheduler :
" + scheduler.getMetaData());
- }
- scheduler.standby();
- }
-
- state = State.STOP;
- } catch (SchedulerException e) {
- throw new SynapseException("Error ShuttingDown Message
processor scheduler ", e);
- }
- }
- }
- public void setMessageStore(MessageStore messageStore) {
+ public void setMessageStoreName(String messageStore) {
if (messageStore != null) {
this.messageStore = messageStore;
} else {
@@ -126,26 +75,12 @@ public abstract class AbstractMessagePro
}
}
+ public String getMessageStoreName() {
+ return messageStore;
+ }
+
public void setParameters(Map<String, Object> parameters) {
this.parameters = parameters;
- if(parameters != null && !parameters.isEmpty()) {
- Object o =
parameters.get(MessageProcessorConsents.CRON_EXPRESSION);
- if (o != null) {
- cronExpression = o.toString();
- }
-
- o = parameters.get(MessageProcessorConsents.INTERVAL);
- if (o != null) {
- interval = Integer.parseInt(o.toString());
- }
-
-
- o = parameters.get(MessageProcessorConsents.QUARTZ_CONF);
- if (o != null) {
- quartzConfig = o.toString();
- }
-
- }
}
public Map<String, Object> getParameters() {
@@ -156,40 +91,20 @@ public abstract class AbstractMessagePro
return state == State.START;
}
- public void init(SynapseEnvironment se) {
- StdSchedulerFactory sf = new StdSchedulerFactory();
- try {
- if (quartzConfig != null && !"".equals(quartzConfig)) {
- if (log.isDebugEnabled()) {
- log.debug("Initiating a Scheduler with configuration : " +
quartzConfig);
- }
-
- sf.initialize(quartzConfig);
- }
- } catch (SchedulerException e) {
- throw new SynapseException("Error initiating scheduler factory "
- + sf + "with configuration loaded from " + quartzConfig,
e);
- }
+ public String getName() {
+ return name;
+ }
- try {
- scheduler = sf.getScheduler();
- } catch (SchedulerException e) {
- throw new SynapseException("Error getting a scheduler instance
form scheduler" +
- " factory " + sf, e);
- }
+ public void setName(String name) {
+ this.name = name;
+ }
- try {
- scheduler.start();
- state = State.INITIALIZED;
- } catch (SchedulerException e) {
- throw new SynapseException("Error starting the scheduler", e);
- }
+ public void setDescription(String description) {
+ this.description=description;
}
- protected abstract JobDetail getJobDetail();
-
- public void destroy() {
- state = State.DESTROY;
+ public String getDescription() {
+ return description;
}
}
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java?rev=1072506&r1=1072505&r2=1072506&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessor.java
Sun Feb 20 05:54:59 2011
@@ -17,9 +17,7 @@
*/
package org.apache.synapse.message.processors;
-import org.apache.synapse.ManagedLifecycle;
-import org.apache.synapse.Mediator;
-import org.apache.synapse.MessageContext;
+import org.apache.synapse.*;
import org.apache.synapse.message.store.MessageStore;
import java.util.HashMap;
@@ -31,7 +29,7 @@ import java.util.Map;
*Message processing logic and process will depend on the
*concrete implementation of the MessageStore
*/
-public interface MessageProcessor extends ManagedLifecycle {
+public interface MessageProcessor extends ManagedLifecycle , Nameable ,
SynapseArtifact{
/**
* Start Message Processor
@@ -44,10 +42,16 @@ public interface MessageProcessor extend
public void stop();
/**
- * Set the Message Store that backs the Message processor
- * @param messageStore the underlying MessageStore instance
+ * Set the Message Store name that backs the Message processor
+ * @param messageStore name the underlying MessageStore instance
*/
- public void setMessageStore(MessageStore messageStore);
+ public void setMessageStoreName(String messageStore);
+
+ /**
+ * Get message store name associated with the Message processor
+ * @return message store name associated with message processor
+ */
+ public String getMessageStoreName();
/**
* Set the Message processor parameters that will be used by the specific
implementation
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java?rev=1072506&r1=1072505&r2=1072506&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/MessageProcessorConsents.java
Sun Feb 20 05:54:59 2011
@@ -21,9 +21,10 @@ package org.apache.synapse.message.proce
public final class MessageProcessorConsents {
public static final String MESSAGE_STORE = "MESSAGE_STORE";
+ public static final String PARAMETERS = "parameters";
+
public static final String QUARTZ_CONF = "quartz.conf";
public static final String INTERVAL = "interval";
public static final String CRON_EXPRESSION = "cronExpression";
- public static final String PARAMETERS = "parameters";
}
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java?rev=1072506&view=auto
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
(added)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
Sun Feb 20 05:54:59 2011
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.message.processors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.quartz.*;
+import org.quartz.impl.StdSchedulerFactory;
+
+import java.text.ParseException;
+import java.util.Map;
+
+public abstract class ScheduledMessageProcessor extends
AbstractMessageProcessor {
+
+
+ protected Log log = LogFactory.getLog(this.getClass());
+
+ /**
+ * The scheduler, run the the processor
+ */
+ protected Scheduler scheduler = null;
+
+ /**
+ * The interval at which this processor runs , default value is 1000ms
+ */
+ protected long interval = 1000;
+
+
+ protected enum State {
+ INITIALIZED,
+ START,
+ STOP,
+ DESTROY
+ }
+
+ /**
+ * message store parameters
+ */
+ protected Map<String, Object> parameters = null;
+
+ /**
+ * The quartz configuration file if specified as a parameter
+ */
+ protected String quartzConfig = null;
+
+ /**
+ * A cron expression to run the sampler
+ */
+ protected String cronExpression = null;
+
+ /**
+ * Keep the state of the message processor
+ */
+ private State state = State.DESTROY;
+
+
+ public void start() {
+ Trigger trigger;
+ if (cronExpression == null || "".equals(cronExpression)) {
+ trigger =
TriggerUtils.makeImmediateTrigger(SimpleTrigger.REPEAT_INDEFINITELY, interval);
+ } else {
+ CronTrigger cronTrigger = new CronTrigger();
+ try {
+ cronTrigger.setCronExpression(cronExpression);
+ trigger = cronTrigger;
+ } catch (ParseException e) {
+ throw new SynapseException("Error setting cron expression : " +
+ e.getMessage() + cronExpression, e);
+ }
+ }
+ trigger.setName(messageStore + "-trigger");
+
+ JobDetail jobDetail = getJobDetail();
+ JobDataMap jobDataMap = new JobDataMap();
+ jobDataMap.put(MessageProcessorConsents.MESSAGE_STORE, messageStore);
+ jobDataMap.put(MessageProcessorConsents.PARAMETERS, parameters);
+ jobDetail.setJobDataMap(jobDataMap);
+
+ try {
+ scheduler.scheduleJob(jobDetail, trigger);
+ } catch (SchedulerException e) {
+ throw new SynapseException("Error scheduling job : " + jobDetail
+ + " with trigger " + trigger);
+ }
+ }
+
+ public void stop() {
+ if (state == State.START) {
+ try {
+ if (scheduler != null && scheduler.isStarted()) {
+ if (log.isDebugEnabled()) {
+ log.debug("ShuttingDown Message Processor Scheduler :
" + scheduler.getMetaData());
+ }
+ scheduler.standby();
+ }
+
+ state = State.STOP;
+ } catch (SchedulerException e) {
+ throw new SynapseException("Error ShuttingDown Message
processor scheduler ", e);
+ }
+ }
+ }
+
+
+ public void setParameters(Map<String, Object> parameters) {
+ super.setParameters(parameters);
+ if (parameters != null && !parameters.isEmpty()) {
+ Object o =
parameters.get(MessageProcessorConsents.CRON_EXPRESSION);
+ if (o != null) {
+ cronExpression = o.toString();
+ }
+
+ o = parameters.get(MessageProcessorConsents.INTERVAL);
+ if (o != null) {
+ interval = Integer.parseInt(o.toString());
+ }
+
+
+ o = parameters.get(MessageProcessorConsents.QUARTZ_CONF);
+ if (o != null) {
+ quartzConfig = o.toString();
+ }
+
+ }
+ }
+
+ public void init(SynapseEnvironment se) {
+ super.init(se);
+ StdSchedulerFactory sf = new StdSchedulerFactory();
+ try {
+ if (quartzConfig != null && !"".equals(quartzConfig)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Initiating a Scheduler with configuration : " +
quartzConfig);
+ }
+
+ sf.initialize(quartzConfig);
+ }
+ } catch (SchedulerException e) {
+ throw new SynapseException("Error initiating scheduler factory "
+ + sf + "with configuration loaded from " + quartzConfig,
e);
+ }
+
+ try {
+ scheduler = sf.getScheduler();
+ } catch (SchedulerException e) {
+ throw new SynapseException("Error getting a scheduler instance
form scheduler" +
+ " factory " + sf, e);
+ }
+
+ try {
+ scheduler.start();
+
+ state = State.INITIALIZED;
+ } catch (SchedulerException e) {
+ throw new SynapseException("Error starting the scheduler", e);
+ }
+ }
+
+ protected abstract JobDetail getJobDetail();
+
+ public void destroy() {
+ state = State.DESTROY;
+ }
+
+}
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java?rev=1072506&r1=1072505&r2=1072506&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
Sun Feb 20 05:54:59 2011
@@ -33,19 +33,18 @@ public class DeadLetterChannelView imple
private MessageStore messageStore;
- private RedeliveryProcessor redeliveryProcessor;
public DeadLetterChannelView(MessageStore messageStore) {
this.messageStore = messageStore;
- this.redeliveryProcessor = (RedeliveryProcessor)
messageStore.getMessageProcessor();
+
}
public void resendAll() {
if (((AbstractMessageStore) messageStore).getLock().tryLock()) {
- int size = messageStore.getSize();
+ int size = messageStore.size();
for (int i = 0; i < size; i++) {
- MessageContext messageContext = messageStore.unstore(0,
0).get(0);
+ MessageContext messageContext = messageStore.poll();
if (messageContext != null) {
redeliver(messageContext);
}
@@ -58,9 +57,9 @@ public class DeadLetterChannelView imple
public void deleteAll() {
if (((AbstractMessageStore) messageStore).getLock().tryLock()) {
- int size = messageStore.getSize();
+ int size = messageStore.size();
for (int i = 0; i < size; i++) {
- messageStore.unstore(0, 0);
+ messageStore.poll();
}
} else {
throw new SynapseException("Error Message store being used re try
later");
@@ -68,10 +67,10 @@ public class DeadLetterChannelView imple
}
public List<String> getMessageIds() {
- int size = messageStore.getSize();
+ int size = messageStore.size();
List<String> list = new ArrayList<String>();
for (int i = 0; i < size; i++) {
- MessageContext messageContext = messageStore.getMessages(0,
0).get(0);
+ MessageContext messageContext = messageStore.peek();
if (messageContext != null) {
list.add(messageContext.getMessageID());
}
@@ -81,7 +80,7 @@ public class DeadLetterChannelView imple
public void resend(String messageID) {
if (((AbstractMessageStore) messageStore).getLock().tryLock()) {
- MessageContext messageContext = messageStore.unstore(messageID);
+ MessageContext messageContext = messageStore.remove(messageID);
if (messageContext != null) {
redeliver(messageContext);
}
@@ -92,12 +91,12 @@ public class DeadLetterChannelView imple
public void delete(String messageID) {
if (((AbstractMessageStore) messageStore).getLock().tryLock()) {
- messageStore.unstore(messageID);
+ messageStore.remove(messageID);
}
}
public String getEnvelope(String messageID) {
- MessageContext messageContext = messageStore.getMessage(messageID);
+ MessageContext messageContext = messageStore.get(messageID);
if (messageContext != null) {
return messageContext.getEnvelope().toString();
}
@@ -106,7 +105,7 @@ public class DeadLetterChannelView imple
}
public int getSize() {
- return messageStore.getSize();
+ return messageStore.size();
}
private void redeliver(MessageContext messageContext) {
@@ -114,15 +113,15 @@ public class DeadLetterChannelView imple
if (artifact instanceof Endpoint) {
if (!RedeliveryJob.handleEndpointReplay((Endpoint) artifact,
messageContext)) {
- messageStore.store(messageContext);
+ messageStore.offer(messageContext);
}
} else if (artifact instanceof Mediator) {
if (!RedeliveryJob.handleSequenceReplay((Mediator) artifact,
messageContext)) {
- messageStore.store(messageContext);
+ messageStore.offer(messageContext);
}
} else {
- messageStore.store(messageContext);
+ messageStore.offer(messageContext);
}
}
}
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java?rev=1072506&r1=1072505&r2=1072506&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
Sun Feb 20 05:54:59 2011
@@ -69,14 +69,14 @@ public class RedeliveryJob implements Jo
}
/**
- * We will keep the message store lock till the redelivery over
+ * We will keep the message store lock till the redelivery is over
*/
if (lock.tryLock()) {
try {
- int size = messageStore.getSize();
+ int size = messageStore.size();
for (int i = 0; i < size; i++) {
- MessageContext messageContext = messageStore.unstore(0,
0).get(0);
+ MessageContext messageContext = messageStore.poll();
if (messageContext != null) {
SynapseArtifact artifact =
getReplayTarget(messageContext);
@@ -96,7 +96,7 @@ public class RedeliveryJob implements Jo
"will be put back to the Message
Store");
}
- messageStore.store(messageContext);
+ messageStore.offer(messageContext);
continue;
}
@@ -105,14 +105,14 @@ public class RedeliveryJob implements Jo
if (artifact instanceof Endpoint) {
if (!handleEndpointReplay((Endpoint) artifact,
messageContext)) {
- messageStore.store(messageContext);
+ messageStore.offer(messageContext);
}
} else if (artifact instanceof Mediator) {
if (!handleSequenceReplay((Mediator) artifact,
messageContext)) {
- messageStore.store(messageContext);
+ messageStore.offer(messageContext);
}
} else {
- messageStore.store(messageContext);
+ messageStore.offer(messageContext);
}
if (log.isDebugEnabled()) {
Copied:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/ScheduledRedeliveryProcessor.java
(from r1071899,
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java)
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/ScheduledRedeliveryProcessor.java?p2=synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/ScheduledRedeliveryProcessor.java&p1=synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java&r1=1071899&r2=1072506&rev=1072506&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/ScheduledRedeliveryProcessor.java
Sun Feb 20 05:54:59 2011
@@ -18,7 +18,9 @@
*/
package org.apache.synapse.message.processors.dlc;
+import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.message.processors.AbstractMessageProcessor;
+import org.apache.synapse.message.processors.ScheduledMessageProcessor;
import org.apache.synapse.message.store.MessageStore;
import org.quartz.JobDetail;
@@ -26,23 +28,23 @@ import org.quartz.JobDetail;
* Redelivery processor is the Message processor which implements the Dead
letter channel EIP
* It will Time to time Redeliver the Messages to a given target.
*/
-public class RedeliveryProcessor extends AbstractMessageProcessor{
+public class ScheduledRedeliveryProcessor extends ScheduledMessageProcessor{
/**Dead Letter channel JMX API*/
private DeadLetterChannelView dlcView;
@Override
- public void setMessageStore(MessageStore messageStore) {
- super.setMessageStore(messageStore);
- dlcView = new DeadLetterChannelView(messageStore);
+ public void init(SynapseEnvironment se) {
+ super.init(se);
+ dlcView = new
DeadLetterChannelView(configuration.getMessageStore(messageStore));
org.apache.synapse.commons.jmx.MBeanRegistrar.getInstance().registerMBean(dlcView,
- "Dead Letter Channel", messageStore.getName());
+ "Dead Letter Channel", messageStore);
}
@Override
protected JobDetail getJobDetail() {
JobDetail jobDetail = new JobDetail();
- jobDetail.setName(messageStore.getName() + "- redelivery job");
+ jobDetail.setName(messageStore + "- redelivery job");
jobDetail.setJobClass(RedeliveryJob.class);
return jobDetail;
}
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java?rev=1072506&r1=1072505&r2=1072506&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
Sun Feb 20 05:54:59 2011
@@ -42,9 +42,9 @@ public class SamplingJob implements Job
JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
final MessageStore messageStore = (MessageStore) jdm.get(
-
MessageProcessorConsents.MESSAGE_STORE);
+ MessageProcessorConsents.MESSAGE_STORE);
- Map<String ,Object> parameters = (Map<String, Object>) jdm.get(
+ Map<String, Object> parameters = (Map<String, Object>) jdm.get(
MessageProcessorConsents.PARAMETERS);
final Object concurrency = jdm.get(SamplingProcessor.CONCURRENCY);
final String sequence = (String)
parameters.get(SamplingProcessor.SEQUENCE);
@@ -56,30 +56,28 @@ public class SamplingJob implements Job
for (int i = 0; i < conc; i++) {
//lock.lock();
- synchronized (messageStore){
- List<MessageContext> list = messageStore.getMessages(0, 0);
+ synchronized (messageStore) {
+ final MessageContext messageContext = messageStore.peek();
- if (list != null && list.size() == 1) {
- final MessageContext messageContext = list.get(0);
- if (messageContext != null) {
- messageStore.unstore(0, 0);
- final ExecutorService executor =
messageContext.getEnvironment().
-
getExecutorService();
- executor.submit(new Runnable() {
- public void run() {
- try {
- Mediator processingSequence =
messageContext.getSequence(sequence);
- if (processingSequence != null) {
-
processingSequence.mediate(messageContext);
- }
- } catch (Throwable t) {
- log.error("Error occurred while executing
the message", t);
+ if (messageContext != null) {
+ messageStore.poll();
+ final ExecutorService executor =
messageContext.getEnvironment().
+ getExecutorService();
+ executor.submit(new Runnable() {
+ public void run() {
+ try {
+ Mediator processingSequence =
messageContext.getSequence(sequence);
+ if (processingSequence != null) {
+ processingSequence.mediate(messageContext);
}
+ } catch (Throwable t) {
+ log.error("Error occurred while executing the
message", t);
}
- });
- }
+ }
+ });
}
}
}
}
+
}
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java?rev=1072506&r1=1072505&r2=1072506&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
Sun Feb 20 05:54:59 2011
@@ -22,9 +22,10 @@ package org.apache.synapse.message.proce
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.message.processors.AbstractMessageProcessor;
+import org.apache.synapse.message.processors.ScheduledMessageProcessor;
import org.quartz.JobDetail;
-public class SamplingProcessor extends AbstractMessageProcessor{
+public class SamplingProcessor extends ScheduledMessageProcessor{
private Log log = LogFactory.getLog(SamplingProcessor.class);
public static final String CONCURRENCY = "concurrency";
@@ -33,7 +34,7 @@ public class SamplingProcessor extends A
@Override
protected JobDetail getJobDetail() {
JobDetail jobDetail = new JobDetail();
- jobDetail.setName(messageStore.getName() + "-job");
+ jobDetail.setName(messageStore + "-job");
jobDetail.setJobClass(SamplingJob.class);
return jobDetail;
}