Author: supun
Date: Tue Feb 15 07:41:29 2011
New Revision: 1070794

URL: http://svn.apache.org/viewvc?rev=1070794&view=rev
Log:
addins sampling message processor

Added:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
Modified:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java?rev=1070794&r1=1070793&r2=1070794&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
 Tue Feb 15 07:41:29 2011
@@ -128,7 +128,7 @@ public class SynapseConfiguration implem
     /**
      * Messages stores for the synapse configuration.
      */
-    private Map<String, MessageStore> messageStores =new 
ConcurrentHashMap<String, MessageStore>();
+    private Map<String, MessageStore> messageStores = new 
ConcurrentHashMap<String, MessageStore>();
 
     /**
      * Description/documentation of the configuration

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java?rev=1070794&r1=1070793&r2=1070794&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MessageStoreFactory.java
 Tue Feb 15 07:41:29 2011
@@ -108,6 +108,8 @@ public class MessageStoreFactory {
         MessageProcessor processor = null;
         if (processorElm != null) {
             processor = populateMessageProcessor(processorElm);
+
+            processor.setMessageStore(messageStore);
         } else {
             log.warn("Creating a Message Store without ");
         }

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java?rev=1070794&r1=1070793&r2=1070794&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/store/MessageStoreMediator.java
 Tue Feb 15 07:41:29 2011
@@ -19,6 +19,7 @@ package org.apache.synapse.mediators.sto
 
 import org.apache.synapse.Mediator;
 import org.apache.synapse.MessageContext;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
 import org.apache.synapse.mediators.AbstractMediator;
 import org.apache.synapse.message.store.MessageStore;
 
@@ -54,6 +55,13 @@ public class MessageStoreMediator extend
                     }
                 }
                 messageStore.store(synCtx);
+
+                // with the nio transport, this causes the listener not to 
write a 202
+                // Accepted response, as this implies that Synapse does not 
yet know if
+                // a 202 or 200 response would be written back.
+                ((Axis2MessageContext) 
synCtx).getAxis2MessageContext().getOperationContext().setProperty(
+                        org.apache.axis2.Constants.RESPONSE_WRITTEN, "SKIP");
+
                 return true;
             }
         }

Added: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java?rev=1070794&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
 (added)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
 Tue Feb 15 07:41:29 2011
@@ -0,0 +1,74 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.message.processors.sampler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.message.store.MessageStore;
+import org.quartz.Job;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+public class SamplingJob implements Job {
+    private static Log log = LogFactory.getLog(SamplingJob.class);
+
+    public void execute(JobExecutionContext jobExecutionContext) throws 
JobExecutionException {
+        JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
+
+        final Object concurrency = jdm.get(SamplingProcessor.CONCURRENCY);
+        final MessageStore messageStore = (MessageStore) 
jdm.get(SamplingProcessor.MESSAGE_STORE);
+        final ExecutorService executor = (ExecutorService) 
jdm.get(SamplingProcessor.EXECUTOR);
+        final String sequence = (String) jdm.get(SamplingProcessor.SEQUENCE);
+
+        int conc = 1;
+        if (concurrency instanceof Integer) {
+            conc = (Integer) concurrency;
+        }
+
+        for (int i = 0; i < conc; i++) {
+            List<MessageContext> list = messageStore.getMessages(0, 0);
+
+            if (list != null && list.size() == 1) {
+                final MessageContext messageContext = list.get(0);
+                if (messageContext != null) {
+                    messageStore.unstore(0, 0);
+                    executor.submit(new Runnable() {
+                        public void run() {
+                            try {
+                                Mediator processingSequence = 
messageContext.getSequence(sequence);
+                                if (processingSequence != null) {
+                                    processingSequence.mediate(messageContext);
+                                }
+                            } catch (Throwable t) {
+                                log.error("Error occurred while executing the 
message", t);
+                            }
+                        }
+                    });
+                }
+            }
+        }
+    }
+}

Added: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java?rev=1070794&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
 (added)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
 Tue Feb 15 07:41:29 2011
@@ -0,0 +1,221 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.message.processors.sampler;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.ManagedLifecycle;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.message.processors.MessageProcessor;
+import org.apache.synapse.message.store.MessageStore;
+import org.quartz.*;
+import org.quartz.impl.StdSchedulerFactory;
+
+import java.text.ParseException;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+public class SamplingProcessor implements MessageProcessor, ManagedLifecycle{
+    public static final String EXECUTOR = "Executor";
+    public static final String MESSAGE_STORE = "MESSAGE_STORE";
+    private Log log = LogFactory.getLog(SamplingProcessor.class);
+
+    private final String QUARTZ_CONF = "quartz.conf";
+
+    public static final String INTERVAL = "interval";
+
+    public static final String CRON_EXPRESSION = "cronExpression";
+
+    public static final String CONCURRENCY = "concurrency";
+
+    public static final String SEQUENCE = "sequence";
+
+    private String cronExpression = null;
+
+    private long interval = 1;
+
+    private String quartzConf = null;
+
+    private Scheduler scheduler = null;
+
+    private boolean initialized = false;
+
+    private MessageStore messageStore = null;
+
+    private Mediator onProcessSequence = null;
+
+    private Mediator onSubmitSequence = null;
+
+    private int concurrency = 1;
+
+    private ExecutorService executor = null;
+
+    private String sequence = null;
+    /**
+     * Creates a Quartz Scheduler and schedule the message processing logic.
+     */
+    public void start() {
+        Trigger trigger;
+        if (cronExpression == null || "".equals(cronExpression)) {
+            trigger = 
TriggerUtils.makeImmediateTrigger(SimpleTrigger.REPEAT_INDEFINITELY, interval);
+        } else {
+            CronTrigger cronTrigger = new CronTrigger();
+            try {
+                cronTrigger.setCronExpression(cronExpression);
+                trigger = cronTrigger;
+            } catch (ParseException e) {
+                throw new SynapseException("Error setting cron expression : " +
+                        e.getMessage() + cronExpression, e);
+            }
+        }
+        trigger.setName(messageStore.getName() + "-trigger");
+
+        JobDetail jobDetail = new JobDetail();
+        jobDetail.setName(messageStore.getName() + "-job");
+        jobDetail.setJobClass(SamplingJob.class);
+
+        JobDataMap jobDataMap = new JobDataMap();
+        jobDataMap.put(CONCURRENCY, concurrency);
+        jobDataMap.put(EXECUTOR, executor);
+        jobDataMap.put(MESSAGE_STORE, messageStore);
+        jobDataMap.put(SEQUENCE, sequence);
+
+        jobDetail.setJobDataMap(jobDataMap);
+
+        StdSchedulerFactory sf = new StdSchedulerFactory();
+        try {
+            if (quartzConf != null && !"".equals(quartzConf)) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Initiating a Scheduler with configuration : " + 
quartzConf);
+                }
+
+                sf.initialize(quartzConf);
+            }
+        } catch (SchedulerException e) {
+            throw new SynapseException("Error initiating scheduler factory "
+                    + sf + "with configuration loaded from " + quartzConf, e);
+        }
+
+
+        try {
+            scheduler = sf.getScheduler();
+        } catch (SchedulerException e) {
+            throw new SynapseException("Error getting a  scheduler instance 
form scheduler" +
+                    " factory " + sf, e);
+        }
+
+        try {
+            scheduler.start();
+
+            scheduler.scheduleJob(jobDetail, trigger);
+
+            initialized = true;
+        } catch (SchedulerException e) {
+            throw new SynapseException("Error scheduling job : " + jobDetail
+                    + " with trigger " + trigger);
+        }
+    }
+
+    public void stop() {
+        if (initialized) {
+            try {
+                if (scheduler != null && scheduler.isStarted()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("ShuttingDown Sampling Scheduler : " + 
scheduler.getMetaData());
+                    }
+                    scheduler.shutdown();
+                }
+                initialized = false;
+            } catch (SchedulerException e) {
+                throw new SynapseException("Error ShuttingDown Sampling 
scheduler ", e);
+            }
+        }
+    }
+
+    public void setMessageStore(MessageStore messageStore) {
+        this.messageStore = messageStore;
+    }
+
+    public MessageStore getMessageStore() {
+        return messageStore;
+    }
+
+    public void setOnProcessSequence(Mediator mediator) {
+        this.onProcessSequence = mediator;
+    }
+
+    public Mediator getOnProcessSequence() {
+        return onProcessSequence;
+    }
+
+    public void setOnSubmitSequence(Mediator mediator) {
+        this.onSubmitSequence = mediator;
+    }
+
+    public Mediator getOnSubmitSequence() {
+        return onSubmitSequence;
+    }
+
+    public void setParameters(Map<String, Object> parameters) {
+        Object o = parameters.get(CRON_EXPRESSION);
+        if (o != null) {
+            cronExpression = o.toString();
+        }
+
+        o = parameters.get(INTERVAL);
+        if (o != null) {
+            interval = Integer.parseInt(o.toString());
+        }
+
+        o = parameters.get(CONCURRENCY);
+        if (o != null) {
+            concurrency = Integer.parseInt(o.toString());
+        }
+
+        o = parameters.get(QUARTZ_CONF);
+        if (o != null) {
+            quartzConf = o.toString();
+        }
+
+        o = parameters.get(SEQUENCE);
+        if (o != null) {
+            sequence = o.toString();
+        }
+    }
+
+    public Map<String, Object> getParameters() {
+        return null;
+    }
+
+    public boolean isStarted() {
+        return initialized;
+    }
+
+    public void init(SynapseEnvironment se) {
+        executor = se.getExecutorService();
+    }
+
+    public void destroy() {
+
+    }
+}

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java?rev=1070794&r1=1070793&r2=1070794&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
 Tue Feb 15 07:41:29 2011
