HAProxy per network partition support

Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/39c4d76d
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/39c4d76d
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/39c4d76d

Branch: refs/heads/master
Commit: 39c4d76d0858f8d60dbfd1a854bf6a1890d3af1a
Parents: 3a4f550
Author: Dinesh Bandara <[email protected]>
Authored: Thu Sep 18 12:11:21 2014 +0530
Committer: Nirmal Fernando <[email protected]>
Committed: Wed Sep 24 15:50:29 2014 +0530

----------------------------------------------------------------------
 .../src/main/bin/haproxy-extension.sh           |   3 +-
 .../stratos/haproxy/extension/Constants.java    |   9 ++
 .../haproxy/extension/HAProxyConfigWriter.java  | 156 +++++++++++--------
 .../haproxy/extension/HAProxyContext.java       |   8 +
 .../extension/HAProxyStatisticsReader.java      |  32 ++--
 tools/puppet3/modules/haproxy/manifests/init.pp |   1 +
 .../templates/bin/haproxy-extension.sh.erb      |   4 +-
 7 files changed, 133 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/39c4d76d/extensions/load-balancer/haproxy-extension/src/main/bin/haproxy-extension.sh
----------------------------------------------------------------------
diff --git 
a/extensions/load-balancer/haproxy-extension/src/main/bin/haproxy-extension.sh 
b/extensions/load-balancer/haproxy-extension/src/main/bin/haproxy-extension.sh
index 717b6e7..43fa639 100755
--- 
a/extensions/load-balancer/haproxy-extension/src/main/bin/haproxy-extension.sh
+++ 
b/extensions/load-balancer/haproxy-extension/src/main/bin/haproxy-extension.sh
@@ -39,7 +39,8 @@ properties="-Dhaproxy.private.ip=127.0.0.1
             -Dthrift.receiver.ip=127.0.0.1
             -Dthrift.receiver.port=7615
             -Dnetwork.partition.id=network-partition-1
-            -Dstratos.messaging.topology.member.filter="
+            -Dcluster.id=cluster
+
 
 # Uncomment below line to enable remote debugging
 #debug="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005"

http://git-wip-us.apache.org/repos/asf/stratos/blob/39c4d76d/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Constants.java
----------------------------------------------------------------------
diff --git 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Constants.java
 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Constants.java
