Make AuroraThriftClient a synchronized singleton & Reconnect with Aurora scheduler on leader rotation
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/946143a7 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/946143a7 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/946143a7 Branch: refs/heads/develop Commit: 946143a74c6737ed4adcdbb545148aff7dcab241 Parents: 593331b Author: Gourav Shenoy <[email protected]> Authored: Thu Nov 3 14:12:08 2016 -0400 Committer: Gourav Shenoy <[email protected]> Committed: Thu Nov 3 14:12:08 2016 -0400 ---------------------------------------------------------------------- .../cloud/aurora/client/AuroraThriftClient.java | 233 ++++++++++++++----- .../aurora/util/AuroraThriftClientUtil.java | 2 +- 2 files changed, 176 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/946143a7/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java ---------------------------------------------------------------------- diff --git a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java index 977479b..e955dc5 100644 --- a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java +++ b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java @@ -39,6 +39,7 @@ import org.apache.airavata.cloud.aurora.util.AuroraThriftClientUtil; import org.apache.airavata.cloud.aurora.util.Constants; import org.apache.airavata.cloud.aurora.util.ResponseResultType; import org.apache.airavata.common.utils.ServerSettings; +import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,31 +74,101 @@ public class AuroraThriftClient { public static AuroraThriftClient getAuroraThriftClient() throws Exception { try { if(thriftClient == null) { - thriftClient = new AuroraThriftClient(); - - // construct connection url for scheduler - String auroraHosts = ServerSettings.getAuroraSchedulerHosts(); - Integer connectTimeout = ServerSettings.getAuroraSchedulerTimeout(); - - // check reachable scheduler host + synchronized(AuroraThriftClient.class) { + if(thriftClient == null) { + thriftClient = new AuroraThriftClient(); + + // construct connection url for scheduler + String auroraHosts = ServerSettings.getAuroraSchedulerHosts(); + Integer connectTimeout = ServerSettings.getAuroraSchedulerTimeout(); + + // check reachable scheduler host + if(auroraHosts != null) { + for(String auroraHost : auroraHosts.split(",")) { + // malformed host string, should be of form <host:port> + if(auroraHost.split(":").length != 2) { + throw new Exception("Scheduler Host String: " + auroraHost + ", is malformed. Should be of form <hostname:port>!"); + } + + // read hostname, port & construct connection-url + String hostname = auroraHost.split(":")[0]; + String port = auroraHost.split(":")[1]; + String connectionUrl = MessageFormat.format(Constants.AURORA_SCHEDULER_CONNECTION_URL, hostname, port); + + // verify if connection succeeds + if(AuroraThriftClientUtil.isSchedulerHostReachable(connectionUrl, connectTimeout)) { + thriftClient.readOnlySchedulerClient = AuroraSchedulerClientFactory.createReadOnlySchedulerClient(connectionUrl, connectTimeout); + thriftClient.auroraSchedulerManagerClient = AuroraSchedulerClientFactory.createSchedulerManagerClient(connectionUrl, connectTimeout); + break; + } + } + + // check if scheduler connection successful + if(thriftClient.auroraSchedulerManagerClient == null || + thriftClient.readOnlySchedulerClient == null) { + throw new Exception("None of the Aurora scheduler hosts were reachable, hence connection not established!"); + } + } else { + // aurora hosts not defined in the properties file + throw new Exception("Aurora hosts not specified in airavata-server.properties file."); + } + } + } + } + } catch(Exception ex) { + logger.error(ex.getMessage(), ex); + throw ex; + } + return thriftClient; + } + + + /** + * Reconnect with aurora scheduler. + * + * @return true, if successful + */ + private boolean reconnectWithAuroraScheduler() { + boolean connectionSuccess = false; + + try { + // construct connection url for scheduler + String auroraHosts = ServerSettings.getAuroraSchedulerHosts(); + Integer connectTimeout = ServerSettings.getAuroraSchedulerTimeout(); + + // check reachable scheduler host + if(auroraHosts != null) { for(String auroraHost : auroraHosts.split(",")) { + // malformed host string, should be of form <host:port> + if(auroraHost.split(":").length != 2) { + throw new Exception("Scheduler Host String: " + auroraHost + ", is malformed. Should be of form <hostname:port>!"); + } + + // read hostname, port & construct connection-url String hostname = auroraHost.split(":")[0]; String port = auroraHost.split(":")[1]; String connectionUrl = MessageFormat.format(Constants.AURORA_SCHEDULER_CONNECTION_URL, hostname, port); + // verify if connection succeeds if(AuroraThriftClientUtil.isSchedulerHostReachable(connectionUrl, connectTimeout)) { thriftClient.readOnlySchedulerClient = AuroraSchedulerClientFactory.createReadOnlySchedulerClient(connectionUrl, connectTimeout); thriftClient.auroraSchedulerManagerClient = AuroraSchedulerClientFactory.createSchedulerManagerClient(connectionUrl, connectTimeout); + + // set connection-success flag + connectionSuccess = true; } } + } else { + // aurora hosts not defined in the properties file + throw new Exception("Aurora hosts not specified in airavata-server.properties file."); } } catch(Exception ex) { logger.error(ex.getMessage(), ex); - throw ex; } - return thriftClient; + return connectionSuccess; } + /** * Creates the job. * @@ -107,16 +178,26 @@ public class AuroraThriftClient { */ public ResponseBean createJob(JobConfigBean jobConfigBean) throws Exception { ResponseBean response = null; - try { - if(jobConfigBean != null) { - JobConfiguration jobConfig = AuroraThriftClientUtil.getAuroraJobConfig(jobConfigBean); - Response createJobResponse = this.auroraSchedulerManagerClient.createJob(jobConfig); - response = AuroraThriftClientUtil.getResponseBean(createJobResponse, ResponseResultType.CREATE_JOB); + // try till we get response or scheduler connection not found + while(response == null) { + try { + if(jobConfigBean != null) { + JobConfiguration jobConfig = AuroraThriftClientUtil.getAuroraJobConfig(jobConfigBean); + Response createJobResponse = this.auroraSchedulerManagerClient.createJob(jobConfig); + response = AuroraThriftClientUtil.getResponseBean(createJobResponse, ResponseResultType.CREATE_JOB); + } + } catch(Exception ex) { + if (ex instanceof TTransportException) { + // if re-connection success, retry command + if (this.reconnectWithAuroraScheduler()) { + continue; + } + } + logger.error(ex.getMessage(), ex); + throw ex; } - } catch(Exception ex) { - logger.error(ex.getMessage(), ex); - throw ex; } + return response; } @@ -130,15 +211,24 @@ public class AuroraThriftClient { */ public ResponseBean killTasks(JobKeyBean jobKeyBean, Set<Integer> instances) throws Exception { ResponseBean response = null; - try { - if(jobKeyBean != null) { - JobKey jobKey = AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean); - Response killTaskResponse = this.auroraSchedulerManagerClient.killTasks(jobKey, instances); - response = AuroraThriftClientUtil.getResponseBean(killTaskResponse, ResponseResultType.KILL_TASKS); + // try till we get response or scheduler connection not found + while(response == null) { + try { + if(jobKeyBean != null) { + JobKey jobKey = AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean); + Response killTaskResponse = this.auroraSchedulerManagerClient.killTasks(jobKey, instances); + response = AuroraThriftClientUtil.getResponseBean(killTaskResponse, ResponseResultType.KILL_TASKS); + } + } catch(Exception ex) { + if (ex instanceof TTransportException) { + // if re-connection success, retry command + if (this.reconnectWithAuroraScheduler()) { + continue; + } + } + logger.error(ex.getMessage(), ex); + throw ex; } - } catch(Exception ex) { - logger.error(ex.getMessage(), ex); - throw ex; } return response; } @@ -152,12 +242,21 @@ public class AuroraThriftClient { */ public GetJobsResponseBean getJobList(String ownerRole) throws Exception { GetJobsResponseBean response = null; - try { - Response jobListResponse = this.readOnlySchedulerClient.getJobs(ownerRole); - response = (GetJobsResponseBean) AuroraThriftClientUtil.getResponseBean(jobListResponse, ResponseResultType.GET_JOBS); - } catch(Exception ex) { - logger.error(ex.getMessage(), ex); - throw ex; + // try till we get response or scheduler connection not found + while(response == null) { + try { + Response jobListResponse = this.readOnlySchedulerClient.getJobs(ownerRole); + response = (GetJobsResponseBean) AuroraThriftClientUtil.getResponseBean(jobListResponse, ResponseResultType.GET_JOBS); + } catch(Exception ex) { + if (ex instanceof TTransportException) { + // if re-connection success, retry command + if (this.reconnectWithAuroraScheduler()) { + continue; + } + } + logger.error(ex.getMessage(), ex); + throw ex; + } } return response; } @@ -171,19 +270,28 @@ public class AuroraThriftClient { */ public PendingJobReasonBean getPendingReasonForJob(JobKeyBean jobKeyBean) throws Exception { PendingJobReasonBean response = null; - try { - JobKey jobKey = AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean); - Set<JobKey> jobKeySet = new HashSet<>(); - jobKeySet.add(jobKey); - - TaskQuery query = new TaskQuery(); - query.setJobKeys(jobKeySet); - - Response pendingReasonResponse = this.readOnlySchedulerClient.getPendingReason(query); - response = (PendingJobReasonBean) AuroraThriftClientUtil.getResponseBean(pendingReasonResponse, ResponseResultType.GET_PENDING_JOB_REASON); - } catch(Exception ex) { - logger.error(ex.getMessage(), ex); - throw ex; + // try till we get response or scheduler connection not found + while(response == null) { + try { + JobKey jobKey = AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean); + Set<JobKey> jobKeySet = new HashSet<>(); + jobKeySet.add(jobKey); + + TaskQuery query = new TaskQuery(); + query.setJobKeys(jobKeySet); + + Response pendingReasonResponse = this.readOnlySchedulerClient.getPendingReason(query); + response = (PendingJobReasonBean) AuroraThriftClientUtil.getResponseBean(pendingReasonResponse, ResponseResultType.GET_PENDING_JOB_REASON); + } catch(Exception ex) { + if (ex instanceof TTransportException) { + // if re-connection success, retry command + if (this.reconnectWithAuroraScheduler()) { + continue; + } + } + logger.error(ex.getMessage(), ex); + throw ex; + } } return response; } @@ -197,21 +305,30 @@ public class AuroraThriftClient { */ public JobDetailsResponseBean getJobDetails(JobKeyBean jobKeyBean) throws Exception { JobDetailsResponseBean response = null; - try { - if(jobKeyBean != null) { - JobKey jobKey = AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean); - Set<JobKey> jobKeySet = new HashSet<>(); - jobKeySet.add(jobKey); - - TaskQuery query = new TaskQuery(); - query.setJobKeys(jobKeySet); - - Response jobDetailsResponse = this.readOnlySchedulerClient.getTasksStatus(query); - response = (JobDetailsResponseBean) AuroraThriftClientUtil.getResponseBean(jobDetailsResponse, ResponseResultType.GET_JOB_DETAILS); + // try till we get response or scheduler connection not found + while(response == null) { + try { + if(jobKeyBean != null) { + JobKey jobKey = AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean); + Set<JobKey> jobKeySet = new HashSet<>(); + jobKeySet.add(jobKey); + + TaskQuery query = new TaskQuery(); + query.setJobKeys(jobKeySet); + + Response jobDetailsResponse = this.readOnlySchedulerClient.getTasksStatus(query); + response = (JobDetailsResponseBean) AuroraThriftClientUtil.getResponseBean(jobDetailsResponse, ResponseResultType.GET_JOB_DETAILS); + } + } catch(Exception ex) { + if (ex instanceof TTransportException) { + // if re-connection success, retry command + if (this.reconnectWithAuroraScheduler()) { + continue; + } + } + logger.error(ex.getMessage(), ex); + throw ex; } - } catch(Exception ex) { - logger.error(ex.getMessage(), ex); - throw ex; } return response; } http://git-wip-us.apache.org/repos/asf/airavata/blob/946143a7/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/util/AuroraThriftClientUtil.java ---------------------------------------------------------------------- diff --git a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/util/AuroraThriftClientUtil.java b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/util/AuroraThriftClientUtil.java index 7cb03b4..c13ef8f 100644 --- a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/util/AuroraThriftClientUtil.java +++ b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/util/AuroraThriftClientUtil.java @@ -390,7 +390,7 @@ public class AuroraThriftClientUtil { // host is reachable isReachable = true; } catch(Exception ex) { - logger.error("Timed-out connecting to URL: " + connectionUrl, ex); + logger.error("Timed-out connecting to URL: " + connectionUrl); } return isReachable; }
