Author: charith
Date: Sun Feb 20 05:53:05 2011
New Revision: 1072505
URL: http://svn.apache.org/viewvc?rev=1072505&view=rev
Log:
standardizing message store api
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreView.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreViewMBean.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java?rev=1072505&r1=1072504&r2=1072505&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
Sun Feb 20 05:53:05 2011
@@ -19,12 +19,9 @@
package org.apache.synapse.message.store;
-import org.apache.synapse.Mediator;
-import org.apache.synapse.MessageContext;
import org.apache.synapse.commons.jmx.MBeanRegistrar;
import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.core.SynapseEnvironment;
-import org.apache.synapse.message.processors.MessageProcessor;
import java.util.Map;
import java.util.concurrent.locks.Lock;
@@ -76,18 +73,10 @@ public abstract class AbstractMessageSto
protected Lock lock = new ReentrantLock();
- /**
- * Message processor instance associated with the MessageStore
- */
- protected MessageProcessor processor;
public void init(SynapseEnvironment se) {
this.synapseEnvironment = se;
this.synapseConfiguration =
synapseEnvironment.getSynapseConfiguration();
-
- processor.init(se);
-
- processor.start();
}
public String getName() {
@@ -102,25 +91,7 @@ public abstract class AbstractMessageSto
}
- protected void mediateSequence(MessageContext synCtx) {
-
- if (sequence != null && synCtx != null) {
- Mediator seq = synCtx.getSequence(sequence);
- if (seq != null) {
- seq.mediate(synCtx);
- }
- }
- }
-
- public void setMessageProcessor(MessageProcessor messageProcessor) {
- this.processor = messageProcessor;
- }
-
- public MessageProcessor getMessageProcessor() {
- return processor;
- }
-
- public int getSize() {
+ public int size() {
return -1;
}
@@ -132,9 +103,7 @@ public abstract class AbstractMessageSto
return sequence;
}
- public void setConfiguration(SynapseConfiguration configuration) {
- this.synapseConfiguration = configuration;
- }
+
public Map<String, Object> getParameters() {
return parameters;
@@ -144,14 +113,8 @@ public abstract class AbstractMessageSto
this.parameters = parameters;
}
- public String getProviderClass() {
- return this.getClass().getName();
- }
public void destroy() {
- processor.stop();
-
- processor.destroy();
}
public void setDescription(String description) {
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java?rev=1072505&r1=1072504&r2=1072505&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
Sun Feb 20 05:53:05 2011
@@ -40,112 +40,88 @@ public class InMemoryMessageStore extend
private Lock lock = new ReentrantLock();
- public void store(MessageContext messageContext) {
+ public boolean offer(MessageContext messageContext) {
lock.lock();
try {
if (messageContext != null) {
- mediateSequence(messageContext);
messageList.put(messageContext.getMessageID(), messageContext);
if (log.isDebugEnabled()) {
- log.debug("Message " + messageContext.getMessageID() +
- " has been stored");
- }
-
- if(processor != null && !processor.isStarted()) {
-
- if(log.isDebugEnabled()) {
- log.debug("Starting Message processor " +
processor.getClass().getName());
- }
- processor.start();
+ log.debug("Message with id " +
messageContext.getMessageID() +
+ " stored");
}
}
} finally {
lock.unlock();
}
+
+ return true;
}
- public MessageContext unstore(String messageID) {
+ public MessageContext poll() {
lock.lock();
+ MessageContext context;
try {
- if (messageID != null) {
- return messageList.remove(messageID);
+ context = peek();
+ if(context !=null) {
+ messageList.remove(context.getMessageID());
}
} finally {
lock.unlock();
}
- return null;
+ return context;
}
- public List<MessageContext> unstoreAll() {
- lock.lock();
- try {
- List<MessageContext> returnList = new ArrayList<MessageContext>();
- for (String k : messageList.keySet()) {
- returnList.add(messageList.remove(k));
- }
- return returnList;
- } finally {
- lock.unlock();
+ public MessageContext peek() {
+ if (messageList.size() > 0) {
+ return (MessageContext) messageList.values().toArray()[0];
}
+
+ return null;
}
- public List<MessageContext> unstore(int maxNumberOfMessages) {
- lock.lock();
- try {
- List<MessageContext> returnList = new ArrayList<MessageContext>();
- Iterator<String> it = messageList.keySet().iterator();
- while (it.hasNext() && maxNumberOfMessages > 0) {
- returnList.add(messageList.get(it.next()));
- maxNumberOfMessages--;
- }
+ public MessageContext remove() throws NoSuchElementException {
+ MessageContext context = poll();
+ if(context == null) {
+ throw new NoSuchElementException();
+ }
- return returnList;
- } finally {
- lock.unlock();
+ return context;
+
+ }
+
+ public MessageContext get(int index) {
+ if(index >=0 && index < messageList.size()) {
+ return (MessageContext) messageList.values().toArray()[index];
}
+ return null;
}
- public List<MessageContext> unstore(int from, int to) {
+ public MessageContext remove(String messageID) {
lock.lock();
try {
- List<MessageContext> returnlist = new ArrayList<MessageContext>();
- if (from <= to && (from <= messageList.size() && to <=
messageList.size()) &&
- messageList.size() > 0) {
-
- String[] keys = messageList.keySet().toArray(
- new String[messageList.keySet().size()]);
-
- for (int i = from; i <= to; i++) {
- returnlist.add(messageList.remove(keys[i]));
- }
+ if (messageID != null) {
+ return messageList.remove(messageID);
}
- return returnlist;
} finally {
lock.unlock();
}
+ return null;
}
- public List<MessageContext> getMessages(int from, int to) {
+ public void clear() {
lock.lock();
try {
- List<MessageContext> returnList = new ArrayList<MessageContext>();
- if (from <= to && (from <= messageList.size() && to <=
messageList.size()) &&
- messageList.size() > 0) {
- String[] keys = messageList.keySet().toArray(
- new String[messageList.keySet().size()]);
- for (int i = from; i <= to; i++) {
- returnList.add(messageList.get(keys[i]));
- }
+ for (String k : messageList.keySet()) {
+ messageList.remove(k);
}
- return returnList;
} finally {
lock.unlock();
}
}
- public List<MessageContext> getAllMessages() {
+ public List<MessageContext> getAll() {
lock.lock();
try {
List<MessageContext> returnList = new ArrayList<MessageContext>();
@@ -158,7 +134,7 @@ public class InMemoryMessageStore extend
}
}
- public MessageContext getMessage(String messageId) {
+ public MessageContext get(String messageId) {
lock.lock();
try {
if (messageId != null) {
@@ -171,12 +147,7 @@ public class InMemoryMessageStore extend
}
- public int getSize() {
- lock.lock();
- try {
- return messageList.size();
- } finally {
- lock.unlock();
- }
+ public int size() {
+ return messageList.size();
}
}
\ No newline at end of file
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java?rev=1072505&r1=1072504&r2=1072505&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java
Sun Feb 20 05:53:05 2011
@@ -28,65 +28,92 @@ import org.apache.synapse.message.proces
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
/**
* This is the interface for the Synapse Message Store
- * Message Store is used to store failed Messages.
+ * Message Store is used to store Messages.
*/
public interface MessageStore extends SynapseArtifact, Nameable,
ManagedLifecycle {
+
+
/**
- * Store the Message in the Message Store
+ * Inserts the Message into this store if it is possible to do so
immediately
+ * without violating capacity restrictions.
* @param messageContext MessageContext to be saved
*/
- public void store(MessageContext messageContext);
+ public boolean offer(MessageContext messageContext);
+
+ /**
+ * Retrieves and removes the first Message in this store.
+ * Message ordering will depend on the underlying implementation
+ * @return first message context in the store
+ */
+ public MessageContext poll();
+
+ /**
+ * Retrieves but not removes the first Message in this store.
+ * Message ordering will depend on the underlying implementation
+ *
+ * @return first message context in the store
+ */
+ public MessageContext peek();
+
+
+ /**
+ * Retrieves and removes the first Message in this store.
+ * Message ordering will depend on the underlying implementation
+ *
+ * @return first message context in the store
+ * @throws NoSuchElementException if store is empty
+ */
+ public MessageContext remove() throws NoSuchElementException;
+
+ /**
+ * Delete all the Messages in the Message Store
+ *
+ */
+ public void clear();
+
/**
* Delete and return the MessageContext with given Message id
* @param messageID message id of the Message
* @return MessageContext instance
*/
- public MessageContext unstore(String messageID);
+ public MessageContext remove(String messageID);
+
/**
- * Delete all the Messages in the Message Store
- * @return List of all messages in store
+ * Returns the number of Messages in this store.
+ * @return the number of Messages in this Store
*/
- public List<MessageContext> unstoreAll();
-
+ public int size();
/**
- * Unstore Messages from index 'from' to index 'to'
- * Message ordering will be depend on the implementation
- * @param from start index
- * @param to stop index
- * @return list of messages that are belong to given range
+ * Return the Message in given index position
+ * (this may depend on the implementation)
+ * @param index position of the message
+ * @return Message in given index position
*/
- public List<MessageContext> unstore(int from , int to);
+ public MessageContext get(int index);
/**
* Get the All messages in the Message store without removing them from
the queue
* @return List of all Messages
*/
- public List<MessageContext> getAllMessages();
+ public List<MessageContext> getAll();
/**
* Get the Message with the given ID from the Message store without
removing it
* @param messageId A message ID string
* @return Message with given ID
*/
- public MessageContext getMessage(String messageId);
+ public MessageContext get(String messageId);
/**
- * Get Messages from index 'from' to index 'to'
- * Message ordering will be depend on the implementation
- * @param from start index
- * @param to stop index
- * @return list of messages that are belong to given range
- */
- public List<MessageContext> getMessages(int from , int to);
- /**
* set the implementation specific parameters
* @param parameters A map of parameters or null
*/
@@ -98,11 +125,6 @@ public interface MessageStore extends Sy
*/
public Map<String,Object> getParameters();
- /**
- * return the number of Messages stored in the Message store
- * @return the number of messages in the store
- */
- public int getSize();
/**
* set a Mediator sequence name
@@ -112,24 +134,11 @@ public interface MessageStore extends Sy
public void setSequence(String sequence);
/**
- * get the implementation class name of the message
- *
- * @return Name of the implementation class
- */
- public String getProviderClass ();
-
- /**
* Get Mediator sequence name
* @return Name of the sequence
*/
public String getSequence();
- /**
- * Add the Synapse configuration reference for the Message Store
- *
- * @param configuration Current SynapseConfiguration
- */
- public void setConfiguration(SynapseConfiguration configuration);
/**
* Set the name of the file that the Message store is configured
@@ -147,14 +156,6 @@ public interface MessageStore extends Sy
/**
- * Set the Message Processor Associated with the Message Store
- * @param messageProcessor message processor instance associated with
message store
- */
- public void setMessageProcessor(MessageProcessor messageProcessor);
-
- /**
- * Get the Message Processor associated with the MessageStore
- * @return message processor instance associated with the message store
+ * Todo Add observer api
*/
- public MessageProcessor getMessageProcessor();
}
\ No newline at end of file
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreView.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreView.java?rev=1072505&r1=1072504&r2=1072505&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreView.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreView.java
Sun Feb 20 05:53:05 2011
@@ -41,14 +41,14 @@ public class MessageStoreView implements
public void deleteAll() {
- messageStore.unstoreAll();
+ messageStore.clear();
log.info("All messages in Message Store" + messageStoreName + " were
deleted");
}
public List<String> getMessageIds() {
List<String> returnList = new ArrayList<String>();
- List<MessageContext> list = messageStore.getAllMessages();
+ List<MessageContext> list = messageStore.getAll();
for(MessageContext m : list) {
returnList.add(m.getMessageID());
@@ -58,7 +58,7 @@ public class MessageStoreView implements
public void delete(String messageID) {
if(messageID != null) {
- MessageContext m =messageStore.unstore(messageID);
+ MessageContext m =messageStore.remove(messageID);
if (m != null){
log.info("Message with ID :" + messageID + " removed from the
MessageStore");
}
@@ -67,7 +67,7 @@ public class MessageStoreView implements
public String getEnvelope(String messageID) {
if (messageID != null) {
- MessageContext m = messageStore.getMessage(messageID);
+ MessageContext m = messageStore.get(messageID);
if (m != null) {
return m.getEnvelope().toString();
@@ -77,10 +77,7 @@ public class MessageStoreView implements
}
public int getSize() {
- return messageStore.getSize();
+ return messageStore.size();
}
- public void delete(int maxCount) {
- messageStore.unstore(0,maxCount-1);
- }
}
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreViewMBean.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreViewMBean.java?rev=1072505&r1=1072504&r2=1072505&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreViewMBean.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreViewMBean.java
Sun Feb 20 05:53:05 2011
@@ -30,12 +30,6 @@ public interface MessageStoreViewMBean {
/**
- * Delete given number of Messages from the MessageStore
- * @param maxCount
- */
- public void delete(int maxCount);
-
- /**
* Get the Message IDs of all stored Messages in the Message store
*
* @return a list of message ID values