Author: supun
Date: Fri Sep 17 05:41:55 2010
New Revision: 997983

URL: http://svn.apache.org/viewvc?rev=997983&view=rev
Log:
SYNAPSE-683

Modified:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SALSessions.java
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SessionInformation.java

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java?rev=997983&r1=997982&r2=997983&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java
 Fri Sep 17 05:41:55 2010
@@ -32,19 +32,38 @@ import org.apache.synapse.core.SynapseEn
 import org.apache.synapse.core.axis2.Axis2MessageContext;
 import org.apache.synapse.core.axis2.Axis2SynapseEnvironment;
 import org.apache.synapse.endpoints.algorithms.AlgorithmContext;
+import org.apache.synapse.endpoints.dispatch.Dispatcher;
+import org.apache.synapse.endpoints.dispatch.SALSessions;
+import org.apache.synapse.endpoints.dispatch.SessionInformation;
 
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
 
 /**
- * Represents a dynamic load balance endpoint. The application membership is 
not static, but discovered
- * through some mechanism such as using a GCF
+ * Represents a dynamic load balance endpoint. The application membership is 
not static,
+ * but discovered through some mechanism such as using a GCF
  */
 public class DynamicLoadbalanceEndpoint extends LoadbalanceEndpoint {
 
     private static final Log log = 
LogFactory.getLog(DynamicLoadbalanceEndpoint.class);
 
     /**
+     *  Flag to enable session affinity based load balancing.
+     */
+    private boolean sessionAffinity = false;
+
+    /**
+     * Dispatcher used for session affinity.
+     */
+    private Dispatcher dispatcher = null;
+
+    /* Sessions time out interval*/
+    private long sessionTimeout = -1;
+
+    /**
      * The algorithm context , place holder for keep any runtime states 
related to the load balance
      * algorithm
      */
@@ -59,6 +78,12 @@ public class DynamicLoadbalanceEndpoint 
             if (algorithmContext == null) {
                 algorithmContext = new AlgorithmContext(isClusteringEnabled, 
cc, getName());
             }
+
+            // Initialize the SAL Sessions if already has not been initialized.
+            SALSessions salSessions = SALSessions.getInstance();
+            if (!salSessions.isInitialized()) {
+                salSessions.initialize(isClusteringEnabled, cc);
+            }
         }
         log.info("Dynamic load balance endpoint initialized");
     }
@@ -77,18 +102,58 @@ public class DynamicLoadbalanceEndpoint 
     }
 
     public void send(MessageContext synCtx) {
-        EndpointReference to = synCtx.getTo();
-        DynamicLoadbalanceFaultHandler faultHandler = new 
DynamicLoadbalanceFaultHandler(to);
-        if (isFailover()) {
-            synCtx.pushFaultHandler(faultHandler);
-        }
+        SessionInformation sessionInformation = null;
+        Member currentMember = null;
         ConfigurationContext configCtx =
                 ((Axis2MessageContext) 
synCtx).getAxis2MessageContext().getConfigurationContext();
         if (lbMembershipHandler.getConfigurationContext() == null) {
             lbMembershipHandler.setConfigurationContext(configCtx);
         }
-//        algorithmContext.setConfigurationContext(configCtx);
-        sendToApplicationMember(synCtx, to, faultHandler);
+
+        if (isSessionAffinityBasedLB()) {
+            // first check if this session is associated with a session. if 
so, get the endpoint
+            // associated for that session.
+            sessionInformation =
+                    (SessionInformation) synCtx.getProperty(
+                            
SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION);
+
+            currentMember = (Member) synCtx.getProperty(
+                    SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_MEMBER);
+
+            if (sessionInformation == null && currentMember == null) {
+                sessionInformation = dispatcher.getSession(synCtx);
+                if (sessionInformation != null) {
+
+                    if (log.isDebugEnabled()) {
+                        log.debug("Current session id : " + 
sessionInformation.getId());
+                    }
+
+                    currentMember = sessionInformation.getMember();
+                    synCtx.setProperty(
+                            SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_MEMBER, 
currentMember);
+                    // This is for reliably recovery any session information 
if while response is getting ,
+                    // session information has been removed by cleaner.
+                    // This will not be a cost as  session information a not 
heavy data structure
+                    synCtx.setProperty(
+                            
SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION, sessionInformation);
+                }
+            }
+
+        }
+        
+        if (sessionInformation != null && currentMember != null) {
+            //send message on current session
+            sendToApplicationMember(synCtx, currentMember, false);
+        } else {
+            // prepare for a new session
+            currentMember = 
lbMembershipHandler.getNextApplicationMember(algorithmContext);
+            if (currentMember == null) {
+                String msg = "No application members available";
+                log.error(msg);
+                throw new SynapseException(msg);
+            }
+            sendToApplicationMember(synCtx, currentMember, true);
+        }
     }
 
     public void setName(String name) {
@@ -96,62 +161,136 @@ public class DynamicLoadbalanceEndpoint 
 //        algorithmContext.setContextID(name);
     }
 
