Author: supun
Date: Fri Sep 17 05:41:55 2010
New Revision: 997983
URL: http://svn.apache.org/viewvc?rev=997983&view=rev
Log:
SYNAPSE-683
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SALSessions.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SessionInformation.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java?rev=997983&r1=997982&r2=997983&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java
Fri Sep 17 05:41:55 2010
@@ -32,19 +32,38 @@ import org.apache.synapse.core.SynapseEn
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.core.axis2.Axis2SynapseEnvironment;
import org.apache.synapse.endpoints.algorithms.AlgorithmContext;
+import org.apache.synapse.endpoints.dispatch.Dispatcher;
+import org.apache.synapse.endpoints.dispatch.SALSessions;
+import org.apache.synapse.endpoints.dispatch.SessionInformation;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
/**
- * Represents a dynamic load balance endpoint. The application membership is
not static, but discovered
- * through some mechanism such as using a GCF
+ * Represents a dynamic load balance endpoint. The application membership is
not static,
+ * but discovered through some mechanism such as using a GCF
*/
public class DynamicLoadbalanceEndpoint extends LoadbalanceEndpoint {
private static final Log log =
LogFactory.getLog(DynamicLoadbalanceEndpoint.class);
/**
+ * Flag to enable session affinity based load balancing.
+ */
+ private boolean sessionAffinity = false;
+
+ /**
+ * Dispatcher used for session affinity.
+ */
+ private Dispatcher dispatcher = null;
+
+ /* Sessions time out interval*/
+ private long sessionTimeout = -1;
+
+ /**
* The algorithm context , place holder for keep any runtime states
related to the load balance
* algorithm
*/
@@ -59,6 +78,12 @@ public class DynamicLoadbalanceEndpoint
if (algorithmContext == null) {
algorithmContext = new AlgorithmContext(isClusteringEnabled,
cc, getName());
}
+
+ // Initialize the SAL Sessions if already has not been initialized.
+ SALSessions salSessions = SALSessions.getInstance();
+ if (!salSessions.isInitialized()) {
+ salSessions.initialize(isClusteringEnabled, cc);
+ }
}
log.info("Dynamic load balance endpoint initialized");
}
@@ -77,18 +102,58 @@ public class DynamicLoadbalanceEndpoint
}
public void send(MessageContext synCtx) {
- EndpointReference to = synCtx.getTo();
- DynamicLoadbalanceFaultHandler faultHandler = new
DynamicLoadbalanceFaultHandler(to);
- if (isFailover()) {
- synCtx.pushFaultHandler(faultHandler);
- }
+ SessionInformation sessionInformation = null;
+ Member currentMember = null;
ConfigurationContext configCtx =
((Axis2MessageContext)
synCtx).getAxis2MessageContext().getConfigurationContext();
if (lbMembershipHandler.getConfigurationContext() == null) {
lbMembershipHandler.setConfigurationContext(configCtx);
}
-// algorithmContext.setConfigurationContext(configCtx);
- sendToApplicationMember(synCtx, to, faultHandler);
+
+ if (isSessionAffinityBasedLB()) {
+ // first check if this session is associated with a session. if
so, get the endpoint
+ // associated for that session.
+ sessionInformation =
+ (SessionInformation) synCtx.getProperty(
+
SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION);
+
+ currentMember = (Member) synCtx.getProperty(
+ SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_MEMBER);
+
+ if (sessionInformation == null && currentMember == null) {
+ sessionInformation = dispatcher.getSession(synCtx);
+ if (sessionInformation != null) {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Current session id : " +
sessionInformation.getId());
+ }
+
+ currentMember = sessionInformation.getMember();
+ synCtx.setProperty(
+ SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_MEMBER,
currentMember);
+ // This is for reliably recovery any session information
if while response is getting ,
+ // session information has been removed by cleaner.
+ // This will not be a cost as session information a not
heavy data structure
+ synCtx.setProperty(
+
SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION, sessionInformation);
+ }
+ }
+
+ }
+
+ if (sessionInformation != null && currentMember != null) {
+ //send message on current session
+ sendToApplicationMember(synCtx, currentMember, false);
+ } else {
+ // prepare for a new session
+ currentMember =
lbMembershipHandler.getNextApplicationMember(algorithmContext);
+ if (currentMember == null) {
+ String msg = "No application members available";
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+ sendToApplicationMember(synCtx, currentMember, true);
+ }
}
public void setName(String name) {
@@ -96,62 +161,136 @@ public class DynamicLoadbalanceEndpoint
// algorithmContext.setContextID(name);
}
+ public Dispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ public void setDispatcher(Dispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ public long getSessionTimeout() {
+ return sessionTimeout;
+ }
+
+ public void setSessionTimeout(long sessionTimeout) {
+ this.sessionTimeout = sessionTimeout;
+ }
+
+ public void setSessionAffinity(boolean sessionAffinity){
+ this.sessionAffinity = sessionAffinity;
+ }
+
+ public boolean isSessionAffinityBasedLB(){
+ return sessionAffinity;
+ }
+
private void sendToApplicationMember(MessageContext synCtx,
- EndpointReference to,
- DynamicLoadbalanceFaultHandler
faultHandler) {
+ Member currentMember, boolean
newSession) {
+ //Rewriting the URL
org.apache.axis2.context.MessageContext axis2MsgCtx =
((Axis2MessageContext) synCtx).getAxis2MessageContext();
-
String transport = axis2MsgCtx.getTransportIn().getName();
- Member currentMember =
- lbMembershipHandler.getNextApplicationMember(algorithmContext);
+ String address = synCtx.getTo().getAddress();
+ EndpointReference to =
getEndpointReferenceAfterURLRewrite(currentMember,
+ transport, address);
+ synCtx.setTo(to);
+
+ DynamicLoadbalanceFaultHandler faultHandler = new
DynamicLoadbalanceFaultHandler(to);
faultHandler.setCurrentMember(currentMember);
- if (currentMember != null) {
+ if (isFailover()) {
+ synCtx.pushFaultHandler(faultHandler);
+ synCtx.getEnvelope().build();
+ }
- // URL rewrite
- if (transport.equals("http") || transport.equals("https")) {
- String address = to.getAddress();
- if (address.indexOf(":") != -1) {
- try {
- address = new URL(address).getPath();
- } catch (MalformedURLException e) {
- String msg = "URL " + address + " is malformed";
- log.error(msg, e);
- throw new SynapseException(msg, e);
- }
- }
+ Endpoint endpoint = getEndpoint(to, synCtx);
+ if (isSessionAffinityBasedLB() && newSession) {
+ prepareEndPointSequence(synCtx, endpoint);
+
synCtx.setProperty(SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_MEMBER,
currentMember);
+
synCtx.setProperty(SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_DISPATCHER,
dispatcher);
+ // we should also indicate that this is the first message in the
session. so that
+ // onFault(...) method can resend only the failed attempts for the
first message.
+
synCtx.setProperty(SynapseConstants.PROP_SAL_ENDPOINT_FIRST_MESSAGE_IN_SESSION,
+ Boolean.TRUE);
+ }
+ endpoint.send(synCtx);
+ }
- EndpointReference epr =
- new EndpointReference(transport + "://" +
currentMember.getHostName() +
- ":" + ("http".equals(transport) ?
currentMember.getHttpPort() :
- currentMember.getHttpsPort()) + address);
- synCtx.setTo(epr);
- if (isFailover()) {
- synCtx.getEnvelope().build();
- }
+ /*
+ * Preparing the endpoint sequence for a new session establishment request
+ */
+ private void prepareEndPointSequence(MessageContext synCtx, Endpoint
endpoint) {
+
+ Object o =
synCtx.getProperty(SynapseConstants.PROP_SAL_ENDPOINT_ENDPOINT_LIST);
+ List<Endpoint> endpointList;
+ if (o instanceof List) {
+ endpointList = (List<Endpoint>) o;
+ endpointList.add(this);
- AddressEndpoint endpoint = new AddressEndpoint();
- endpoint.setName("DynamicLoadBalanceAddressEndpoint-" +
Math.random());
- EndpointDefinition definition = new EndpointDefinition();
- definition.setAddress(epr.getAddress());
- endpoint.setDefinition(definition);
- endpoint.init((SynapseEnvironment)
- ((Axis2MessageContext)
synCtx).getAxis2MessageContext().
- getConfigurationContext().getAxisConfiguration().
- getParameterValue(SynapseConstants.SYNAPSE_ENV));
- endpoint.send(synCtx);
- } else {
- log.error("Cannot load balance for non-HTTP/S transport " +
transport);
+ } else {
+ // this is the first endpoint in the hierarchy. so create the
queue and
+ // insert this as the first element.
+ endpointList = new ArrayList<Endpoint>();
+ endpointList.add(this);
+
synCtx.setProperty(SynapseConstants.PROP_SAL_ENDPOINT_ENDPOINT_LIST,
endpointList);
+ }
+
+ // if the next endpoint is not a session affinity one, endpoint
sequence ends
+ // here. but we have to add the next endpoint to the list.
+ if (!(endpoint instanceof DynamicLoadbalanceEndpoint)) {
+ endpointList.add(endpoint);
+ // Clearing out if there any any session information with current
message
+ if (dispatcher.isServerInitiatedSession()) {
+ dispatcher.removeSessionID(synCtx);
}
+ }
+ }
+
+ private EndpointReference getEndpointReferenceAfterURLRewrite(Member
currentMember,
+ String
transport,
+ String
address) {
+ // URL rewrite
+ if (transport.equals("http") || transport.equals("https")) {
+ if (address.indexOf(":") != -1) {
+ try {
+ address = new URL(address).getPath();
+ } catch (MalformedURLException e) {
+ String msg = "URL " + address + " is malformed";
+ log.error(msg, e);
+ throw new SynapseException(msg, e);
+ }
+ }
+
+ return new EndpointReference(transport + "://" +
currentMember.getHostName() +
+ ":" + ("http".equals(transport) ?
currentMember.getHttpPort() :
+ currentMember.getHttpsPort()) + address);
} else {
- synCtx.getFaultStack().pop(); // Remove the
DynamicLoadbalanceFaultHandler
- String msg = "No application members available";
+ String msg = "Cannot load balance for non-HTTP/S transport " +
transport;
log.error(msg);
throw new SynapseException(msg);
}
}
/**
+ *
+ * @param to get an endpoint to send the information
+ * @param synCtx synapse context
+ * @return the created endpoint
+ */
+ private Endpoint getEndpoint(EndpointReference to, MessageContext synCtx) {
+ AddressEndpoint endpoint = new AddressEndpoint();
+ endpoint.setName("DynamicLoadBalanceAddressEndpoint-" + Math.random());
+ EndpointDefinition definition = new EndpointDefinition();
+ definition.setAddress(to.getAddress());
+ endpoint.setDefinition(definition);
+ endpoint.init((SynapseEnvironment)
+ ((Axis2MessageContext) synCtx).getAxis2MessageContext().
+ getConfigurationContext().getAxisConfiguration().
+ getParameterValue(SynapseConstants.SYNAPSE_ENV));
+ return endpoint;
+ }
+
+ /**
* This FaultHandler will try to resend the message to another member if
an error occurs
* while sending to some member. This is a failover mechanism
*/
@@ -172,8 +311,23 @@ public class DynamicLoadbalanceEndpoint
if (currentMember == null) {
return;
}
- synCtx.pushFaultHandler(this);
- sendToApplicationMember(synCtx, to, this);
+ synCtx.getFaultStack().pop(); // Remove the LoadbalanceFaultHandler
+ currentMember =
lbMembershipHandler.getNextApplicationMember(algorithmContext);
+ if(currentMember == null){
+ String msg = "No application members available";
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+ synCtx.setTo(to);
+ if(isSessionAffinityBasedLB()){
+ //We are sending the this message on a new session,
+ // hence we need to remove previous session information
+ Set pros = synCtx.getPropertyKeySet();
+ if (pros != null) {
+
pros.remove(SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION);
+ }
+ }
+ sendToApplicationMember(synCtx, currentMember, true);
}
}
}
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SALSessions.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SALSessions.java?rev=997983&r1=997982&r2=997983&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SALSessions.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SALSessions.java
Fri Sep 17 05:41:55 2010
@@ -20,6 +20,7 @@
package org.apache.synapse.endpoints.dispatch;
import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.clustering.Member;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.MessageContext;
@@ -28,6 +29,7 @@ import org.apache.synapse.SynapseExcepti
import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.endpoints.IndirectEndpoint;
import org.apache.synapse.endpoints.SALoadbalanceEndpoint;
+import org.apache.synapse.endpoints.DynamicLoadbalanceEndpoint;
import org.apache.synapse.util.Replicator;
import java.util.*;
@@ -141,6 +143,7 @@ public class SALSessions {
SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION);
List<Endpoint> endpoints = null;
+ Member currentMember = null;
if (oldSession == null) {
@@ -149,8 +152,11 @@ public class SALSessions {
}
endpoints = (List<Endpoint>) synCtx.getProperty(
SynapseConstants.PROP_SAL_ENDPOINT_ENDPOINT_LIST);
- createSession = true;
+ currentMember = (Member) synCtx.getProperty(
+ SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_MEMBER);
+
+ createSession = true;
} else {
String oldSessionID = oldSession.getId();
@@ -162,6 +168,7 @@ public class SALSessions {
}
removeSession(oldSessionID);
endpoints = oldSession.getEndpointList();
+ currentMember = oldSession.getMember();
createSession = true;
} else {
@@ -174,6 +181,7 @@ public class SALSessions {
log.debug("Recovering lost session information for
session id " + sessionID);
}
endpoints = oldSession.getEndpointList();
+ currentMember = oldSession.getMember();
createSession = true;
} else {
if (log.isDebugEnabled()) {
@@ -184,9 +192,12 @@ public class SALSessions {
}
if (createSession) {
-
- SessionInformation newInformation = createSessionInformation(
- synCtx, sessionID, endpoints);
+ SessionInformation newInformation;
+ if(currentMember == null){
+ newInformation = createSessionInformation(synCtx, sessionID,
endpoints);
+ } else {
+ newInformation = createSessionInformation(synCtx, sessionID,
currentMember);
+ }
if (log.isDebugEnabled()) {
log.debug("Establishing a session with id :" +
@@ -420,7 +431,8 @@ public class SALSessions {
}
/*
- * Helper method to get a list of endpoints from a list of endpoint name
maps - This is for clustered env.
+ * Helper method to get a list of endpoints from a list of endpoint name
maps -
+ * This is for clustered env.
*/
private List<Endpoint> getEndpoints(List<String> endpointNames, String
root) {
@@ -507,9 +519,11 @@ public class SALSessions {
}
/*
- * Factory method to create a session information using given endpoint
list , session id and other informations
+ * Factory method to create a session information using given endpoint
list,
+ * session id and other informations
*/
- private SessionInformation createSessionInformation(MessageContext synCtx,
String id, List<Endpoint> endpoints) {
+ private SessionInformation createSessionInformation(MessageContext synCtx,
+ String id,
List<Endpoint> endpoints) {
if (endpoints == null || endpoints.isEmpty()) {
handleException("Invalid request to create sessions . Cannot find
a endpoint sequence.");
@@ -559,4 +573,61 @@ public class SALSessions {
}
return information;
}
+
+ /*
+ * Factory method to create a session information using a given member
node ,
+ * session id and other informations
+ */
+ private SessionInformation createSessionInformation(MessageContext synCtx,
+ String id, Member
currentMember) {
+
+ if (currentMember == null) {
+ handleException("Invalid request to create sessions.");
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Creating a session information for given session id "
+ id
+ + " with member Host:" + currentMember.getHostName() + "
Port:"
+ + currentMember.getPort());
+ }
+
+ long expireTimeWindow = -1;
+ List<Endpoint> endpoints = (List<Endpoint>)synCtx.getProperty(
+ SynapseConstants.PROP_SAL_ENDPOINT_ENDPOINT_LIST);
+
+ assert endpoints != null;
+ for (Endpoint endpoint : endpoints) {
+ if (endpoint instanceof DynamicLoadbalanceEndpoint) {
+ long sessionsTimeout = ((DynamicLoadbalanceEndpoint)
endpoint).getSessionTimeout();
+ if (expireTimeWindow == -1) {
+ expireTimeWindow = sessionsTimeout;
+ } else if (expireTimeWindow > sessionsTimeout) {
+ expireTimeWindow = sessionsTimeout;
+ }
+ }
+ }
+
+ if (expireTimeWindow == -1) {
+ expireTimeWindow = synCtx.getConfiguration().getProperty(
+ SynapseConstants.PROP_SAL_ENDPOINT_DEFAULT_SESSION_TIMEOUT,
+ SynapseConstants.SAL_ENDPOINTS_DEFAULT_SESSION_TIMEOUT);
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("For session with id " + id +
+ " : expiry time interval : " + expireTimeWindow);
+ }
+
+ long expiryTime = System.currentTimeMillis() + expireTimeWindow;
+
+ Endpoint rootEndpoint = endpoints.get(0);
+
+ SessionInformation information = new SessionInformation(id,
+ currentMember, expiryTime);
+
+ if (isClustered) {
+ information.setRootEndpointName(getEndpointName(rootEndpoint));
+ }
+ return information;
+ }
}
\ No newline at end of file
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SessionInformation.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SessionInformation.java?rev=997983&r1=997982&r2=997983&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SessionInformation.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/SessionInformation.java
Fri Sep 17 05:41:55 2010
@@ -21,6 +21,7 @@ package org.apache.synapse.endpoints.dis
import org.apache.synapse.endpoints.Endpoint;
+import org.apache.axis2.clustering.Member;
import java.io.Serializable;
import java.util.List;
@@ -35,7 +36,8 @@ public class SessionInformation implemen
private String rootEndpointName;
private List<String> path;
private long expiryTime;
- private final transient List<Endpoint> endpointList;
+ private transient List<Endpoint> endpointList;
+ private transient Member member;
public SessionInformation(String id, List<Endpoint> endpointList, long
expiryTime) {
this.id = id;
@@ -43,6 +45,12 @@ public class SessionInformation implemen
this.expiryTime = expiryTime;
}
+ public SessionInformation(String id, Member member, long expiryTime) {
+ this.id = id;
+ this.member = member;
+ this.expiryTime = expiryTime;
+ }
+
public String getId() {
return id;
}
@@ -83,5 +91,8 @@ public class SessionInformation implemen
return expiryTime < System.currentTimeMillis();
}
+ public Member getMember(){
+ return member;
+ }
}