@@ -79,6 +79,9 @@ public abstract class AbstractMessageSto
     public void init(SynapseEnvironment se) {
         this.synapseEnvironment = se;
         this.synapseConfiguration = 
synapseEnvironment.getSynapseConfiguration();
+        if (processor instanceof ManagedLifecycle) {
+            ((ManagedLifecycle) processor).init(se);
+        }
     }
 
     public String getName() {

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java?rev=1070794&r1=1070793&r2=1070794&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
 Tue Feb 15 07:41:29 2011
@@ -81,9 +81,9 @@ public class InMemoryMessageStore extend
 
     public List<MessageContext> unstore(int from, int to) {
         List<MessageContext> returnlist = new ArrayList<MessageContext>();
-        if (from <= to && (from <= messageList.size() && to <= 
messageList.size())) {
+        if (from <= to && (from <= messageList.size() && to <= 
messageList.size()) && messageList.size() > 0) {
 
-            String[] keys = messageList.keySet().toArray(new String[0]);
+            String[] keys = messageList.keySet().toArray(new 
String[messageList.keySet().size()]);
 
             for (int i = from; i <= to; i++) {
                 returnlist.add(messageList.remove(keys[i]));
@@ -93,15 +93,15 @@ public class InMemoryMessageStore extend
     }
 
     public List<MessageContext> getMessages(int from, int to) {
-         List<MessageContext> returnlist = new ArrayList<MessageContext>();
-        if (from <= to && (from <= messageList.size() && to <= 
messageList.size())) {
-            String[] keys = messageList.keySet().toArray(new String[0]);
+        List<MessageContext> returnList = new ArrayList<MessageContext>();
+        if (from <= to && (from <= messageList.size() && to <= 
messageList.size()) && messageList.size() > 0) {
+            String[] keys = messageList.keySet().toArray(new 
String[messageList.keySet().size()]);
 
             for (int i = from; i <= to; i++) {
-                returnlist.add(messageList.get(keys[i]));
+                returnList.add(messageList.get(keys[i]));
             }
         }
-        return returnlist;
+        return returnList;
     }
 
     public List<MessageContext> getAllMessages() {


Reply via email to