Author: hiranya
Date: Wed Dec 7 07:08:33 2011
New Revision: 1211311
URL: http://svn.apache.org/viewvc?rev=1211311&view=rev
Log:
* Upgrading to caching 3.5
* Updated the cache mediator
* Fixed an issue in throttle mediator (Patch from Miyuru)
* Fixed an issue in script mediator
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/bsf/ScriptMediator.java
synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java
synapse/trunk/java/pom.xml
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java?rev=1211311&r1=1211310&r2=1211311&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
Wed Dec 7 07:08:33 2011
@@ -591,7 +591,7 @@ public class SynapseConfiguration implem
* Returns the map of defined entries in the configuration excluding the
* fetched entries from remote registry.
*
- * @return Map of Entries defined in the local configuraion
+ * @return Map of Entries defined in the local configuration
*/
public Map<String, Entry> getDefinedEntries() {
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java?rev=1211311&r1=1211310&r2=1211311&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
Wed Dec 7 07:08:33 2011
@@ -19,10 +19,16 @@
package org.apache.synapse.mediators.builtin;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.soap.SOAP11Constants;
+import org.apache.axiom.soap.SOAP12Constants;
+import org.apache.axiom.soap.SOAPFactory;
import org.apache.axis2.AxisFault;
import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.clustering.state.Replicator;
import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.i18n.Messages;
import org.apache.synapse.ManagedLifecycle;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseException;
@@ -35,10 +41,7 @@ import org.apache.synapse.mediators.base
import org.apache.synapse.util.FixedByteArrayOutputStream;
import org.apache.synapse.util.MessageHelper;
import org.apache.axiom.soap.SOAPEnvelope;
-import org.wso2.caching.CacheManager;
-import org.wso2.caching.CachedObject;
-import org.wso2.caching.CachingConstants;
-import org.wso2.caching.CachingException;
+import org.wso2.caching.*;
import org.wso2.caching.util.SOAPMessageHelper;
import org.wso2.caching.digest.DigestGenerator;
@@ -148,10 +151,10 @@ public class CacheMediator extends Abstr
try {
if (synCtx.isResponse()) {
- processResponseMessage(synCtx, cfgCtx, synLog, cacheManager);
+ processResponseMessage(synCtx, synLog);
} else {
- result = processRequestMessage(synCtx, cfgCtx, synLog,
cacheManager);
+ result = processRequestMessage(synCtx, synLog, cacheManager);
}
} catch (ClusteringFault clusteringFault) {
@@ -169,66 +172,70 @@ public class CacheMediator extends Abstr
*
* @param synLog the Synapse log to use
* @param synCtx the current message (response)
- * @param cfgCtx the abstract context in which the cache will be
kept
- * @param cacheManager the cache manager
* @throws ClusteringFault is there is an error in replicating the cfgCtx
*/
- private void processResponseMessage(MessageContext synCtx,
ConfigurationContext cfgCtx,
- SynapseLog synLog, CacheManager cacheManager) throws ClusteringFault {
+ private void processResponseMessage(MessageContext synCtx,
+ SynapseLog synLog) throws
ClusteringFault {
if (!collector) {
handleException("Response messages cannot be handled in a non
collector cache", synCtx);
}
- String requestHash = (String)
synCtx.getProperty(CachingConstants.REQUEST_HASH);
-
- if (requestHash != null) {
+ org.apache.axis2.context.MessageContext msgCtx =
+ ((Axis2MessageContext)synCtx).getAxis2MessageContext();
+ OperationContext operationContext = msgCtx.getOperationContext();
+
+ CachableResponse response =
+ (CachableResponse)
operationContext.getPropertyNonReplicable(CachingConstants.CACHED_OBJECT);
+ if (response != null) {
if (synLog.isTraceOrDebugEnabled()) {
synLog.traceOrDebug("Storing the response message into the
cache at scope : " +
- scope + " with ID : " + cacheKey + " for request hash : "
+ requestHash);
+ scope + " with ID : " + cacheKey + " for request hash
: " + response.getRequestHash());
}
-
- CachedObject cachedObj = cacheManager.getResponseForKey(cacheKey,
requestHash, cfgCtx);
- if (cachedObj != null) {
-
- if (synLog.isTraceOrDebugEnabled()) {
- synLog.traceOrDebug("Storing the response for the message
with ID : " +
+ if (synLog.isTraceOrDebugEnabled()) {
+ synLog.traceOrDebug("Storing the response for the message with
ID : " +
synCtx.getMessageID() + " with request hash ID : " +
- cachedObj.getRequestHash() + " in the cache : " +
cacheKey);
- }
-
- ByteArrayOutputStream outStream = new ByteArrayOutputStream();
- try {
-
MessageHelper.cloneSOAPEnvelope(synCtx.getEnvelope()).serialize(outStream);
- cachedObj.setResponseEnvelope(outStream.toByteArray());
- } catch (XMLStreamException e) {
- handleException("Unable to set the response to the Cache",
e, synCtx);
- }
+ response.getRequestHash() + " in the cache : " +
cacheKey);
+ }
- /* this is not required yet, can commented this for perf
improvements
- in the future there can be a situation where user sends the
request
- with the response hash (if client side caching is on) in
which case
- we can compare that response hash with the given response
hash and
- respond with not-modified http header */
- // cachedObj.setResponseHash(cache.getGenerator().getDigest(
- // ((Axis2MessageContext)
synCtx).getAxis2MessageContext()));
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ try {
+ synCtx.getEnvelope().serialize(outStream);
+ response.setResponseEnvelope(outStream.toByteArray());
+ } catch (XMLStreamException e) {
+ handleException("Unable to set the response to the Cache", e,
synCtx);
+ }
- if (cachedObj.getTimeout() > 0) {
- cachedObj.setExpireTimeMillis(System.currentTimeMillis() +
cachedObj.getTimeout());
- }
+ // this is not required yet, can commented this for perf
improvements
+ // in the future there can be a situation where user sends the
request
+ // with the response hash (if client side caching is on) in which
case
+ // we can compare that response hash with the given response hash
and
+ // respond with not-modified http header */
+ // cachedObj.setResponseHash(cache.getGenerator().getDigest(
+ // ((Axis2MessageContext) synCtx).getAxis2MessageContext()));
- cfgCtx.setProperty(CachingConstants.CACHE_MANAGER,
cacheManager);
-// Replicator.replicate(cfgCtx, new String[]{cacheManagerKey});
- Replicator.replicate(cfgCtx);
- } else {
- synLog.auditWarn("A response message without a valid mapping
to the " +
- "request hash found. Unable to store the response in
cache");
+ if (response.getTimeout() > 0) {
+ response.setExpireTimeMillis(System.currentTimeMillis() +
response.getTimeout());
}
+ // Finally, we may need to replicate the changes in the cache
+ CacheReplicationCommand cacheReplicationCommand
+ = (CacheReplicationCommand)
msgCtx.getPropertyNonReplicable(
+ CachingConstants.STATE_REPLICATION_OBJECT);
+ if (cacheReplicationCommand != null) {
+ try {
+ Replicator.replicateState(cacheReplicationCommand,
+ msgCtx.getRootContext().getAxisConfiguration());
+ } catch (ClusteringFault clusteringFault) {
+ log.error("Cannot replicate cache changes");
+ }
+ }
} else {
- synLog.auditWarn("A response message without a mapping to the " +
- "request hash found. Unable to store the response in cache");
+ synLog.auditWarn("A response message without a valid mapping to
the " +
+ "request hash found. Unable to store the response in
cache");
}
+
+
}
/**
@@ -237,18 +244,19 @@ public class CacheMediator extends Abstr
* this message as a response and sends back directly to client.
*
* @param synCtx incoming request message
- * @param cfgCtx the AbstractContext in which the cache will be
kept
* @param synLog the Synapse log to use
* @param cacheManager the cache manager
* @return should this mediator terminate further processing?
* @throws ClusteringFault if there is an error in replicating the cfgCtx
*/
- private boolean processRequestMessage(MessageContext synCtx,
ConfigurationContext cfgCtx,
+ private boolean processRequestMessage(MessageContext synCtx,
SynapseLog synLog, CacheManager cacheManager) throws ClusteringFault {
if (collector) {
handleException("Request messages cannot be handled in a collector
cache", synCtx);
}
+ OperationContext opCtx =
((Axis2MessageContext)synCtx).getAxis2MessageContext().
+ getOperationContext();
String requestHash = null;
try {
@@ -263,37 +271,49 @@ public class CacheMediator extends Abstr
synLog.traceOrDebug("Generated request hash : " + requestHash);
}
- if (cacheManager.containsKey(cacheKey, requestHash) &&
- cacheManager.getResponseForKey(cacheKey, requestHash, cfgCtx) !=
null) {
+ ServiceName service;
+ if (id != null) {
+ service = new ServiceName(id);
+ } else {
+ service = new ServiceName(cacheKey);
+ }
+
+ RequestHash hash = new RequestHash(requestHash);
+ CachableResponse cachedResponse =
+ cacheManager.getCachedResponse(service, hash);
+
+ org.apache.axis2.context.MessageContext msgCtx =
((Axis2MessageContext)synCtx).getAxis2MessageContext();
+ opCtx.setNonReplicableProperty(CachingConstants.REQUEST_HASH,
requestHash);
+ CacheReplicationCommand cacheReplicationCommand = new
CacheReplicationCommand();
+ if (cachedResponse != null) {
// get the response from the cache and attach to the context and
change the
// direction of the message
- CachedObject cachedObj = cacheManager.getResponseForKey(cacheKey,
requestHash, cfgCtx);
-
- if (!cachedObj.isExpired() && cachedObj.getResponseEnvelope() !=
null) {
-
+ if (!cachedResponse.isExpired()) {
if (synLog.isTraceOrDebugEnabled()) {
synLog.traceOrDebug("Cache-hit for message ID : " +
synCtx.getMessageID());
}
-
+ cachedResponse.setInUse(true);
// mark as a response and replace envelope from cache
synCtx.setResponse(true);
- try {
- SOAPEnvelope omSOAPEnv =
SOAPMessageHelper.buildSOAPEnvelopeFromBytes(
- cachedObj.getResponseEnvelope());
-
- // todo: if there is a WSA messageID in the response, is
that need to be unique on each and every resp
+ opCtx.setNonReplicableProperty(CachingConstants.CACHED_OBJECT,
cachedResponse);
- synCtx.setEnvelope(omSOAPEnv);
+ SOAPEnvelope omSOAPEnv;
+ try {
+ omSOAPEnv = SOAPMessageHelper.buildSOAPEnvelopeFromBytes(
+ cachedResponse.getResponseEnvelope());
+ if (omSOAPEnv != null) {
+ synCtx.setEnvelope(omSOAPEnv);
+ }
} catch (AxisFault axisFault) {
handleException("Error setting response envelope from
cache : "
- + cacheKey, synCtx);
+ + cacheKey, synCtx);
} catch (IOException ioe) {
handleException("Error setting response envelope from
cache : "
- + cacheKey, ioe, synCtx);
+ + cacheKey, ioe, synCtx);
} catch (SOAPException soape) {
handleException("Error setting response envelope from
cache : "
- + cacheKey, soape, synCtx);
+ + cacheKey, soape, synCtx);
}
// take specified action on cache hit
@@ -307,7 +327,7 @@ public class CacheMediator extends Abstr
if (synLog.isTraceOrDebugEnabled()) {
synLog.traceOrDebug("Delegating message to the
onCachingHit " +
- "sequence : " + onCacheHitRef);
+ "sequence : " + onCacheHitRef);
}
synCtx.getSequence(onCacheHitRef).mediate(synCtx);
@@ -315,7 +335,7 @@ public class CacheMediator extends Abstr
if (synLog.isTraceOrDebugEnabled()) {
synLog.traceOrDebug("Request message " +
synCtx.getMessageID() +
- " was served from the cache : " + cacheKey);
+ " was served from the cache : " + cacheKey);
}
// send the response back if there is not onCacheHit is
specified
synCtx.setTo(null);
@@ -325,54 +345,52 @@ public class CacheMediator extends Abstr
return false;
} else {
- // cache exists, but has expired...
- cachedObj.expire();
- cachedObj.setTimeout(timeout);
- synLog.traceOrDebug("Existing cached response has expired.
Reset cache element");
-
- cfgCtx.setProperty(CachingConstants.CACHE_MANAGER,
cacheManager);
-// Replicator.replicate(cfgCtx, new String[]{cacheManagerKey});
- Replicator.replicate(cfgCtx);
- }
+ cachedResponse.reincarnate(timeout);
+ if (synLog.isTraceOrDebugEnabled()) {
+ synLog.traceOrDebug("Existing cached response has expired.
Reset cache element");
+ }
+ cacheManager.cacheResponse(service, hash, cachedResponse,
cacheReplicationCommand);
+ opCtx.setNonReplicableProperty(CachingConstants.CACHED_OBJECT,
cachedResponse);
+
opCtx.setNonReplicableProperty(CachingConstants.STATE_REPLICATION_OBJECT,
+ cacheReplicationCommand);
+ Replicator.replicate(opCtx);
+ }
} else {
-
// if not found in cache, check if we can cache this request
- if (cacheManager.getCacheKeys(cacheKey).size() ==
inMemoryCacheSize) {
- cacheManager.removeExpiredResponses(cacheKey, cfgCtx);
- if (cacheManager.getCacheKeys(cacheKey).size() ==
inMemoryCacheSize) {
- synLog.traceOrDebug("In-memory cache is full. Unable to
cache");
- } else {
- storeRequestToCache(cfgCtx, requestHash, cacheManager);
+ if (cacheManager.getCacheSize(service) >= inMemoryCacheSize) { //
If cache is full
+ cacheManager.removeExpiredResponses(service,
cacheReplicationCommand); // try to remove expired responses
+ if (cacheManager.getCacheSize(service) >= inMemoryCacheSize) {
// recheck if there is space
+ if (log.isDebugEnabled()) {
+ log.debug("In-memory cache is full. Unable to cache");
+ }
+ } else { // if we managed to free up some space in the cache.
Need state replication
+ cacheNewResponse(msgCtx, service, hash, cacheManager,
+ cacheReplicationCommand);
}
- } else {
- storeRequestToCache(cfgCtx, requestHash, cacheManager);
+ } else { // if there is more space in the cache. Need state
replication
+ cacheNewResponse(msgCtx, service, hash, cacheManager,
+ cacheReplicationCommand);
}
}
+
return true;
}
- /**
- * Store request message to the cache
- *
- * @param cfgCtx - the Abstract context in which the cache will be
kept
- * @param requestHash - the request hash that has already been computed
- * @param cacheManager - the cache
- * @throws ClusteringFault if there is an error in replicating the cfgCtx
- */
- private void storeRequestToCache(ConfigurationContext cfgCtx,
- String requestHash, CacheManager cacheManager) throws ClusteringFault {
+ private void cacheNewResponse(org.apache.axis2.context.MessageContext
msgContext,
+ ServiceName serviceName, RequestHash
requestHash,
+ CacheManager cacheManager,
+ CacheReplicationCommand
cacheReplicationCommand) throws ClusteringFault {
+ OperationContext opCtx = msgContext.getOperationContext();
+ CachableResponse response = new CachableResponse();
+ response.setRequestHash(requestHash.getRequestHash());
+ response.setTimeout(timeout);
+ cacheManager.cacheResponse(serviceName, requestHash, response,
cacheReplicationCommand);
+ opCtx.setNonReplicableProperty(CachingConstants.CACHED_OBJECT,
response);
+
opCtx.setNonReplicableProperty(CachingConstants.STATE_REPLICATION_OBJECT,
+ cacheReplicationCommand);
- CachedObject cachedObj = new CachedObject();
- cachedObj.setRequestHash(requestHash);
- // this does not set the expiretime but just sets the timeout and the
espiretime will
- // be set when the response is availabel
- cachedObj.setTimeout(timeout);
- cacheManager.addResponseWithKey(cacheKey, requestHash, cachedObj,
cfgCtx);
-
- cfgCtx.setProperty(CachingConstants.CACHE_MANAGER, cacheManager);
-// Replicator.replicate(cfgCtx, new String[]{cacheManagerKey});
- Replicator.replicate(cfgCtx);
+ Replicator.replicate(opCtx);
}
public String getId() {
@@ -458,4 +476,15 @@ public class CacheMediator extends Abstr
public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
+
+ public SOAPFactory getSOAPFactory(org.apache.axis2.context.MessageContext
msgContext) throws AxisFault {
+ String nsURI =
msgContext.getEnvelope().getNamespace().getNamespaceURI();
+ if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(nsURI)) {
+ return OMAbstractFactory.getSOAP12Factory();
+ } else if (SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(nsURI)) {
+ return OMAbstractFactory.getSOAP11Factory();
+ } else {
+ throw new AxisFault(Messages.getMessage("invalidSOAPversion"));
+ }
+ }
}
\ No newline at end of file
Modified:
synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/bsf/ScriptMediator.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/bsf/ScriptMediator.java?rev=1211311&r1=1211310&r2=1211311&view=diff
==============================================================================
---
synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/bsf/ScriptMediator.java
(original)
+++
synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/bsf/ScriptMediator.java
Wed Dec 7 07:08:33 2011
@@ -20,6 +20,8 @@
package org.apache.synapse.mediators.bsf;
import com.sun.phobos.script.javascript.RhinoScriptEngineFactory;
+import com.sun.script.groovy.GroovyScriptEngineFactory;
+import com.sun.script.jruby.JRubyScriptEngineFactory;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMText;
import org.apache.bsf.xml.XMLHelper;
@@ -400,6 +402,8 @@ public class ScriptMediator extends Abst
ScriptEngineManager manager = new ScriptEngineManager();
manager.registerEngineExtension("js", new RhinoScriptEngineFactory());
+ manager.registerEngineExtension("groovy", new
GroovyScriptEngineFactory());
+ manager.registerEngineExtension("rb", new JRubyScriptEngineFactory());
this.scriptEngine = manager.getEngineByExtension(language);
if (scriptEngine == null) {
Modified:
synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java?rev=1211311&r1=1211310&r2=1211311&view=diff
==============================================================================
---
synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java
(original)
+++
synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java
Wed Dec 7 07:08:33 2011
@@ -72,6 +72,8 @@ public class ThrottleMediator extends Ab
private Throttle throttle;
/* Lock used to ensure thread-safe creation of the throttle */
private final Object throttleLock = new Object();
+ /* Last version of dynamic policy resource*/
+ private long version;
public ThrottleMediator() {
this.accessControler = new AccessRateController();
@@ -181,8 +183,10 @@ public class ThrottleMediator extends Ab
boolean reCreate = false;
// if the key refers to a dynamic resource
if (entry.isDynamic()) {
- if (!entry.isCached() || entry.isExpired()) {
+ if ((!entry.isCached() || entry.isExpired()) &&
+ version != entry.getVersion()) {
reCreate = true;
+ version = entry.getVersion();
}
}
if (reCreate || throttle == null) {
Modified: synapse/trunk/java/pom.xml
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/pom.xml?rev=1211311&r1=1211310&r2=1211311&view=diff
==============================================================================
--- synapse/trunk/java/pom.xml (original)
+++ synapse/trunk/java/pom.xml Wed Dec 7 07:08:33 2011
@@ -865,7 +865,7 @@
<!-- dependencies of Synapse extensions module -->
<wso2commons.version>1.2</wso2commons.version>
- <wso2caching.version>3.4.0</wso2caching.version>
+ <wso2caching.version>3.5.0</wso2caching.version>
<wso2throttle.version>SNAPSHOT</wso2throttle.version>
<wso2eventing-api.version>SNAPSHOT</wso2eventing-api.version>
<xbean.version>2.2.0</xbean.version>