Repository: airavata Updated Branches: refs/heads/master e290cfe17 -> b7e914ee3
Send acknowledgement to processed processes, Handle task status after execute each task and handle failed scenarios. Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b7e914ee Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b7e914ee Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b7e914ee Branch: refs/heads/master Commit: b7e914ee3c4c90b77676bd2c217bec3b3183dc76 Parents: e290cfe Author: Shameera Rathanyaka <[email protected]> Authored: Thu Jul 16 15:57:12 2015 -0400 Committer: Shameera Rathanyaka <[email protected]> Committed: Thu Jul 16 15:57:12 2015 -0400 ---------------------------------------------------------------------- .../apache/airavata/gfac/core/GFacUtils.java | 45 ++-- .../gfac/core/context/ProcessContext.java | 17 +- .../airavata/gfac/core/context/TaskContext.java | 3 + .../apache/airavata/gfac/core/task/Task.java | 11 +- .../org/apache/airavata/gfac/impl/Factory.java | 9 + .../airavata/gfac/impl/GFacEngineImpl.java | 169 ++++++++------- .../apache/airavata/gfac/impl/GFacWorker.java | 28 +++ .../gfac/impl/task/AbstractSCPTask.java | 2 - .../gfac/impl/task/ForkJobSubmissionTask.java | 5 +- .../gfac/impl/task/LocalJobSubmissionTask.java | 15 +- .../gfac/impl/task/SCPDataStageTask.java | 81 ++++--- .../gfac/impl/task/SCPInputDataStageTask.java | 14 +- .../gfac/impl/task/SCPOutputDataStatgeTask.java | 10 +- .../gfac/impl/task/SSHEnvironmentSetupTask.java | 25 ++- .../gfac/impl/task/SSHJobSubmissionTask.java | 212 ++++++++++++------- .../airavata/gfac/server/GfacServerHandler.java | 6 +- 16 files changed, 413 insertions(+), 239 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java index b00240b..af10218 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java @@ -266,9 +266,7 @@ public class GFacUtils { // first we save job jobModel to the registry for sa and then save the job status. ProcessContext processContext = taskContext.getParentProcessContext(); ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog(); - TaskStatus status = new TaskStatus(); - status.setState(state); - taskContext.setTaskStatus(status); + TaskStatus status = taskContext.getTaskStatus(); status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); experimentCatalog.add(ExpCatChildDataType.TASK_STATUS, status, taskContext.getTaskId()); TaskIdentifier identifier = new TaskIdentifier(taskContext.getTaskId(), @@ -286,20 +284,17 @@ public class GFacUtils { } } - public static void saveProcessStatus(ProcessContext processContext, - ProcessState state) throws GFacException { + public static void saveAndPublishProcessStatus(ProcessContext processContext) throws GFacException { try { // first we save job jobModel to the registry for sa and then save the job status. ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog(); - ProcessStatus status = new ProcessStatus(); - status.setState(state); - processContext.getProcessModel().setProcessStatus(status); + ProcessStatus status = processContext.getProcessStatus(); status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); experimentCatalog.add(ExpCatChildDataType.PROCESS_STATUS, status, processContext.getProcessId()); ProcessIdentifier identifier = new ProcessIdentifier(processContext.getProcessId(), processContext.getProcessModel().getExperimentId(), processContext.getGatewayId()); - ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(state, identifier); + ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(status.getState(), identifier); MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS, AiravataUtils.getId(MessageType.PROCESS.name()), processContext.getGatewayId()); msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); @@ -1094,27 +1089,35 @@ public class GFacUtils { return GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + File.separator + experimentId; } - public static void createExperimentNode(CuratorFramework curatorClient, String gfacServerName, String + public static void createProcessZKNode(CuratorFramework curatorClient, String gfacServerName, String processId, long deliveryTag, String token) throws Exception { - // create /experiments/processId node and set data - serverName, add redelivery listener - String experimentPath = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId); - ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentPath); - curatorClient.setData().withVersion(-1).forPath(experimentPath, gfacServerName.getBytes()); - curatorClient.getData().usingWatcher(new RedeliveryRequestWatcher()).forPath(experimentPath); - - // create /experiments/processId/deliveryTag node and set data - deliveryTag - String deliveryTagPath = ZKPaths.makePath(experimentPath, GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE); + // TODO - To handle multiple processes per experiment, need to create a /experiment/{expId}/{processId} node + // create /experiments/{processId} node and set data - serverName, add redelivery listener + String zkProcessNodePath = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId); + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkProcessNodePath); + curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, gfacServerName.getBytes()); + curatorClient.getData().usingWatcher(new RedeliveryRequestWatcher()).forPath(zkProcessNodePath); + + // create /experiments/{processId}/deliveryTag node and set data - deliveryTag + String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE); ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), deliveryTagPath); curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, GFacUtils.longToBytes(deliveryTag)); - // create /experiments/processId/token node and set data - token + // create /experiments/{processId}/token node and set data - token String tokenNodePath = ZKPaths.makePath(processId, GFacConstants.ZOOKEEPER_TOKEN_NODE); ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), tokenNodePath); curatorClient.setData().withVersion(-1).forPath(tokenNodePath, token.getBytes()); - // create /experiments/processId/cancelListener node and set watcher for data changes - String cancelListenerNode = ZKPaths.makePath(experimentPath, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); + // create /experiments/{processId}/cancelListener node and set watcher for data changes + String cancelListenerNode = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), cancelListenerNode); curatorClient.getData().usingWatcher(new CancelRequestWatcher()).forPath(cancelListenerNode); } + + public static long getProcessDeliveryTag(CuratorFramework curatorClient, String processId) throws Exception { + String deliveryTagPath = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + processId + GFacConstants + .ZOOKEEPER_DELIVERYTAG_NODE; + byte[] bytes = curatorClient.getData().forPath(deliveryTagPath); + return GFacUtils.bytesToLong(bytes); + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java index dc8dace..9e1ac06 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java @@ -21,7 +21,7 @@ package org.apache.airavata.gfac.core.context; -import org.apache.airavata.common.utils.LocalEventPublisher; +import org.apache.airavata.gfac.core.GFacUtils; import org.apache.airavata.gfac.core.cluster.RemoteCluster; import org.apache.airavata.messaging.core.Publisher; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; @@ -40,11 +40,15 @@ import org.apache.airavata.model.status.ProcessStatus; import org.apache.airavata.registry.cpi.AppCatalog; import org.apache.airavata.registry.cpi.ExperimentCatalog; import org.apache.curator.framework.CuratorFramework; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; public class ProcessContext { + + private static final Logger log = LoggerFactory.getLogger(ProcessContext.class); // process model private ExperimentCatalog experimentCatalog; private AppCatalog appCatalog; @@ -289,11 +293,16 @@ public class ProcessContext { public void setProcessStatus(ProcessStatus status) { if (status != null) { + log.info("expId: {}, processId: {} :- Status changed {} -> {}", getExperimentId(), processId, + getProcessState().name(), status.getState().name()); processModel.setProcessStatus(status); - // TODO publish process status change. } } + public ProcessStatus getProcessStatus(){ + return processModel.getProcessStatus(); + } + public String getComputeResourceId() { return getComputeResourceDescription().getComputeResourceId(); } @@ -321,4 +330,8 @@ public class ProcessContext { public void setLocalWorkingDir(String localWorkingDir) { this.localWorkingDir = localWorkingDir; } + + public String getExperimentId() { + return processModel.getExperimentId(); + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java index 95d2fb9..597fd2e 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java @@ -54,6 +54,9 @@ public class TaskContext { } public void setTaskStatus(TaskStatus taskStatus) { + log.info("expId: {}, processId: {}, taskId: {}, type: {}:- Status changed {} -> {}", parentProcessContext + .getExperimentId(), parentProcessContext.getProcessId(), getTaskId(), getTaskType().name(), + getTaskState().name(), taskStatus .getState().name()); taskModel.setTaskStatus(taskStatus); } http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java index 62c069a..f4eec85 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java @@ -22,6 +22,7 @@ package org.apache.airavata.gfac.core.task; import org.apache.airavata.gfac.core.context.TaskContext; import org.apache.airavata.model.status.TaskState; +import org.apache.airavata.model.status.TaskStatus; import org.apache.airavata.model.task.TaskTypes; import java.util.Map; @@ -42,19 +43,17 @@ public interface Task { * This method will be called at the first time of task chain execution. This method should called before recover * method. For a given task chain execute method only call one time. recover method may be called more than once. * @param taskContext - * @throws TaskException - * @return + * @return completed task status if success otherwise failed task status. */ - public TaskState execute(TaskContext taskContext) throws TaskException; + public TaskStatus execute(TaskContext taskContext); /** * This methond will be invoked at recover path.Before this method is invoked, execute method should be invoked. * This method may be called zero or few time in a process chain. * @param taskContext - * @throws TaskException - * @return + * @return completed task status if success otherwise failed task status. */ - public TaskState recover(TaskContext taskContext) throws TaskException; + public TaskStatus recover(TaskContext taskContext); /** * Task type will be used to identify the task behaviour. eg : DATA_STAGING , JOB_SUBMISSION http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java index 51db6f3..a5fa5ed 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java @@ -52,6 +52,7 @@ import org.apache.airavata.gfac.impl.job.UGEJobConfiguration; import org.apache.airavata.gfac.impl.job.UGEOutputParser; import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor; import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchConsumer; import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher; import org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol; import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; @@ -101,6 +102,7 @@ public abstract class Factory { private static Map<DataMovementProtocol, Task> dataMovementTask = new HashMap<>(); private static Map<ResourceJobManagerType, ResourceConfig> resources = new HashMap<>(); private static Map<MonitorMode, JobMonitor> jobMonitorServices = new HashMap<>(); + private static RabbitMQProcessLaunchConsumer processLaunchConsumer; public static GFacEngine getGFacEngine() throws GFacException { if (engine == null) { @@ -145,6 +147,13 @@ public abstract class Factory { return curatorClient; } + public static RabbitMQProcessLaunchConsumer getProcessLaunchConsumer() throws AiravataException { + if (processLaunchConsumer == null) { + processLaunchConsumer = new RabbitMQProcessLaunchConsumer(); + } + return processLaunchConsumer; + } + public static JobManagerConfiguration getJobManagerConfiguration(ResourceJobManager resourceJobManager) throws GFacException { ResourceConfig resourceConfig = Factory.getResourceConfig(resourceJobManager.getResourceJobManagerType()); OutputParser outputParser; http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java index c7eba99..a4c8381 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java @@ -124,14 +124,23 @@ public class GFacEngineImpl implements GFacEngine { TaskContext taskCtx = null; List<TaskContext> taskChain = new ArrayList<>(); processContext.setProcessStatus(new ProcessStatus(ProcessState.CONFIGURING_WORKSPACE)); + GFacUtils.saveAndPublishProcessStatus(processContext); // Run all environment setup tasks taskCtx = getEnvSetupTaskContext(processContext); saveTaskModel(taskCtx); GFacUtils.saveAndPublishTaskStatus(taskCtx); SSHEnvironmentSetupTask envSetupTask = new SSHEnvironmentSetupTask(); - executeTask(taskCtx, envSetupTask); + TaskStatus taskStatus = executeTask(taskCtx, envSetupTask); + if (taskStatus.getState() == TaskState.FAILED) { + log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " + + "reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx + .getParentProcessContext().getProcessId(), taskCtx.getTaskId(), envSetupTask.getType + ().name(), taskStatus.getReason()); + throw new GFacException("Error while environment setup"); + } // execute process inputs processContext.setProcessStatus(new ProcessStatus(ProcessState.INPUT_DATA_STAGING)); + GFacUtils.saveAndPublishProcessStatus(processContext); List<InputDataObjectType> processInputs = processContext.getProcessModel().getProcessInputs(); sortByInputOrder(processInputs); if (processInputs != null) { @@ -151,7 +160,14 @@ public class GFacEngineImpl implements GFacEngine { saveTaskModel(taskCtx); GFacUtils.saveAndPublishTaskStatus(taskCtx); Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol()); - executeTask(taskCtx, dMoveTask); + taskStatus = executeTask(taskCtx, dMoveTask); + if (taskStatus.getState() == TaskState.FAILED) { + log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " + + "reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx + .getParentProcessContext().getProcessId(), taskCtx.getTaskId(), dMoveTask.getType + ().name(), taskStatus.getReason()); + throw new GFacException("Error while staging input data"); + } break; default: // nothing to do @@ -160,29 +176,87 @@ public class GFacEngineImpl implements GFacEngine { } } processContext.setProcessStatus(new ProcessStatus(ProcessState.EXECUTING)); + GFacUtils.saveAndPublishProcessStatus(processContext); taskCtx = getJobSubmissionTaskContext(processContext); + saveTaskModel(taskCtx); GFacUtils.saveAndPublishTaskStatus(taskCtx); JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol()); - executeTask(taskCtx, jobSubmissionTask); + taskStatus = executeTask(taskCtx, jobSubmissionTask); + if (taskStatus.getState() == TaskState.FAILED) { + throw new GFacException("Job submission task failed"); + } processContext.setTaskChain(taskChain); } - private void executeTask(TaskContext taskCtx, Task task) throws GFacException { - try { - taskCtx.setTaskStatus(new TaskStatus(TaskState.EXECUTING)); - GFacUtils.saveAndPublishTaskStatus(taskCtx); - task.execute(taskCtx); - taskCtx.setTaskStatus(new TaskStatus(TaskState.COMPLETED)); - GFacUtils.saveAndPublishTaskStatus(taskCtx); - } catch (TaskException e) { - TaskStatus status = new TaskStatus(TaskState.FAILED); - status.setReason(taskCtx.getTaskType().toString() + " Task Failed to execute"); - taskCtx.setTaskStatus(status); - GFacUtils.saveAndPublishTaskStatus(taskCtx); + + @Override + public void recoverProcess(ProcessContext processContext) throws GFacException { + + } + + @Override + public void runProcessOutflow(ProcessContext processContext) throws GFacException { + TaskContext taskCtx = null; + processContext.setProcessStatus(new ProcessStatus(ProcessState.OUTPUT_DATA_STAGING)); + GFacUtils.saveAndPublishProcessStatus(processContext); + List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs(); + for (OutputDataObjectType processOutput : processOutputs) { + DataType type = processOutput.getType(); + switch (type) { + case STDERR: + break; + case STDOUT: + break; + case URI: + try { + taskCtx = getDataStagingTaskContext(processContext, processOutput); + } catch (TException e) { + throw new GFacException("Thrift model to byte[] convertion issue", e); + } + File localWorkingdir = new File(taskCtx.getLocalWorkingDir()); + localWorkingdir.mkdirs(); // make local dir if not exist + saveTaskModel(taskCtx); + GFacUtils.saveAndPublishTaskStatus(taskCtx); + Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol()); + TaskStatus taskStatus = executeTask(taskCtx, dMoveTask); + if (taskStatus.getState() == TaskState.FAILED) { + log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " + + "reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx + .getParentProcessContext().getProcessId(), taskCtx.getTaskId(), dMoveTask.getType + ().name(), taskStatus.getReason()); + throw new GFacException("Error while staging input data"); + } + break; + default: + // nothing to do + break; + } } + processContext.setProcessStatus(new ProcessStatus(ProcessState.POST_PROCESSING)); + GFacUtils.saveAndPublishProcessStatus(processContext); +// taskCtx = getEnvCleanupTaskContext(processContext); + + } + + @Override + public void recoverProcessOutflow(ProcessContext processContext) throws GFacException { + + } + + @Override + public void cancelProcess() throws GFacException { } + private TaskStatus executeTask(TaskContext taskCtx, Task task) throws GFacException { + taskCtx.setTaskStatus(new TaskStatus(TaskState.EXECUTING)); + GFacUtils.saveAndPublishTaskStatus(taskCtx); + TaskStatus taskStatus = task.execute(taskCtx); + taskCtx.setTaskStatus(taskStatus); + GFacUtils.saveAndPublishTaskStatus(taskCtx); + return taskCtx.getTaskStatus(); + } + private TaskContext getJobSubmissionTaskContext(ProcessContext processContext) throws GFacException { TaskContext taskCtx = new TaskContext(); taskCtx.setParentProcessContext(processContext); @@ -197,7 +271,8 @@ public class GFacEngineImpl implements GFacEngine { return taskCtx; } - private TaskContext getDataStagingTaskContext(ProcessContext processContext, InputDataObjectType processInput) throws TException { + private TaskContext getDataStagingTaskContext(ProcessContext processContext, InputDataObjectType processInput) + throws TException { TaskContext taskCtx = new TaskContext(); taskCtx.setParentProcessContext(processContext); // create new task model for this task @@ -210,13 +285,15 @@ public class GFacEngineImpl implements GFacEngine { // create data staging sub task model DataStagingTaskModel submodel = new DataStagingTaskModel(); submodel.setSource(processInput.getValue()); - submodel.setDestination(processContext.getDataMovementProtocol().name() + ":" + processContext.getWorkingDir()); + submodel.setDestination(processContext.getDataMovementProtocol().name() + ":" + processContext.getWorkingDir + ()); taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel)); taskCtx.setTaskModel(taskModel); return taskCtx; } - private TaskContext getDataStagingTaskContext(ProcessContext processContext, OutputDataObjectType processOutput) throws TException { + private TaskContext getDataStagingTaskContext(ProcessContext processContext, OutputDataObjectType processOutput) + throws TException { TaskContext taskCtx = new TaskContext(); taskCtx.setParentProcessContext(processContext); // create new task model for this task @@ -241,7 +318,6 @@ public class GFacEngineImpl implements GFacEngine { /** * Persist task model - * @param taskContext */ private void saveTaskModel(TaskContext taskContext) throws GFacException { try { @@ -266,56 +342,6 @@ public class GFacEngineImpl implements GFacEngine { return taskCtx; } - @Override - public void recoverProcess(ProcessContext processContext) throws GFacException { - - } - - @Override - public void runProcessOutflow(ProcessContext processContext) throws GFacException { - TaskContext taskCtx = null; - processContext.setProcessStatus(new ProcessStatus(ProcessState.OUTPUT_DATA_STAGING)); - List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs(); - for (OutputDataObjectType processOutput : processOutputs) { - DataType type = processOutput.getType(); - switch (type) { - case STDERR: - break; - case STDOUT: - break; - case URI: - // TODO : Provide data staging data model - try { - taskCtx = getDataStagingTaskContext(processContext, processOutput); - } catch (TException e) { - throw new GFacException("Thrift model to byte[] convertion issue", e); - } - File localWorkingdir = new File(taskCtx.getLocalWorkingDir()); - localWorkingdir.mkdirs(); // make local dir if not exist - saveTaskModel(taskCtx); - GFacUtils.saveAndPublishTaskStatus(taskCtx); - Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol()); - executeTask(taskCtx, dMoveTask); - break; - default: - // nothing to do - break; - } - } - processContext.setProcessStatus(new ProcessStatus(ProcessState.POST_PROCESSING)); -// taskCtx = getEnvCleanupTaskContext(processContext); - - } - - @Override - public void recoverProcessOutflow(ProcessContext processContext) throws GFacException { - - } - - @Override - public void cancelProcess() throws GFacException { - - } /** * Sort input data type by input order. @@ -331,7 +357,7 @@ public class GFacEngineImpl implements GFacEngine { public static ResourceJobManager getResourceJobManager(ProcessContext processCtx) throws AppCatalogException { List<JobSubmissionInterface> jobSubmissionInterfaces = Factory.getDefaultAppCatalog().getComputeResource() - .getComputeResource(processCtx.getComputeResourceId()) .getJobSubmissionInterfaces(); + .getComputeResource(processCtx.getComputeResourceId()).getJobSubmissionInterfaces(); ResourceJobManager resourceJobManager = null; JobSubmissionInterface jsInterface = null; @@ -346,7 +372,8 @@ public class GFacEngineImpl implements GFacEngine { } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) { SSHJobSubmission sshJobSubmission = Factory.getDefaultAppCatalog().getComputeResource().getSSHJobSubmission (jsInterface.getJobSubmissionInterfaceId()); - processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move this to populate process context method. + processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move this to populate process + // context method. resourceJobManager = sshJobSubmission.getResourceJobManager(); } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.LOCAL) { LOCALSubmission localSubmission = Factory.getDefaultAppCatalog().getComputeResource().getLocalJobSubmission http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java index 899f684..a759f90 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java @@ -25,6 +25,7 @@ import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.gfac.core.GFac; import org.apache.airavata.gfac.core.GFacEngine; import org.apache.airavata.gfac.core.GFacException; +import org.apache.airavata.gfac.core.GFacUtils; import org.apache.airavata.gfac.core.context.ProcessContext; import org.apache.airavata.gfac.core.monitor.JobMonitor; import org.apache.airavata.model.status.ProcessState; @@ -32,6 +33,8 @@ import org.apache.airavata.model.status.ProcessStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.text.MessageFormat; + public class GFacWorker implements Runnable { private static final Logger log = LoggerFactory.getLogger(GFacWorker.class); @@ -85,11 +88,15 @@ public class GFacWorker implements Runnable { // run the outflow task engine.runProcessOutflow(processContext); processContext.setProcessStatus(new ProcessStatus(ProcessState.COMPLETED)); + GFacUtils.saveAndPublishProcessStatus(processContext); + sendAck(); break; case RECOVER_OUTFLOW: // recover outflow task; engine.recoverProcessOutflow(processContext); processContext.setProcessStatus(new ProcessStatus(ProcessState.COMPLETED)); + GFacUtils.saveAndPublishProcessStatus(processContext); + sendAck(); break; default: throw new GFacException("process Id : " + processId + " Couldn't identify process type"); @@ -113,6 +120,14 @@ public class GFacWorker implements Runnable { } } catch (GFacException e) { log.error("GFac Worker throws an exception", e); + processContext.setProcessStatus(new ProcessStatus(ProcessState.FAILED)); + try { + GFacUtils.saveAndPublishProcessStatus(processContext); + } catch (GFacException e1) { + log.error("expId: {}, processId: {} :- Couldn't save and publish process status {}", processContext + .getExperimentId(), processContext.getProcessId(), processContext.getProcessState()); + } + sendAck(); } } @@ -142,6 +157,19 @@ public class GFacWorker implements Runnable { } } + private void sendAck() { + try { + long processDeliveryTag = GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(), processId); + Factory.getProcessLaunchConsumer().sendAck(processDeliveryTag); + log.info("expId: {}, procesId: {} :- Sent ack for deliveryTag {}", processContext.getExperimentId(), + processId, processDeliveryTag); + } catch (Exception e1) { + String format = MessageFormat.format("expId: {0}, processId: {1} :- Couldn't send ack for deliveryTag ", + processContext .getExperimentId(), processId); + log.error(format, e1); + } + } + private ProcessType getProcessType(ProcessContext processContext) { // check the status and return correct type of process. switch (processContext.getProcessState()) { http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java index a34997c..17746f4 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java @@ -20,10 +20,8 @@ */ package org.apache.airavata.gfac.impl.task; -import org.apache.airavata.gfac.core.context.TaskContext; import org.apache.airavata.gfac.core.task.Task; import org.apache.airavata.gfac.core.task.TaskException; -import org.apache.airavata.model.status.TaskState; import java.util.Map; http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java index dbc1a97..f8ef0ea 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java @@ -25,6 +25,7 @@ import org.apache.airavata.gfac.core.context.TaskContext; import org.apache.airavata.gfac.core.task.JobSubmissionTask; import org.apache.airavata.gfac.core.task.TaskException; import org.apache.airavata.model.status.TaskState; +import org.apache.airavata.model.status.TaskStatus; import org.apache.airavata.model.task.TaskTypes; import java.util.Map; @@ -36,12 +37,12 @@ public class ForkJobSubmissionTask implements JobSubmissionTask { } @Override - public TaskState execute(TaskContext taskContext) throws TaskException { + public TaskStatus execute(TaskContext taskContext) { return null; } @Override - public TaskState recover(TaskContext taskContext) throws TaskException { + public TaskStatus recover(TaskContext taskContext) { return null; } http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java index ad7ab6d..5201de6 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java @@ -36,6 +36,7 @@ import org.apache.airavata.model.application.io.InputDataObjectType; import org.apache.airavata.model.job.JobModel; import org.apache.airavata.model.status.JobState; import org.apache.airavata.model.status.TaskState; +import org.apache.airavata.model.status.TaskStatus; import org.apache.airavata.model.task.TaskTypes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,8 +54,8 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{ } @Override - public TaskState execute(TaskContext taskContext) throws TaskException { - try { + public TaskStatus execute(TaskContext taskContext) { + /* try { ProcessContext processContext = taskContext.getParentProcessContext(); // build command with all inputs List<String> cmdList = buildCommand(processContext); @@ -97,10 +98,10 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{ standardOutWriter.join(); standardErrorWriter.join(); - /* + *//* * check return value. usually not very helpful to draw conclusions based on return values so don't bother. * just provide warning in the log messages - */ + *//* if (returnValue != 0) { log.error("Process finished with non zero return value. Process may have failed"); } else { @@ -124,12 +125,12 @@ public class LocalJobSubmissionTask implements JobSubmissionTask{ } catch (IOException e) { log.error("Error while submitting local job", e); throw new TaskException("Error while submitting local job", e); - } - return TaskState.COMPLETED; + }*/ + return new TaskStatus(TaskState.COMPLETED); } @Override - public TaskState recover(TaskContext taskContext) throws TaskException { + public TaskStatus recover(TaskContext taskContext) { return null; } http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java index 089535e..b2a83ed 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java @@ -20,63 +20,88 @@ */ package org.apache.airavata.gfac.impl.task; -import com.jcraft.jsch.JSch; -import com.jcraft.jsch.JSchException; -import com.jcraft.jsch.Session; import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.gfac.core.SSHApiException; import org.apache.airavata.gfac.core.context.TaskContext; import org.apache.airavata.gfac.core.task.Task; import org.apache.airavata.gfac.core.task.TaskException; -import org.apache.airavata.gfac.impl.SSHUtils; +import org.apache.airavata.model.commons.ErrorModel; import org.apache.airavata.model.status.TaskState; +import org.apache.airavata.model.status.TaskStatus; import org.apache.airavata.model.task.DataStagingTaskModel; import org.apache.airavata.model.task.TaskTypes; import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Map; public class SCPDataStageTask implements Task { + private static final Logger log = LoggerFactory.getLogger(SCPDataStageTask.class); + @Override public void init(Map<String, String> propertyMap) throws TaskException { } @Override - public TaskState execute(TaskContext taskContext) throws TaskException { - + public TaskStatus execute(TaskContext taskContext) { + TaskStatus status = new TaskStatus(TaskState.COMPLETED); if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) { - throw new TaskException("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found " + status.setState(TaskState.FAILED); + status.setReason("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found " + taskContext.getTaskModel().getTaskType().toString()); - } - try { - DataStagingTaskModel subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext - .getTaskModel()); - URI sourceURI = new URI(subTaskModel.getSource()); - URI destinationURI = new URI(subTaskModel.getDestination()); + } else { + try { + DataStagingTaskModel subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext + .getTaskModel()); + URI sourceURI = new URI(subTaskModel.getSource()); + URI destinationURI = new URI(subTaskModel.getDestination()); - if (sourceURI.getScheme().equalsIgnoreCase("file")) { // Airavata --> RemoteCluster - taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURI.getPath(), destinationURI - .getPath()); - } else { // RemoteCluster --> Airavata - taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURI.getPath(), destinationURI - .getPath()); + if (sourceURI.getScheme().equalsIgnoreCase("file")) { // Airavata --> RemoteCluster + taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURI.getPath(), destinationURI + .getPath()); + } else { // RemoteCluster --> Airavata + taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURI.getPath(), destinationURI + .getPath()); + } + status.setReason("Successfully staged data"); + } catch (SSHApiException e) { + String msg = "Scp attempt failed"; + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (TException e) { + String msg = "Invalid task invocation"; + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (URISyntaxException e) { + String msg = "source or destination is not a valid URI"; + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); } - } catch (SSHApiException e) { - throw new TaskException("Scp attempt failed", e); - } catch (TException e) { - throw new TaskException("Invalid task invocation"); - } catch (URISyntaxException e) { - throw new TaskException("source or destination is not a valid URI"); } - return null; + return status; } @Override - public TaskState recover(TaskContext taskContext) throws TaskException { + public TaskStatus recover(TaskContext taskContext) { return null; } http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java index 332a0aa..fc4d634 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java @@ -23,23 +23,19 @@ package org.apache.airavata.gfac.impl.task; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; -import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.gfac.core.SSHApiException; import org.apache.airavata.gfac.core.context.TaskContext; import org.apache.airavata.gfac.core.task.TaskException; -import org.apache.airavata.gfac.impl.Factory; import org.apache.airavata.gfac.impl.SSHUtils; -import org.apache.airavata.model.status.TaskState; +import org.apache.airavata.model.status.TaskStatus; import org.apache.airavata.model.task.DataStagingTaskModel; import org.apache.airavata.model.task.TaskTypes; import org.apache.thrift.TException; import java.io.IOException; -import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; -import java.net.URL; public class SCPInputDataStageTask extends AbstractSCPTask { @@ -47,9 +43,9 @@ public class SCPInputDataStageTask extends AbstractSCPTask { } @Override - public TaskState execute(TaskContext taskContext) throws TaskException { + public TaskStatus execute(TaskContext taskContext) { - if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) { +/* if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) { throw new TaskException("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found " + taskContext.getTaskModel().getTaskType().toString()); } @@ -81,12 +77,12 @@ public class SCPInputDataStageTask extends AbstractSCPTask { throw new TaskException("Invalid task invocation"); } catch (URISyntaxException e) { e.printStackTrace(); - } + }*/ return null; } @Override - public TaskState recover(TaskContext taskContext) throws TaskException { + public TaskStatus recover(TaskContext taskContext) { return null; } http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java index 72e071c..6fa87c4 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java @@ -28,7 +28,7 @@ import org.apache.airavata.gfac.core.SSHApiException; import org.apache.airavata.gfac.core.context.TaskContext; import org.apache.airavata.gfac.core.task.TaskException; import org.apache.airavata.gfac.impl.SSHUtils; -import org.apache.airavata.model.status.TaskState; +import org.apache.airavata.model.status.TaskStatus; import org.apache.airavata.model.task.DataStagingTaskModel; import org.apache.airavata.model.task.TaskTypes; import org.apache.thrift.TException; @@ -41,8 +41,8 @@ public class SCPOutputDataStatgeTask extends AbstractSCPTask { @Override - public TaskState execute(TaskContext taskContext) throws TaskException { - if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) { + public TaskStatus execute(TaskContext taskContext) { +/* if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) { throw new TaskException("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found " + taskContext.getTaskModel().getTaskType().toString()); } @@ -69,12 +69,12 @@ public class SCPOutputDataStatgeTask extends AbstractSCPTask { throw new TaskException("Scp failed", e); } catch (TException e) { throw new TaskException("Invalid task invocation"); - } + }*/ return null; } @Override - public TaskState recover(TaskContext taskContext) throws TaskException { + public TaskStatus recover(TaskContext taskContext) { return null; } http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java index e541644..74f5826 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java @@ -20,37 +20,50 @@ */ package org.apache.airavata.gfac.impl.task; -import org.apache.airavata.gfac.core.task.TaskException; import org.apache.airavata.gfac.core.SSHApiException; import org.apache.airavata.gfac.core.cluster.RemoteCluster; import org.apache.airavata.gfac.core.context.TaskContext; import org.apache.airavata.gfac.core.task.Task; +import org.apache.airavata.gfac.core.task.TaskException; +import org.apache.airavata.model.commons.ErrorModel; import org.apache.airavata.model.status.TaskState; +import org.apache.airavata.model.status.TaskStatus; import org.apache.airavata.model.task.TaskTypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; public class SSHEnvironmentSetupTask implements Task { + private static final Logger log = LoggerFactory.getLogger(SSHEnvironmentSetupTask.class); @Override public void init(Map<String, String> propertyMap) throws TaskException { } @Override - public TaskState execute(TaskContext taskContext) throws TaskException { - + public TaskStatus execute(TaskContext taskContext) { + TaskStatus status = new TaskStatus(TaskState.COMPLETED); try { RemoteCluster remoteCluster = taskContext.getParentProcessContext().getRemoteCluster(); remoteCluster.makeDirectory(taskContext.getParentProcessContext().getWorkingDir()); + status.setReason("Successfully createded environment"); } catch (SSHApiException e) { - throw new TaskException("Error while environment setup", e); + String msg = "Error while environment setup"; + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); } - return null; + return status; } @Override - public TaskState recover(TaskContext taskContext) throws TaskException { + public TaskStatus recover(TaskContext taskContext) { return execute(taskContext); } http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java index ff3a6f8..c282f17 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java @@ -31,10 +31,12 @@ import org.apache.airavata.gfac.core.task.JobSubmissionTask; import org.apache.airavata.gfac.core.task.TaskException; import org.apache.airavata.gfac.impl.Factory; import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager; +import org.apache.airavata.model.commons.ErrorModel; import org.apache.airavata.model.job.JobModel; import org.apache.airavata.model.status.JobState; import org.apache.airavata.model.status.JobStatus; import org.apache.airavata.model.status.TaskState; +import org.apache.airavata.model.status.TaskStatus; import org.apache.airavata.model.task.TaskTypes; import org.apache.airavata.registry.cpi.AppCatalogException; import org.apache.commons.io.FileUtils; @@ -53,87 +55,143 @@ public class SSHJobSubmissionTask implements JobSubmissionTask { } @Override - public TaskState execute(TaskContext taskContext) throws TaskException { - try { - ProcessContext processContext = taskContext.getParentProcessContext(); - JobModel jobModel = processContext.getJobModel(); - if (jobModel == null){ - jobModel = new JobModel(); - jobModel.setWorkingDir(processContext.getWorkingDir()); - jobModel.setTaskId(taskContext.getTaskId()); - jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime()); - } - RemoteCluster remoteCluster = processContext.getRemoteCluster(); - JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext); - jobModel.setJobName(jobDescriptor.getJobName()); - ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext); - JobManagerConfiguration jConfig = null; - if (resourceJobManager != null) { - jConfig = Factory.getJobManagerConfiguration(resourceJobManager); - } - File jobFile = GFacUtils.createJobFile(jobDescriptor, jConfig); - if (jobFile != null && jobFile.exists()){ - jobModel.setJobDescription(FileUtils.readFileToString(jobFile)); - String jobId = remoteCluster.submitBatchJob(jobFile.getPath(), processContext.getWorkingDir()); - if (jobId != null && !jobId.isEmpty()) { - jobModel.setJobId(jobId); - GFacUtils.saveJobStatus(taskContext, jobModel, JobState.SUBMITTED); + public TaskStatus execute(TaskContext taskContext){ + TaskStatus status = new TaskStatus(TaskState.COMPLETED); // set to completed. + try { + ProcessContext processContext = taskContext.getParentProcessContext(); + JobModel jobModel = processContext.getJobModel(); + if (jobModel == null) { + jobModel = new JobModel(); + jobModel.setWorkingDir(processContext.getWorkingDir()); + jobModel.setTaskId(taskContext.getTaskId()); + jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime()); + } + RemoteCluster remoteCluster = processContext.getRemoteCluster(); + JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(processContext); + jobModel.setJobName(jobDescriptor.getJobName()); + ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext); + JobManagerConfiguration jConfig = null; + if (resourceJobManager != null) { + jConfig = Factory.getJobManagerConfiguration(resourceJobManager); + } + File jobFile = GFacUtils.createJobFile(jobDescriptor, jConfig); + if (jobFile != null && jobFile.exists()) { + jobModel.setJobDescription(FileUtils.readFileToString(jobFile)); + String jobId = remoteCluster.submitBatchJob(jobFile.getPath(), processContext.getWorkingDir()); + if (jobId != null && !jobId.isEmpty()) { + jobModel.setJobId(jobId); + GFacUtils.saveJobStatus(taskContext, jobModel, JobState.SUBMITTED); // publisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) // , GfacExperimentState.JOBSUBMITTED)); - processContext.setJobModel(jobModel); - if (verifyJobSubmissionByJobId(remoteCluster, jobId)) { + processContext.setJobModel(jobModel); + if (verifyJobSubmissionByJobId(remoteCluster, jobId)) { // publisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) // , GfacExperimentState.JOBSUBMITTED)); - GFacUtils.saveJobStatus(taskContext, jobModel, JobState.QUEUED); - } - } else { - processContext.setJobModel(jobModel); - int verificationTryCount = 0; - while (verificationTryCount++ < 3) { - String verifyJobId = verifyJobSubmission(remoteCluster, jobModel); - if (verifyJobId != null && !verifyJobId.isEmpty()) { - // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED - jobId = verifyJobId; - jobModel.setJobId(jobId); + GFacUtils.saveJobStatus(taskContext, jobModel, JobState.QUEUED); + } + status = new TaskStatus(TaskState.COMPLETED); + status.setReason("Submitted job to compute resource"); + } else { + processContext.setJobModel(jobModel); + int verificationTryCount = 0; + while (verificationTryCount++ < 3) { + String verifyJobId = verifyJobSubmission(remoteCluster, jobModel); + if (verifyJobId != null && !verifyJobId.isEmpty()) { + // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED + jobId = verifyJobId; + jobModel.setJobId(jobId); // publisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) // , GfacExperimentState.JOBSUBMITTED)); - GFacUtils.saveJobStatus(taskContext, jobModel, JobState.QUEUED); - break; - } - Thread.sleep(verificationTryCount * 1000); - } - } + GFacUtils.saveJobStatus(taskContext, jobModel, JobState.QUEUED); + status.setState(TaskState.COMPLETED); + status.setReason("Submitted job to compute resource"); + break; + } + Thread.sleep(verificationTryCount * 1000); + } + } - if (jobId == null || jobId.isEmpty()) { - String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find remote jobId for JobName:" - + jobModel.getJobName() + ", both submit and verify steps doesn't return a valid JobId. Hence changing experiment state to Failed"; - log.error(msg); - GFacUtils.saveErrorDetails(processContext, msg); - // FIXME : Need to handle according to status update chain -// GFacUtils.publishTaskStatus(jobExecutionContext, publisher, TaskState.FAILED); - return TaskState.FAILED; - } - } - return TaskState.COMPLETED; - } catch (AppCatalogException e) { - log.error("Error while instatiating app catalog",e); - throw new TaskException("Error while instatiating app catalog", e); - } catch (ApplicationSettingsException e) { - log.error("Error occurred while creating job descriptor", e); - throw new TaskException("Error occurred while creating job descriptor", e); - } catch (GFacException e) { - log.error("Error occurred while creating job descriptor", e); - throw new TaskException("Error occurred while creating job descriptor", e); - } catch (SSHApiException e) { - log.error("Error occurred while submitting the job", e); - throw new TaskException("Error occurred while submitting the job", e); - } catch (IOException e) { - log.error("Error while reading the content of the job file", e); - throw new TaskException("Error while reading the content of the job file", e); - } catch (InterruptedException e) { - log.error("Error occurred while verifying the job submission", e); - throw new TaskException("Error occurred while verifying the job submission", e); - } + if (jobId == null || jobId.isEmpty()) { + String msg = "expId:" + processContext.getProcessModel().getExperimentId() + " Couldn't find " + + "remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " + + "doesn't return a valid JobId. " + "Hence changing experiment state to Failed"; + log.error(msg); + GFacUtils.saveErrorDetails(processContext, msg); + status.setState(TaskState.FAILED); + status.setReason("Couldn't find job id in both submitted and verified steps"); + } + } else { + status.setState(TaskState.FAILED); + if (jobFile == null) { + status.setReason("JobFile is null"); + } else { + status.setReason("Job file doesn't exist"); + } + } + + } catch (AppCatalogException e) { + String msg = "Error while instatiating app catalog"; + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (ApplicationSettingsException e) { + String msg = "Error occurred while creating job descriptor"; + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (GFacException e) { + String msg = "Error occurred while creating job descriptor"; + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (SSHApiException e) { + String msg = "Error occurred while submitting the job"; + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (IOException e) { + String msg = "Error while reading the content of the job file"; + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } catch (InterruptedException e) { + String msg = "Error occurred while verifying the job submission"; + log.error(msg, e); + status.setState(TaskState.FAILED); + status.setReason(msg); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskError(errorModel); + } + + taskContext.setTaskStatus(status); + try { + GFacUtils.saveAndPublishTaskStatus(taskContext); + } catch (GFacException e) { + log.error("Error while saving task status", e); + } + return status; } private boolean verifyJobSubmissionByJobId(RemoteCluster remoteCluster, String jobID) throws SSHApiException { @@ -154,15 +212,15 @@ public class SSHJobSubmissionTask implements JobSubmissionTask { @Override - public TaskState recover(TaskContext taskContext) throws TaskException { + public TaskStatus recover(TaskContext taskContext) { ProcessContext processContext = taskContext.getParentProcessContext(); JobModel jobModel = processContext.getJobModel(); // original job failed before submitting if (jobModel == null || jobModel.getJobId() == null ){ return execute(taskContext); }else { - // job is already submitted and monitor should handle the recovery - return TaskState.COMPLETED; + // job is already submitted and monitor should handle the recovery + return new TaskStatus(TaskState.COMPLETED); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 6fa1288..56d9d9c 100644 --- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -38,7 +38,6 @@ import org.apache.airavata.messaging.core.MessageHandler; import org.apache.airavata.messaging.core.MessagingConstants; import org.apache.airavata.messaging.core.Publisher; import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchConsumer; -import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher; import org.apache.airavata.model.messaging.event.*; import org.apache.airavata.model.status.ProcessState; import org.apache.airavata.model.status.ProcessStatus; @@ -92,7 +91,7 @@ public class GfacServerHandler implements GfacService.Iface { } private void initAMQPClient() throws AiravataException { - rabbitMQProcessLaunchConsumer = new RabbitMQProcessLaunchConsumer(); + rabbitMQProcessLaunchConsumer = Factory.getProcessLaunchConsumer(); rabbitMQProcessLaunchConsumer.listen(new ProcessLaunchMessageHandler()); } @@ -220,7 +219,8 @@ public class GfacServerHandler implements GfacService.Iface { status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, status, event.getProcessId()); try { - GFacUtils.createExperimentNode(curatorClient, gfacServerName, event.getProcessId(), message.getDeliveryTag(), + GFacUtils.createProcessZKNode(curatorClient, gfacServerName, event.getProcessId(), message + .getDeliveryTag(), event.getTokenId()); submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId()); } catch (Exception e) {
