Make AuroraThriftClient a synchronized singleton & Reconnect with Aurora 
scheduler on leader rotation


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/946143a7
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/946143a7
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/946143a7

Branch: refs/heads/develop
Commit: 946143a74c6737ed4adcdbb545148aff7dcab241
Parents: 593331b
Author: Gourav Shenoy <[email protected]>
Authored: Thu Nov 3 14:12:08 2016 -0400
Committer: Gourav Shenoy <[email protected]>
Committed: Thu Nov 3 14:12:08 2016 -0400

----------------------------------------------------------------------
 .../cloud/aurora/client/AuroraThriftClient.java | 233 ++++++++++++++-----
 .../aurora/util/AuroraThriftClientUtil.java     |   2 +-
 2 files changed, 176 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/946143a7/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java
----------------------------------------------------------------------
diff --git 
a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java
 
b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java
index 977479b..e955dc5 100644
--- 
a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java
+++ 
b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java
@@ -39,6 +39,7 @@ import 
org.apache.airavata.cloud.aurora.util.AuroraThriftClientUtil;
 import org.apache.airavata.cloud.aurora.util.Constants;
 import org.apache.airavata.cloud.aurora.util.ResponseResultType;
 import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,31 +74,101 @@ public class AuroraThriftClient {
        public static AuroraThriftClient getAuroraThriftClient() throws 
Exception {
                try {
                        if(thriftClient == null) {
-                               thriftClient = new AuroraThriftClient();
-                               
-                               // construct connection url for scheduler
-                               String auroraHosts = 
ServerSettings.getAuroraSchedulerHosts();
-                               Integer connectTimeout = 
ServerSettings.getAuroraSchedulerTimeout();
-                               
-                               // check reachable scheduler host
+                               synchronized(AuroraThriftClient.class) {
+                                       if(thriftClient == null) {
+                                               thriftClient = new 
AuroraThriftClient();
+                                               
+                                               // construct connection url for 
scheduler
+                                               String auroraHosts = 
ServerSettings.getAuroraSchedulerHosts();
+                                               Integer connectTimeout = 
ServerSettings.getAuroraSchedulerTimeout();
+                                               
+                                               // check reachable scheduler 
host
+                                               if(auroraHosts != null) {
+                                                       for(String auroraHost : 
auroraHosts.split(",")) {
+                                                               // malformed 
host string, should be of form <host:port>
+                                                               
if(auroraHost.split(":").length != 2) {
+                                                                       throw 
new Exception("Scheduler Host String: " + auroraHost + ", is malformed. Should 
be of form <hostname:port>!");
+                                                               }
+                                                               
+                                                               // read 
hostname, port & construct connection-url
+                                                               String hostname 
= auroraHost.split(":")[0];
+                                                               String port = 
auroraHost.split(":")[1];
+                                                               String 
connectionUrl = MessageFormat.format(Constants.AURORA_SCHEDULER_CONNECTION_URL, 
hostname, port);
+                                                               
+                                                               // verify if 
connection succeeds
+                                                               
if(AuroraThriftClientUtil.isSchedulerHostReachable(connectionUrl, 
connectTimeout)) {
+                                                                       
thriftClient.readOnlySchedulerClient = 
AuroraSchedulerClientFactory.createReadOnlySchedulerClient(connectionUrl, 
connectTimeout);
+                                                                       
thriftClient.auroraSchedulerManagerClient = 
AuroraSchedulerClientFactory.createSchedulerManagerClient(connectionUrl, 
connectTimeout);
+                                                                       break;
+                                                               }
+                                                       }
+                                                       
+                                                       // check if scheduler 
connection successful
+                                                       
if(thriftClient.auroraSchedulerManagerClient == null || 
+                                                                       
thriftClient.readOnlySchedulerClient == null) {
+                                                               throw new 
Exception("None of the Aurora scheduler hosts were reachable, hence connection 
not established!");
+                                                       }
+                                               } else {
+                                                       // aurora hosts not 
defined in the properties file
+                                                       throw new 
Exception("Aurora hosts not specified in airavata-server.properties file.");
+                                               }
+                                       }
+                               }
+                       }
+               } catch(Exception ex) {
+                       logger.error(ex.getMessage(), ex);
+                       throw ex;
+               }
+               return thriftClient;
+       }
+       
+       
+       /**
+        * Reconnect with aurora scheduler.
+        *
+        * @return true, if successful
+        */
+       private boolean reconnectWithAuroraScheduler() {
+               boolean connectionSuccess = false;
+               
+               try {
+                       // construct connection url for scheduler
+                       String auroraHosts = 
ServerSettings.getAuroraSchedulerHosts();
+                       Integer connectTimeout = 
ServerSettings.getAuroraSchedulerTimeout();
+                       
+                       // check reachable scheduler host
+                       if(auroraHosts != null) {
                                for(String auroraHost : auroraHosts.split(",")) 
{
+                                       // malformed host string, should be of 
form <host:port>
+                                       if(auroraHost.split(":").length != 2) {
+                                               throw new Exception("Scheduler 
Host String: " + auroraHost + ", is malformed. Should be of form 
<hostname:port>!");
+                                       }
+                                       
+                                       // read hostname, port & construct 
connection-url
                                        String hostname = 
auroraHost.split(":")[0];
                                        String port = auroraHost.split(":")[1];
                                        String connectionUrl = 
MessageFormat.format(Constants.AURORA_SCHEDULER_CONNECTION_URL, hostname, port);
                                        
+                                       // verify if connection succeeds
                                        
if(AuroraThriftClientUtil.isSchedulerHostReachable(connectionUrl, 
connectTimeout)) {
                                                
thriftClient.readOnlySchedulerClient = 
AuroraSchedulerClientFactory.createReadOnlySchedulerClient(connectionUrl, 
connectTimeout);
                                                
thriftClient.auroraSchedulerManagerClient = 
AuroraSchedulerClientFactory.createSchedulerManagerClient(connectionUrl, 
connectTimeout);
+                                               
+                                               // set connection-success flag
+                                               connectionSuccess = true;
                                        }
                                }
+                       } else {
+                               // aurora hosts not defined in the properties 
file
+                               throw new Exception("Aurora hosts not specified 
in airavata-server.properties file.");
                        }
                } catch(Exception ex) {
                        logger.error(ex.getMessage(), ex);
-                       throw ex;
                }
-               return thriftClient;
+               return connectionSuccess;
        }
        
+       
        /**
         * Creates the job.
         *
@@ -107,16 +178,26 @@ public class AuroraThriftClient {
         */
        public ResponseBean createJob(JobConfigBean jobConfigBean) throws 
Exception {
                ResponseBean response = null;
-               try {
-                       if(jobConfigBean != null) {
-                               JobConfiguration jobConfig = 
AuroraThriftClientUtil.getAuroraJobConfig(jobConfigBean);
-                               Response createJobResponse = 
this.auroraSchedulerManagerClient.createJob(jobConfig);
-                               response = 
AuroraThriftClientUtil.getResponseBean(createJobResponse, 
ResponseResultType.CREATE_JOB);
+               // try till we get response or scheduler connection not found
+               while(response == null) {
+                       try {
+                               if(jobConfigBean != null) {
+                                       JobConfiguration jobConfig = 
AuroraThriftClientUtil.getAuroraJobConfig(jobConfigBean);
+                                       Response createJobResponse = 
this.auroraSchedulerManagerClient.createJob(jobConfig);
+                                       response = 
AuroraThriftClientUtil.getResponseBean(createJobResponse, 
ResponseResultType.CREATE_JOB);
+                               }
+                       } catch(Exception ex) {
+                               if (ex instanceof TTransportException) {
+                                       // if re-connection success, retry 
command
+                                       if 
(this.reconnectWithAuroraScheduler()) {
+                                               continue;
+                                       }
+                               }
+                               logger.error(ex.getMessage(), ex);
+                               throw ex;
                        }
-               } catch(Exception ex) {
-                       logger.error(ex.getMessage(), ex);
-                       throw ex;
                }
+               
                return response;
        }
        
@@ -130,15 +211,24 @@ public class AuroraThriftClient {
         */
        public ResponseBean killTasks(JobKeyBean jobKeyBean, Set<Integer> 
instances) throws Exception {
                ResponseBean response = null;
-               try {
-                       if(jobKeyBean != null) {
-                               JobKey jobKey = 
AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean);
-                               Response killTaskResponse = 
this.auroraSchedulerManagerClient.killTasks(jobKey, instances);
-                               response = 
AuroraThriftClientUtil.getResponseBean(killTaskResponse, 
ResponseResultType.KILL_TASKS);
+               // try till we get response or scheduler connection not found
+               while(response == null) {
+                       try {
+                               if(jobKeyBean != null) {
+                                       JobKey jobKey = 
AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean);
+                                       Response killTaskResponse = 
this.auroraSchedulerManagerClient.killTasks(jobKey, instances);
+                                       response = 
AuroraThriftClientUtil.getResponseBean(killTaskResponse, 
ResponseResultType.KILL_TASKS);
+                               }
+                       } catch(Exception ex) {
+                               if (ex instanceof TTransportException) {
+                                       // if re-connection success, retry 
command
+                                       if 
(this.reconnectWithAuroraScheduler()) {
+                                               continue;
+                                       }
+                               }
+                               logger.error(ex.getMessage(), ex);
+                               throw ex;
                        }
-               } catch(Exception ex) {
-                       logger.error(ex.getMessage(), ex);
-                       throw ex;
                }
                return response;
        }
@@ -152,12 +242,21 @@ public class AuroraThriftClient {
         */
        public GetJobsResponseBean getJobList(String ownerRole) throws 
Exception {
                GetJobsResponseBean response = null;
-               try {
-                               Response jobListResponse = 
this.readOnlySchedulerClient.getJobs(ownerRole);
-                               response = (GetJobsResponseBean) 
AuroraThriftClientUtil.getResponseBean(jobListResponse, 
ResponseResultType.GET_JOBS);
-               } catch(Exception ex) {
-                       logger.error(ex.getMessage(), ex);
-                       throw ex;
+               // try till we get response or scheduler connection not found
+               while(response == null) {
+                       try {
+                                       Response jobListResponse = 
this.readOnlySchedulerClient.getJobs(ownerRole);
+                                       response = (GetJobsResponseBean) 
AuroraThriftClientUtil.getResponseBean(jobListResponse, 
ResponseResultType.GET_JOBS);
+                       } catch(Exception ex) {
+                               if (ex instanceof TTransportException) {
+                                       // if re-connection success, retry 
command
+                                       if 
(this.reconnectWithAuroraScheduler()) {
+                                               continue;
+                                       }
+                               }
+                               logger.error(ex.getMessage(), ex);
+                               throw ex;
+                       }
                }
                return response;
        }
@@ -171,19 +270,28 @@ public class AuroraThriftClient {
         */
        public PendingJobReasonBean getPendingReasonForJob(JobKeyBean 
jobKeyBean) throws Exception {
                PendingJobReasonBean response = null;
-               try {
-                               JobKey jobKey = 
AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean);
-                               Set<JobKey> jobKeySet = new HashSet<>();
-                               jobKeySet.add(jobKey);
-                               
-                               TaskQuery query = new TaskQuery();
-                               query.setJobKeys(jobKeySet);
-                               
-                               Response pendingReasonResponse = 
this.readOnlySchedulerClient.getPendingReason(query);
-                               response = (PendingJobReasonBean) 
AuroraThriftClientUtil.getResponseBean(pendingReasonResponse, 
ResponseResultType.GET_PENDING_JOB_REASON);
-               } catch(Exception ex) {
-                       logger.error(ex.getMessage(), ex);
-                       throw ex;
+               // try till we get response or scheduler connection not found
+               while(response == null) {
+                       try {
+                                       JobKey jobKey = 
AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean);
+                                       Set<JobKey> jobKeySet = new HashSet<>();
+                                       jobKeySet.add(jobKey);
+                                       
+                                       TaskQuery query = new TaskQuery();
+                                       query.setJobKeys(jobKeySet);
+                                       
+                                       Response pendingReasonResponse = 
this.readOnlySchedulerClient.getPendingReason(query);
+                                       response = (PendingJobReasonBean) 
AuroraThriftClientUtil.getResponseBean(pendingReasonResponse, 
ResponseResultType.GET_PENDING_JOB_REASON);
+                       } catch(Exception ex) {
+                               if (ex instanceof TTransportException) {
+                                       // if re-connection success, retry 
command
+                                       if 
(this.reconnectWithAuroraScheduler()) {
+                                               continue;
+                                       }
+                               }
+                               logger.error(ex.getMessage(), ex);
+                               throw ex;
+                       }
                }
                return response;
        }
@@ -197,21 +305,30 @@ public class AuroraThriftClient {
         */
        public JobDetailsResponseBean getJobDetails(JobKeyBean jobKeyBean) 
throws Exception {
                JobDetailsResponseBean response = null;
-               try {
-                       if(jobKeyBean != null) {
-                               JobKey jobKey = 
AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean);
-                               Set<JobKey> jobKeySet = new HashSet<>();
-                               jobKeySet.add(jobKey);
-                               
-                               TaskQuery query = new TaskQuery();
-                               query.setJobKeys(jobKeySet);
-                               
-                               Response jobDetailsResponse = 
this.readOnlySchedulerClient.getTasksStatus(query);
-                               response = (JobDetailsResponseBean) 
AuroraThriftClientUtil.getResponseBean(jobDetailsResponse, 
ResponseResultType.GET_JOB_DETAILS);
+               // try till we get response or scheduler connection not found
+               while(response == null) {
+                       try {
+                               if(jobKeyBean != null) {
+                                       JobKey jobKey = 
AuroraThriftClientUtil.getAuroraJobKey(jobKeyBean);
+                                       Set<JobKey> jobKeySet = new HashSet<>();
+                                       jobKeySet.add(jobKey);
+                                       
+                                       TaskQuery query = new TaskQuery();
+                                       query.setJobKeys(jobKeySet);
+                                       
+                                       Response jobDetailsResponse = 
this.readOnlySchedulerClient.getTasksStatus(query);
+                                       response = (JobDetailsResponseBean) 
AuroraThriftClientUtil.getResponseBean(jobDetailsResponse, 
ResponseResultType.GET_JOB_DETAILS);
+                               }
+                       } catch(Exception ex) {
+                               if (ex instanceof TTransportException) {
+                                       // if re-connection success, retry 
command
+                                       if 
(this.reconnectWithAuroraScheduler()) {
+                                               continue;
+                                       }
+                               }
+                               logger.error(ex.getMessage(), ex);
+                               throw ex;
                        }
-               } catch(Exception ex) {
-                       logger.error(ex.getMessage(), ex);
-                       throw ex;
                }
                return response;
        }

http://git-wip-us.apache.org/repos/asf/airavata/blob/946143a7/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/util/AuroraThriftClientUtil.java
----------------------------------------------------------------------
diff --git 
a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/util/AuroraThriftClientUtil.java
 
b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/util/AuroraThriftClientUtil.java
index 7cb03b4..c13ef8f 100644
--- 
a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/util/AuroraThriftClientUtil.java
+++ 
b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/util/AuroraThriftClientUtil.java
@@ -390,7 +390,7 @@ public class AuroraThriftClientUtil {
                        // host is reachable
                        isReachable = true;
                } catch(Exception ex) {
-                       logger.error("Timed-out connecting to URL: " + 
connectionUrl, ex);
+                       logger.error("Timed-out connecting to URL: " + 
connectionUrl);
                }
                return isReachable;
        }

Reply via email to