Repository: stratos Updated Branches: refs/heads/stratos-4.1.x 4ead24e10 -> 42a965473
Refactor the aws lb extension Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/52966c04 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/52966c04 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/52966c04 Branch: refs/heads/stratos-4.1.x Commit: 52966c044e1f325f8b96c4f05a10e61ee7f9ea73 Parents: 4ead24e Author: gayangunarathne <[email protected]> Authored: Tue Dec 8 10:41:57 2015 +0530 Committer: gayangunarathne <[email protected]> Committed: Mon Dec 21 10:24:28 2015 +0530 ---------------------------------------------------------------------- .../apache/stratos/aws/extension/AWSHelper.java | 9 +- .../stratos/aws/extension/AWSLoadBalancer.java | 259 +++++++++---------- .../org/apache/stratos/aws/extension/Main.java | 24 +- 3 files changed, 136 insertions(+), 156 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/52966c04/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSHelper.java ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSHelper.java b/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSHelper.java index 8078252..a12fc96 100644 --- a/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSHelper.java +++ b/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSHelper.java @@ -471,20 +471,17 @@ public class AWSHelper { public List<Instance> getAttachedInstances(String loadBalancerName, String region) { try { - LoadBalancerDescription lbDescription = getLoadBalancerDescription( - loadBalancerName, region); + LoadBalancerDescription lbDescription = getLoadBalancerDescription(loadBalancerName, region); if (lbDescription == null) { - log.warn("Could not find description of load balancer " - + loadBalancerName); + log.warn("Could not find description of load balancer "+ loadBalancerName); return null; } return lbDescription.getInstances(); } catch (AmazonClientException e) { - log.error("Could not find instances attached load balancer " - + loadBalancerName, e); + log.error("Could not find instances attached load balancer "+ loadBalancerName, e); } return null; http://git-wip-us.apache.org/repos/asf/stratos/blob/52966c04/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSLoadBalancer.java ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSLoadBalancer.java b/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSLoadBalancer.java index 62f9882..5cd5556 100644 --- a/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSLoadBalancer.java +++ b/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSLoadBalancer.java @@ -80,63 +80,13 @@ public class AWSLoadBalancer implements LoadBalancer { // Get the load balancer and update it. if (log.isDebugEnabled()) { - log.debug("Load balancer for cluster " + cluster.getClusterId() + " is already present."); + log.debug(String.format("Load balancer for cluster %s is already present.", cluster.getClusterId())); } - LoadBalancerInfo loadBalancerInfo = clusterIdToLoadBalancerMap.get(cluster.getClusterId()); + if(updateExistingLoadBalancer(cluster)){ + activeClusters.add(cluster.getClusterId()); + } - String loadBalancerName = loadBalancerInfo.getName(); - String region = loadBalancerInfo.getRegion(); - - // Get all the instances attached - Attach newly added instances to load balancer - - // attachedInstances list is useful in finding out what are the new instances which - // should be attached to this load balancer. - List<Instance> attachedInstances = awsHelper.getAttachedInstances(loadBalancerName, region); - - // clusterMembers stores all the members of a cluster. - Collection<Member> clusterMembers = cluster.getMembers(); - - if (clusterMembers.size() > 0) { - activeClusters.add(cluster.getClusterId()); - - List<Instance> instancesToAddToLoadBalancer = new ArrayList<Instance>(); - List<String> availabilityZones = new ArrayList<String>(); - - for (Member member : clusterMembers) { - // if instance id of member is not in - // attachedInstances - // add this to instancesToAddToLoadBalancer - Instance instance = new Instance(awsHelper.getAWSInstanceName(member.getInstanceId())); - - if (attachedInstances == null || !attachedInstances.contains(instance)) { - instancesToAddToLoadBalancer.add(instance); - - if (log.isDebugEnabled()) { - log.debug("Instance " + awsHelper.getAWSInstanceName(member.getInstanceId()) + - " needs to be registered to load balancer " + loadBalancerName); - } - - // LB Common Member has a property 'EC2_AVAILABILITY_ZONE' points to the ec2 availability zone - // for this member. Use the property value to update the LB about the relevant zone - String availabilityZone = getEC2AvaialbilityZoneOfMember(member); - if (availabilityZone != null) { - availabilityZones.add(availabilityZone); - } - } - } - - if (instancesToAddToLoadBalancer.size() > 0) { - awsHelper.registerInstancesToLoadBalancer( - loadBalancerName, - instancesToAddToLoadBalancer, region); - } - - // update LB with the zones - if (!availabilityZones.isEmpty() && !AWSExtensionContext.getInstance().isOperatingInVPC()) { - awsHelper.addAvailabilityZonesForLoadBalancer(loadBalancerName, availabilityZones, region); - } - } } else { // Create a new load balancer for this cluster @@ -161,8 +111,7 @@ public class AWSLoadBalancer implements LoadBalancer { if (initialZones.isEmpty()) { // initial availability zones not defined // use the default (<region>a) - initialAvailabilityZones.add(awsHelper.getAvailabilityZoneFromRegion - (region)); + initialAvailabilityZones.add(awsHelper.getAvailabilityZoneFromRegion(region)); } else { // prepend the region and construct the availability zone list with // full names (<region> + <zone>) @@ -170,98 +119,29 @@ public class AWSLoadBalancer implements LoadBalancer { initialAvailabilityZones.add(region + zone); } } + String loadBalancerDNSName = + createAWSLoadBalancer(loadBalancerName, region, listenersForThisCluster,initialAvailabilityZones); - // Returns DNS name of load balancer which was created. - // This is used in the domain mapping of this cluster. - String loadBalancerDNSName = awsHelper.createLoadBalancer(loadBalancerName, listenersForThisCluster, - region, initialAvailabilityZones, AWSExtensionContext.getInstance().isOperatingInVPC()); - - // enable connection draining (default) and cross zone load balancing (if specified in aws-extension.sh) - awsHelper.modifyLBAttributes(loadBalancerName, region, AWSExtensionContext.getInstance(). - isCrossZoneLoadBalancingEnabled(), true); - - // Add the inbound rule the security group of the load balancer - // For each listener, add a new rule with load balancer port as allowed protocol in the security group. - // if security group id is defined, directly use that - for (Listener listener : listenersForThisCluster) { - int port = listener.getLoadBalancerPort(); - - if (awsHelper.getLbSecurityGroupIdDefinedInConfiguration() != null && !awsHelper. - getLbSecurityGroupIdDefinedInConfiguration().isEmpty()) { - for (String protocol : awsHelper.getAllowedProtocolsForLBSecurityGroup()) { - awsHelper.addInboundRuleToSecurityGroup(awsHelper.getLbSecurityGroupIdDefinedInConfiguration(), - region, protocol, port); - } - } else if (awsHelper.getLbSecurityGroupName() != null && !awsHelper - .getLbSecurityGroupName().isEmpty()) { - for (String protocol : awsHelper.getAllowedProtocolsForLBSecurityGroup()) { - awsHelper.addInboundRuleToSecurityGroup(awsHelper.getSecurityGroupId(awsHelper - .getLbSecurityGroupName(), region), region, protocol, port); - } - } - } - - log.info("Load balancer '" + loadBalancerDNSName + "' created for cluster '" + cluster.getClusterId()); - - // Register instances in the cluster to load balancer - List<Instance> instances = new ArrayList<Instance>(); - List<String> availabilityZones = new ArrayList<String>(); - - for (Member member : clusterMembers) { - String instanceId = member.getInstanceId(); - - if (log.isDebugEnabled()) { - log.debug("Instance " + awsHelper.getAWSInstanceName(instanceId) + " needs to be registered to load balancer " - + loadBalancerName); - } - - Instance instance = new Instance(); - instance.setInstanceId(awsHelper.getAWSInstanceName(instanceId)); - instances.add(instance); - // LB Common Member has a property 'EC2_AVAILABILITY_ZONE' which points to the ec2 availability - // zone for this member. Use the property value to update the LB about the relevant zone - String availabilityZone = getEC2AvaialbilityZoneOfMember(member); - if (availabilityZone != null) { - availabilityZones.add(availabilityZone); - } - } + log.info(String.format("Load balancer %s created for cluster %s " , loadBalancerDNSName, cluster.getClusterId())); - awsHelper.registerInstancesToLoadBalancer(loadBalancerName, instances, region); - - // update LB with the zones - if (!availabilityZones.isEmpty() && !AWSExtensionContext.getInstance().isOperatingInVPC()) { - awsHelper.addAvailabilityZonesForLoadBalancer(loadBalancerName, availabilityZones, region); - } - - // add stickiness policy - if (awsHelper.getAppStickySessionCookie() != null && !awsHelper.getAppStickySessionCookie().isEmpty()) { - CreateAppCookieStickinessPolicyResult result = awsHelper.createStickySessionPolicy(loadBalancerName, - awsHelper.getAppStickySessionCookie(), Constants.STICKINESS_POLICY, region); - - if (result != null) { - // Take a single port mapping from a member, and apply the policy for - // the LB Listener port (Proxy port of the port mapping) - awsHelper.applyPolicyToLBListenerPorts(aMember.getPorts(), loadBalancerName, - Constants.STICKINESS_POLICY, region); - } - } + if(addClusterMembersInfo(clusterMembers, loadBalancerName, region)){ + activeClusters.add(cluster.getClusterId()); + } // persist LB info try { persistenceManager.persist(new LBInfoDTO(loadBalancerName, cluster.getClusterId(), region)); } catch (PersistenceException e) { - log.error("Unable to persist LB Information for " + loadBalancerName + ", cluster id " + - cluster.getClusterId()); + log.error(String.format( + "Unable to persist LB Information for %s , cluster id %s " + loadBalancerName, + cluster.getClusterId())); } - LoadBalancerInfo loadBalancerInfo = new LoadBalancerInfo( - loadBalancerName, region); + LoadBalancerInfo loadBalancerInfo = new LoadBalancerInfo(loadBalancerName, region); + clusterIdToLoadBalancerMap.put(cluster.getClusterId(),loadBalancerInfo); - clusterIdToLoadBalancerMap.put(cluster.getClusterId(), - loadBalancerInfo); - activeClusters.add(cluster.getClusterId()); } pause(3000); @@ -311,7 +191,111 @@ public class AWSLoadBalancer implements LoadBalancer { return true; } - private String getEC2AvaialbilityZoneOfMember(Member member) { + private Boolean addClusterMembersInfo(Collection<Member> clusterMembers, String loadBalancerName, String region) { + Boolean isUpdated=false; + // Register instances in the cluster to load balancer + List<Instance> instances = new ArrayList<Instance>(); + List<String> availabilityZones = new ArrayList<String>(); + + for (Member member : clusterMembers) { + isUpdated=true; + String instanceId = member.getInstanceId(); + + if (log.isDebugEnabled()) { + log.debug("Instance " + awsHelper.getAWSInstanceName(instanceId) + " needs to be registered to load balancer " + + loadBalancerName); + } + + Instance instance = new Instance(); + instance.setInstanceId(awsHelper.getAWSInstanceName(instanceId)); + + instances.add(instance); + // LB Common Member has a property 'EC2_AVAILABILITY_ZONE' which points to the ec2 availability + // zone for this member. Use the property value to update the LB about the relevant zone + String availabilityZone = getEC2AvaialbilityZoneOfMember(member); + if (availabilityZone != null) { + availabilityZones.add(availabilityZone); + } + + // add stickiness policy + if (awsHelper.getAppStickySessionCookie() != null && !awsHelper.getAppStickySessionCookie().isEmpty()) { + CreateAppCookieStickinessPolicyResult result = awsHelper.createStickySessionPolicy(loadBalancerName, awsHelper.getAppStickySessionCookie(), + Constants.STICKINESS_POLICY, + region); + + if (result != null) { + // Take a single port mapping from a member, and apply the policy for + // the LB Listener port (Proxy port of the port mapping) + awsHelper.applyPolicyToLBListenerPorts(member.getPorts(), loadBalancerName, + Constants.STICKINESS_POLICY, region); + } + } + + } + + awsHelper.registerInstancesToLoadBalancer(loadBalancerName, instances, region); + + // update LB with the zones + if (!availabilityZones.isEmpty() && !AWSExtensionContext.getInstance().isOperatingInVPC()) { + awsHelper.addAvailabilityZonesForLoadBalancer(loadBalancerName, availabilityZones, region); + } + return isUpdated; + } + + private String createAWSLoadBalancer(String loadBalancerName, String region, List<Listener> listenersForThisCluster, + Set<String> initialAvailabilityZones) throws LoadBalancerExtensionException { + // Returns DNS name of load balancer which was created. + // This is used in the domain mapping of this cluster. + String loadBalancerDNSName = awsHelper.createLoadBalancer(loadBalancerName, listenersForThisCluster, + region, initialAvailabilityZones, AWSExtensionContext.getInstance().isOperatingInVPC()); + + // enable connection draining (default) and cross zone load balancing (if specified in aws-extension.sh) + awsHelper.modifyLBAttributes(loadBalancerName, region, AWSExtensionContext.getInstance(). + isCrossZoneLoadBalancingEnabled(), true); + // Add the inbound rule the security group of the load balancer + // For each listener, add a new rule with load balancer port as allowed protocol in the security group. + // if security group id is defined, directly use that + for (Listener listener : listenersForThisCluster) { + int port = listener.getLoadBalancerPort(); + + if (awsHelper.getLbSecurityGroupIdDefinedInConfiguration() != null && !awsHelper.getLbSecurityGroupIdDefinedInConfiguration().isEmpty()) { + for (String protocol : awsHelper.getAllowedProtocolsForLBSecurityGroup()) { + awsHelper.addInboundRuleToSecurityGroup(awsHelper.getLbSecurityGroupIdDefinedInConfiguration(), + region, protocol, port); + } + } else if (awsHelper.getLbSecurityGroupName() != null && !awsHelper + .getLbSecurityGroupName().isEmpty()) { + for (String protocol : awsHelper.getAllowedProtocolsForLBSecurityGroup()) { + awsHelper.addInboundRuleToSecurityGroup(awsHelper.getSecurityGroupId(awsHelper.getLbSecurityGroupName(),region), region, protocol,port); + } + } + } + + return loadBalancerDNSName; + } + + private Boolean updateExistingLoadBalancer(Cluster cluster) { + Boolean isUpdated=false; + LoadBalancerInfo loadBalancerInfo = clusterIdToLoadBalancerMap.get(cluster.getClusterId()); + + String loadBalancerName = loadBalancerInfo.getName(); + String region = loadBalancerInfo.getRegion(); + + // Get all the instances attached - Attach newly added instances to load balancer + + // attachedInstances list is useful in finding out what are the new instances which + // should be attached to this load balancer. + List<Instance> attachedInstances = awsHelper.getAttachedInstances(loadBalancerName, region); + + // clusterMembers stores all the members of a cluster. + Collection<Member> clusterMembers = cluster.getMembers(); + + isUpdated= addClusterMembersInfo(clusterMembers, loadBalancerName, region); + + return isUpdated; + } + + private String getEC2AvaialbilityZoneOfMember(Member member) { if (member.getProperties() != null) { return member.getProperties().getProperty(Constants.EC2_AVAILABILITY_ZONE_PROPERTY); } @@ -324,7 +308,6 @@ public class AWSLoadBalancer implements LoadBalancer { * nothing but logs the message. */ public void start() throws LoadBalancerExtensionException { - log.info("AWS load balancer extension started."); } http://git-wip-us.apache.org/repos/asf/stratos/blob/52966c04/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/Main.java ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/Main.java b/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/Main.java index 73fa971..80b6481 100644 --- a/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/Main.java +++ b/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/Main.java @@ -25,6 +25,7 @@ import org.apache.log4j.PropertyConfigurator; import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.load.balancer.common.topology.TopologyProvider; import org.apache.stratos.load.balancer.extension.api.LoadBalancerExtension; +import org.apache.stratos.load.balancer.extension.api.exception.LoadBalancerExtensionException; import java.util.concurrent.ExecutorService; @@ -34,6 +35,8 @@ import java.util.concurrent.ExecutorService; public class Main { private static final Log log = LogFactory.getLog(Main.class); + public static final String AWS_EXTENSION_THREAD_POOL = "aws.extension.thread.pool"; + public static final int THREAD_POOL_SIZE = 10; private static ExecutorService executorService; public static void main(String[] args) { @@ -41,23 +44,20 @@ public class Main { LoadBalancerExtension extension = null; try { // Configure log4j properties - PropertyConfigurator.configure(System - .getProperty("log4j.properties.file.path")); + PropertyConfigurator.configure(System.getProperty("log4j.properties.file.path")); if (log.isInfoEnabled()) { log.info("AWS extension started"); } - executorService = StratosThreadPool.getExecutorService( - "aws.extension.thread.pool", 10); + executorService = StratosThreadPool.getExecutorService(AWS_EXTENSION_THREAD_POOL, THREAD_POOL_SIZE); // Validate runtime parameters AWSExtensionContext.getInstance().validate(); TopologyProvider topologyProvider = new TopologyProvider(); - AWSStatisticsReader statisticsReader = AWSExtensionContext - .getInstance().isCEPStatsPublisherEnabled() ? new AWSStatisticsReader( - topologyProvider) : null; - extension = new LoadBalancerExtension(new AWSLoadBalancer(), - statisticsReader, topologyProvider); + AWSStatisticsReader statisticsReader = + AWSExtensionContext.getInstance().isCEPStatsPublisherEnabled() ? new AWSStatisticsReader( + topologyProvider) : null; + extension = new LoadBalancerExtension(new AWSLoadBalancer(), statisticsReader, topologyProvider); extension.setExecutorService(executorService); extension.execute(); @@ -68,7 +68,7 @@ public class Main { public void run() { try { if (finalExtension != null) { - log.info("Shutting aws extension..."); + log.info("Shutting aws LB extension..."); finalExtension.stop(); } mainThread.join(); @@ -77,9 +77,9 @@ public class Main { } } }); - } catch (Exception e) { + } catch (LoadBalancerExtensionException e) { if (log.isErrorEnabled()) { - log.error(e); + log.error("Error occurred while running the aws lb extension"); } if (extension != null) { log.info("Shutting aws extension...");
