Author: supun
Date: Tue Feb 15 07:41:29 2011
New Revision: 1070794
URL: http://svn.apache.org/viewvc?rev=1070794&view=rev
Log:
addins sampling message processor
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java?rev=1070794&r1=1070793&r2=1070794&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
Tue Feb 15 07:41:29 2011
@@ -128,7 +128,7 @@ public class SynapseConfiguration implem
/**
* Messages stores for the synapse configuration.
*/
- private Map<String, MessageStore> messageStores =new
ConcurrentHashMap<String, MessageStore>();
+ private Map<String, MessageStore> messageStores = new
ConcurrentHashMap<String, MessageStore>();
/**
* Description/documentation of the configuration
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java?rev=1070794&r1=1070793&r2=1070794&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java
Tue Feb 15 07:41:29 2011
@@ -108,6 +108,8 @@ public class MessageStoreFactory {
MessageProcessor processor = null;
if (processorElm != null) {
processor = populateMessageProcessor(processorElm);
+
+ processor.setMessageStore(messageStore);
} else {
log.warn("Creating a Message Store without ");
}
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java?rev=1070794&r1=1070793&r2=1070794&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java
Tue Feb 15 07:41:29 2011
@@ -19,6 +19,7 @@ package org.apache.synapse.mediators.sto
import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.mediators.AbstractMediator;
import org.apache.synapse.message.store.MessageStore;
@@ -54,6 +55,13 @@ public class MessageStoreMediator extend
}
}
messageStore.store(synCtx);
+
+ // with the nio transport, this causes the listener not to
write a 202
+ // Accepted response, as this implies that Synapse does not
yet know if
+ // a 202 or 200 response would be written back.
+ ((Axis2MessageContext)
synCtx).getAxis2MessageContext().getOperationContext().setProperty(
+ org.apache.axis2.Constants.RESPONSE_WRITTEN, "SKIP");
+
return true;
}
}
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java?rev=1070794&view=auto
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
(added)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
Tue Feb 15 07:41:29 2011
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.message.processors.sampler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.message.store.MessageStore;
+import org.quartz.Job;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+public class SamplingJob implements Job {
+ private static Log log = LogFactory.getLog(SamplingJob.class);
+
+ public void execute(JobExecutionContext jobExecutionContext) throws
JobExecutionException {
+ JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
+
+ final Object concurrency = jdm.get(SamplingProcessor.CONCURRENCY);
+ final MessageStore messageStore = (MessageStore)
jdm.get(SamplingProcessor.MESSAGE_STORE);
+ final ExecutorService executor = (ExecutorService)
jdm.get(SamplingProcessor.EXECUTOR);
+ final String sequence = (String) jdm.get(SamplingProcessor.SEQUENCE);
+
+ int conc = 1;
+ if (concurrency instanceof Integer) {
+ conc = (Integer) concurrency;
+ }
+
+ for (int i = 0; i < conc; i++) {
+ List<MessageContext> list = messageStore.getMessages(0, 0);
+
+ if (list != null && list.size() == 1) {
+ final MessageContext messageContext = list.get(0);
+ if (messageContext != null) {
+ messageStore.unstore(0, 0);
+ executor.submit(new Runnable() {
+ public void run() {
+ try {
+ Mediator processingSequence =
messageContext.getSequence(sequence);
+ if (processingSequence != null) {
+ processingSequence.mediate(messageContext);
+ }
+ } catch (Throwable t) {
+ log.error("Error occurred while executing the
message", t);
+ }
+ }
+ });
+ }
+ }
+ }
+ }
+}
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java?rev=1070794&view=auto
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
(added)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
Tue Feb 15 07:41:29 2011
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.message.processors.sampler;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.ManagedLifecycle;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.message.processors.MessageProcessor;
+import org.apache.synapse.message.store.MessageStore;
+import org.quartz.*;
+import org.quartz.impl.StdSchedulerFactory;
+
+import java.text.ParseException;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+public class SamplingProcessor implements MessageProcessor, ManagedLifecycle{
+ public static final String EXECUTOR = "Executor";
+ public static final String MESSAGE_STORE = "MESSAGE_STORE";
+ private Log log = LogFactory.getLog(SamplingProcessor.class);
+
+ private final String QUARTZ_CONF = "quartz.conf";
+
+ public static final String INTERVAL = "interval";
+
+ public static final String CRON_EXPRESSION = "cronExpression";
+
+ public static final String CONCURRENCY = "concurrency";
+
+ public static final String SEQUENCE = "sequence";
+
+ private String cronExpression = null;
+
+ private long interval = 1;
+
+ private String quartzConf = null;
+
+ private Scheduler scheduler = null;
+
+ private boolean initialized = false;
+
+ private MessageStore messageStore = null;
+
+ private Mediator onProcessSequence = null;
+
+ private Mediator onSubmitSequence = null;
+
+ private int concurrency = 1;
+
+ private ExecutorService executor = null;
+
+ private String sequence = null;
+ /**
+ * Creates a Quartz Scheduler and schedule the message processing logic.
+ */
+ public void start() {
+ Trigger trigger;
+ if (cronExpression == null || "".equals(cronExpression)) {
+ trigger =
TriggerUtils.makeImmediateTrigger(SimpleTrigger.REPEAT_INDEFINITELY, interval);
+ } else {
+ CronTrigger cronTrigger = new CronTrigger();
+ try {
+ cronTrigger.setCronExpression(cronExpression);
+ trigger = cronTrigger;
+ } catch (ParseException e) {
+ throw new SynapseException("Error setting cron expression : " +
+ e.getMessage() + cronExpression, e);
+ }
+ }
+ trigger.setName(messageStore.getName() + "-trigger");
+
+ JobDetail jobDetail = new JobDetail();
+ jobDetail.setName(messageStore.getName() + "-job");
+ jobDetail.setJobClass(SamplingJob.class);
+
+ JobDataMap jobDataMap = new JobDataMap();
+ jobDataMap.put(CONCURRENCY, concurrency);
+ jobDataMap.put(EXECUTOR, executor);
+ jobDataMap.put(MESSAGE_STORE, messageStore);
+ jobDataMap.put(SEQUENCE, sequence);
+
+ jobDetail.setJobDataMap(jobDataMap);
+
+ StdSchedulerFactory sf = new StdSchedulerFactory();
+ try {
+ if (quartzConf != null && !"".equals(quartzConf)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Initiating a Scheduler with configuration : " +
quartzConf);
+ }
+
+ sf.initialize(quartzConf);
+ }
+ } catch (SchedulerException e) {
+ throw new SynapseException("Error initiating scheduler factory "
+ + sf + "with configuration loaded from " + quartzConf, e);
+ }
+
+
+ try {
+ scheduler = sf.getScheduler();
+ } catch (SchedulerException e) {
+ throw new SynapseException("Error getting a scheduler instance
form scheduler" +
+ " factory " + sf, e);
+ }
+
+ try {
+ scheduler.start();
+
+ scheduler.scheduleJob(jobDetail, trigger);
+
+ initialized = true;
+ } catch (SchedulerException e) {
+ throw new SynapseException("Error scheduling job : " + jobDetail
+ + " with trigger " + trigger);
+ }
+ }
+
+ public void stop() {
+ if (initialized) {
+ try {
+ if (scheduler != null && scheduler.isStarted()) {
+ if (log.isDebugEnabled()) {
+ log.debug("ShuttingDown Sampling Scheduler : " +
scheduler.getMetaData());
+ }
+ scheduler.shutdown();
+ }
+ initialized = false;
+ } catch (SchedulerException e) {
+ throw new SynapseException("Error ShuttingDown Sampling
scheduler ", e);
+ }
+ }
+ }
+
+ public void setMessageStore(MessageStore messageStore) {
+ this.messageStore = messageStore;
+ }
+
+ public MessageStore getMessageStore() {
+ return messageStore;
+ }
+
+ public void setOnProcessSequence(Mediator mediator) {
+ this.onProcessSequence = mediator;
+ }
+
+ public Mediator getOnProcessSequence() {
+ return onProcessSequence;
+ }
+
+ public void setOnSubmitSequence(Mediator mediator) {
+ this.onSubmitSequence = mediator;
+ }
+
+ public Mediator getOnSubmitSequence() {
+ return onSubmitSequence;
+ }
+
+ public void setParameters(Map<String, Object> parameters) {
+ Object o = parameters.get(CRON_EXPRESSION);
+ if (o != null) {
+ cronExpression = o.toString();
+ }
+
+ o = parameters.get(INTERVAL);
+ if (o != null) {
+ interval = Integer.parseInt(o.toString());
+ }
+
+ o = parameters.get(CONCURRENCY);
+ if (o != null) {
+ concurrency = Integer.parseInt(o.toString());
+ }
+
+ o = parameters.get(QUARTZ_CONF);
+ if (o != null) {
+ quartzConf = o.toString();
+ }
+
+ o = parameters.get(SEQUENCE);
+ if (o != null) {
+ sequence = o.toString();
+ }
+ }
+
+ public Map<String, Object> getParameters() {
+ return null;
+ }
+
+ public boolean isStarted() {
+ return initialized;
+ }
+
+ public void init(SynapseEnvironment se) {
+ executor = se.getExecutorService();
+ }
+
+ public void destroy() {
+
+ }
+}
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java?rev=1070794&r1=1070793&r2=1070794&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
Tue Feb 15 07:41:29 2011
@@ -79,6 +79,9 @@ public abstract class AbstractMessageSto
public void init(SynapseEnvironment se) {
this.synapseEnvironment = se;
this.synapseConfiguration =
synapseEnvironment.getSynapseConfiguration();
+ if (processor instanceof ManagedLifecycle) {
+ ((ManagedLifecycle) processor).init(se);
+ }
}
public String getName() {
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java?rev=1070794&r1=1070793&r2=1070794&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
Tue Feb 15 07:41:29 2011
@@ -81,9 +81,9 @@ public class InMemoryMessageStore extend
public List<MessageContext> unstore(int from, int to) {
List<MessageContext> returnlist = new ArrayList<MessageContext>();
- if (from <= to && (from <= messageList.size() && to <=
messageList.size())) {
+ if (from <= to && (from <= messageList.size() && to <=
messageList.size()) && messageList.size() > 0) {
- String[] keys = messageList.keySet().toArray(new String[0]);
+ String[] keys = messageList.keySet().toArray(new
String[messageList.keySet().size()]);
for (int i = from; i <= to; i++) {
returnlist.add(messageList.remove(keys[i]));
@@ -93,15 +93,15 @@ public class InMemoryMessageStore extend
}
public List<MessageContext> getMessages(int from, int to) {
- List<MessageContext> returnlist = new ArrayList<MessageContext>();
- if (from <= to && (from <= messageList.size() && to <=
messageList.size())) {
- String[] keys = messageList.keySet().toArray(new String[0]);
+ List<MessageContext> returnList = new ArrayList<MessageContext>();
+ if (from <= to && (from <= messageList.size() && to <=
messageList.size()) && messageList.size() > 0) {
+ String[] keys = messageList.keySet().toArray(new
String[messageList.keySet().size()]);
for (int i = from; i <= to; i++) {
- returnlist.add(messageList.get(keys[i]));
+ returnList.add(messageList.get(keys[i]));
}
}
- return returnlist;
+ return returnList;
}
public List<MessageContext> getAllMessages() {