Repository: airavata Updated Branches: refs/heads/master b98f65997 -> 1de22283b
Fixed process recovery, fixed job status update and retrieval issues. Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/1de22283 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/1de22283 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/1de22283 Branch: refs/heads/master Commit: 1de22283bce653c76f111125af429869fc05b60f Parents: b98f659 Author: Shameera Rathanyaka <[email protected]> Authored: Mon Aug 24 17:30:11 2015 -0400 Committer: Shameera Rathanyaka <[email protected]> Committed: Mon Aug 24 17:30:11 2015 -0400 ---------------------------------------------------------------------- .../airavata/gfac/impl/GFacEngineImpl.java | 160 ++++++++++++++----- .../apache/airavata/gfac/impl/GFacWorker.java | 143 +++++------------ .../gfac/monitor/email/EmailBasedMonitor.java | 4 +- .../catalog/impl/ExperimentRegistry.java | 19 ++- .../experiment/catalog/resources/Utils.java | 5 +- .../utils/ThriftDataModelConversion.java | 5 +- 6 files changed, 183 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/1de22283/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java index 1f5526a..b97915c 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java @@ -133,27 +133,48 @@ public class GFacEngineImpl implements GFacEngine { if (processContext.isHandOver()) { return; } - TaskContext taskCtx = null; - List<TaskContext> taskChain = new ArrayList<>(); - processContext.setProcessStatus(new ProcessStatus(ProcessState.CONFIGURING_WORKSPACE)); +// List<TaskContext> taskChain = new ArrayList<>(); + if (configureWorkspace(processContext, false)) return; + + // exit if process is handed over to another instance while input staging + if (inputDataStaging(processContext, false)) return; + + // exit if process is handed orver to another instance while job submission. + if (executeJobSubmission(processContext)) return; +// processContext.setTaskChain(taskChain); + if (processContext.isHandOver()) { + return; + } + } + + private boolean executeJobSubmission(ProcessContext processContext) throws GFacException { + if (processContext.isHandOver()) { + return true; + } + TaskContext taskCtx; + TaskStatus taskStatus; + processContext.setProcessStatus(new ProcessStatus(ProcessState.EXECUTING)); + JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol()); + if (processContext.isHandOver()) { + return true; + } GFacUtils.saveAndPublishProcessStatus(processContext); - // Run all environment setup tasks - taskCtx = getEnvSetupTaskContext(processContext); + taskCtx = getJobSubmissionTaskContext(processContext); saveTaskModel(taskCtx); GFacUtils.saveAndPublishTaskStatus(taskCtx); - SSHEnvironmentSetupTask envSetupTask = new SSHEnvironmentSetupTask(); - TaskStatus taskStatus = executeTask(taskCtx, envSetupTask); + taskStatus = executeTask(taskCtx, jobSubmissionTask, false); if (taskStatus.getState() == TaskState.FAILED) { - log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " + - "reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx - .getParentProcessContext().getProcessId(), taskCtx.getTaskId(), envSetupTask.getType - ().name(), taskStatus.getReason()); - throw new GFacException("Error while environment setup"); + throw new GFacException("Job submission task failed"); } + return processContext.isHandOver(); + } + + private boolean inputDataStaging(ProcessContext processContext, boolean recover) throws GFacException { if (processContext.isHandOver()) { - return; + return true; } - // execute process inputs + TaskContext taskCtx; + TaskStatus taskStatus;// execute process inputs processContext.setProcessStatus(new ProcessStatus(ProcessState.INPUT_DATA_STAGING)); GFacUtils.saveAndPublishProcessStatus(processContext); List<InputDataObjectType> processInputs = processContext.getProcessModel().getProcessInputs(); @@ -161,7 +182,7 @@ public class GFacEngineImpl implements GFacEngine { if (processInputs != null) { for (InputDataObjectType processInput : processInputs) { if (processContext.isHandOver()) { - return; + return true; } DataType type = processInput.getType(); switch (type) { @@ -178,7 +199,7 @@ public class GFacEngineImpl implements GFacEngine { saveTaskModel(taskCtx); GFacUtils.saveAndPublishTaskStatus(taskCtx); Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol()); - taskStatus = executeTask(taskCtx, dMoveTask); + taskStatus = executeTask(taskCtx, dMoveTask, false); if (taskStatus.getState() == TaskState.FAILED) { log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " + "reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx @@ -193,32 +214,57 @@ public class GFacEngineImpl implements GFacEngine { } } } + return processContext.isHandOver(); + } + + private boolean configureWorkspace(ProcessContext processContext, boolean recover) throws GFacException { if (processContext.isHandOver()) { - return; + return true; } - processContext.setProcessStatus(new ProcessStatus(ProcessState.EXECUTING)); + TaskContext taskCtx; + processContext.setProcessStatus(new ProcessStatus(ProcessState.CONFIGURING_WORKSPACE)); GFacUtils.saveAndPublishProcessStatus(processContext); - taskCtx = getJobSubmissionTaskContext(processContext); + // Run all environment setup tasks + taskCtx = getEnvSetupTaskContext(processContext); saveTaskModel(taskCtx); GFacUtils.saveAndPublishTaskStatus(taskCtx); - JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol()); - if (processContext.isHandOver()) { - return; - } - taskStatus = executeTask(taskCtx, jobSubmissionTask); + SSHEnvironmentSetupTask envSetupTask = new SSHEnvironmentSetupTask(); + TaskStatus taskStatus = executeTask(taskCtx, envSetupTask, recover); if (taskStatus.getState() == TaskState.FAILED) { - throw new GFacException("Job submission task failed"); - } - processContext.setTaskChain(taskChain); - if (processContext.isHandOver()) { - return; + log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " + + "reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx + .getParentProcessContext().getProcessId(), taskCtx.getTaskId(), envSetupTask.getType + ().name(), taskStatus.getReason()); + throw new GFacException("Error while environment setup"); } + return processContext.isHandOver(); } @Override public void recoverProcess(ProcessContext processContext) throws GFacException { - + ProcessState state = processContext.getProcessStatus().getState(); + switch (state) { + case CREATED: + case VALIDATED: + executeProcess(processContext); + break; + case PRE_PROCESSING: + case CONFIGURING_WORKSPACE: + if (configureWorkspace(processContext, true)) return; + if (inputDataStaging(processContext, false)) return; + if (executeJobSubmission(processContext)) return; + break; + case INPUT_DATA_STAGING: + if (inputDataStaging(processContext, true)) return; + if (executeJobSubmission(processContext)) return; + break; + case EXECUTING: + if (executeJobSubmission(processContext)) return; + break; + default: + throw new GFacException("Invalid process recovery invocation"); + } } @Override @@ -226,13 +272,35 @@ public class GFacEngineImpl implements GFacEngine { if (processContext.isHandOver()) { return; } - TaskContext taskCtx = null; + // exit if process is handed over to another instance while output staging. + if (outpuDataStaging(processContext, false)) return; + + if (processContext.isHandOver()) { + return; + } + + postProcessing(processContext,false); + + if (processContext.isHandOver()) { + return; + } + } + + private boolean postProcessing(ProcessContext processContext, boolean recovery) throws GFacException { + processContext.setProcessStatus(new ProcessStatus(ProcessState.POST_PROCESSING)); + GFacUtils.saveAndPublishProcessStatus(processContext); +// taskCtx = getEnvCleanupTaskContext(processContext); + return processContext.isHandOver(); + } + + private boolean outpuDataStaging(ProcessContext processContext, boolean recovery) throws GFacException { + TaskContext taskCtx; processContext.setProcessStatus(new ProcessStatus(ProcessState.OUTPUT_DATA_STAGING)); GFacUtils.saveAndPublishProcessStatus(processContext); List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs(); for (OutputDataObjectType processOutput : processOutputs) { if (processContext.isHandOver()) { - return; + return true; } DataType type = processOutput.getType(); switch (type) { @@ -251,7 +319,7 @@ public class GFacEngineImpl implements GFacEngine { saveTaskModel(taskCtx); GFacUtils.saveAndPublishTaskStatus(taskCtx); Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol()); - TaskStatus taskStatus = executeTask(taskCtx, dMoveTask); + TaskStatus taskStatus = executeTask(taskCtx, dMoveTask, recovery); if (taskStatus.getState() == TaskState.FAILED) { log.error("expId: {}, processId: {}, taskId: {} type: {},:- Input statging failed, " + "reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx @@ -265,17 +333,20 @@ public class GFacEngineImpl implements GFacEngine { break; } } - if (processContext.isHandOver()) { - return; - } - processContext.setProcessStatus(new ProcessStatus(ProcessState.POST_PROCESSING)); - GFacUtils.saveAndPublishProcessStatus(processContext); -// taskCtx = getEnvCleanupTaskContext(processContext); - + return false; } @Override public void recoverProcessOutflow(ProcessContext processContext) throws GFacException { + ProcessState processState = processContext.getProcessStatus().getState(); + switch (processState) { + case OUTPUT_DATA_STAGING: + if (outpuDataStaging(processContext, true)) return; + if (postProcessing(processContext, false)) return; + case POST_PROCESSING: + postProcessing(processContext, true); + break; + } runProcessOutflow(processContext); // TODO implement recover steps } @@ -284,10 +355,15 @@ public class GFacEngineImpl implements GFacEngine { } - private TaskStatus executeTask(TaskContext taskCtx, Task task) throws GFacException { + private TaskStatus executeTask(TaskContext taskCtx, Task task, boolean recover) throws GFacException { taskCtx.setTaskStatus(new TaskStatus(TaskState.EXECUTING)); GFacUtils.saveAndPublishTaskStatus(taskCtx); - TaskStatus taskStatus = task.execute(taskCtx); + TaskStatus taskStatus = null; + if (recover) { + taskStatus = task.recover(taskCtx); + } else { + taskStatus = task.execute(taskCtx); + } taskCtx.setTaskStatus(taskStatus); GFacUtils.saveAndPublishTaskStatus(taskCtx); return taskCtx.getTaskStatus(); http://git-wip-us.apache.org/repos/asf/airavata/blob/1de22283/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java index 35b0ed6..8d3c5e0 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java @@ -78,68 +78,46 @@ public class GFacWorker implements Runnable { return; } try { - ProcessType type = getProcessType(processContext); - try { - switch (type) { - case NEW: - executeProcess(); - break; - case RECOVER: - recoverProcess(); - break; - case RECOVER_MONITORING: - monitorProcess(); - // TODO get monitor mode from process and get correct monitor service instead default service. - break; - case RUN_OUTFLOW: - // run the outflow task + ProcessState processState = processContext.getProcessStatus().getState(); + switch (processState) { + case CREATED: + case VALIDATED: + executeProcess(); + break; + case PRE_PROCESSING: + case CONFIGURING_WORKSPACE: + case INPUT_DATA_STAGING: + case EXECUTING: + recoverProcess(); + break; + case MONITORING: + if (runOutflow) { runProcessOutflow(); - break; - case RECOVER_OUTFLOW: - // recover outflow task; - recoverProcessOutflow(); - break; - case COMPLETED: - completeProcess(); - break; - case CANCELED: - // TODO - implement cancel scenario - break; - case FAILED: - // TODO - implement failed scenario - break; - default: - throw new GFacException("process Id : " + processId + " Couldn't identify process type"); - } - } catch (GFacException e) { - switch (type) { - case NEW: - log.error("Process execution error", e); - break; - case RECOVER: - log.error("Process recover error ", e); - break; - case RECOVER_MONITORING: - log.error("Process monitoring recovery error", e); - break; - case RUN_OUTFLOW: - log.error("Process outflow execution error", e); - break; - case RECOVER_OUTFLOW: - log.error("Process outflow recover error", e); - break; - case COMPLETED: - log.error("Process completion error", e); - break; - case CANCELED: // TODO - implement cancel scenario - case FAILED: // TODO - implement failed scenario - break; - } - throw e; + } else { + monitorProcess(); + } + break; + case OUTPUT_DATA_STAGING: + case POST_PROCESSING: + recoverProcessOutflow(); + break; + case COMPLETED: + completeProcess(); + break; + case CANCELED: + // TODO - implement cancel scenario + break; + case FAILED: + // TODO - implement failed scenario + break; + default: + throw new GFacException("process Id : " + processId + " Couldn't identify process type"); } } catch (GFacException e) { log.error("GFac Worker throws an exception", e); - processContext.setProcessStatus(new ProcessStatus(ProcessState.FAILED)); + ProcessStatus status = new ProcessStatus(ProcessState.FAILED); + status.setReason(e.getMessage()); + processContext.setProcessStatus(status); try { GFacUtils.saveAndPublishProcessStatus(processContext); } catch (GFacException e1) { @@ -168,9 +146,8 @@ public class GFacWorker implements Runnable { } private void recoverProcess() throws GFacException { - // recover the process - // engine.recoverProcess(processContext); - executeProcess(); // TODO - implement recover process. + engine.recoverProcess(processContext); + monitorProcess(); } private void executeProcess() throws GFacException { @@ -210,48 +187,4 @@ public class GFacWorker implements Runnable { } } - private ProcessType getProcessType(ProcessContext processContext) { - // check the status and return correct type of process. - switch (processContext.getProcessState()) { - case CREATED: - case VALIDATED: - return ProcessType.NEW; - case PRE_PROCESSING: - case CONFIGURING_WORKSPACE: - case INPUT_DATA_STAGING: - case EXECUTING: - return ProcessType.RECOVER; - case MONITORING: - if (runOutflow) { - return ProcessType.RUN_OUTFLOW; // execute outflow - } else { - return ProcessType.RECOVER_MONITORING; // hand over to monitor task - } - case OUTPUT_DATA_STAGING: - case POST_PROCESSING: - return ProcessType.RECOVER_OUTFLOW; - case COMPLETED: - return ProcessType.COMPLETED; - case CANCELED: - return ProcessType.CANCELED; - case FAILED: - return ProcessType.FAILED; - //case CANCELLING: // TODO: handle this - default: - // this will never hit as we have handle all states in cases. - return ProcessType.NEW; - } - } - - - private enum ProcessType { - NEW, - RECOVER, - RECOVER_MONITORING, - RUN_OUTFLOW, - RECOVER_OUTFLOW, - CANCELED, - FAILED, - COMPLETED - } } http://git-wip-us.apache.org/repos/asf/airavata/blob/1de22283/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java index efa0641..bd0b541 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java @@ -274,8 +274,8 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ log.info("[EJM]: Job Queued email received, " + jobDetails); }else if (resultState == JobState.ACTIVE) { // nothing special thing to do, update the status change to rabbit mq at the end of this method. - jobStatus.setJobState(JobState.QUEUED); - jobStatus.setReason("Queued email received"); + jobStatus.setJobState(JobState.ACTIVE); + jobStatus.setReason("Active email received"); log.info("[EJM]: Job Active email received, " + jobDetails); }else if (resultState == JobState.FAILED) { jobMonitorMap.remove(jobStatusResult.getJobId()); http://git-wip-us.apache.org/repos/asf/airavata/blob/1de22283/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/impl/ExperimentRegistry.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/impl/ExperimentRegistry.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/impl/ExperimentRegistry.java index ae3d73b..5030846 100644 --- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/impl/ExperimentRegistry.java +++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/impl/ExperimentRegistry.java @@ -35,6 +35,7 @@ import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel import org.apache.airavata.model.status.*; import org.apache.airavata.model.task.TaskModel; import org.apache.airavata.registry.core.experiment.catalog.ExpCatResourceUtils; +import org.apache.airavata.registry.core.experiment.catalog.ExperimentCatResource; import org.apache.airavata.registry.core.experiment.catalog.ResourceType; import org.apache.airavata.registry.core.experiment.catalog.resources.*; import org.apache.airavata.registry.core.experiment.catalog.utils.ThriftDataModelConversion; @@ -1175,7 +1176,23 @@ public class ExperimentRegistry { List<JobResource> resources = processResource.getJobList(); for (JobResource jobResource : resources) { JobModel jobModel = ThriftDataModelConversion.getJobModel(jobResource); - jobs.add(jobModel); + List<ExperimentCatResource> jobStatuses = jobResource.get(ResourceType.JOB_STATUS); + JobStatusResource latestSR = null; + for (ExperimentCatResource jobStatuse : jobStatuses) { + JobStatusResource jobSR = (JobStatusResource) jobStatuse; + if (latestSR == null) { + latestSR = jobSR; + } else { + latestSR = (jobSR.getTimeOfStateChange().after(latestSR.getTimeOfStateChange()) ? jobSR : + latestSR); + } + } + if (latestSR != null) { + JobStatus jobStatus = new JobStatus(JobState.valueOf(latestSR.getState())); + jobStatus.setReason(latestSR.getReason()); + jobModel.setJobStatus(jobStatus); + } + jobs.add(jobModel); } return jobs; } else { http://git-wip-us.apache.org/repos/asf/airavata/blob/1de22283/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/Utils.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/Utils.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/Utils.java index d7def01..8fba449 100644 --- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/Utils.java +++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/Utils.java @@ -648,6 +648,7 @@ public class Utils { JobResource jobResource = new JobResource(); if (o != null){ jobResource.setJobId(o.getJobId()); + jobResource.setProcessId(o.getProcessId()); jobResource.setTaskId(o.getTaskId()); jobResource.setCreationTime(o.getCreationTime()); jobResource.setJobDescription(o.getJobDescription()); @@ -662,7 +663,9 @@ public class Utils { JobStatusResource jobStatusResource = new JobStatusResource(); if (o != null){ jobStatusResource.setJobId(o.getJobId()); - jobStatusResource.setState(o.getState()); + jobStatusResource.setStatusId(o.getStatusId()); + jobStatusResource.setProcessId(o.getProcessId()); + jobStatusResource.setState(o.getState()); jobStatusResource.setTimeOfStateChange(o.getTimeOfStateChange()); jobStatusResource.setReason(o.getReason()); } http://git-wip-us.apache.org/repos/asf/airavata/blob/1de22283/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/utils/ThriftDataModelConversion.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/utils/ThriftDataModelConversion.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/utils/ThriftDataModelConversion.java index e1ebdeb..0c954f7 100644 --- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/utils/ThriftDataModelConversion.java +++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/utils/ThriftDataModelConversion.java @@ -394,8 +394,9 @@ public class ThriftDataModelConversion { public static JobModel getJobModel (JobResource jobResource) throws RegistryException { JobModel model = new JobModel(); - model.setJobId(jobResource.getJobId()); - model.setTaskId(jobResource.getTaskId()); + model.setJobId(jobResource.getJobId()); + model.setProcessId(jobResource.getProcessId()); + model.setTaskId(jobResource.getTaskId()); model.setJobDescription(jobResource.getJobDescription()); model.setCreationTime(jobResource.getCreationTime().getTime()); model.setComputeResourceConsumed(jobResource.getComputeResourceConsumed());