+  public Dispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    public void setDispatcher(Dispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public long getSessionTimeout() {
+        return sessionTimeout;
+    }
+
+    public void setSessionTimeout(long sessionTimeout) {
+        this.sessionTimeout = sessionTimeout;
+    }
+
+    public void setSessionAffinity(boolean sessionAffinity){
+        this.sessionAffinity = sessionAffinity;
+    }
+
+    public boolean isSessionAffinityBasedLB(){
+        return sessionAffinity;
+    }
+
     private void sendToApplicationMember(MessageContext synCtx,
-                                         EndpointReference to,
-                                         DynamicLoadbalanceFaultHandler 
faultHandler) {
+                                         Member currentMember, boolean 
newSession) {
+        //Rewriting the URL
         org.apache.axis2.context.MessageContext axis2MsgCtx =
                 ((Axis2MessageContext) synCtx).getAxis2MessageContext();
-
         String transport = axis2MsgCtx.getTransportIn().getName();
-        Member currentMember =
-                lbMembershipHandler.getNextApplicationMember(algorithmContext);
+        String address = synCtx.getTo().getAddress();
+        EndpointReference to = 
getEndpointReferenceAfterURLRewrite(currentMember,
+                transport, address);
+        synCtx.setTo(to);
+
+        DynamicLoadbalanceFaultHandler faultHandler = new 
DynamicLoadbalanceFaultHandler(to);
         faultHandler.setCurrentMember(currentMember);
-        if (currentMember != null) {
+        if (isFailover()) {
+            synCtx.pushFaultHandler(faultHandler);
+            synCtx.getEnvelope().build();
+        }
 
-            // URL rewrite
-            if (transport.equals("http") || transport.equals("https")) {
-                String address = to.getAddress();
-                if (address.indexOf(":") != -1) {
-                    try {
-                        address = new URL(address).getPath();
-                    } catch (MalformedURLException e) {
-                        String msg = "URL " + address + " is malformed";
-                        log.error(msg, e);
-                        throw new SynapseException(msg, e);
-                    }
-                }
+        Endpoint endpoint = getEndpoint(to, synCtx);
+        if (isSessionAffinityBasedLB() && newSession) {
+            prepareEndPointSequence(synCtx, endpoint);
+            
synCtx.setProperty(SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_MEMBER, 
currentMember);
+            
synCtx.setProperty(SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_DISPATCHER, 
dispatcher);
+            // we should also indicate that this is the first message in the 
session. so that
+            // onFault(...) method can resend only the failed attempts for the 
first message.
+            
synCtx.setProperty(SynapseConstants.PROP_SAL_ENDPOINT_FIRST_MESSAGE_IN_SESSION,
+                    Boolean.TRUE);
+        }
+        endpoint.send(synCtx);
+    }
 
-                EndpointReference epr =
-                        new EndpointReference(transport + "://" + 
currentMember.getHostName() +
-                                ":" + ("http".equals(transport) ? 
currentMember.getHttpPort() :
-                                currentMember.getHttpsPort()) + address);
-                synCtx.setTo(epr);
-                if (isFailover()) {
-                    synCtx.getEnvelope().build();
-                }
+    /*
+    * Preparing the endpoint sequence for a new session establishment request
+    */
+    private void prepareEndPointSequence(MessageContext synCtx, Endpoint 
endpoint) {
+
+        Object o = 
synCtx.getProperty(SynapseConstants.PROP_SAL_ENDPOINT_ENDPOINT_LIST);
+        List<Endpoint> endpointList;
+        if (o instanceof List) {
+            endpointList = (List<Endpoint>) o;
+            endpointList.add(this);
 
-                AddressEndpoint endpoint = new AddressEndpoint();
-                endpoint.setName("DynamicLoadBalanceAddressEndpoint-" + 
Math.random());
-                EndpointDefinition definition = new EndpointDefinition();
-                definition.setAddress(epr.getAddress());
-                endpoint.setDefinition(definition);
-                endpoint.init((SynapseEnvironment)
-                        ((Axis2MessageContext) 
synCtx).getAxis2MessageContext().
-                        getConfigurationContext().getAxisConfiguration().
-                        getParameterValue(SynapseConstants.SYNAPSE_ENV));
-                endpoint.send(synCtx);
-            } else {
-                log.error("Cannot load balance for non-HTTP/S transport " + 
transport);
+        } else {
+            // this is the first endpoint in the hierarchy. so create the 
queue and
+            // insert this as the first element.
+            endpointList = new ArrayList<Endpoint>();
+            endpointList.add(this);
+            
synCtx.setProperty(SynapseConstants.PROP_SAL_ENDPOINT_ENDPOINT_LIST, 
endpointList);
+        }
+
+        // if the next endpoint is not a session affinity one, endpoint 
sequence ends
+        // here. but we have to add the next endpoint to the list.
+        if (!(endpoint instanceof DynamicLoadbalanceEndpoint)) {
+            endpointList.add(endpoint);
+            // Clearing out if there any any session information with current 
message
+            if (dispatcher.isServerInitiatedSession()) {
+                dispatcher.removeSessionID(synCtx);
             }
+        }
+    }
+
+    private EndpointReference getEndpointReferenceAfterURLRewrite(Member 
currentMember,
+                                                                  String 
transport,
+                                                                  String 
address) {
+        // URL rewrite
+        if (transport.equals("http") || transport.equals("https")) {
+            if (address.indexOf(":") != -1) {
+                try {
+                    address = new URL(address).getPath();
+                } catch (MalformedURLException e) {
+                    String msg = "URL " + address + " is malformed";
+                    log.error(msg, e);
+                    throw new SynapseException(msg, e);
+                }
+            }
+
+            return new EndpointReference(transport + "://" + 
currentMember.getHostName() +
+                    ":" + ("http".equals(transport) ? 
currentMember.getHttpPort() :
+                    currentMember.getHttpsPort()) + address);
         } else {
-            synCtx.getFaultStack().pop(); // Remove the 
DynamicLoadbalanceFaultHandler
-            String msg = "No application members available";
+            String msg = "Cannot load balance for non-HTTP/S transport " + 
transport;
             log.error(msg);
             throw new SynapseException(msg);
         }
     }
 
     /**
+     *
+     * @param to get an endpoint to send the information
+     * @param synCtx synapse context
+     * @return the created endpoint
+     */
+    private Endpoint getEndpoint(EndpointReference to, MessageContext synCtx) {
+        AddressEndpoint endpoint = new AddressEndpoint();
+        endpoint.setName("DynamicLoadBalanceAddressEndpoint-" + Math.random());
+        EndpointDefinition definition = new EndpointDefinition();
+        definition.setAddress(to.getAddress());
+        endpoint.setDefinition(definition);
+        endpoint.init((SynapseEnvironment)
+                ((Axis2MessageContext) synCtx).getAxis2MessageContext().
+                        getConfigurationContext().getAxisConfiguration().
+                        getParameterValue(SynapseConstants.SYNAPSE_ENV));
+        return endpoint;
+    }
+
+    /**
      * This FaultHandler will try to resend the message to another member if 
an error occurs
      * while sending to some member. This is a failover mechanism
      */
@@ -172,8 +311,23 @@ public class DynamicLoadbalanceEndpoint 
             if (currentMember == null) {
                 return;
             }
-            synCtx.pushFaultHandler(this);
-            sendToApplicationMember(synCtx, to, this);
+            synCtx.getFaultStack().pop(); // Remove the LoadbalanceFaultHandler
+            currentMember = 
lbMembershipHandler.getNextApplicationMember(algorithmContext);
+            if(currentMember == null){
+                String msg = "No application members available";
+                log.error(msg);
+                throw new SynapseException(msg);
+            }
+            synCtx.setTo(to);
+            if(isSessionAffinityBasedLB()){
+                //We are sending the this message on a new session,
+                // hence we need to remove previous session information
+                Set pros = synCtx.getPropertyKeySet();
+                if (pros != null) {
+                    
pros.remove(SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION);
+                }
+            }
+            sendToApplicationMember(synCtx, currentMember, true);
         }
     }
 }

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SALSessions.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SALSessions.java?rev=997983&r1=997982&r2=997983&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SALSessions.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SALSessions.java
 Fri Sep 17 05:41:55 2010
