This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch staging
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/staging by this push:
     new 2edfb83  Load balancing workflows across multiple clusters
2edfb83 is described below

commit 2edfb83bcd579e44da65f6bd3e69611f74d10699
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Fri Apr 5 14:15:04 2019 -0400

    Load balancing workflows across multiple clusters
---
 .../scigap/staging/group_vars/all/vars.yml         |  2 +
 .../scigap/testing/group_vars/all/vars.yml         |  2 +
 .../post-wm/airavata-server.properties.j2          |  1 +
 .../templates/pre-wm/airavata-server.properties.j2 |  1 +
 .../helix/impl/workflow/ParserWorkflowManager.java |  3 +-
 .../helix/impl/workflow/PostWorkflowManager.java   |  3 +-
 .../helix/impl/workflow/PreWorkflowManager.java    |  3 +-
 .../helix/impl/workflow/WorkflowManager.java       | 53 ++++++++++++++++++----
 .../src/main/resources/airavata-server.properties  |  2 +
 9 files changed, 58 insertions(+), 12 deletions(-)

diff --git 
a/dev-tools/ansible/inventories/scigap/staging/group_vars/all/vars.yml 
b/dev-tools/ansible/inventories/scigap/staging/group_vars/all/vars.yml
index ce9c1d2..a49db72 100644
--- a/dev-tools/ansible/inventories/scigap/staging/group_vars/all/vars.yml
+++ b/dev-tools/ansible/inventories/scigap/staging/group_vars/all/vars.yml
@@ -148,7 +148,9 @@ snapshot_version: "0.17-SNAPSHOT"
 helix_controller_name: "helixcontroller"
 helix_participant_name: "helixparticipant"
 helix_pre_wm_name: "prewm"
+helix_pre_wm_load_balance_clusters: "false"
 helix_post_wm_name: "postwm"
+helix_post_wm_load_balance_clusters: "false"
 
 # Listening port for kafka installations
 kafka_listener_port: 9092
diff --git 
a/dev-tools/ansible/inventories/scigap/testing/group_vars/all/vars.yml 
b/dev-tools/ansible/inventories/scigap/testing/group_vars/all/vars.yml
index b1a7b18..562d3ff 100644
--- a/dev-tools/ansible/inventories/scigap/testing/group_vars/all/vars.yml
+++ b/dev-tools/ansible/inventories/scigap/testing/group_vars/all/vars.yml
@@ -148,7 +148,9 @@ snapshot_version: "0.18-SNAPSHOT"
 helix_controller_name: "helixcontroller"
 helix_participant_name: "helixparticipant"
 helix_pre_wm_name: "prewm"
+helix_pre_wm_load_balance_clusters: "false"
 helix_post_wm_name: "postwm"
+helix_post_wm_load_balance_clusters: "false"
 
 # Listening port for kafka installations
 kafka_listener_port: 9092
diff --git 
a/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2
 
b/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2
index 0085310..259e52c 100644
--- 
a/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2
+++ 
b/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2
@@ -30,6 +30,7 @@ kafka.broker.topic={{ job_monitor_broker_topic }}
 kafka.broker.consumer.group={{ job_monitor_broker_consumer_group }}
 helix.cluster.name={{ helix_cluster_name }}
 post.workflow.manager.name={{ helix_post_wm_name }}
+post.workflow.manager.loadbalance.clusters={{ 
helix_post_wm_load_balance_clusters }}
 
 ###########################################################################
 # AMQP Notification Configuration
diff --git 
a/dev-tools/ansible/roles/helix_setup/templates/pre-wm/airavata-server.properties.j2
 
b/dev-tools/ansible/roles/helix_setup/templates/pre-wm/airavata-server.properties.j2
index 220fba7..cabede9 100644
--- 
a/dev-tools/ansible/roles/helix_setup/templates/pre-wm/airavata-server.properties.j2
+++ 
b/dev-tools/ansible/roles/helix_setup/templates/pre-wm/airavata-server.properties.j2
@@ -27,6 +27,7 @@ regserver.server.port={{ registry_port }}
 ###########################################################################
 helix.cluster.name={{ helix_cluster_name }}
 pre.workflow.manager.name={{ helix_pre_wm_name }}
