This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch staging
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/staging by this push:
new 2edfb83 Load balancing workflows across multiple clusters
2edfb83 is described below
commit 2edfb83bcd579e44da65f6bd3e69611f74d10699
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Fri Apr 5 14:15:04 2019 -0400
Load balancing workflows across multiple clusters
---
.../scigap/staging/group_vars/all/vars.yml | 2 +
.../scigap/testing/group_vars/all/vars.yml | 2 +
.../post-wm/airavata-server.properties.j2 | 1 +
.../templates/pre-wm/airavata-server.properties.j2 | 1 +
.../helix/impl/workflow/ParserWorkflowManager.java | 3 +-
.../helix/impl/workflow/PostWorkflowManager.java | 3 +-
.../helix/impl/workflow/PreWorkflowManager.java | 3 +-
.../helix/impl/workflow/WorkflowManager.java | 53 ++++++++++++++++++----
.../src/main/resources/airavata-server.properties | 2 +
9 files changed, 58 insertions(+), 12 deletions(-)
diff --git
a/dev-tools/ansible/inventories/scigap/staging/group_vars/all/vars.yml
b/dev-tools/ansible/inventories/scigap/staging/group_vars/all/vars.yml
index ce9c1d2..a49db72 100644
--- a/dev-tools/ansible/inventories/scigap/staging/group_vars/all/vars.yml
+++ b/dev-tools/ansible/inventories/scigap/staging/group_vars/all/vars.yml
@@ -148,7 +148,9 @@ snapshot_version: "0.17-SNAPSHOT"
helix_controller_name: "helixcontroller"
helix_participant_name: "helixparticipant"
helix_pre_wm_name: "prewm"
+helix_pre_wm_load_balance_clusters: "false"
helix_post_wm_name: "postwm"
+helix_post_wm_load_balance_clusters: "false"
# Listening port for kafka installations
kafka_listener_port: 9092
diff --git
a/dev-tools/ansible/inventories/scigap/testing/group_vars/all/vars.yml
b/dev-tools/ansible/inventories/scigap/testing/group_vars/all/vars.yml
index b1a7b18..562d3ff 100644
--- a/dev-tools/ansible/inventories/scigap/testing/group_vars/all/vars.yml
+++ b/dev-tools/ansible/inventories/scigap/testing/group_vars/all/vars.yml
@@ -148,7 +148,9 @@ snapshot_version: "0.18-SNAPSHOT"
helix_controller_name: "helixcontroller"
helix_participant_name: "helixparticipant"
helix_pre_wm_name: "prewm"
+helix_pre_wm_load_balance_clusters: "false"
helix_post_wm_name: "postwm"
+helix_post_wm_load_balance_clusters: "false"
# Listening port for kafka installations
kafka_listener_port: 9092
diff --git
a/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2
b/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2
index 0085310..259e52c 100644
---
a/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2
+++
b/dev-tools/ansible/roles/helix_setup/templates/post-wm/airavata-server.properties.j2
@@ -30,6 +30,7 @@ kafka.broker.topic={{ job_monitor_broker_topic }}
kafka.broker.consumer.group={{ job_monitor_broker_consumer_group }}
helix.cluster.name={{ helix_cluster_name }}
post.workflow.manager.name={{ helix_post_wm_name }}
+post.workflow.manager.loadbalance.clusters={{
helix_post_wm_load_balance_clusters }}
###########################################################################
# AMQP Notification Configuration
diff --git
a/dev-tools/ansible/roles/helix_setup/templates/pre-wm/airavata-server.properties.j2
b/dev-tools/ansible/roles/helix_setup/templates/pre-wm/airavata-server.properties.j2
index 220fba7..cabede9 100644
---
a/dev-tools/ansible/roles/helix_setup/templates/pre-wm/airavata-server.properties.j2
+++
b/dev-tools/ansible/roles/helix_setup/templates/pre-wm/airavata-server.properties.j2
@@ -27,6 +27,7 @@ regserver.server.port={{ registry_port }}
###########################################################################
helix.cluster.name={{ helix_cluster_name }}
pre.workflow.manager.name={{ helix_pre_wm_name }}
+pre.workflow.manager.loadbalance.clusters={{
helix_pre_wm_load_balance_clusters }}
###########################################################################
# AMQP Notification Configuration
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
index 3fc9008..9b4fd74 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
@@ -47,7 +47,8 @@ public class ParserWorkflowManager extends WorkflowManager {
private ParserRequest parserRequest;
public ParserWorkflowManager(ParserRequest parserRequest) throws
ApplicationSettingsException {
- super(ServerSettings.getSetting("parser.workflow.manager.name"));
+ super(ServerSettings.getSetting("parser.workflow.manager.name"),
+
Boolean.parseBoolean(ServerSettings.getSetting("post.workflow.manager.loadbalance.clusters")));
this.parserRequest = parserRequest;
}
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index 78a4cb3..bd522f2 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -59,7 +59,8 @@ public class PostWorkflowManager extends WorkflowManager {
private final static Logger logger =
LoggerFactory.getLogger(PostWorkflowManager.class);
public PostWorkflowManager() throws ApplicationSettingsException {
- super(ServerSettings.getSetting("post.workflow.manager.name"));
+ super(ServerSettings.getSetting("post.workflow.manager.name"),
+
Boolean.parseBoolean(ServerSettings.getSetting("post.workflow.manager.loadbalance.clusters")));
}
private void init() throws Exception {
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index 15d6735..9b716c1 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -57,7 +57,8 @@ public class PreWorkflowManager extends WorkflowManager {
private Subscriber subscriber;
public PreWorkflowManager() throws ApplicationSettingsException {
- super(ServerSettings.getSetting("pre.workflow.manager.name"));
+ super(ServerSettings.getSetting("pre.workflow.manager.name"),
+
Boolean.parseBoolean(ServerSettings.getSetting("pre.workflow.manager.loadbalance.clusters")));
}
private void initAllComponents() throws Exception {
diff --git
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java
index 8d963fa..97f7c02 100644
---
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java
+++
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java
@@ -22,10 +22,15 @@ import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Calendar;
+import java.util.List;
public class WorkflowManager {
@@ -33,26 +38,47 @@ public class WorkflowManager {
private Publisher statusPublisher;
private CuratorFramework curatorClient = null;
- private WorkflowOperator workflowOperator;
+ private List<WorkflowOperator> workflowOperators = new ArrayList<>();
private ThriftClientPool<RegistryService.Client> registryClientPool;
private String workflowManagerName;
+ private ZKHelixAdmin zkHelixAdmin;
+ private boolean loadBalanceClusters;
- public WorkflowManager(String workflowManagerName) {
+ private int currentOperator = 0;
+
+ public WorkflowManager(String workflowManagerName, boolean
loadBalanceClusters) {
this.workflowManagerName = workflowManagerName;
+ this.loadBalanceClusters = loadBalanceClusters;
}
protected void initComponents() throws Exception {
initRegistryClientPool();
- initWorkflowOperatorr();
+ initHelixAdmin();
+ initWorkflowOperators();
initStatusPublisher();
initCuratorClient();
+
}
- private void initWorkflowOperatorr() throws Exception {
- workflowOperator = new WorkflowOperator(
- ServerSettings.getSetting("helix.cluster.name"),
- workflowManagerName,
- ServerSettings.getZookeeperConnection());
+ private void initWorkflowOperators() throws Exception {
+
+ if (!loadBalanceClusters) {
+ logger.info("Using default cluster " +
ServerSettings.getSetting("helix.cluster.name") + " to submit workflows");
+ workflowOperators.add(new WorkflowOperator(
+ ServerSettings.getSetting("helix.cluster.name"),
+ workflowManagerName,
+ ServerSettings.getZookeeperConnection()));
+ } else {
+ logger.info("Load balancing workflows among existing clusters");
+ List<String> clusters = zkHelixAdmin.getClusters();
+ logger.info("Total available clusters " + clusters.size());
+ for (String cluster : clusters) {
+ workflowOperators.add(new WorkflowOperator(
+ cluster,
+ workflowManagerName,
+ ServerSettings.getZookeeperConnection()));
+ }
+ }
}
private void initStatusPublisher() throws AiravataException {
@@ -65,6 +91,11 @@ public class WorkflowManager {
this.curatorClient.start();
}
+ private void initHelixAdmin() throws ApplicationSettingsException {
+ ZkClient zkClient = new
ZkClient(ServerSettings.getZookeeperConnection(),
ZkClient.DEFAULT_SESSION_TIMEOUT,
+ ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ zkHelixAdmin = new ZKHelixAdmin(zkClient);
+ }
private void initRegistryClientPool() throws ApplicationSettingsException {
GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
@@ -90,7 +121,11 @@ public class WorkflowManager {
}
public WorkflowOperator getWorkflowOperator() {
- return workflowOperator;
+ currentOperator++;
+ if (workflowOperators.size() >= currentOperator) {
+ currentOperator = 0;
+ }
+ return workflowOperators.get(currentOperator);
}
public ThriftClientPool<RegistryService.Client> getRegistryClientPool() {
diff --git
a/modules/airavata-helix/helix-spectator/src/main/resources/airavata-server.properties
b/modules/airavata-helix/helix-spectator/src/main/resources/airavata-server.properties
index a5bde66..f1bd06b 100644
---
a/modules/airavata-helix/helix-spectator/src/main/resources/airavata-server.properties
+++
b/modules/airavata-helix/helix-spectator/src/main/resources/airavata-server.properties
@@ -43,7 +43,9 @@ kafka.broker.topic=parsed-data
kafka.broker.consumer.group=MonitoringConsumer
helix.cluster.name=AiravataDemoCluster
pre.workflow.manager.name=prewm
+pre.workflow.manager.loadbalance.clusters=false
post.workflow.manager.name=postwm
+post.workflow.manager.loadbalance.clusters=false
parser.workflow.manager.name=parserwm
helix.controller.name=helixcontroller
helix.participant.name=helixparticipant