index 1167172..6fe77ca 100644
--- 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Constants.java
+++ 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Constants.java
@@ -34,4 +34,13 @@ public class Constants {
     public static final String THRIFT_RECEIVER_IP = "thrift.receiver.ip";
     public static final String THRIFT_RECEIVER_PORT = "thrift.receiver.port";
     public static final String NETWORK_PARTITION_ID = "network.partition.id";
+    public static final String CLUSTER_ID = "cluster.id";
+
+    public static final String LOAD_BALANCER = "load.balancer";
+    public static final String LOAD_BALANCER_REF = "load.balancer.ref";
+    public static final String NO_LOAD_BALANCER = "no.load.balancer";
+    public static final String LB_SERVICE_TYPE ="load.balanced.service.type";
+    public static final String DEFAULT_LOAD_BALANCER ="default.load.balancer";
+    public static final String STATIC_LOAD_BALANCER ="static.load.balancer";
+    public static final String SERVICE_LOAD_BALANCER = 
"service.aware.load.balancer";
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/39c4d76d/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyConfigWriter.java
----------------------------------------------------------------------
diff --git 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyConfigWriter.java
 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyConfigWriter.java
index b767fe0..77f86af 100644
--- 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyConfigWriter.java
+++ 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyConfigWriter.java
@@ -19,6 +19,7 @@
 
 package org.apache.stratos.haproxy.extension;
 
+import org.apache.commons.lang.ObjectUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.*;
@@ -44,6 +45,21 @@ public class HAProxyConfigWriter {
     private String confFilePath;
     private String statsSocketFilePath;
 
+    // Prepare frontend http collection
+    private StringBuilder frontEndHttp = new StringBuilder();
+    // Prepare frontend https collection
+    private StringBuilder frontEndHttps = new StringBuilder();
+    // Prepare backend http collection
+    private StringBuilder backEndHttp = new StringBuilder();
+    // Prepare backend https collection
+    private StringBuilder backEndHttps = new StringBuilder();
+
+    private String frontEndHttpId, frontEndHttpsId;
+    private boolean frontEndHttpAdded, frontEndHttpsAdded;
+
+    private String loadBalancerType; // Load Balancer type (default, service 
aware or static)
+    String loadBalancedServiceType;  // Service type if load balancer is a 
service aware
+
     public HAProxyConfigWriter(String templatePath, String templateName, 
String confFilePath, String statsSocketFilePath) {
         this.templatePath = templatePath;
         this.templateName = templateName;
@@ -57,72 +73,32 @@ public class HAProxyConfigWriter {
         globalParameters.append("stats socket ");
         globalParameters.append(statsSocketFilePath);
 
-        // Prepare frontend http collection
-        StringBuilder frontEndHttp = new StringBuilder();
-        // Prepare frontend https collection
-        StringBuilder frontEndHttps = new StringBuilder();
-        // Prepare backend http collection
-        StringBuilder backEndHttp = new StringBuilder();
-        // Prepare backend https collection
-        StringBuilder backEndHttps = new StringBuilder();
-
-        String frontEndHttpId = "http_frontend";
-        String frontEndHttpsId = "https_frontend";
-        boolean frontEndHttpAdded = false;
-        boolean frontEndHttpsAdded = false;
+        frontEndHttpId = "http_frontend";
+        frontEndHttpsId = "https_frontend";
+        frontEndHttpAdded = false;
+        frontEndHttpsAdded = false;
 
         for (Service service : topology.getServices()) {
-            for (Cluster cluster : service.getClusters()) {
-
-                if(cluster.getServiceName().equals("haproxy"))
-                    continue;
-
-                if ((service.getPorts() == null) || (service.getPorts().size() 
== 0)) {
-                    throw new RuntimeException(String.format("No ports found 
in service: %s", service.getServiceName()));
+            if(service.getServiceName().equals("haproxy")) {
+                for (Cluster cluster : service.getClusters()) {
+                    
if(cluster.getClusterId().equals(HAProxyContext.getInstance().getClusterId()))
+                    {
+                        loadBalancerType = 
cluster.getProperties().getProperty(Constants.LOAD_BALANCER_REF);
+                        
if(cluster.getProperties().getProperty(Constants.LB_SERVICE_TYPE) != null)
+                            loadBalancedServiceType = 
cluster.getProperties().getProperty(Constants.LB_SERVICE_TYPE);
+                        break;
+                    }
                 }
+            }
+        }
 
-                for (Port port : service.getPorts()) {
-                    if (port.getProtocol().equals("http")){
-                        if (!frontEndHttpAdded) {
-                            frontEndHttp.append("frontend 
").append(frontEndHttpId).append(NEW_LINE);
-                            frontEndHttp.append("\tbind 
").append(HAProxyContext.getInstance().getHAProxyPrivateIp()).append(":").append(port.getProxy()).append(NEW_LINE);
-                            frontEndHttp.append("\tmode 
").append(port.getProtocol()).append(NEW_LINE);
-                            frontEndHttpAdded = true;
-                        }
-
-                        for(String hostname : cluster.getHostNames()) {
-                            frontEndHttp.append("\tacl 
").append("is_").append(hostname).append(" hdr_beg(host) -i 
").append(hostname).append(NEW_LINE);
-                            frontEndHttp.append("\tuse_backend 
").append(hostname).append("-http-members if 
is_").append(hostname).append(NEW_LINE);
-
-                            // Backend block
-                            backEndHttp.append("backend 
").append(hostname).append("-http-members").append(NEW_LINE);
-                            backEndHttp.append("\tmode 
").append("http").append(NEW_LINE);
-                            for (Member member : cluster.getMembers()) {
-                                backEndHttp.append("\tserver 
").append(member.getMemberId()).append(" ")
-                                        
.append(member.getMemberIp()).append(":").append(port.getValue()).append(NEW_LINE);
-                            }
-                            backEndHttp.append(NEW_LINE);
-                        }
-                    } else if (port.getProtocol().equals("https")){
-                        if (!frontEndHttpsAdded) {
-                            frontEndHttp.append("frontend 
").append(frontEndHttpsId).append(NEW_LINE);
-                            frontEndHttp.append("\tbind 
").append(HAProxyContext.getInstance().getHAProxyPrivateIp()).append(":").append(port.getProxy()).append(NEW_LINE);
-                            frontEndHttp.append("\tmode 
").append("http").append(NEW_LINE);
-                            frontEndHttpsAdded = true;
-                        }
-
-                        for(String hostname : cluster.getHostNames()) {
-                            frontEndHttps.append("\tacl 
").append("is_").append(hostname).append(" hdr_beg(host) -i 
").append(hostname).append(NEW_LINE);
-                            frontEndHttps.append("\tuse_backend 
").append(hostname).append("-https-members if 
is_").append(hostname).append(NEW_LINE);
-
-                            // Backend block
-                            backEndHttps.append("backend 
").append(hostname).append("-http-members").append(NEW_LINE);
-                            backEndHttps.append("\tmode 
").append("https").append(NEW_LINE);
-                            for (Member member : cluster.getMembers()) {
-                                backEndHttps.append("\tserver 
").append(member.getMemberId()).append(" ")
-                                        
.append(member.getMemberIp()).append(":").append(port.getValue()).append(NEW_LINE);
-                            }
-                            backEndHttps.append(NEW_LINE);
+        for (Service service : topology.getServices()) {
+            for (Cluster cluster : service.getClusters()) {
+                if 
((cluster.getProperties().getProperty(Constants.LOAD_BALANCER) == null)) {
+                    if 
((cluster.getProperties().getProperty(Constants.LOAD_BALANCER_REF) != null)) {
+                        
if(!(cluster.getProperties().getProperty(Constants.LOAD_BALANCER_REF).equals(Constants.NO_LOAD_BALANCER))
 &&
+                                
cluster.getProperties().getProperty(Constants.LOAD_BALANCER_REF).equals(loadBalancerType))
 {
+                            createConfig(service, cluster);
                         }
                     }
                 }
@@ -166,4 +142,60 @@ public class HAProxyConfigWriter {
             throw new RuntimeException(e);
         }
     }
+
+    private void createConfig(Service service, Cluster cluster) {
+        if ((service.getPorts() == null) || (service.getPorts().size() == 0)) {
+            throw new RuntimeException(String.format("No ports found in 
service: %s", service.getServiceName()));
+        }
+
+        for (Port port : service.getPorts()) {
+            if (port.getProtocol().equals("http")) {
+                if (!frontEndHttpAdded) {
+                    frontEndHttp.append("frontend 
").append(frontEndHttpId).append(NEW_LINE);
+                    frontEndHttp.append("\tbind 
").append(HAProxyContext.getInstance().getHAProxyPrivateIp()).append(":").append(port.getProxy()).append(NEW_LINE);
+                    frontEndHttp.append("\tmode 
").append(port.getProtocol()).append(NEW_LINE);
+                    frontEndHttpAdded = true;
+                }
+
+                for (String hostname : cluster.getHostNames()) {
+                    frontEndHttp.append("\tacl 
").append("is_").append(hostname).append(" hdr_beg(host) -i 
").append(hostname).append(NEW_LINE);
+                    frontEndHttp.append("\tuse_backend 
").append(hostname).append("-http-members if 
is_").append(hostname).append(NEW_LINE);
+
+                    // Backend block
+                    backEndHttp.append("backend 
").append(hostname).append("-http-members").append(NEW_LINE);
+                    backEndHttp.append("\tmode 
").append("http").append(NEW_LINE);
+                    for (Member member : cluster.getMembers()) {
+                        if 
(member.getNetworkPartitionId().equals(HAProxyContext.getInstance().getNetworkPartitionId()))
 {
+                            backEndHttp.append("\tserver 
").append(member.getMemberId()).append(" ")
+                                    
.append(member.getMemberIp()).append(":").append(port.getValue()).append(NEW_LINE);
+                        }
+                    }
+                    backEndHttp.append(NEW_LINE);
+                }
+            } else if (port.getProtocol().equals("https")) {
+                if (!frontEndHttpsAdded) {
+                    frontEndHttp.append("frontend 
").append(frontEndHttpsId).append(NEW_LINE);
+                    frontEndHttp.append("\tbind 
").append(HAProxyContext.getInstance().getHAProxyPrivateIp()).append(":").append(port.getProxy()).append(NEW_LINE);
+                    frontEndHttp.append("\tmode 
").append("http").append(NEW_LINE);
+                    frontEndHttpsAdded = true;
+                }
+
+                for (String hostname : cluster.getHostNames()) {
+                    frontEndHttps.append("\tacl 
").append("is_").append(hostname).append(" hdr_beg(host) -i 
").append(hostname).append(NEW_LINE);
+                    frontEndHttps.append("\tuse_backend 
").append(hostname).append("-https-members if 
is_").append(hostname).append(NEW_LINE);
+
+                    // Backend block
+                    backEndHttps.append("backend 
").append(hostname).append("-http-members").append(NEW_LINE);
+                    backEndHttps.append("\tmode 
").append("https").append(NEW_LINE);
+                    for (Member member : cluster.getMembers()) {
+                        if 
(member.getNetworkPartitionId().equals(HAProxyContext.getInstance().getNetworkPartitionId()))
 {
+                            backEndHttps.append("\tserver 
").append(member.getMemberId()).append(" ")
+                                    
.append(member.getMemberIp()).append(":").append(port.getValue()).append(NEW_LINE);
+                        }
+                    }
+                    backEndHttps.append(NEW_LINE);
+                }
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/39c4d76d/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyContext.java
----------------------------------------------------------------------
diff --git 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyContext.java
 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyContext.java
index c835f5d..55dfecc 100644
--- 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyContext.java
+++ 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyContext.java
@@ -41,6 +41,7 @@ public class HAProxyContext {
     private String thriftReceiverIp;
     private String thriftReceiverPort;
     private String networkPartitionId;
+    private String clusterId;
 
     private HAProxyContext() {
         this.haProxyPrivateIp = 
System.getProperty(Constants.HAPROXY_PRIVATE_IP);
@@ -54,6 +55,7 @@ public class HAProxyContext {
         this.thriftReceiverIp = 
System.getProperty(Constants.THRIFT_RECEIVER_IP);
         this.thriftReceiverPort = 
System.getProperty(Constants.THRIFT_RECEIVER_PORT);
         this.networkPartitionId = 
System.getProperty(Constants.NETWORK_PARTITION_ID);
+        this.clusterId = System.getProperty(Constants.CLUSTER_ID);
 
         if (log.isDebugEnabled()) {
             log.debug(Constants.HAPROXY_PRIVATE_IP + " = " + haProxyPrivateIp);
@@ -67,6 +69,7 @@ public class HAProxyContext {
             log.debug(Constants.THRIFT_RECEIVER_IP + " = " + thriftReceiverIp);
             log.debug(Constants.THRIFT_RECEIVER_PORT + " = " + 
thriftReceiverPort);
             log.debug(Constants.NETWORK_PARTITION_ID + " = " + 
networkPartitionId);
+            log.debug(Constants.CLUSTER_ID + " = " + clusterId);
         }
     }
 
@@ -90,6 +93,7 @@ public class HAProxyContext {
         validateSystemProperty(Constants.CONF_FILE_PATH);
         validateSystemProperty(Constants.STATS_SOCKET_FILE_PATH);
         validateSystemProperty(Constants.CEP_STATS_PUBLISHER_ENABLED);
+        validateSystemProperty(Constants.CLUSTER_ID);
 
         if(cepStatsPublisherEnabled) {
             validateSystemProperty(Constants.THRIFT_RECEIVER_IP);
@@ -136,4 +140,8 @@ public class HAProxyContext {
     public boolean isCEPStatsPublisherEnabled() {
         return cepStatsPublisherEnabled;
     }
+
+    public String getNetworkPartitionId() { return networkPartitionId; };
+
+    public String getClusterId() { return clusterId; };
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/39c4d76d/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
----------------------------------------------------------------------
diff --git 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
index f564e7c..f29e1c6 100644
--- 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
+++ 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
@@ -63,23 +63,25 @@ public class HAProxyStatisticsReader implements 
LoadBalancerStatisticsReader {
                         for(String hostname : cluster.getHostNames()) {
                             backendId = hostname+"-http-members";
                             for (Member member : cluster.getMembers()) {
-                                // echo "get weight <backend>/<server>" | 
socat stdio <stats-socket>
-                                command = String.format("%s/get-weight.sh %s 
%s %s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath);
-                                try {
-                                    output = 
CommandUtils.executeCommand(command);
-                                    if ((output != null) && (output.length() > 
0)) {
-                                        array = output.split(" ");
-                                        if ((array != null) && (array.length > 
0)) {
-                                            weight = 
Integer.parseInt(array[0]);
-                                            if (log.isDebugEnabled()) {
-                                                
log.debug(String.format("Member weight found: [cluster] %s [member] %s [weight] 
%d", member.getClusterId(), member.getMemberId(), weight));
+                                
if(member.getNetworkPartitionId().equals(HAProxyContext.getInstance().getNetworkPartitionId()))
 {
+                                    // echo "get weight <backend>/<server>" | 
socat stdio <stats-socket>
+                                    command = String.format("%s/get-weight.sh 
%s %s %s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath);
+                                    try {
+                                        output = 
CommandUtils.executeCommand(command);
+                                        if ((output != null) && 
(output.length() > 0)) {
+                                            array = output.split(" ");
+                                            if ((array != null) && 
(array.length > 0)) {
+                                                weight = 
Integer.parseInt(array[0]);
+                                                if (log.isDebugEnabled()) {
+                                                    
log.debug(String.format("Member weight found: [cluster] %s [member] %s [weight] 
%d", member.getClusterId(), member.getMemberId(), weight));
+                                                }
+                                                totalWeight += weight;
                                             }
-                                            totalWeight += weight;
                                         }
-                                    }
-                                } catch (IOException e) {
-                                    if (log.isErrorEnabled()) {
-                                        log.error(e);
+                                    } catch (IOException e) {
+                                        if (log.isErrorEnabled()) {
+                                            log.error(e);
+                                        }
                                     }
                                 }
                             }

http://git-wip-us.apache.org/repos/asf/stratos/blob/39c4d76d/tools/puppet3/modules/haproxy/manifests/init.pp
----------------------------------------------------------------------
diff --git a/tools/puppet3/modules/haproxy/manifests/init.pp 
b/tools/puppet3/modules/haproxy/manifests/init.pp
index dfbc4f9..3187e04 100755
--- a/tools/puppet3/modules/haproxy/manifests/init.pp
+++ b/tools/puppet3/modules/haproxy/manifests/init.pp
@@ -22,6 +22,7 @@
 class haproxy(
   $network_partition_id = $stratos_network_partition_id,
   $service_filter       = $stratos_load_balanced_service_type,
+  $cluster_id           = $stratos_cluster_id,
   $version              = '4.0.0',
   $owner                = 'root',
   $group                = 'root',

http://git-wip-us.apache.org/repos/asf/stratos/blob/39c4d76d/tools/puppet3/modules/haproxy/templates/bin/haproxy-extension.sh.erb
----------------------------------------------------------------------
diff --git 
a/tools/puppet3/modules/haproxy/templates/bin/haproxy-extension.sh.erb 
b/tools/puppet3/modules/haproxy/templates/bin/haproxy-extension.sh.erb
index 7f17d2e..4cb44d7 100755
--- a/tools/puppet3/modules/haproxy/templates/bin/haproxy-extension.sh.erb
+++ b/tools/puppet3/modules/haproxy/templates/bin/haproxy-extension.sh.erb
@@ -39,8 +39,8 @@ properties="-Dhaproxy.private.ip=0.0.0.0
             -Dthrift.receiver.ip=<%= @cep_ip %>
             -Dthrift.receiver.port=<%= @cep_port %>
             -Dnetwork.partition.id=<%= @network_partition_id %>
-            <% if @stratos_lb_category == 'service.aware.load.balancer' 
%>-Dstratos.topology.service.filter=service-name=<%= @service_filter %><% end 
%>            
-            -Dcep.stats.publisher.enabled=true"
+            -Dcluster.id=<%= @cluster_id %>
+           -Dcep.stats.publisher.enabled=true"
 
 # Uncomment below line to enable remote debugging
 #debug="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005"

Reply via email to