+pre.workflow.manager.loadbalance.clusters={{ 
helix_pre_wm_load_balance_clusters }}
 
 ###########################################################################
 # AMQP Notification Configuration
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
index 3fc9008..9b4fd74 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
@@ -47,7 +47,8 @@ public class ParserWorkflowManager extends WorkflowManager {
     private ParserRequest parserRequest;
 
     public ParserWorkflowManager(ParserRequest parserRequest) throws 
ApplicationSettingsException {
-        super(ServerSettings.getSetting("parser.workflow.manager.name"));
+        super(ServerSettings.getSetting("parser.workflow.manager.name"),
+                
Boolean.parseBoolean(ServerSettings.getSetting("post.workflow.manager.loadbalance.clusters")));
         this.parserRequest = parserRequest;
     }
 
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 78a4cb3..bd522f2 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -59,7 +59,8 @@ public class PostWorkflowManager extends WorkflowManager {
     private final static Logger logger = 
LoggerFactory.getLogger(PostWorkflowManager.class);
 
     public PostWorkflowManager() throws ApplicationSettingsException {
-        super(ServerSettings.getSetting("post.workflow.manager.name"));
+        super(ServerSettings.getSetting("post.workflow.manager.name"),
+                
Boolean.parseBoolean(ServerSettings.getSetting("post.workflow.manager.loadbalance.clusters")));
     }
 
     private void init() throws Exception {
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index 15d6735..9b716c1 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -57,7 +57,8 @@ public class PreWorkflowManager extends WorkflowManager {
     private Subscriber subscriber;
 
     public PreWorkflowManager() throws ApplicationSettingsException {
-        super(ServerSettings.getSetting("pre.workflow.manager.name"));
+        super(ServerSettings.getSetting("pre.workflow.manager.name"),
+                
Boolean.parseBoolean(ServerSettings.getSetting("pre.workflow.manager.loadbalance.clusters")));
     }
 
     private void initAllComponents() throws Exception {
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java
index 8d963fa..97f7c02 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java
@@ -22,10 +22,15 @@ import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.List;
 
 public class WorkflowManager {
 
@@ -33,26 +38,47 @@ public class WorkflowManager {
 
     private Publisher statusPublisher;
     private CuratorFramework curatorClient = null;
-    private WorkflowOperator workflowOperator;
+    private List<WorkflowOperator> workflowOperators = new ArrayList<>();
     private ThriftClientPool<RegistryService.Client> registryClientPool;
     private String workflowManagerName;
+    private ZKHelixAdmin zkHelixAdmin;
+    private boolean loadBalanceClusters;
 
-    public WorkflowManager(String workflowManagerName) {
+    private int currentOperator = 0;
+
+    public WorkflowManager(String workflowManagerName, boolean 
loadBalanceClusters) {
         this.workflowManagerName = workflowManagerName;
+        this.loadBalanceClusters = loadBalanceClusters;
     }
 
     protected void initComponents() throws Exception {
         initRegistryClientPool();
-        initWorkflowOperatorr();
+        initHelixAdmin();
+        initWorkflowOperators();
         initStatusPublisher();
         initCuratorClient();
+
     }
 
-    private void initWorkflowOperatorr() throws Exception {
-        workflowOperator = new WorkflowOperator(
-                ServerSettings.getSetting("helix.cluster.name"),
-                workflowManagerName,
-                ServerSettings.getZookeeperConnection());
+    private void initWorkflowOperators() throws Exception {
+
+        if (!loadBalanceClusters) {
+            logger.info("Using default cluster " + 
ServerSettings.getSetting("helix.cluster.name") + " to submit workflows");
+            workflowOperators.add(new WorkflowOperator(
+                    ServerSettings.getSetting("helix.cluster.name"),
+                    workflowManagerName,
+                    ServerSettings.getZookeeperConnection()));
+        } else {
+            logger.info("Load balancing workflows among existing clusters");
+            List<String> clusters = zkHelixAdmin.getClusters();
+            logger.info("Total available clusters " + clusters.size());
+            for (String cluster : clusters) {
+                workflowOperators.add(new WorkflowOperator(
+                        cluster,
+                        workflowManagerName,
+                        ServerSettings.getZookeeperConnection()));
+            }
+        }
     }
 
     private void initStatusPublisher() throws AiravataException {
@@ -65,6 +91,11 @@ public class WorkflowManager {
         this.curatorClient.start();
     }
 
+    private void initHelixAdmin() throws ApplicationSettingsException {
+        ZkClient zkClient = new 
ZkClient(ServerSettings.getZookeeperConnection(), 
ZkClient.DEFAULT_SESSION_TIMEOUT,
+                ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+        zkHelixAdmin = new ZKHelixAdmin(zkClient);
+    }
     private void initRegistryClientPool() throws ApplicationSettingsException {
 
         GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
@@ -90,7 +121,11 @@ public class WorkflowManager {
     }
 
     public WorkflowOperator getWorkflowOperator() {
-        return workflowOperator;
+        currentOperator++;
+        if (workflowOperators.size() >= currentOperator) {
+            currentOperator = 0;
+        }
+        return workflowOperators.get(currentOperator);
     }
 
     public ThriftClientPool<RegistryService.Client> getRegistryClientPool() {
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/resources/airavata-server.properties
 
b/modules/airavata-helix/helix-spectator/src/main/resources/airavata-server.properties
index a5bde66..f1bd06b 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/resources/airavata-server.properties
+++ 
b/modules/airavata-helix/helix-spectator/src/main/resources/airavata-server.properties
@@ -43,7 +43,9 @@ kafka.broker.topic=parsed-data
 kafka.broker.consumer.group=MonitoringConsumer
 helix.cluster.name=AiravataDemoCluster
 pre.workflow.manager.name=prewm
+pre.workflow.manager.loadbalance.clusters=false
 post.workflow.manager.name=postwm
+post.workflow.manager.loadbalance.clusters=false
 parser.workflow.manager.name=parserwm
 helix.controller.name=helixcontroller
 helix.participant.name=helixparticipant

Reply via email to