@@ -20,6 +20,7 @@
 package org.apache.synapse.endpoints.dispatch;
 
 import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.clustering.Member;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.MessageContext;
@@ -28,6 +29,7 @@ import org.apache.synapse.SynapseExcepti
 import org.apache.synapse.endpoints.Endpoint;
 import org.apache.synapse.endpoints.IndirectEndpoint;
 import org.apache.synapse.endpoints.SALoadbalanceEndpoint;
+import org.apache.synapse.endpoints.DynamicLoadbalanceEndpoint;
 import org.apache.synapse.util.Replicator;
 
 import java.util.*;
@@ -141,6 +143,7 @@ public class SALSessions {
                 SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION);
 
         List<Endpoint> endpoints = null;
+        Member currentMember = null; 
 
         if (oldSession == null) {
 
@@ -149,8 +152,11 @@ public class SALSessions {
             }
             endpoints = (List<Endpoint>) synCtx.getProperty(
                     SynapseConstants.PROP_SAL_ENDPOINT_ENDPOINT_LIST);
-            createSession = true;
 
+            currentMember = (Member) synCtx.getProperty(
+                    SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_MEMBER);
+            
+            createSession = true;
         } else {
 
             String oldSessionID = oldSession.getId();
@@ -162,6 +168,7 @@ public class SALSessions {
                 }
                 removeSession(oldSessionID);
                 endpoints = oldSession.getEndpointList();
+                currentMember = oldSession.getMember();
                 createSession = true;
 
             } else {
@@ -174,6 +181,7 @@ public class SALSessions {
                         log.debug("Recovering lost session information for 
session id " + sessionID);
                     }
                     endpoints = oldSession.getEndpointList();
+                    currentMember = oldSession.getMember();
                     createSession = true;
                 } else {
                     if (log.isDebugEnabled()) {
@@ -184,9 +192,12 @@ public class SALSessions {
         }
 
         if (createSession) {
-
-            SessionInformation newInformation = createSessionInformation(
-                    synCtx, sessionID, endpoints);
+            SessionInformation newInformation;
+            if(currentMember == null){
+                newInformation = createSessionInformation(synCtx, sessionID, 
endpoints);
+            } else {
+                newInformation = createSessionInformation(synCtx, sessionID, 
currentMember);
+            }
 
             if (log.isDebugEnabled()) {
                 log.debug("Establishing a session with id :" +
@@ -420,7 +431,8 @@ public class SALSessions {
     }
 
     /*
-     * Helper method to get a list of endpoints from a list of endpoint name 
maps - This is for clustered env.
+     * Helper method to get a list of endpoints from a list of endpoint name 
maps -
+     * This is for clustered env.
      */
     private List<Endpoint> getEndpoints(List<String> endpointNames, String 
root) {
 
@@ -507,9 +519,11 @@ public class SALSessions {
     }
 
     /*
-     * Factory method to create a session information using given endpoint 
list , session id and other informations
+     * Factory method to create a session information using given endpoint 
list,
+     * session id and other informations
      */
-    private SessionInformation createSessionInformation(MessageContext synCtx, 
String id, List<Endpoint> endpoints) {
+    private SessionInformation createSessionInformation(MessageContext synCtx,
+                                                        String id, 
List<Endpoint> endpoints) {
 
         if (endpoints == null || endpoints.isEmpty()) {
             handleException("Invalid request to create sessions . Cannot find 
a endpoint sequence.");
@@ -559,4 +573,61 @@ public class SALSessions {
         }
         return information;
     }
+
+    /*
+     * Factory method to create a session information using a given member 
node ,
+     * session id and other informations
+     */
+    private SessionInformation createSessionInformation(MessageContext synCtx,
+                                                        String id, Member 
currentMember) {
+
+        if (currentMember == null) {
+            handleException("Invalid request to create sessions.");
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Creating a session information for given session id  " 
+ id
+                    + " with member Host:" + currentMember.getHostName() + " 
Port:"
+                    + currentMember.getPort());
+        }
+
+        long expireTimeWindow = -1;
+        List<Endpoint> endpoints = (List<Endpoint>)synCtx.getProperty(
+                SynapseConstants.PROP_SAL_ENDPOINT_ENDPOINT_LIST);
+
+        assert endpoints != null;
+        for (Endpoint endpoint : endpoints) {
+            if (endpoint instanceof DynamicLoadbalanceEndpoint) {
+                long sessionsTimeout = ((DynamicLoadbalanceEndpoint) 
endpoint).getSessionTimeout();
+                if (expireTimeWindow == -1) {
+                    expireTimeWindow = sessionsTimeout;
+                } else if (expireTimeWindow > sessionsTimeout) {
+                    expireTimeWindow = sessionsTimeout;
+                }
+            }
+        }
+
+        if (expireTimeWindow == -1) {
+            expireTimeWindow = synCtx.getConfiguration().getProperty(
+                    SynapseConstants.PROP_SAL_ENDPOINT_DEFAULT_SESSION_TIMEOUT,
+                    SynapseConstants.SAL_ENDPOINTS_DEFAULT_SESSION_TIMEOUT);
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("For session with id " + id +
+                    " : expiry time interval : " + expireTimeWindow);
+        }
+
+        long expiryTime = System.currentTimeMillis() + expireTimeWindow;
+
+        Endpoint rootEndpoint = endpoints.get(0);
+
+        SessionInformation information = new SessionInformation(id,
+                currentMember, expiryTime);
+
+        if (isClustered) {
+            information.setRootEndpointName(getEndpointName(rootEndpoint));
+        }
+        return information;
+    }
 }
\ No newline at end of file

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SessionInformation.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SessionInformation.java?rev=997983&r1=997982&r2=997983&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SessionInformation.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SessionInformation.java
 Fri Sep 17 05:41:55 2010
@@ -21,6 +21,7 @@ package org.apache.synapse.endpoints.dis
 
 
 import org.apache.synapse.endpoints.Endpoint;
+import org.apache.axis2.clustering.Member;
 
 import java.io.Serializable;
 import java.util.List;
@@ -35,7 +36,8 @@ public class SessionInformation implemen
     private String rootEndpointName;
     private List<String> path;
     private long expiryTime;
-    private final transient List<Endpoint> endpointList;
+    private transient List<Endpoint> endpointList;
+    private transient Member member;
 
     public SessionInformation(String id, List<Endpoint> endpointList, long 
expiryTime) {
         this.id = id;
@@ -43,6 +45,12 @@ public class SessionInformation implemen
         this.expiryTime = expiryTime;
     }
 
+    public SessionInformation(String id, Member member, long expiryTime) {
+        this.id = id;
+        this.member = member;
+        this.expiryTime = expiryTime;
+    }
+
     public String getId() {
         return id;
     }
@@ -83,5 +91,8 @@ public class SessionInformation implemen
         return expiryTime < System.currentTimeMillis();
     }
 
+    public Member getMember(){
+        return member;
+    }
 }
 


Reply via email to