Repository: airavata Updated Branches: refs/heads/master 83cb4bf21 -> 4fbe57dac
fixing stampede job cancel issue Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4fbe57da Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4fbe57da Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4fbe57da Branch: refs/heads/master Commit: 4fbe57dac658f0b24141c85f43ae41c8d1feaa44 Parents: 83cb4bf Author: lahiru <[email protected]> Authored: Wed Aug 20 12:02:18 2014 +0530 Committer: lahiru <[email protected]> Committed: Wed Aug 20 12:02:18 2014 +0530 ---------------------------------------------------------------------- .../airavata/gfac/core/cpi/BetterGfacImpl.java | 24 +++++++++---- .../gsissh/provider/impl/GSISSHProvider.java | 4 +-- .../handlers/GridPullMonitorHandler.java | 36 +++++++++++++++---- .../monitor/impl/pull/qstat/HPCPullMonitor.java | 24 +++++++++++++ .../gfac/ssh/provider/impl/SSHProvider.java | 38 +++++++++++++++++++- .../gsi/ssh/impl/GSISSHAbstractCluster.java | 17 +++++---- 6 files changed, 121 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/4fbe57da/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index 9415625..b917542 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -689,8 +689,14 @@ public class BetterGfacImpl implements GFac,Watcher { jobExecutionContext.setProperty(ERROR_SENT, "true"); jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause())); throw new GFacException(e.getMessage(), e); - } - } + }finally { + try { + zk.close(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } private void launch(JobExecutionContext jobExecutionContext) throws GFacException { // Scheduler will decide the execution flow of handlers and provider @@ -756,8 +762,14 @@ public class BetterGfacImpl implements GFac,Watcher { jobExecutionContext.setProperty(ERROR_SENT, "true"); jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause())); throw new GFacException(e.getMessage(), e); - } - } + }finally { + try { + zk.close(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } private void invokeProviderExecute(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException, InterruptedException, KeeperException { GFacProvider provider = jobExecutionContext.getProvider(); @@ -1176,8 +1188,8 @@ public class BetterGfacImpl implements GFac,Watcher { public void process(WatchedEvent watchedEvent) { if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){ // node data is changed, this means node is cancelled. - System.out.println(watchedEvent.getPath()); - System.out.println("Experiment is cancelled with this path"); + log.info("Experiment is cancelled with this path:"); + log.info(watchedEvent.getPath()); this.cancelled = true; } } http://git-wip-us.apache.org/repos/asf/airavata/blob/4fbe57da/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java index d6981f3..28c792d 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java @@ -184,7 +184,6 @@ public class GSISSHProvider extends AbstractRecoverableProvider { if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) { pullMonitorHandler = threadedHandler; if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)) { - log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID); jobExecutionContext.setProperty("cancel","true"); pullMonitorHandler.invoke(jobExecutionContext); } else { @@ -194,7 +193,6 @@ public class GSISSHProvider extends AbstractRecoverableProvider { } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) { pushMonitorHandler = threadedHandler; if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PUSH.equals(monitorMode)) { - log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned: " + jobID); pushMonitorHandler.invoke(jobExecutionContext); } else { log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" + @@ -218,7 +216,6 @@ public class GSISSHProvider extends AbstractRecoverableProvider { log.info("canceling the job status in GSISSHProvider!!!!!"); HostDescriptionType host = jobExecutionContext.getApplicationContext(). getHostDescription().getType(); - StringBuffer data = new StringBuffer(); JobDetails jobDetails = jobExecutionContext.getJobDetails(); try { Cluster cluster = null; @@ -242,6 +239,7 @@ public class GSISSHProvider extends AbstractRecoverableProvider { log.error("No Job Id is set, so cannot perform the cancel operation !!!"); return; } + removeFromMonitorHandlers(jobExecutionContext, (GsisshHostType)host, jobDetails.getJobID()); GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED); // we know this host is type GsiSSHHostType } catch (SSHApiException e) { http://git-wip-us.apache.org/repos/asf/airavata/blob/4fbe57da/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java index 3899538..5cd929d 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java @@ -28,15 +28,22 @@ import org.apache.airavata.gfac.core.cpi.GFacImpl; import org.apache.airavata.gfac.core.handler.GFacHandlerException; import org.apache.airavata.gfac.core.handler.ThreadedHandler; import org.apache.airavata.gfac.core.monitor.MonitorID; +import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.airavata.gfac.monitor.HPCMonitorID; import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; import org.apache.airavata.gfac.monitor.impl.pull.qstat.HPCPullMonitor; import org.apache.airavata.gfac.monitor.util.CommonUtils; import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.util.Properties; /** @@ -45,7 +52,7 @@ import java.util.Properties; * commands like qstat,squeue and this supports sun grid enging monitoring too * which is a slight variation of qstat monitoring. */ -public class GridPullMonitorHandler extends ThreadedHandler { +public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{ private final static Logger logger = LoggerFactory.getLogger(GridPullMonitorHandler.class); private HPCPullMonitor hpcPullMonitor; @@ -83,6 +90,19 @@ public class GridPullMonitorHandler extends ThreadedHandler { if ("true".equals(jobExecutionContext.getProperty("cancel"))) { removeJobFromMonitoring(jobExecutionContext); } else { + ZooKeeper zk = jobExecutionContext.getZk(); + try { + String experimentEntry = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID(), zk); + String path = experimentEntry + File.separator + "operation"; + Stat exists = zk.exists(path, this); + if(exists != null){ + zk.getData(path,this,exists); // watching the operations node + } + } catch (KeeperException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID); } } catch (AiravataMonitorException e) { @@ -92,11 +112,7 @@ public class GridPullMonitorHandler extends ThreadedHandler { public void removeJobFromMonitoring(JobExecutionContext jobExecutionContext)throws GFacHandlerException { MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(),jobExecutionContext); - try { - CommonUtils.removeMonitorFromQueue(hpcPullMonitor.getQueue(),monitorID); - } catch (AiravataMonitorException e) { - throw new GFacHandlerException(e); - } + hpcPullMonitor.getCancelJobList().add(monitorID); } public AuthenticationInfo getAuthenticationInfo() { return authenticationInfo; @@ -117,4 +133,12 @@ public class GridPullMonitorHandler extends ThreadedHandler { public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { this.removeJobFromMonitoring(jobExecutionContext); } + + public void process(WatchedEvent watchedEvent) { + if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){ + // node data is changed, this means node is cancelled. + logger.info("Experiment is cancelled with this path:"); + logger.info(watchedEvent.getPath()); + } + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/4fbe57da/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index 2fea154..a2ead4d 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -71,6 +71,8 @@ public class HPCPullMonitor extends PullMonitor { private MonitorPublisher publisher; + private List<MonitorID> cancelJobList; + private GFac gfac; @@ -80,6 +82,7 @@ public class HPCPullMonitor extends PullMonitor { connections = new HashMap<String, ResourceConnection>(); this.queue = new LinkedBlockingDeque<UserMonitorData>(); publisher = new MonitorPublisher(new EventBus()); + cancelJobList = new ArrayList<MonitorID>(); } public HPCPullMonitor(MonitorPublisher monitorPublisher, AuthenticationInfo authInfo) { @@ -87,12 +90,14 @@ public class HPCPullMonitor extends PullMonitor { this.queue = new LinkedBlockingDeque<UserMonitorData>(); publisher = monitorPublisher; authenticationInfo = authInfo; + cancelJobList = new ArrayList<MonitorID>(); } public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) { this.queue = queue; this.publisher = publisher; connections = new HashMap<String, ResourceConnection>(); + cancelJobList = new ArrayList<MonitorID>(); } @@ -159,7 +164,18 @@ public class HPCPullMonitor extends PullMonitor { connection = new ResourceConnection(iHostMonitorData,getAuthenticationInfo()); connections.put(hostName, connection); } + // before we get the statuses, we check the cancel job list and remove them permanently List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs(); + for(MonitorID iMonitorID:monitorID){ + for(MonitorID cancelMId:cancelJobList){ + if(iMonitorID.getJobID().equals(cancelMId.getJobID()) + && iMonitorID.getExperimentID().equals(cancelMId.getExperimentID()) + && iMonitorID.getTaskID().equals(cancelMId.getTaskID())){ + completedJobs.add(iMonitorID); + cancelJobList.remove(cancelMId); // once we found we delte the cancel job, so we don't have to do this check again and again + } + } + } Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID); for (MonitorID iMonitorID : monitorID) { currentMonitorID = iMonitorID; @@ -340,4 +356,12 @@ public class HPCPullMonitor extends PullMonitor { public void setAuthenticationInfo(AuthenticationInfo authenticationInfo) { this.authenticationInfo = authenticationInfo; } + + public List<MonitorID> getCancelJobList() { + return cancelJobList; + } + + public void setCancelJobList(List<MonitorID> cancelJobList) { + this.cancelJobList = cancelJobList; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/4fbe57da/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java index 4db72a4..2ad26a3 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java @@ -37,6 +37,7 @@ import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.context.MessageContext; import org.apache.airavata.gfac.core.cpi.BetterGfacImpl; +import org.apache.airavata.gfac.core.cpi.GFacImpl; import org.apache.airavata.gfac.core.handler.GFacHandlerException; import org.apache.airavata.gfac.core.handler.ThreadedHandler; import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent; @@ -231,6 +232,7 @@ public class SSHProvider extends AbstractProvider { log.error("No Job Id is set, so cannot perform the cancel operation !!!"); return; } + removeFromMonitorHandlers(jobExecutionContext, (GsisshHostType)host, jobDetails.getJobID()); GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED); } catch (SSHApiException e) { String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage(); @@ -251,7 +253,41 @@ public class SSHProvider extends AbstractProvider { } } - + public void removeFromMonitorHandlers(JobExecutionContext jobExecutionContext, GsisshHostType host, String jobID) throws GFacHandlerException { + List<ThreadedHandler> daemonHandlers = GFacImpl.getDaemonHandlers(); + if (daemonHandlers == null) { + daemonHandlers = BetterGfacImpl.getDaemonHandlers(); + } + ThreadedHandler pullMonitorHandler = null; + ThreadedHandler pushMonitorHandler = null; + String monitorMode = host.getMonitorMode(); + for (ThreadedHandler threadedHandler : daemonHandlers) { + if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) { + pullMonitorHandler = threadedHandler; + if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)) { + jobExecutionContext.setProperty("cancel","true"); + pullMonitorHandler.invoke(jobExecutionContext); + } else { + log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" + + " to handle by the GridPullMonitorHandler"); + } + } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) { + pushMonitorHandler = threadedHandler; + if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PUSH.equals(monitorMode)) { + log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned: " + jobID); + pushMonitorHandler.invoke(jobExecutionContext); + } else { + log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" + + " to handle by the GridPushMonitorHandler"); + } + } + // have to handle the GridPushMonitorHandler logic + } + if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) { + log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" + + ", execution is configured as asynchronous, so Outhandler will not be invoked"); + } + } private File createShellScript(JobExecutionContext context) throws IOException { ApplicationDeploymentDescriptionType app = context.getApplicationContext() .getApplicationDeploymentDescription().getType(); http://git-wip-us.apache.org/repos/asf/airavata/blob/4fbe57da/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java ---------------------------------------------------------------------- diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java index 855c9dc..3639ddd 100644 --- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java +++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java @@ -225,12 +225,17 @@ public class GSISSHAbstractCluster implements Cluster { String outputifAvailable = getOutputifAvailable(stdOutReader, "Error reading output of job submission",rawCommandInfo.getBaseCommand(jobManagerConfiguration.getInstalledPath())); // this might not be the case for all teh resources, if so Cluster implementation can override this method // because here after cancelling we try to get the job description and return it back - JobDescriptor jobById = this.getJobDescriptorById(jobID); - if (CommonUtils.isJobFinished(jobById)) { - log.debug("Job Cancel operation was successful !"); - return jobById; - } else { - log.debug("Job Cancel operation was not successful !"); + try { + JobDescriptor jobById = this.getJobDescriptorById(jobID); + if (CommonUtils.isJobFinished(jobById)) { + log.debug("Job Cancel operation was successful !"); + return jobById; + } else { + log.debug("Job Cancel operation was not successful !"); + return null; + } + }catch (Exception e){ + //its ok to fail to get status when the job is gone return null; } }
