Author: hiranya
Date: Wed Dec  7 07:08:33 2011
New Revision: 1211311

URL: http://svn.apache.org/viewvc?rev=1211311&view=rev
Log:
* Upgrading to caching 3.5
* Updated the cache mediator
* Fixed an issue in throttle mediator (Patch from Miyuru)
* Fixed an issue in script mediator


Modified:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
    
synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/bsf/ScriptMediator.java
    
synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java
    synapse/trunk/java/pom.xml

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java?rev=1211311&r1=1211310&r2=1211311&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
 Wed Dec  7 07:08:33 2011
@@ -591,7 +591,7 @@ public class SynapseConfiguration implem
      * Returns the map of defined entries in the configuration excluding the
      * fetched entries from remote registry.
      *
-     * @return Map of Entries defined in the local configuraion
+     * @return Map of Entries defined in the local configuration
      */
     public Map<String, Entry> getDefinedEntries() {
 

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java?rev=1211311&r1=1211310&r2=1211311&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
 Wed Dec  7 07:08:33 2011
@@ -19,10 +19,16 @@
 
 package org.apache.synapse.mediators.builtin;
 
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.soap.SOAP11Constants;
+import org.apache.axiom.soap.SOAP12Constants;
+import org.apache.axiom.soap.SOAPFactory;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.clustering.ClusteringFault;
 import org.apache.axis2.clustering.state.Replicator;
 import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.i18n.Messages;
 import org.apache.synapse.ManagedLifecycle;
 import org.apache.synapse.MessageContext;
 import org.apache.synapse.SynapseException;
@@ -35,10 +41,7 @@ import org.apache.synapse.mediators.base
 import org.apache.synapse.util.FixedByteArrayOutputStream;
 import org.apache.synapse.util.MessageHelper;
 import org.apache.axiom.soap.SOAPEnvelope;
-import org.wso2.caching.CacheManager;
-import org.wso2.caching.CachedObject;
-import org.wso2.caching.CachingConstants;
-import org.wso2.caching.CachingException;
+import org.wso2.caching.*;
 import org.wso2.caching.util.SOAPMessageHelper;
 import org.wso2.caching.digest.DigestGenerator;
 
@@ -148,10 +151,10 @@ public class CacheMediator extends Abstr
         try {
 
             if (synCtx.isResponse()) {
-                processResponseMessage(synCtx, cfgCtx, synLog, cacheManager);
+                processResponseMessage(synCtx, synLog);
 
             } else {
-                result = processRequestMessage(synCtx, cfgCtx, synLog, 
cacheManager);
+                result = processRequestMessage(synCtx, synLog, cacheManager);
             }
 
         } catch (ClusteringFault clusteringFault) {
@@ -169,66 +172,70 @@ public class CacheMediator extends Abstr
      *
      * @param synLog         the Synapse log to use
      * @param synCtx         the current message (response)
-     * @param cfgCtx         the abstract context in which the cache will be 
kept
-     * @param cacheManager   the cache manager
      * @throws ClusteringFault is there is an error in replicating the cfgCtx
      */
-    private void processResponseMessage(MessageContext synCtx, 
ConfigurationContext cfgCtx,
-        SynapseLog synLog, CacheManager cacheManager) throws ClusteringFault {
+    private void processResponseMessage(MessageContext synCtx,
+                                        SynapseLog synLog) throws 
ClusteringFault {
 
         if (!collector) {
             handleException("Response messages cannot be handled in a non 
collector cache", synCtx);
         }
 
-        String requestHash = (String) 
synCtx.getProperty(CachingConstants.REQUEST_HASH);
-
-        if (requestHash != null) {
+        org.apache.axis2.context.MessageContext msgCtx =
+                ((Axis2MessageContext)synCtx).getAxis2MessageContext();
+        OperationContext operationContext = msgCtx.getOperationContext();
+
+        CachableResponse response =
+                (CachableResponse) 
operationContext.getPropertyNonReplicable(CachingConstants.CACHED_OBJECT);
+        if (response != null) {
             if (synLog.isTraceOrDebugEnabled()) {
                 synLog.traceOrDebug("Storing the response message into the 
cache at scope : " +
-                    scope + " with ID : " + cacheKey + " for request hash : " 
+ requestHash);
+                        scope + " with ID : " + cacheKey + " for request hash 
: " + response.getRequestHash());
             }
-
-            CachedObject cachedObj = cacheManager.getResponseForKey(cacheKey, 
requestHash, cfgCtx);
-            if (cachedObj != null) {
-
-                if (synLog.isTraceOrDebugEnabled()) {
-                    synLog.traceOrDebug("Storing the response for the message 
with ID : " +
+            if (synLog.isTraceOrDebugEnabled()) {
+                synLog.traceOrDebug("Storing the response for the message with 
ID : " +
                         synCtx.getMessageID() + " with request hash ID : " +
-                        cachedObj.getRequestHash() + " in the cache : " + 
cacheKey);
-                }
-
-                ByteArrayOutputStream outStream = new ByteArrayOutputStream();
-                try {
-                    
MessageHelper.cloneSOAPEnvelope(synCtx.getEnvelope()).serialize(outStream);
-                    cachedObj.setResponseEnvelope(outStream.toByteArray());
-                } catch (XMLStreamException e) {
-                    handleException("Unable to set the response to the Cache", 
e, synCtx);
-                }
+                        response.getRequestHash() + " in the cache : " + 
cacheKey);
+            }
 
-                /* this is not required yet, can commented this for perf 
improvements
-                   in the future there can be a situation where user sends the 
request
-                   with the response hash (if client side caching is on) in 
which case
-                   we can compare that response hash with the given response 
hash and
-                   respond with not-modified http header */
-                // cachedObj.setResponseHash(cache.getGenerator().getDigest(
-                //     ((Axis2MessageContext) 
synCtx).getAxis2MessageContext()));
+            ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+            try {
+                synCtx.getEnvelope().serialize(outStream);
+                response.setResponseEnvelope(outStream.toByteArray());
+            } catch (XMLStreamException e) {
+                handleException("Unable to set the response to the Cache", e, 
synCtx);
+            }
 
-                if (cachedObj.getTimeout() > 0) {
-                    cachedObj.setExpireTimeMillis(System.currentTimeMillis() + 
cachedObj.getTimeout());
-                }
+            // this is not required yet, can commented this for perf 
improvements
+            // in the future there can be a situation where user sends the 
request
+            // with the response hash (if client side caching is on) in which 
case
+            // we can compare that response hash with the given response hash 
and
+            // respond with not-modified http header */
+            // cachedObj.setResponseHash(cache.getGenerator().getDigest(
+            //     ((Axis2MessageContext) synCtx).getAxis2MessageContext()));
 
-                cfgCtx.setProperty(CachingConstants.CACHE_MANAGER, 
cacheManager);
-//                Replicator.replicate(cfgCtx, new String[]{cacheManagerKey});
-                Replicator.replicate(cfgCtx);
-            } else {
-                synLog.auditWarn("A response message without a valid mapping 
to the " +
-                    "request hash found. Unable to store the response in 
cache");
+            if (response.getTimeout() > 0) {
+                response.setExpireTimeMillis(System.currentTimeMillis() + 
response.getTimeout());
             }
 
+            // Finally, we may need to replicate the changes in the cache
+            CacheReplicationCommand cacheReplicationCommand
+                    = (CacheReplicationCommand) 
msgCtx.getPropertyNonReplicable(
+                    CachingConstants.STATE_REPLICATION_OBJECT);
+            if (cacheReplicationCommand != null) {
+                try {
+                    Replicator.replicateState(cacheReplicationCommand,
+                            msgCtx.getRootContext().getAxisConfiguration());
+                } catch (ClusteringFault clusteringFault) {
+                    log.error("Cannot replicate cache changes");
+                }
+            }
         } else {
-            synLog.auditWarn("A response message without a mapping to the " +
-                "request hash found. Unable to store the response in cache");
+            synLog.auditWarn("A response message without a valid mapping to 
the " +
+                    "request hash found. Unable to store the response in 
cache");
         }
+
+
     }
 
     /**
@@ -237,18 +244,19 @@ public class CacheMediator extends Abstr
      * this message as a response and sends back directly to client.
      *
      * @param synCtx         incoming request message
-     * @param cfgCtx         the AbstractContext in which the cache will be 
kept
      * @param synLog         the Synapse log to use
      * @param cacheManager   the cache manager
      * @return should this mediator terminate further processing?
      * @throws ClusteringFault if there is an error in replicating the cfgCtx
      */
-    private boolean processRequestMessage(MessageContext synCtx, 
ConfigurationContext cfgCtx,
+    private boolean processRequestMessage(MessageContext synCtx,
         SynapseLog synLog, CacheManager cacheManager) throws ClusteringFault {
 
         if (collector) {
             handleException("Request messages cannot be handled in a collector 
cache", synCtx);
         }
+        OperationContext opCtx = 
((Axis2MessageContext)synCtx).getAxis2MessageContext().
+                getOperationContext();
 
         String requestHash = null;
         try {
@@ -263,37 +271,49 @@ public class CacheMediator extends Abstr
             synLog.traceOrDebug("Generated request hash : " + requestHash);
         }
 
-        if (cacheManager.containsKey(cacheKey, requestHash) &&
-            cacheManager.getResponseForKey(cacheKey, requestHash, cfgCtx) != 
null) {
+        ServiceName service;
+        if (id != null) {
+            service = new ServiceName(id);
+        } else {
+            service = new ServiceName(cacheKey);
+        }
+
+        RequestHash hash = new RequestHash(requestHash);
+        CachableResponse cachedResponse =
+                cacheManager.getCachedResponse(service, hash);
+
+        org.apache.axis2.context.MessageContext msgCtx = 
((Axis2MessageContext)synCtx).getAxis2MessageContext();
+        opCtx.setNonReplicableProperty(CachingConstants.REQUEST_HASH, 
requestHash);
+        CacheReplicationCommand cacheReplicationCommand = new 
CacheReplicationCommand();
 
+        if (cachedResponse != null) {
             // get the response from the cache and attach to the context and 
change the
             // direction of the message
-            CachedObject cachedObj = cacheManager.getResponseForKey(cacheKey, 
requestHash, cfgCtx);
-
-            if (!cachedObj.isExpired() && cachedObj.getResponseEnvelope() != 
null) {
-
+            if (!cachedResponse.isExpired()) {
                 if (synLog.isTraceOrDebugEnabled()) {
                     synLog.traceOrDebug("Cache-hit for message ID : " + 
synCtx.getMessageID());
                 }
-
+                cachedResponse.setInUse(true);
                 // mark as a response and replace envelope from cache
                 synCtx.setResponse(true);
-                try {
-                    SOAPEnvelope omSOAPEnv = 
SOAPMessageHelper.buildSOAPEnvelopeFromBytes(
-                            cachedObj.getResponseEnvelope());
-
-                    // todo: if there is a WSA messageID in the response, is 
that need to be unique on each and every resp
+                opCtx.setNonReplicableProperty(CachingConstants.CACHED_OBJECT, 
cachedResponse);
 
-                    synCtx.setEnvelope(omSOAPEnv);
+                SOAPEnvelope omSOAPEnv;
+                try {
+                    omSOAPEnv = SOAPMessageHelper.buildSOAPEnvelopeFromBytes(
+                            cachedResponse.getResponseEnvelope());
+                    if (omSOAPEnv != null) {
+                        synCtx.setEnvelope(omSOAPEnv);
+                    }
                 } catch (AxisFault axisFault) {
                     handleException("Error setting response envelope from 
cache : "
-                        + cacheKey, synCtx);
+                            + cacheKey, synCtx);
                 } catch (IOException ioe) {
                     handleException("Error setting response envelope from 
cache : "
-                        + cacheKey, ioe, synCtx);
+                            + cacheKey, ioe, synCtx);
                 } catch (SOAPException soape) {
                     handleException("Error setting response envelope from 
cache : "
-                        + cacheKey, soape, synCtx);
+                            + cacheKey, soape, synCtx);
                 }
 
                 // take specified action on cache hit
@@ -307,7 +327,7 @@ public class CacheMediator extends Abstr
 
                     if (synLog.isTraceOrDebugEnabled()) {
                         synLog.traceOrDebug("Delegating message to the 
onCachingHit " +
-                            "sequence : " + onCacheHitRef);
+                                "sequence : " + onCacheHitRef);
                     }
                     synCtx.getSequence(onCacheHitRef).mediate(synCtx);
 
@@ -315,7 +335,7 @@ public class CacheMediator extends Abstr
 
                     if (synLog.isTraceOrDebugEnabled()) {
                         synLog.traceOrDebug("Request message " + 
synCtx.getMessageID() +
-                            " was served from the cache : " + cacheKey);
+                                " was served from the cache : " + cacheKey);
                     }
                     // send the response back if there is not onCacheHit is 
specified
                     synCtx.setTo(null);
@@ -325,54 +345,52 @@ public class CacheMediator extends Abstr
                 return false;
 
             } else {
-                // cache exists, but has expired...
-                cachedObj.expire();
-                cachedObj.setTimeout(timeout);
-                synLog.traceOrDebug("Existing cached response has expired. 
Reset cache element");
-
-                cfgCtx.setProperty(CachingConstants.CACHE_MANAGER, 
cacheManager);
-//                Replicator.replicate(cfgCtx, new String[]{cacheManagerKey});
-                Replicator.replicate(cfgCtx);
-            }
+                cachedResponse.reincarnate(timeout);
+                if (synLog.isTraceOrDebugEnabled()) {
+                    synLog.traceOrDebug("Existing cached response has expired. 
Reset cache element");
+                }
+                cacheManager.cacheResponse(service, hash, cachedResponse, 
cacheReplicationCommand);
+                opCtx.setNonReplicableProperty(CachingConstants.CACHED_OBJECT, 
cachedResponse);
+                
opCtx.setNonReplicableProperty(CachingConstants.STATE_REPLICATION_OBJECT,
+                        cacheReplicationCommand);
 
+                Replicator.replicate(opCtx);
+            }
         } else {
-
             // if not found in cache, check if we can cache this request
-            if (cacheManager.getCacheKeys(cacheKey).size() == 
inMemoryCacheSize) {
-                cacheManager.removeExpiredResponses(cacheKey, cfgCtx);
-                if (cacheManager.getCacheKeys(cacheKey).size() == 
inMemoryCacheSize) {
-                    synLog.traceOrDebug("In-memory cache is full. Unable to 
cache");
-                } else {
-                    storeRequestToCache(cfgCtx, requestHash, cacheManager);
+            if (cacheManager.getCacheSize(service) >= inMemoryCacheSize) { // 
If cache is full
+                cacheManager.removeExpiredResponses(service, 
cacheReplicationCommand); // try to remove expired responses
+                if (cacheManager.getCacheSize(service) >= inMemoryCacheSize) { 
// recheck if there is space
+                    if (log.isDebugEnabled()) {
+                        log.debug("In-memory cache is full. Unable to cache");
+                    }
+                } else { // if we managed to free up some space in the cache. 
Need state replication
+                    cacheNewResponse(msgCtx, service, hash, cacheManager,
+                                     cacheReplicationCommand);
                 }
-            } else {
-                storeRequestToCache(cfgCtx, requestHash, cacheManager);
+            } else { // if there is more space in the cache. Need state 
replication
+                cacheNewResponse(msgCtx, service, hash, cacheManager,
+                                 cacheReplicationCommand);
             }
         }
+
         return true;
     }
 
-    /**
-     * Store request message to the cache
-     *
-     * @param cfgCtx        - the Abstract context in which the cache will be 
kept
-     * @param requestHash   - the request hash that has already been computed
-     * @param cacheManager  - the cache
-     * @throws ClusteringFault if there is an error in replicating the cfgCtx
-     */
-    private void storeRequestToCache(ConfigurationContext cfgCtx,
-        String requestHash, CacheManager cacheManager) throws ClusteringFault {
+    private void cacheNewResponse(org.apache.axis2.context.MessageContext 
msgContext,
+                                  ServiceName serviceName, RequestHash 
requestHash,
+                                  CacheManager cacheManager,
+                                  CacheReplicationCommand 
cacheReplicationCommand) throws ClusteringFault {
+        OperationContext opCtx = msgContext.getOperationContext();
+        CachableResponse response = new CachableResponse();
+        response.setRequestHash(requestHash.getRequestHash());
+        response.setTimeout(timeout);
+        cacheManager.cacheResponse(serviceName, requestHash, response, 
cacheReplicationCommand);
+        opCtx.setNonReplicableProperty(CachingConstants.CACHED_OBJECT, 
response);
+        
opCtx.setNonReplicableProperty(CachingConstants.STATE_REPLICATION_OBJECT,
+                                       cacheReplicationCommand);
 
-        CachedObject cachedObj = new CachedObject();
-        cachedObj.setRequestHash(requestHash);
-        // this does not set the expiretime but just sets the timeout and the 
espiretime will
-        // be set when the response is availabel
-        cachedObj.setTimeout(timeout);
-        cacheManager.addResponseWithKey(cacheKey, requestHash, cachedObj, 
cfgCtx);
-
-        cfgCtx.setProperty(CachingConstants.CACHE_MANAGER, cacheManager);
-//        Replicator.replicate(cfgCtx, new String[]{cacheManagerKey});
-        Replicator.replicate(cfgCtx);
+        Replicator.replicate(opCtx);
     }
 
     public String getId() {
@@ -458,4 +476,15 @@ public class CacheMediator extends Abstr
     public void setMaxMessageSize(int maxMessageSize) {
         this.maxMessageSize = maxMessageSize;
     }
+
+    public SOAPFactory getSOAPFactory(org.apache.axis2.context.MessageContext 
msgContext) throws AxisFault {
+        String nsURI = 
msgContext.getEnvelope().getNamespace().getNamespaceURI();
+        if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(nsURI)) {
+            return OMAbstractFactory.getSOAP12Factory();
+        } else if (SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(nsURI)) {
+            return OMAbstractFactory.getSOAP11Factory();
+        } else {
+            throw new AxisFault(Messages.getMessage("invalidSOAPversion"));
+        }
+    }
 }
\ No newline at end of file

Modified: 
synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/bsf/ScriptMediator.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/bsf/ScriptMediator.java?rev=1211311&r1=1211310&r2=1211311&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/bsf/ScriptMediator.java
 (original)
+++ 
synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/bsf/ScriptMediator.java
 Wed Dec  7 07:08:33 2011
@@ -20,6 +20,8 @@
 package org.apache.synapse.mediators.bsf;
 
 import com.sun.phobos.script.javascript.RhinoScriptEngineFactory;
+import com.sun.script.groovy.GroovyScriptEngineFactory;
+import com.sun.script.jruby.JRubyScriptEngineFactory;
 import org.apache.axiom.om.OMElement;
 import org.apache.axiom.om.OMText;
 import org.apache.bsf.xml.XMLHelper;
@@ -400,6 +402,8 @@ public class ScriptMediator extends Abst
 
         ScriptEngineManager manager = new ScriptEngineManager();
         manager.registerEngineExtension("js", new RhinoScriptEngineFactory());
+        manager.registerEngineExtension("groovy", new 
GroovyScriptEngineFactory());
+        manager.registerEngineExtension("rb", new JRubyScriptEngineFactory());
 
         this.scriptEngine = manager.getEngineByExtension(language);
         if (scriptEngine == null) {

Modified: 
synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java?rev=1211311&r1=1211310&r2=1211311&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java
 (original)
+++ 
synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/throttle/ThrottleMediator.java
 Wed Dec  7 07:08:33 2011
@@ -72,6 +72,8 @@ public class ThrottleMediator extends Ab
     private Throttle throttle;
     /* Lock used to ensure thread-safe creation of the throttle */
     private final Object throttleLock = new Object();
+    /* Last version of dynamic policy resource*/
+    private long version;
 
     public ThrottleMediator() {
         this.accessControler = new AccessRateController();
@@ -181,8 +183,10 @@ public class ThrottleMediator extends Ab
                         boolean reCreate = false;
                         // if the key refers to a dynamic resource
                         if (entry.isDynamic()) {
-                            if (!entry.isCached() || entry.isExpired()) {
+                            if ((!entry.isCached() || entry.isExpired()) &&
+                                    version != entry.getVersion()) {
                                 reCreate = true;
+                                version = entry.getVersion();
                             }
                         }
                         if (reCreate || throttle == null) {

Modified: synapse/trunk/java/pom.xml
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/pom.xml?rev=1211311&r1=1211310&r2=1211311&view=diff
==============================================================================
--- synapse/trunk/java/pom.xml (original)
+++ synapse/trunk/java/pom.xml Wed Dec  7 07:08:33 2011
@@ -865,7 +865,7 @@
 
         <!-- dependencies of Synapse extensions module -->
         <wso2commons.version>1.2</wso2commons.version>
-        <wso2caching.version>3.4.0</wso2caching.version>
+        <wso2caching.version>3.5.0</wso2caching.version>
         <wso2throttle.version>SNAPSHOT</wso2throttle.version>
         <wso2eventing-api.version>SNAPSHOT</wso2eventing-api.version>
         <xbean.version>2.2.0</xbean.version>


Reply via email to