Repository: airavata Updated Branches: refs/heads/master 4fbe57dac -> 3a927d855
more improvements for job cancel Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/3a927d85 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/3a927d85 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/3a927d85 Branch: refs/heads/master Commit: 3a927d855d02985309dd3f9090b784a50298a05f Parents: 4fbe57d Author: lahiru <[email protected]> Authored: Wed Aug 20 16:58:39 2014 +0530 Committer: lahiru <[email protected]> Committed: Wed Aug 20 16:58:39 2014 +0530 ---------------------------------------------------------------------- .../airavata/gfac/server/GfacServerHandler.java | 2 +- .../airavata/gfac/core/cpi/BetterGfacImpl.java | 19 ++------ .../airavata/gfac/core/monitor/MonitorID.java | 11 ++++- .../airavata/gfac/core/utils/GFacUtils.java | 16 ++++--- .../gsissh/provider/impl/GSISSHProvider.java | 1 - .../handlers/GridPullMonitorHandler.java | 47 +++++++++----------- .../monitor/impl/pull/qstat/HPCPullMonitor.java | 26 ++++++----- .../gfac/ssh/provider/impl/SSHProvider.java | 1 - .../server/OrchestratorServerHandler.java | 2 +- 9 files changed, 60 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/3a927d85/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 4d59b32..0ec7e67 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -124,7 +124,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ } String instanceId = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME); String instantNode = gfacServer + File.separator + instanceId; - zkStat = zk.exists(instantNode, false); + zkStat = zk.exists(instantNode, true); if (zkStat == null) { zk.create(instantNode, airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, http://git-wip-us.apache.org/repos/asf/airavata/blob/3a927d85/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index b917542..c4229be 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -689,13 +689,7 @@ public class BetterGfacImpl implements GFac,Watcher { jobExecutionContext.setProperty(ERROR_SENT, "true"); jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause())); throw new GFacException(e.getMessage(), e); - }finally { - try { - zk.close(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } + } } private void launch(JobExecutionContext jobExecutionContext) throws GFacException { @@ -762,13 +756,7 @@ public class BetterGfacImpl implements GFac,Watcher { jobExecutionContext.setProperty(ERROR_SENT, "true"); jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause())); throw new GFacException(e.getMessage(), e); - }finally { - try { - zk.close(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } + } } private void invokeProviderExecute(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException, InterruptedException, KeeperException { @@ -1188,8 +1176,7 @@ public class BetterGfacImpl implements GFac,Watcher { public void process(WatchedEvent watchedEvent) { if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){ // node data is changed, this means node is cancelled. - log.info("Experiment is cancelled with this path:"); - log.info(watchedEvent.getPath()); + log.info("Experiment is cancelled with this path:"+watchedEvent.getPath()); this.cancelled = true; } } http://git-wip-us.apache.org/repos/asf/airavata/blob/3a927d85/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java index f36a188..563db94 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java @@ -65,7 +65,16 @@ public class MonitorID { public MonitorID() { } - + public MonitorID(MonitorID monitorID){ + this.host = monitorID.getHost(); + this.jobStartedTime = new Timestamp((new Date()).getTime()); + this.userName = monitorID.getUserName(); + this.jobID = monitorID.getJobID(); + this.taskID = monitorID.getTaskID(); + this.experimentID = monitorID.getExperimentID(); + this.workflowNodeID = monitorID.getWorkflowNodeID(); + this.jobName = monitorID.getJobName(); + } public MonitorID(HostDescription host, String jobID, String taskID, String workflowNodeID, String experimentID, String userName,String jobName) { this.host = host; this.jobStartedTime = new Timestamp((new Date()).getTime()); http://git-wip-us.apache.org/repos/asf/airavata/blob/3a927d85/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java index 413bc13..9043d06 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java @@ -1137,12 +1137,16 @@ public class GFacUtils { public static void setExperimentCancel(String experimentId,String taskId,ZooKeeper zk)throws KeeperException, InterruptedException { String experimentEntry = GFacUtils.findExperimentEntry(experimentId, taskId, zk); - Stat operation = zk.exists(experimentEntry + File.separator + "operation", false); - if (operation == null) { // if there is no entry, this will come when a user immediately cancel a job - zk.create(experimentEntry + File.separator + "operation", "cancel".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } else { // if user submit the job to gfac then cancel during execution - zk.setData(experimentEntry + File.separator + "operation", "cancel".getBytes(), operation.getVersion()); + if(experimentEntry == null){ + log.error("Cannot find the experiment Entry, so cancel operation cannot be performed !!!"); + }else { + Stat operation = zk.exists(experimentEntry + File.separator + "operation", false); + if (operation == null) { // if there is no entry, this will come when a user immediately cancel a job + zk.create(experimentEntry + File.separator + "operation", "cancel".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } else { // if user submit the job to gfac then cancel during execution + zk.setData(experimentEntry + File.separator + "operation", "cancel".getBytes(), operation.getVersion()); + } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/3a927d85/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java index 28c792d..59949ac 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java @@ -239,7 +239,6 @@ public class GSISSHProvider extends AbstractRecoverableProvider { log.error("No Job Id is set, so cannot perform the cancel operation !!!"); return; } - removeFromMonitorHandlers(jobExecutionContext, (GsisshHostType)host, jobDetails.getJobID()); GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED); // we know this host is type GsiSSHHostType } catch (SSHApiException e) { http://git-wip-us.apache.org/repos/asf/airavata/blob/3a927d85/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java index 5cd929d..fea2d20 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java @@ -87,33 +87,24 @@ public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{ hpcPullMonitor.setGfac(jobExecutionContext.getGfac()); MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(), jobExecutionContext); try { - if ("true".equals(jobExecutionContext.getProperty("cancel"))) { - removeJobFromMonitoring(jobExecutionContext); - } else { - ZooKeeper zk = jobExecutionContext.getZk(); - try { - String experimentEntry = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID(), zk); - String path = experimentEntry + File.separator + "operation"; - Stat exists = zk.exists(path, this); - if(exists != null){ - zk.getData(path,this,exists); // watching the operations node - } - } catch (KeeperException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); + ZooKeeper zk = jobExecutionContext.getZk(); + try { + String experimentEntry = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID(), zk); + String path = experimentEntry + File.separator + "operation"; + Stat exists = zk.exists(path, this); + if (exists != null) { + zk.getData(path, this, exists); // watching the operations node } - CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID); + } catch (KeeperException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); } + CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID); } catch (AiravataMonitorException e) { logger.error("Error adding monitorID object to the queue with experiment ", monitorID.getExperimentID()); } } - - public void removeJobFromMonitoring(JobExecutionContext jobExecutionContext)throws GFacHandlerException { - MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(),jobExecutionContext); - hpcPullMonitor.getCancelJobList().add(monitorID); - } public AuthenticationInfo getAuthenticationInfo() { return authenticationInfo; } @@ -130,15 +121,19 @@ public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{ this.hpcPullMonitor = hpcPullMonitor; } - public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - this.removeJobFromMonitoring(jobExecutionContext); - } public void process(WatchedEvent watchedEvent) { if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){ // node data is changed, this means node is cancelled. - logger.info("Experiment is cancelled with this path:"); - logger.info(watchedEvent.getPath()); + logger.info("Experiment is cancelled with this path:"+watchedEvent.getPath()); + + String[] split = watchedEvent.getPath().split("/"); + for(String element:split) { + if (element.contains("+")) { + logger.info("Adding experimentID+TaskID to be removed from monitoring:"+element); + hpcPullMonitor.getCancelJobList().add(element); + } + } } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/3a927d85/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index a2ead4d..5dd8a15 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.airavata.common.utils.MonitorPublisher; import org.apache.airavata.common.utils.ServerSettings; @@ -47,6 +48,7 @@ import org.apache.airavata.gfac.monitor.util.CommonUtils; import org.apache.airavata.gsi.ssh.api.SSHApiException; import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; import org.apache.airavata.model.workspace.experiment.JobState; +import org.apache.airavata.model.workspace.experiment.JobStatus; import org.apache.airavata.model.workspace.experiment.TaskState; import org.apache.airavata.schemas.gfac.GsisshHostType; import org.apache.airavata.schemas.gfac.SSHHostType; @@ -71,7 +73,7 @@ public class HPCPullMonitor extends PullMonitor { private MonitorPublisher publisher; - private List<MonitorID> cancelJobList; + private LinkedBlockingQueue<String> cancelJobList; private GFac gfac; @@ -82,7 +84,7 @@ public class HPCPullMonitor extends PullMonitor { connections = new HashMap<String, ResourceConnection>(); this.queue = new LinkedBlockingDeque<UserMonitorData>(); publisher = new MonitorPublisher(new EventBus()); - cancelJobList = new ArrayList<MonitorID>(); + cancelJobList = new LinkedBlockingQueue<String>(); } public HPCPullMonitor(MonitorPublisher monitorPublisher, AuthenticationInfo authInfo) { @@ -90,14 +92,14 @@ public class HPCPullMonitor extends PullMonitor { this.queue = new LinkedBlockingDeque<UserMonitorData>(); publisher = monitorPublisher; authenticationInfo = authInfo; - cancelJobList = new ArrayList<MonitorID>(); + cancelJobList = new LinkedBlockingQueue<String>(); } public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) { this.queue = queue; this.publisher = publisher; connections = new HashMap<String, ResourceConnection>(); - cancelJobList = new ArrayList<MonitorID>(); + cancelJobList = new LinkedBlockingQueue<String>(); } @@ -138,7 +140,7 @@ public class HPCPullMonitor extends PullMonitor { * * @return if the start process is successful return true else false */ - public boolean startPulling() throws AiravataMonitorException { + synchronized public boolean startPulling() throws AiravataMonitorException { // take the top element in the queue and pull the data and put that element // at the tail of the queue //todo this polling will not work with multiple usernames but with single user @@ -165,14 +167,14 @@ public class HPCPullMonitor extends PullMonitor { connections.put(hostName, connection); } // before we get the statuses, we check the cancel job list and remove them permanently + List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs(); for(MonitorID iMonitorID:monitorID){ - for(MonitorID cancelMId:cancelJobList){ - if(iMonitorID.getJobID().equals(cancelMId.getJobID()) - && iMonitorID.getExperimentID().equals(cancelMId.getExperimentID()) - && iMonitorID.getTaskID().equals(cancelMId.getTaskID())){ + for(String cancelMId:cancelJobList) { + if (cancelMId.equals(iMonitorID.getExperimentID() + "+" + iMonitorID.getTaskID())) { + logger.info("Found a match in monitoring Queue, so marking this job to remove from monitor queue " + cancelMId); + logger.info("ExperimentID: " + cancelMId.split("\\+")[0]+",TaskID: "+cancelMId.split("\\+")[1]+"JobID"+iMonitorID.getJobID()); completedJobs.add(iMonitorID); - cancelJobList.remove(cancelMId); // once we found we delte the cancel job, so we don't have to do this check again and again } } } @@ -357,11 +359,11 @@ public class HPCPullMonitor extends PullMonitor { this.authenticationInfo = authenticationInfo; } - public List<MonitorID> getCancelJobList() { + public LinkedBlockingQueue<String> getCancelJobList() { return cancelJobList; } - public void setCancelJobList(List<MonitorID> cancelJobList) { + public void setCancelJobList(LinkedBlockingQueue<String> cancelJobList) { this.cancelJobList = cancelJobList; } } http://git-wip-us.apache.org/repos/asf/airavata/blob/3a927d85/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java index 2ad26a3..0e5f69e 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java @@ -232,7 +232,6 @@ public class SSHProvider extends AbstractProvider { log.error("No Job Id is set, so cannot perform the cancel operation !!!"); return; } - removeFromMonitorHandlers(jobExecutionContext, (GsisshHostType)host, jobDetails.getJobID()); GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED); } catch (SSHApiException e) { String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage(); http://git-wip-us.apache.org/repos/asf/airavata/blob/3a927d85/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index 7072019..68b84af 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -520,7 +520,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, experimentId); } ExperimentState experimentState = experiment.getExperimentStatus().getExperimentState(); - if (experimentState.getValue()> 4 && experimentState.getValue()<10){ + if (experimentState.getValue()> 5 && experimentState.getValue()<10){ throw new OrchestratorException("Unable to mark experiment as Cancelled, because current state is: " + experiment.getExperimentStatus().getExperimentState().toString()); }else if(experimentState.getValue()<3){
