Save jobModel with processId and recovery in monitoring state
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b98f6599 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b98f6599 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b98f6599 Branch: refs/heads/master Commit: b98f65997304eded6ee59f82dd236efc6b2fe091 Parents: bf56b43 Author: Shameera Rathanyaka <[email protected]> Authored: Fri Aug 21 11:34:45 2015 -0400 Committer: Shameera Rathanyaka <[email protected]> Committed: Fri Aug 21 11:34:45 2015 -0400 ---------------------------------------------------------------------- .../apache/airavata/gfac/core/GFacUtils.java | 14 ++- .../gfac/core/context/ProcessContext.java | 1 + .../airavata/gfac/impl/GFacEngineImpl.java | 11 ++- .../apache/airavata/gfac/impl/GFacWorker.java | 94 ++++++++++++++------ .../gfac/impl/task/SSHJobSubmissionTask.java | 2 + .../catalog/impl/ExperimentRegistry.java | 84 +++++++++-------- .../core/experiment/catalog/model/Job.java | 31 ++++--- .../core/experiment/catalog/model/JobPK.java | 16 ++-- .../experiment/catalog/model/JobStatus.java | 12 +-- .../experiment/catalog/model/JobStatusPK.java | 16 ++-- .../resources/AbstractExpCatResource.java | 4 +- .../catalog/resources/JobResource.java | 18 +++- .../catalog/resources/JobStatusResource.java | 16 ++-- .../catalog/resources/ProcessResource.java | 51 ++++++++++- .../catalog/resources/TaskResource.java | 50 +---------- .../src/main/resources/expcatalog-derby.sql | 11 +-- .../src/main/resources/expcatalog-mysql.sql | 11 +-- .../airavata/registry/cpi/utils/Constants.java | 2 +- 18 files changed, 262 insertions(+), 182 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/b98f6599/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java index 8461216..d313ac0 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java @@ -241,7 +241,7 @@ public class GFacUtils { ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog(); jobModel.setJobStatus(jobStatus); jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - CompositeIdentifier ids = new CompositeIdentifier(jobModel.getTaskId(), jobModel.getJobId()); + CompositeIdentifier ids = new CompositeIdentifier(jobModel.getProcessId(), jobModel.getJobId()); experimentCatalog.add(ExpCatChildDataType.JOB_STATUS, jobStatus, ids); JobIdentifier identifier = new JobIdentifier(jobModel.getJobId(), jobModel.getTaskId(), processContext.getProcessId(), processContext.getProcessModel().getExperimentId(), @@ -1093,4 +1093,16 @@ public class GFacUtils { byte[] bytes = curatorClient.getData().forPath(deliveryTagPath); return GFacUtils.bytesToLong(bytes); } + + public static void saveJobModel(ProcessContext processContext, JobModel jobModel) throws GFacException { + try { + ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog(); + experimentCatalog.add(ExpCatChildDataType.JOB, jobModel, processContext.getProcessId()); + } catch (RegistryException e) { + String msg = "expId: " + processContext.getExperimentId() + " processId: " + processContext.getProcessId() + + " jobId: " + jobModel.getJobId() + " : - Error while saving Job Model"; + throw new GFacException(msg, e); + } + } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/b98f6599/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java index 37f8b20..553a388 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java @@ -275,6 +275,7 @@ public class ProcessContext { public JobModel getJobModel() { if (jobModel == null) { jobModel = new JobModel(); + jobModel.setProcessId(processId); jobModel.setWorkingDir(getWorkingDir()); jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime()); } http://git-wip-us.apache.org/repos/asf/airavata/blob/b98f6599/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java index 33e354b..1f5526a 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java @@ -43,6 +43,7 @@ import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfil import org.apache.airavata.model.application.io.DataType; import org.apache.airavata.model.application.io.InputDataObjectType; import org.apache.airavata.model.application.io.OutputDataObjectType; +import org.apache.airavata.model.job.JobModel; import org.apache.airavata.model.process.ProcessModel; import org.apache.airavata.model.status.ProcessState; import org.apache.airavata.model.status.ProcessStatus; @@ -109,6 +110,14 @@ public class GFacEngineImpl implements GFacEngine { processContext.setLocalWorkingDir((inputPath.endsWith("/") ? inputPath : inputPath + "/") + processContext.getProcessId()); } + + List<Object> jobModels = expCatalog.get(ExperimentCatalogModelType.JOB, "processId", processId); + if (jobModels != null && !jobModels.isEmpty()) { + if (jobModels.size() > 1) { + log.warn("Process has more than one job model, take first one"); + } + processContext.setJobModel(((JobModel) jobModels.get(0))); + } return processContext; } catch (AppCatalogException e) { throw new GFacException("App catalog access exception ", e); @@ -267,7 +276,7 @@ public class GFacEngineImpl implements GFacEngine { @Override public void recoverProcessOutflow(ProcessContext processContext) throws GFacException { - + runProcessOutflow(processContext); // TODO implement recover steps } @Override http://git-wip-us.apache.org/repos/asf/airavata/blob/b98f6599/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java index 6bbf159..35b0ed6 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java @@ -82,26 +82,31 @@ public class GFacWorker implements Runnable { try { switch (type) { case NEW: - exectuteProcess(engine); + executeProcess(); break; case RECOVER: - recoverProcess(engine); + recoverProcess(); break; -// case RECOVER_MONITORING: + case RECOVER_MONITORING: + monitorProcess(); // TODO get monitor mode from process and get correct monitor service instead default service. + break; case RUN_OUTFLOW: // run the outflow task - engine.runProcessOutflow(processContext); - processContext.setProcessStatus(new ProcessStatus(ProcessState.COMPLETED)); - GFacUtils.saveAndPublishProcessStatus(processContext); - sendAck(); + runProcessOutflow(); break; case RECOVER_OUTFLOW: // recover outflow task; - engine.recoverProcessOutflow(processContext); - processContext.setProcessStatus(new ProcessStatus(ProcessState.COMPLETED)); - GFacUtils.saveAndPublishProcessStatus(processContext); - sendAck(); + recoverProcessOutflow(); + break; + case COMPLETED: + completeProcess(); + break; + case CANCELED: + // TODO - implement cancel scenario + break; + case FAILED: + // TODO - implement failed scenario break; default: throw new GFacException("process Id : " + processId + " Couldn't identify process type"); @@ -114,12 +119,21 @@ public class GFacWorker implements Runnable { case RECOVER: log.error("Process recover error ", e); break; + case RECOVER_MONITORING: + log.error("Process monitoring recovery error", e); + break; case RUN_OUTFLOW: log.error("Process outflow execution error", e); break; case RECOVER_OUTFLOW: log.error("Process outflow recover error", e); break; + case COMPLETED: + log.error("Process completion error", e); + break; + case CANCELED: // TODO - implement cancel scenario + case FAILED: // TODO - implement failed scenario + break; } throw e; } @@ -136,32 +150,50 @@ public class GFacWorker implements Runnable { } } - private void recoverProcess(GFacEngine engine) throws GFacException { + private void completeProcess() throws GFacException { + processContext.setProcessStatus(new ProcessStatus(ProcessState.COMPLETED)); + GFacUtils.saveAndPublishProcessStatus(processContext); + sendAck(); + Factory.getGfacContext().remoteProcess(processContext.getProcessId()); + } + + private void recoverProcessOutflow() throws GFacException { + engine.recoverProcessOutflow(processContext); + completeProcess(); + } + + private void runProcessOutflow() throws GFacException { + engine.runProcessOutflow(processContext); + completeProcess(); + } + + private void recoverProcess() throws GFacException { // recover the process // engine.recoverProcess(processContext); - exectuteProcess(engine); // TODO - implement recover process. + executeProcess(); // TODO - implement recover process. } - private void exectuteProcess(GFacEngine engine) throws GFacException { + private void executeProcess() throws GFacException { if (processContext.isHandOver()) { return; } engine.executeProcess(processContext); - if (processContext.getMonitorMode() == null) { - engine.runProcessOutflow(processContext); - } else { - try { - JobMonitor monitorService = Factory.getMonitorService(processContext.getMonitorMode()); - if (monitorService != null) { - monitorService.monitor(processContext.getJobModel().getJobId(), processContext); - processContext.setProcessStatus(new ProcessStatus(ProcessState.MONITORING)); - } else { - // we directly invoke outflow - engine.runProcessOutflow(processContext); - } - } catch (AiravataException e) { - throw new GFacException("Error while retrieving moniot service", e); + monitorProcess(); + } + + private void monitorProcess() throws GFacException { + try { + JobMonitor monitorService = Factory.getMonitorService(processContext.getMonitorMode()); + if (monitorService != null) { + monitorService.monitor(processContext.getJobModel().getJobId(), processContext); + processContext.setProcessStatus(new ProcessStatus(ProcessState.MONITORING)); + GFacUtils.saveAndPublishProcessStatus(processContext); + } else { + // we directly invoke outflow + runProcessOutflow(); } + } catch (AiravataException e) { + throw new GFacException("Error while retrieving moniot service", e); } } @@ -199,9 +231,11 @@ public class GFacWorker implements Runnable { case POST_PROCESSING: return ProcessType.RECOVER_OUTFLOW; case COMPLETED: + return ProcessType.COMPLETED; case CANCELED: + return ProcessType.CANCELED; case FAILED: - return ProcessType.COMPLETED; + return ProcessType.FAILED; //case CANCELLING: // TODO: handle this default: // this will never hit as we have handle all states in cases. @@ -216,6 +250,8 @@ public class GFacWorker implements Runnable { RECOVER_MONITORING, RUN_OUTFLOW, RECOVER_OUTFLOW, + CANCELED, + FAILED, COMPLETED } } http://git-wip-us.apache.org/repos/asf/airavata/blob/b98f6599/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java index b47e5d2..e69cfa5 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java @@ -76,6 +76,7 @@ public class SSHJobSubmissionTask implements JobSubmissionTask { String jobId = remoteCluster.submitBatchJob(jobFile.getPath(), processContext.getWorkingDir()); if (jobId != null && !jobId.isEmpty()) { jobModel.setJobId(jobId); + GFacUtils.saveJobModel(processContext, jobModel); jobStatus.setJobState(JobState.SUBMITTED); jobStatus.setReason("Successfully Submitted to " + taskContext.getParentProcessContext() .getComputeResourceDescription().getHostName()); @@ -97,6 +98,7 @@ public class SSHJobSubmissionTask implements JobSubmissionTask { // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED jobId = verifyJobId; jobModel.setJobId(jobId); + GFacUtils.saveJobModel(processContext,jobModel); jobStatus.setJobState(JobState.QUEUED); jobStatus.setReason("Verification step succeeded"); jobModel.setJobStatus(jobStatus); http://git-wip-us.apache.org/repos/asf/airavata/blob/b98f6599/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/impl/ExperimentRegistry.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/impl/ExperimentRegistry.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/impl/ExperimentRegistry.java index 1f9e7f0..ae3d73b 100644 --- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/impl/ExperimentRegistry.java +++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/impl/ExperimentRegistry.java @@ -476,29 +476,27 @@ public class ExperimentRegistry { return taskId; } - public String addJob(JobModel job, String taskId) throws RegistryException { + public String addJob(JobModel job, String processId) throws RegistryException { try { JobResource jobResource = new JobResource(); jobResource.setJobId(job.getJobId()); - jobResource.setTaskId(taskId); + jobResource.setProcessId(processId); + jobResource.setTaskId(job.getTaskId()); jobResource.setJobDescription(job.getJobDescription()); jobResource.setCreationTime(AiravataUtils.getTime(job.getCreationTime())); jobResource.setComputeResourceConsumed(job.getComputeResourceConsumed()); jobResource.setJobName(job.getJobName()); jobResource.setWorkingDir(job.getWorkingDir()); - - TaskStatus taskStatus = new TaskStatus(); - taskStatus.setState(TaskState.CREATED); - addTaskStatus(taskStatus, job.getJobId()); + jobResource.save(); } catch (Exception e) { - logger.error(taskId, "Error while adding task...", e); + logger.error(processId, "Error while adding task...", e); throw new RegistryException(e); } - return taskId; + return processId; } public String addJobStatus(JobStatus jobStatus, CompositeIdentifier cis) throws RegistryException { - String taskID = (String)cis.getTopLevelIdentifier(); + String processId = (String)cis.getTopLevelIdentifier(); String jobID = (String)cis.getSecondLevelIdentifier(); try { JobResource jobResource = new JobResource(); @@ -509,7 +507,7 @@ public class ExperimentRegistry { } status.setStatusId(getStatusID(jobID)); status.setJobId(jobID); - status.setTaskId(taskID); + status.setProcessId(processId); status.setTimeOfStateChange(AiravataUtils.getTime(jobStatus.getTimeOfStateChange())); status.setState(jobStatus.getJobState().toString()); status.setReason(jobStatus.getReason()); @@ -838,23 +836,21 @@ public class ExperimentRegistry { } public String updateJob(JobModel job, CompositeIdentifier cis) throws RegistryException { - String taskId = (String) cis.getTopLevelIdentifier(); + String processId = (String) cis.getTopLevelIdentifier(); String jobId = (String) cis.getSecondLevelIdentifier(); try { - TaskResource taskResource = new TaskResource(); - taskResource.setTaskId(taskId); - JobResource jobResource = taskResource.getJob(jobId); - jobResource.setJobId(jobId); - jobResource.setTaskId(job.getTaskId()); - jobResource.setJobDescription(job.getJobDescription()); - jobResource.setCreationTime(AiravataUtils.getTime(job.getCreationTime())); - jobResource.setComputeResourceConsumed(job.getComputeResourceConsumed()); - jobResource.setJobName(job.getJobName()); - jobResource.setWorkingDir(job.getWorkingDir()); - - TaskStatus taskStatus = new TaskStatus(); - taskStatus.setState(TaskState.CREATED); - addTaskStatus(taskStatus, job.getJobId()); + ProcessResource processResource = new ProcessResource(); + processResource.setProcessId(processId); + JobResource jobResource = processResource.getJob(jobId); + jobResource.setJobId(jobId); + jobResource.setTaskId(job.getTaskId()); + jobResource.setProcessId(processId); + jobResource.setJobDescription(job.getJobDescription()); + jobResource.setCreationTime(AiravataUtils.getTime(job.getCreationTime())); + jobResource.setComputeResourceConsumed(job.getComputeResourceConsumed()); + jobResource.setJobName(job.getJobName()); + jobResource.setWorkingDir(job.getWorkingDir()); + jobResource.save(); } catch (Exception e) { logger.error(jobId, "Error while adding job...", e); throw new RegistryException(e); @@ -1064,19 +1060,19 @@ public class ExperimentRegistry { } public Object getJob(CompositeIdentifier cis, String fieldName) throws RegistryException { - String taskID = (String) cis.getTopLevelIdentifier(); + String processId = (String) cis.getTopLevelIdentifier(); String jobId = (String) cis.getSecondLevelIdentifier(); try { - TaskResource taskResource = new TaskResource(); - taskResource.setTaskId(taskID); - JobResource resource = taskResource.getJob(jobId); - if (fieldName == null) { - return ThriftDataModelConversion.getJobModel(resource); - } else if (fieldName.equals(Constants.FieldConstants.JobConstants.JOB_STATUS)) { - return ThriftDataModelConversion.getJobStatus(resource.getJobStatus()); - } else { - logger.error("Unsupported field name for job basic data.."); - } + ProcessResource processResource = new ProcessResource(); + processResource.setProcessId(processId); + JobResource resource = processResource.getJob(jobId); + if (fieldName == null) { + return ThriftDataModelConversion.getJobModel(resource); + } else if (fieldName.equals(Constants.FieldConstants.JobConstants.JOB_STATUS)) { + return ThriftDataModelConversion.getJobStatus(resource.getJobStatus()); + } else { + logger.error("Unsupported field name for job basic data.."); + } }catch (Exception e) { logger.error("Error while getting job data..", e); throw new RegistryException(e); @@ -1173,10 +1169,10 @@ public class ExperimentRegistry { public List<JobModel> getJobList(String fieldName, Object value) throws RegistryException { List<JobModel> jobs = new ArrayList<JobModel>(); try { - if (fieldName.equals(Constants.FieldConstants.JobConstants.TASK_ID)) { - TaskResource taskResource = new TaskResource(); - taskResource.setTaskId((String) value); - List<JobResource> resources = taskResource.getJobList(); + if (fieldName.equals(Constants.FieldConstants.JobConstants.PROCESS_ID)) { + ProcessResource processResource = new ProcessResource(); + processResource.setProcessId((String) value); + List<JobResource> resources = processResource.getJobList(); for (JobResource jobResource : resources) { JobModel jobModel = ThriftDataModelConversion.getJobModel(jobResource); jobs.add(jobModel); @@ -1438,11 +1434,11 @@ public class ExperimentRegistry { public void removeJob(CompositeIdentifier cis) throws RegistryException { try { - String taskId = (String) cis.getTopLevelIdentifier(); + String processId = (String) cis.getTopLevelIdentifier(); String jobId = (String) cis.getSecondLevelIdentifier(); - TaskResource taskResource = new TaskResource(); - taskResource.setTaskId(taskId); - taskResource.remove(ResourceType.JOB, jobId); + ProcessResource process = new ProcessResource(); + process.setProcessId(processId); + process.remove(ResourceType.JOB, jobId); } catch (Exception e) { logger.error("Error while removing task details..", e); throw new RegistryException(e); http://git-wip-us.apache.org/repos/asf/airavata/blob/b98f6599/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/Job.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/Job.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/Job.java index 3347263..4943c3a 100644 --- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/Job.java +++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/Job.java @@ -33,12 +33,13 @@ public class Job { private final static Logger logger = LoggerFactory.getLogger(Job.class); private String jobId; private String taskId; + private String processId; private String jobDescription; private Timestamp creationTime; private String computeResourceConsumed; private String jobName; private String workingDir; - private Task task; + private Process process; private Collection<JobStatus> jobStatuses; @Id @@ -51,7 +52,17 @@ public class Job { this.jobId = jobId; } - @Id + @Id + @Column(name = "PROCESS_ID") + public String getProcessId() { + return processId; + } + + public void setProcessId(String processId) { + this.processId = processId; + } + + @Basic @Column(name = "TASK_ID") public String getTaskId() { return taskId; @@ -143,15 +154,15 @@ public class Job { // return result; // } - @ManyToOne - @JoinColumn(name = "TASK_ID", referencedColumnName = "TASK_ID") - public Task getTask() { - return task; - } + @OneToOne + @JoinColumn(name = "PROCESS_ID", referencedColumnName = "PROCESS_ID") + public Process getProcess() { + return process; + } - public void setTask(Task taskByTaskId) { - this.task = taskByTaskId; - } + public void setProcess(Process processByProcessId) { + this.process = processByProcessId; + } @OneToMany(mappedBy = "job") public Collection<JobStatus> getJobStatuses() { http://git-wip-us.apache.org/repos/asf/airavata/blob/b98f6599/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/JobPK.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/JobPK.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/JobPK.java index eb704d5..e1f8da2 100644 --- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/JobPK.java +++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/JobPK.java @@ -30,7 +30,7 @@ import java.io.Serializable; public class JobPK implements Serializable { private final static Logger logger = LoggerFactory.getLogger(JobPK.class); private String jobId; - private String taskId; + private String processId; @Id @Column(name = "JOB_ID") @@ -43,13 +43,13 @@ public class JobPK implements Serializable { } @Id - @Column(name = "TASK_ID") - public String getTaskId() { - return taskId; + @Column(name = "PROCESS_ID") + public String getProcessId() { + return processId; } - public void setTaskId(String taskId) { - this.taskId = taskId; + public void setProcessId(String processId) { + this.processId = processId; } @Override @@ -60,7 +60,7 @@ public class JobPK implements Serializable { JobPK that = (JobPK) o; if (getJobId() != null ? !getJobId().equals(that.getJobId()) : that.getJobId() != null) return false; - if (getTaskId() != null ? !getTaskId().equals(that.getTaskId()) : that.getTaskId() != null) return false; + if (getProcessId() != null ? !getProcessId().equals(that.getProcessId()) : that.getProcessId() != null) return false; return true; } @@ -68,7 +68,7 @@ public class JobPK implements Serializable { @Override public int hashCode() { int result = getJobId() != null ? getJobId().hashCode() : 0; - result = 31 * result + (getTaskId() != null ? getTaskId().hashCode() : 0); + result = 31 * result + (getProcessId() != null ? getProcessId().hashCode() : 0); return result; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/b98f6599/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/JobStatus.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/JobStatus.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/JobStatus.java index c93d1cb..b4a4905 100644 --- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/JobStatus.java +++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/JobStatus.java @@ -33,7 +33,7 @@ public class JobStatus { private final static Logger logger = LoggerFactory.getLogger(JobStatus.class); private String statusId; private String jobId; - private String taskId; + private String processId; private String state; private Timestamp timeOfStateChange; private String reason; @@ -60,13 +60,13 @@ public class JobStatus { } @Id - @Column(name = "TASK_ID") - public String getTaskId() { - return taskId; + @Column(name = "PROCESS_ID") + public String getProcessId() { + return processId; } - public void setTaskId(String taskId) { - this.taskId = taskId; + public void setProcessId(String processId) { + this.processId = processId; } @Basic http://git-wip-us.apache.org/repos/asf/airavata/blob/b98f6599/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/JobStatusPK.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/JobStatusPK.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/JobStatusPK.java index cac3c52..f63cbfa 100644 --- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/JobStatusPK.java +++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/JobStatusPK.java @@ -31,7 +31,7 @@ public class JobStatusPK implements Serializable { private final static Logger logger = LoggerFactory.getLogger(JobStatusPK.class); private String statusId; private String jobId; - private String taskId; + private String processId; @Id @Column(name = "STATUS_ID") @@ -54,13 +54,13 @@ public class JobStatusPK implements Serializable { } @Id - @Column(name = "TASK_ID") - public String getTaskId() { - return taskId; + @Column(name = "PROCESS_ID") + public String getProcessId() { + return processId; } - public void setTaskId(String taskId) { - this.taskId = taskId; + public void setProcessId(String processId) { + this.processId = processId; } @Override @@ -72,7 +72,7 @@ public class JobStatusPK implements Serializable { if (getStatusId() != null ? !getStatusId().equals(that.getStatusId()) : that.getStatusId() != null) return false; if (getJobId() != null ? !getJobId().equals(that.getJobId()) : that.getJobId() != null) return false; - if (getTaskId() != null ? !getTaskId().equals(that.getTaskId()) : that.getTaskId() != null) return false; + if (getProcessId() != null ? !getProcessId().equals(that.getProcessId()) : that.getProcessId() != null) return false; return true; } @@ -81,7 +81,7 @@ public class JobStatusPK implements Serializable { public int hashCode() { int result = getStatusId() != null ? getStatusId().hashCode() : 0; result = 31 * result + (getJobId() != null ? getJobId().hashCode() : 0); - result = 31 * result + (getTaskId() != null ? getTaskId().hashCode() : 0); + result = 31 * result + (getProcessId() != null ? getProcessId().hashCode() : 0); return result; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/b98f6599/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/AbstractExpCatResource.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/AbstractExpCatResource.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/AbstractExpCatResource.java index 2c12f03..b9bdf4b 100644 --- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/AbstractExpCatResource.java +++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/AbstractExpCatResource.java @@ -220,14 +220,14 @@ public abstract class AbstractExpCatResource implements ExperimentCatResource { //Job Table public final class JobConstants { public static final String JOB_ID = "jobId"; - public static final String TASK_ID = "taskId"; + public static final String PROCESS_ID = "processId"; } // Job Status table public final class JobStatusConstants { public static final String STATUS_ID = "statusId"; public static final String JOB_ID = "jobId"; - public static final String TASK_ID = "taskId"; + public static final String PROCESS_ID = "processId"; public static final String STATE = "state"; public static final String TIME_OF_STATE_CHANGE = "timeOfStateChange"; public static final String REASON = "reason"; http://git-wip-us.apache.org/repos/asf/airavata/blob/b98f6599/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/JobResource.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/JobResource.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/JobResource.java index 6ed5a54..99a152d 100644 --- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/JobResource.java +++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/JobResource.java @@ -42,6 +42,7 @@ public class JobResource extends AbstractExpCatResource { private static final Logger logger = LoggerFactory.getLogger(JobResource.class); private String jobId; private String taskId; + private String processId; private String jobDescription; private Timestamp creationTime; private String computeResourceConsumed; @@ -64,7 +65,15 @@ public class JobResource extends AbstractExpCatResource { this.taskId = taskId; } - public String getJobDescription() { + public String getProcessId() { + return processId; + } + + public void setProcessId(String processId) { + this.processId = processId; + } + + public String getJobDescription() { return jobDescription; } @@ -201,7 +210,7 @@ public class JobResource extends AbstractExpCatResource { case JOB_STATUS: generator = new QueryGenerator(JOB_STATUS); generator.setParameter(JobStatusConstants.JOB_ID, jobId); - generator.setParameter(JobStatusConstants.TASK_ID, taskId); + generator.setParameter(JobStatusConstants.PROCESS_ID, processId); q = generator.selectQuery(em); results = q.getResultList(); if (results.size() != 0) { @@ -243,14 +252,15 @@ public class JobResource extends AbstractExpCatResource { em.getTransaction().begin(); JobPK jobPK = new JobPK(); jobPK.setJobId(jobId); - jobPK.setTaskId(taskId); + jobPK.setProcessId(processId); Job job = em.find(Job.class, jobPK); if(job == null){ job = new Job(); } job.setJobId(jobId); job.setTaskId(taskId); - job.setJobDescription(jobDescription); + job.setProcessId(processId); +// job.setJobDescription(jobDescription); // fixme - Data too long for column 'JOB_DESCRIPTION' job.setCreationTime(creationTime); job.setComputeResourceConsumed(computeResourceConsumed); job.setJobName(jobName); http://git-wip-us.apache.org/repos/asf/airavata/blob/b98f6599/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/JobStatusResource.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/JobStatusResource.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/JobStatusResource.java index 4085a3b..b41a071 100644 --- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/JobStatusResource.java +++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/JobStatusResource.java @@ -38,7 +38,7 @@ public class JobStatusResource extends AbstractExpCatResource { private static final Logger logger = LoggerFactory.getLogger(JobStatusResource.class); private String statusId; private String jobId; - private String taskId; + private String processId; private String state; private Timestamp timeOfStateChange; private String reason; @@ -59,12 +59,12 @@ public class JobStatusResource extends AbstractExpCatResource { this.jobId = jobId; } - public String getTaskId() { - return taskId; + public String getProcessId() { + return processId; } - public void setTaskId(String taskId) { - this.taskId = taskId; + public void setProcessId(String processId) { + this.processId = processId; } public String getState() { @@ -121,20 +121,20 @@ public class JobStatusResource extends AbstractExpCatResource { em = ExpCatResourceUtils.getEntityManager(); em.getTransaction().begin(); JobStatus jobStatus; - if(jobId == null || statusId == null || taskId == null){ + if(jobId == null || statusId == null || processId == null){ throw new RegistryException("Does not have the job id or status id or task id"); } JobStatusPK jobStatusPK = new JobStatusPK(); jobStatusPK.setJobId(jobId); jobStatusPK.setStatusId(statusId); - jobStatusPK.setTaskId(taskId); + jobStatusPK.setProcessId(processId); jobStatus = em.find(JobStatus.class, jobStatusPK); if(jobStatus == null){ jobStatus = new JobStatus(); } jobStatus.setStatusId(statusId); jobStatus.setJobId(jobId); - jobStatus.setTaskId(taskId); + jobStatus.setProcessId(processId); jobStatus.setState(state); jobStatus.setReason(reason); em.persist(jobStatus); http://git-wip-us.apache.org/repos/asf/airavata/blob/b98f6599/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/ProcessResource.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/ProcessResource.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/ProcessResource.java index 00cf8c7..58b286a 100644 --- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/ProcessResource.java +++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/ProcessResource.java @@ -174,6 +174,10 @@ public class ProcessResource extends AbstractExpCatResource { TaskResource taskResource = new TaskResource(); taskResource.setParentProcessId(processId); return taskResource; + case JOB: + JobResource jobResource = new JobResource(); + jobResource.setProcessId(processId); + return jobResource; default: logger.error("Unsupported resource type for process resource.", new IllegalArgumentException()); throw new IllegalArgumentException("Unsupported resource type for process resource."); @@ -227,6 +231,13 @@ public class ProcessResource extends AbstractExpCatResource { q = generator.deleteQuery(em); q.executeUpdate(); break; + case JOB: + generator = new QueryGenerator(JOB); + generator.setParameter(JobConstants.JOB_ID, name); + generator.setParameter(JobConstants.PROCESS_ID, processId); + q = generator.deleteQuery(em); + q.executeUpdate(); + break; default: logger.error("Unsupported resource type for process detail resource.", new IllegalArgumentException()); break; @@ -312,7 +323,17 @@ public class ProcessResource extends AbstractExpCatResource { em.getTransaction().commit(); em.close(); return taskResource; - default: + case JOB: + generator = new QueryGenerator(JOB); + generator.setParameter(JobConstants.JOB_ID, name); + generator.setParameter(JobConstants.PROCESS_ID, processId); + q = generator.selectQuery(em); + Job job = (Job) q.getSingleResult(); + JobResource jobResource = (JobResource) Utils.getResource(ResourceType.JOB, job); + em.getTransaction().commit(); + em.close(); + return jobResource; + default: em.getTransaction().commit(); em.close(); logger.error("Unsupported resource type for process resource.", new IllegalArgumentException()); @@ -411,6 +432,20 @@ public class ProcessResource extends AbstractExpCatResource { } } break; + case JOB: + generator = new QueryGenerator(JOB); + generator.setParameter(JobConstants.PROCESS_ID, processId); + q = generator.selectQuery(em); + results = q.getResultList(); + if (results.size() != 0) { + for (Object result : results) { + Job job = (Job) result; + JobResource jobResource = + (JobResource) Utils.getResource(ResourceType.JOB, job); + resourceList.add(jobResource); + } + } + break; default: em.getTransaction().commit(); em.close(); @@ -562,4 +597,18 @@ public class ProcessResource extends AbstractExpCatResource { ExperimentCatResource resource = get(ResourceType.TASK, taskId); return (TaskResource)resource; } + + public JobResource getJob(String jobId) throws RegistryException { + return (JobResource) get(ResourceType.JOB, jobId); + } + + public List<JobResource> getJobList() throws RegistryException { + List<JobResource> jobResources = new ArrayList(); + List<ExperimentCatResource> resources = get(ResourceType.JOB); + for (ExperimentCatResource resource : resources) { + JobResource jobResource = (JobResource) resource; + jobResources.add(jobResource); + } + return jobResources; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/b98f6599/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/TaskResource.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/TaskResource.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/TaskResource.java index f0bc5fe..962f0d0 100644 --- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/TaskResource.java +++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/TaskResource.java @@ -115,10 +115,7 @@ public class TaskResource extends AbstractExpCatResource { TaskErrorResource taskErrorResource = new TaskErrorResource(); taskErrorResource.setTaskId(taskId); return taskErrorResource; - case JOB: - JobResource jobResource = new JobResource(); - jobResource.setTaskId(taskId); - return jobResource; + default: logger.error("Unsupported resource type for task data resource.", new UnsupportedOperationException()); throw new UnsupportedOperationException(); @@ -146,13 +143,6 @@ public class TaskResource extends AbstractExpCatResource { q = generator.deleteQuery(em); q.executeUpdate(); break; - case JOB: - generator = new QueryGenerator(JOB); - generator.setParameter(JobConstants.JOB_ID, name); - generator.setParameter(JobConstants.TASK_ID, taskId); - q = generator.deleteQuery(em); - q.executeUpdate(); - break; default: logger.error("Unsupported resource type for job details resource.", new IllegalArgumentException()); break; @@ -201,16 +191,6 @@ public class TaskResource extends AbstractExpCatResource { em.getTransaction().commit(); em.close(); return errorResource; - case JOB: - generator = new QueryGenerator(JOB); - generator.setParameter(JobConstants.JOB_ID, name); - generator.setParameter(JobConstants.TASK_ID, taskId); - q = generator.selectQuery(em); - Job job = (Job) q.getSingleResult(); - JobResource jobResource = (JobResource) Utils.getResource(ResourceType.JOB, job); - em.getTransaction().commit(); - em.close(); - return jobResource; default: em.getTransaction().commit(); em.close(); @@ -269,20 +249,6 @@ public class TaskResource extends AbstractExpCatResource { } } break; - case JOB: - generator = new QueryGenerator(JOB); - generator.setParameter(JobConstants.TASK_ID, taskId); - q = generator.selectQuery(em); - results = q.getResultList(); - if (results.size() != 0) { - for (Object result : results) { - Job job = (Job) result; - JobResource jobResource = - (JobResource) Utils.getResource(ResourceType.JOB, job); - resourceList.add(jobResource); - } - } - break; default: em.getTransaction().commit(); em.close(); @@ -387,18 +353,4 @@ public class TaskResource extends AbstractExpCatResource { return max; } } - - public JobResource getJob(String jobId) throws RegistryException { - return (JobResource) get(ResourceType.JOB, jobId); - } - - public List<JobResource> getJobList() throws RegistryException { - List<JobResource> jobResources = new ArrayList(); - List<ExperimentCatResource> resources = get(ResourceType.JOB); - for (ExperimentCatResource resource : resources) { - JobResource jobResource = (JobResource) resource; - jobResources.add(jobResource); - } - return jobResources; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/b98f6599/modules/registry/registry-core/src/main/resources/expcatalog-derby.sql ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/resources/expcatalog-derby.sql b/modules/registry/registry-core/src/main/resources/expcatalog-derby.sql index 80334e5..480b595 100644 --- a/modules/registry/registry-core/src/main/resources/expcatalog-derby.sql +++ b/modules/registry/registry-core/src/main/resources/expcatalog-derby.sql @@ -300,24 +300,25 @@ CREATE TABLE TASK_ERROR ( CREATE TABLE JOB ( JOB_ID varchar(255), TASK_ID varchar(255), + PROCESS_ID varchar(255), JOB_DESCRIPTION varchar(255), CREATION_TIME timestamp DEFAULT '0000-00-00 00:00:00', COMPUTE_RESOURCE_CONSUMED varchar(255), JOB_NAME varchar(255), WORKING_DIR varchar(255), - PRIMARY KEY (JOB_ID, TASK_ID), - FOREIGN KEY (TASK_ID) REFERENCES TASK(TASK_ID) ON DELETE CASCADE + PRIMARY KEY (JOB_ID, PROCESS_ID), + FOREIGN KEY (PROCESS_ID) REFERENCES PROCESS(PROCESS_ID) ON DELETE CASCADE ); CREATE TABLE JOB_STATUS ( STATUS_ID varchar(255), JOB_ID varchar(255), - TASK_ID varchar(255), + PROCESS_ID varchar(255), STATE varchar(255), TIME_OF_STATE_CHANGE timestamp DEFAULT '0000-00-00 00:00:00', REASON CLOB, - PRIMARY KEY (STATUS_ID, JOB_ID, TASK_ID), - FOREIGN KEY (JOB_ID, TASK_ID) REFERENCES JOB(JOB_ID, TASK_ID) ON DELETE CASCADE + PRIMARY KEY (STATUS_ID, JOB_ID, PROCESS_ID), + FOREIGN KEY (JOB_ID, PROCESS_ID) REFERENCES JOB(JOB_ID, PROCESS_ID) ON DELETE CASCADE ); CREATE TABLE CONFIGURATION http://git-wip-us.apache.org/repos/asf/airavata/blob/b98f6599/modules/registry/registry-core/src/main/resources/expcatalog-mysql.sql ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/resources/expcatalog-mysql.sql b/modules/registry/registry-core/src/main/resources/expcatalog-mysql.sql index 1c8dc48..724e8a7 100644 --- a/modules/registry/registry-core/src/main/resources/expcatalog-mysql.sql +++ b/modules/registry/registry-core/src/main/resources/expcatalog-mysql.sql @@ -302,24 +302,25 @@ CREATE TABLE TASK_ERROR ( CREATE TABLE JOB ( JOB_ID varchar(255), TASK_ID varchar(255), + PROCESS_ID varchar(255), JOB_DESCRIPTION varchar(255), CREATION_TIME timestamp DEFAULT NOW(), COMPUTE_RESOURCE_CONSUMED varchar(255), JOB_NAME varchar(255), WORKING_DIR varchar(255), - PRIMARY KEY (JOB_ID, TASK_ID), - FOREIGN KEY (TASK_ID) REFERENCES TASK(TASK_ID) ON DELETE CASCADE + PRIMARY KEY (JOB_ID, PROCESS_ID), + FOREIGN KEY (PROCESS_ID) REFERENCES PROCESS(PROCESS_ID) ON DELETE CASCADE ); CREATE TABLE JOB_STATUS ( STATUS_ID varchar(255), JOB_ID varchar(255), - TASK_ID varchar(255), + PROCESS_ID varchar(255), STATE varchar(255), TIME_OF_STATE_CHANGE timestamp DEFAULT '0000-00-00 00:00:00', REASON LONGTEXT, - PRIMARY KEY (STATUS_ID, JOB_ID, TASK_ID), - FOREIGN KEY (JOB_ID, TASK_ID) REFERENCES JOB(JOB_ID, TASK_ID) ON DELETE CASCADE + PRIMARY KEY (STATUS_ID, JOB_ID, PROCESS_ID), + FOREIGN KEY (JOB_ID, PROCESS_ID) REFERENCES JOB(JOB_ID, PROCESS_ID) ON DELETE CASCADE ); http://git-wip-us.apache.org/repos/asf/airavata/blob/b98f6599/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/utils/Constants.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/utils/Constants.java b/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/utils/Constants.java index ab56285..c4f4d15 100644 --- a/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/utils/Constants.java +++ b/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/utils/Constants.java @@ -81,7 +81,7 @@ public class Constants { public final class JobConstants { public static final String JOB_ID = "jobId"; - public static final String TASK_ID = "taskId"; + public static final String PROCESS_ID = "processId"; public static final String JOB_STATUS = "taskStatus"; } }
