This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch helix-integration in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 1c3a5d4eca7a3fd455adea1ce56437ec21499bd7 Author: dimuthu <[email protected]> AuthorDate: Mon Mar 5 08:46:53 2018 -0500 Improving status publishing --- .../helix/impl/participant/GlobalParticipant.java | 1 + .../airavata/helix/impl/task/AiravataTask.java | 136 ++++++++++++++++++--- .../airavata/helix/impl/task/CompletingTask.java | 26 ++++ .../airavata/helix/impl/task/EnvSetupTask.java | 3 + .../helix/impl/task/InputDataStagingTask.java | 7 +- .../helix/impl/task/OutputDataStagingTask.java | 3 + .../airavata/helix/impl/task/TaskContext.java | 16 +++ .../submission/task/DefaultJobSubmissionTask.java | 15 +-- .../submission/task/ForkJobSubmissionTask.java | 2 +- .../task/submission/task/JobSubmissionTask.java | 43 +------ .../submission/task/LocalJobSubmissionTask.java | 4 +- .../helix/impl/workflow/PostWorkflowManager.java | 88 +++++++++++-- .../job/monitor/kafka/MessageProducer.java | 1 + 13 files changed, 266 insertions(+), 79 deletions(-) diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java index 984b277..fc3fbcb 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java @@ -18,6 +18,7 @@ public class GlobalParticipant extends HelixParticipant { "org.apache.airavata.helix.impl.task.EnvSetupTask", "org.apache.airavata.helix.impl.task.InputDataStagingTask", "org.apache.airavata.helix.impl.task.OutputDataStagingTask", + "org.apache.airavata.helix.impl.task.CompletingTask", "org.apache.airavata.helix.impl.task.submission.task.ForkJobSubmissionTask", "org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask", "org.apache.airavata.helix.impl.task.submission.task.LocalJobSubmissionTask" diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java index e15195d..03dedf3 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java @@ -1,25 +1,21 @@ package org.apache.airavata.helix.impl.task; +import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.helix.core.AbstractTask; import org.apache.airavata.helix.core.OutPort; import org.apache.airavata.helix.task.api.annotation.TaskOutPort; import org.apache.airavata.helix.task.api.annotation.TaskParam; import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessagingFactory; import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.messaging.core.Type; +import org.apache.airavata.messaging.core.impl.RabbitMQPublisher; import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; -import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; -import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; -import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; -import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile; -import org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference; -import org.apache.airavata.model.appcatalog.userresourceprofile.UserResourceProfile; -import org.apache.airavata.model.messaging.event.MessageType; -import org.apache.airavata.model.messaging.event.TaskIdentifier; -import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; +import org.apache.airavata.model.commons.ErrorModel; +import org.apache.airavata.model.messaging.event.*; import org.apache.airavata.model.process.ProcessModel; -import org.apache.airavata.model.status.TaskState; -import org.apache.airavata.model.status.TaskStatus; +import org.apache.airavata.model.status.*; import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; import org.apache.airavata.registry.cpi.*; import org.apache.helix.HelixManager; @@ -27,7 +23,8 @@ import org.apache.helix.task.TaskResult; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.util.*; +import java.io.PrintWriter; +import java.io.StringWriter; public abstract class AiravataTask extends AbstractTask { @@ -61,17 +58,128 @@ public abstract class AiravataTask extends AbstractTask { } protected TaskResult onFail(String reason, boolean fatal, Throwable error) { + String errorMessage; + ProcessStatus status = new ProcessStatus(ProcessState.FAILED); + StringWriter errors = new StringWriter(); if (error == null) { errorMessage = "Task " + getTaskId() + " failed due to " + reason; + errors.write(errorMessage); + status.setReason(errorMessage); logger.error(errorMessage); + } else { errorMessage = "Task " + getTaskId() + " failed due to " + reason + ", " + error.getMessage(); + status.setReason(errorMessage); + error.printStackTrace(new PrintWriter(errors)); logger.error(errorMessage, error); } + status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + getTaskContext().setProcessStatus(status); + + ErrorModel errorModel = new ErrorModel(); + errorModel.setUserFriendlyMessage("GFac Worker throws an exception"); + errorModel.setActualErrorMessage(errors.toString()); + errorModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime()); + + saveAndPublishProcessStatus(); + saveExperimentError(errorModel); + saveProcessError(errorModel); return new TaskResult(fatal ? TaskResult.Status.FATAL_FAILED : TaskResult.Status.FAILED, errorMessage); + } + public void saveAndPublishProcessStatus(ProcessState state) { + ProcessStatus processStatus = new ProcessStatus(state); + processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + getTaskContext().setProcessStatus(processStatus); + saveAndPublishProcessStatus(); + } + + public void saveAndPublishProcessStatus() { + try { + ProcessStatus status = taskContext.getProcessStatus(); + if (status.getTimeOfStateChange() == 0 || status.getTimeOfStateChange() > 0 ){ + status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + }else { + status.setTimeOfStateChange(status.getTimeOfStateChange()); + } + experimentCatalog.add(ExpCatChildDataType.PROCESS_STATUS, status, getProcessId()); + ProcessIdentifier identifier = new ProcessIdentifier(getProcessId(), getExperimentId(), getGatewayId()); + ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(status.getState(), identifier); + MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS, + AiravataUtils.getId(MessageType.PROCESS.name()), getGatewayId()); + msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + getStatusPublisher().publish(msgCtx); + } catch (Exception e) { + logger.error("Failed to save process status of process " + getProcessId(), e); + } + } + + public void saveAndPublishTaskStatus() { + try { + TaskState state = getTaskContext().getTaskState(); + // first we save job jobModel to the registry for sa and then save the job status. + TaskStatus status = getTaskContext().getTaskStatus(); + if (status.getTimeOfStateChange() == 0 || status.getTimeOfStateChange() > 0 ){ + status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + }else { + status.setTimeOfStateChange(status.getTimeOfStateChange()); + } + experimentCatalog.add(ExpCatChildDataType.TASK_STATUS, status, getTaskId()); + TaskIdentifier identifier = new TaskIdentifier(getTaskId(), getProcessId(), getExperimentId(), getGatewayId()); + TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(state, + identifier); + MessageContext msgCtx = new MessageContext(taskStatusChangeEvent, MessageType.TASK, AiravataUtils.getId + (MessageType.TASK.name()), getGatewayId()); + msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + getStatusPublisher().publish(msgCtx); + } catch (Exception e) { + logger.error("Failed to publist task status of task " + getTaskId()); + } + } + + public void saveExperimentError(ErrorModel errorModel) { + try { + errorModel.setErrorId(AiravataUtils.getId("EXP_ERROR")); + getExperimentCatalog().add(ExpCatChildDataType.EXPERIMENT_ERROR, errorModel, experimentId); + } catch (RegistryException e) { + String msg = "expId: " + getExperimentId() + " processId: " + getProcessId() + " : - Error while updating experiment errors"; + logger.error(msg, e); + } + } + + public void saveProcessError(ErrorModel errorModel) { + try { + errorModel.setErrorId(AiravataUtils.getId("PROCESS_ERROR")); + experimentCatalog.add(ExpCatChildDataType.PROCESS_ERROR, errorModel, getProcessId()); + } catch (RegistryException e) { + String msg = "expId: " + getExperimentId() + " processId: " + getProcessId() + + " : - Error while updating process errors"; + logger.error(msg, e); + } + } + + public void saveTaskError(ErrorModel errorModel) throws Exception { + try { + errorModel.setErrorId(AiravataUtils.getId("TASK_ERROR")); + getExperimentCatalog().add(ExpCatChildDataType.TASK_ERROR, errorModel, getTaskId()); + } catch (RegistryException e) { + String msg = "expId: " + getExperimentId() + " processId: " + getProcessId() + " taskId: " + getTaskId() + + " : - Error while updating task errors"; + throw new Exception(msg, e); + } + } + + public Publisher getStatusPublisher() throws AiravataException { + if (statusPublisher == null) { + synchronized (RabbitMQPublisher.class) { + if (statusPublisher == null) { + statusPublisher = MessagingFactory.getPublisher(Type.STATUS); + } + } + } + return statusPublisher; } @Override @@ -145,10 +253,6 @@ public abstract class AiravataTask extends AbstractTask { return experimentCatalog; } - public Publisher getStatusPublisher() { - return statusPublisher; - } - public String getProcessId() { return processId; } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/CompletingTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/CompletingTask.java new file mode 100644 index 0000000..9ec2909 --- /dev/null +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/CompletingTask.java @@ -0,0 +1,26 @@ +package org.apache.airavata.helix.impl.task; + +import org.apache.airavata.helix.task.api.TaskHelper; +import org.apache.airavata.helix.task.api.annotation.TaskDef; +import org.apache.airavata.model.status.ProcessState; +import org.apache.helix.task.TaskResult; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +@TaskDef(name = "Completing Task") +public class CompletingTask extends AiravataTask { + + private static final Logger logger = LogManager.getLogger(CompletingTask.class); + + @Override + public TaskResult onRun(TaskHelper helper) { + logger.info("Process " + getProcessId() + " successfully completed"); + saveAndPublishProcessStatus(ProcessState.COMPLETED); + return onSuccess("Process " + getProcessId() + " successfully completed"); + } + + @Override + public void onCancel() { + + } +} diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java index ddba5f2..abdc1bf 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java @@ -3,6 +3,7 @@ package org.apache.airavata.helix.impl.task; import org.apache.airavata.agents.api.AgentAdaptor; import org.apache.airavata.helix.task.api.TaskHelper; import org.apache.airavata.helix.task.api.annotation.TaskDef; +import org.apache.airavata.model.status.ProcessState; import org.apache.airavata.model.status.TaskState; import org.apache.airavata.registry.cpi.RegistryException; import org.apache.helix.task.TaskResult; @@ -17,6 +18,8 @@ public class EnvSetupTask extends AiravataTask { @Override public TaskResult onRun(TaskHelper taskHelper) { try { + + saveAndPublishProcessStatus(ProcessState.CONFIGURING_WORKSPACE); publishTaskState(TaskState.EXECUTING); AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor( getTaskContext().getGatewayId(), diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java index 30eeec0..ed143dd 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/InputDataStagingTask.java @@ -3,20 +3,17 @@ package org.apache.airavata.helix.impl.task; import org.apache.airavata.agents.api.AgentAdaptor; import org.apache.airavata.agents.api.AgentException; import org.apache.airavata.agents.api.StorageResourceAdaptor; -import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.helix.task.api.TaskHelper; import org.apache.airavata.helix.task.api.annotation.TaskDef; import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription; import org.apache.airavata.model.application.io.InputDataObjectType; +import org.apache.airavata.model.status.ProcessState; import org.apache.airavata.model.task.DataStagingTaskModel; -import org.apache.airavata.registry.cpi.AppCatalogException; -import org.apache.commons.io.FileUtils; import org.apache.helix.task.TaskResult; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.File; -import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -29,6 +26,8 @@ public class InputDataStagingTask extends DataStagingTask { public TaskResult onRun(TaskHelper taskHelper) { logger.info("Starting Input Data Staging Task " + getTaskId()); + saveAndPublishProcessStatus(ProcessState.INPUT_DATA_STAGING); + try { // Get and validate data staging task model DataStagingTaskModel dataStagingTaskModel = getDataStagingTaskModel(); diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java index f33523c..ff8fd2e 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/OutputDataStagingTask.java @@ -7,6 +7,7 @@ import org.apache.airavata.helix.task.api.TaskHelper; import org.apache.airavata.helix.task.api.annotation.TaskDef; import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription; import org.apache.airavata.model.application.io.OutputDataObjectType; +import org.apache.airavata.model.status.ProcessState; import org.apache.airavata.model.task.DataStagingTaskModel; import org.apache.airavata.registry.cpi.ExpCatChildDataType; import org.apache.airavata.registry.cpi.RegistryException; @@ -29,6 +30,8 @@ public class OutputDataStagingTask extends DataStagingTask { public TaskResult onRun(TaskHelper taskHelper) { logger.info("Starting output data staging task " + getTaskId()); + saveAndPublishProcessStatus(ProcessState.OUTPUT_DATA_STAGING); + try { // Get and validate data staging task model DataStagingTaskModel dataStagingTaskModel = getDataStagingTaskModel(); diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java index 489a196..6be1d36 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java @@ -21,6 +21,8 @@ import org.apache.airavata.model.process.ProcessModel; import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel; import org.apache.airavata.model.status.ProcessState; import org.apache.airavata.model.status.ProcessStatus; +import org.apache.airavata.model.status.TaskState; +import org.apache.airavata.model.status.TaskStatus; import org.apache.airavata.model.task.TaskModel; import org.apache.airavata.registry.cpi.AppCatalog; import org.apache.airavata.registry.cpi.AppCatalogException; @@ -385,6 +387,20 @@ public class TaskContext { return null; } + public TaskState getTaskState() { + if(getCurrentTaskModel().getTaskStatuses() != null) + return getCurrentTaskModel().getTaskStatuses().get(0).getState(); + else + return null; + } + + public TaskStatus getTaskStatus() { + if(getCurrentTaskModel().getTaskStatuses() != null) + return getCurrentTaskModel().getTaskStatuses().get(0); + else + return null; + } + public String getComputeResourceId() { if (isUseUserCRPref() && userComputeResourcePreference != null && diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java index 31b6f30..688f894 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java @@ -5,24 +5,18 @@ import org.apache.airavata.agents.api.JobSubmissionOutput; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.helix.impl.task.submission.GroovyMapBuilder; import org.apache.airavata.helix.impl.task.submission.GroovyMapData; -import org.apache.airavata.helix.impl.task.submission.SubmissionUtil; import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo; import org.apache.airavata.helix.task.api.TaskHelper; import org.apache.airavata.helix.task.api.annotation.TaskDef; import org.apache.airavata.model.commons.ErrorModel; import org.apache.airavata.model.experiment.ExperimentModel; import org.apache.airavata.model.job.JobModel; -import org.apache.airavata.model.status.JobState; -import org.apache.airavata.model.status.JobStatus; -import org.apache.airavata.model.status.TaskState; -import org.apache.airavata.model.status.TaskStatus; +import org.apache.airavata.model.status.*; import org.apache.airavata.registry.cpi.ExperimentCatalogModelType; -import org.apache.commons.io.FileUtils; import org.apache.helix.task.TaskResult; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -38,6 +32,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask { public TaskResult onRun(TaskHelper taskHelper) { try { + saveAndPublishProcessStatus(ProcessState.EXECUTING); GroovyMapData mapData = new GroovyMapBuilder(getTaskContext()).build(); @@ -133,14 +128,14 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask { jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName()); jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); jobModel.setJobStatuses(Arrays.asList(jobStatus)); - saveJobStatus(jobModel); + saveAndPublishJobStatus(jobModel); if (verifyJobSubmissionByJobId(adaptor, jobId)) { jobStatus.setJobState(JobState.QUEUED); jobStatus.setReason("Verification step succeeded"); jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); jobModel.setJobStatuses(Arrays.asList(jobStatus)); - saveJobStatus(jobModel); + saveAndPublishJobStatus(jobModel); createMonitoringNode(jobId); } @@ -172,7 +167,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask { jobStatus.setReason("Verification step succeeded"); jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); jobModel.setJobStatuses(Arrays.asList(jobStatus)); - saveJobStatus(jobModel); + saveAndPublishJobStatus(jobModel); //taskStatus.setState(TaskState.COMPLETED); //taskStatus.setReason("Submitted job to compute resource"); //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java index 2e4a052..e3b5447 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/ForkJobSubmissionTask.java @@ -58,7 +58,7 @@ public class ForkJobSubmissionTask extends JobSubmissionTask { jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName()); jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); jobModel.setJobStatuses(Arrays.asList(jobStatus)); - saveJobStatus(jobModel); + saveAndPublishJobStatus(jobModel); return null; } else { diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java index afa2630..4fed22d 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java @@ -73,6 +73,8 @@ public abstract class JobSubmissionTask extends AiravataTask { this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/lock", new byte[0]); this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/gateway", getGatewayId().getBytes()); this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/process", getProcessId().getBytes()); + this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/task", getTaskId().getBytes()); + this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/experiment", getExperimentId().getBytes()); this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/status", "pending".getBytes()); } @@ -146,48 +148,11 @@ public abstract class JobSubmissionTask extends AiravataTask { return jobManagerConfiguration.getParser().parseJobId(jobName, commandOutput.getStdOut()); } - //////////////////////////////// - - - ///////////////////////////////////////////// - public void saveExperimentError(ErrorModel errorModel) throws Exception { - try { - errorModel.setErrorId(AiravataUtils.getId("EXP_ERROR")); - getExperimentCatalog().add(ExpCatChildDataType.EXPERIMENT_ERROR, errorModel, getExperimentId()); - } catch (RegistryException e) { - String msg = "expId: " + getExperimentId() + " processId: " + getProcessId() - + " : - Error while updating experiment errors"; - throw new Exception(msg, e); - } - } - - public void saveProcessError(ErrorModel errorModel) throws Exception { - try { - errorModel.setErrorId(AiravataUtils.getId("PROCESS_ERROR")); - getExperimentCatalog().add(ExpCatChildDataType.PROCESS_ERROR, errorModel, getProcessId()); - } catch (RegistryException e) { - String msg = "expId: " + getExperimentId() + " processId: " + getProcessId() - + " : - Error while updating process errors"; - throw new Exception(msg, e); - } - } - - public void saveTaskError(ErrorModel errorModel) throws Exception { - try { - errorModel.setErrorId(AiravataUtils.getId("TASK_ERROR")); - getExperimentCatalog().add(ExpCatChildDataType.TASK_ERROR, errorModel, getTaskId()); - } catch (RegistryException e) { - String msg = "expId: " + getExperimentId() + " processId: " + getProcessId() + " taskId: " + getTaskId() - + " : - Error while updating task errors"; - throw new Exception(msg, e); - } - } - public void saveJobModel(JobModel jobModel) throws RegistryException { getExperimentCatalog().add(ExpCatChildDataType.JOB, jobModel, getProcessId()); } - public void saveJobStatus(JobModel jobModel) throws Exception { + public void saveAndPublishJobStatus(JobModel jobModel) throws Exception { try { // first we save job jobModel to the registry for sa and then save the job status. JobStatus jobStatus = null; @@ -213,7 +178,7 @@ public abstract class JobSubmissionTask extends AiravataTask { MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, MessageType.JOB, AiravataUtils.getId (MessageType.JOB.name()), getGatewayId()); msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); - //getStatusPublisher().publish(msgCtx); + getStatusPublisher().publish(msgCtx); } catch (Exception e) { throw new Exception("Error persisting job status " + e.getLocalizedMessage(), e); } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java index e3ae4fa..cea6750 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/LocalJobSubmissionTask.java @@ -58,7 +58,7 @@ public class LocalJobSubmissionTask extends JobSubmissionTask { jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); jobModel.setJobStatuses(Arrays.asList(jobStatus)); - saveJobStatus(jobModel); + saveAndPublishJobStatus(jobModel); jobModel.setExitCode(submissionOutput.getExitCode()); jobModel.setStdErr(submissionOutput.getStdErr()); @@ -69,7 +69,7 @@ public class LocalJobSubmissionTask extends JobSubmissionTask { jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); jobModel.setJobStatuses(Arrays.asList(jobStatus)); - saveJobStatus(jobModel); + saveAndPublishJobStatus(jobModel); return null; } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java index 383fe37..07a9aee 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java @@ -1,26 +1,33 @@ package org.apache.airavata.helix.impl.workflow; +import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.helix.core.OutPort; -import org.apache.airavata.helix.impl.task.AiravataTask; -import org.apache.airavata.helix.impl.task.EnvSetupTask; -import org.apache.airavata.helix.impl.task.InputDataStagingTask; -import org.apache.airavata.helix.impl.task.OutputDataStagingTask; +import org.apache.airavata.helix.impl.task.*; import org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask; import org.apache.airavata.helix.impl.task.submission.task.JobSubmissionTask; import org.apache.airavata.helix.workflow.WorkflowManager; import org.apache.airavata.job.monitor.kafka.JobStatusResultDeserializer; import org.apache.airavata.job.monitor.parser.JobStatusResult; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessagingFactory; +import org.apache.airavata.messaging.core.Publisher; +import org.apache.airavata.messaging.core.Type; +import org.apache.airavata.messaging.core.impl.RabbitMQPublisher; import org.apache.airavata.model.experiment.ExperimentModel; +import org.apache.airavata.model.job.JobModel; +import org.apache.airavata.model.messaging.event.JobIdentifier; +import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; +import org.apache.airavata.model.messaging.event.MessageType; import org.apache.airavata.model.process.ProcessModel; import org.apache.airavata.model.status.JobState; +import org.apache.airavata.model.status.JobStatus; import org.apache.airavata.model.task.TaskModel; import org.apache.airavata.model.task.TaskTypes; import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; -import org.apache.airavata.registry.cpi.AppCatalog; -import org.apache.airavata.registry.cpi.ExperimentCatalog; -import org.apache.airavata.registry.cpi.ExperimentCatalogModelType; +import org.apache.airavata.registry.cpi.*; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -46,6 +53,7 @@ public class PostWorkflowManager { private final String TOPIC = "parsed-data"; private CuratorFramework curatorClient = null; + private Publisher statusPublisher; private void init() throws ApplicationSettingsException { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); @@ -66,6 +74,18 @@ public class PostWorkflowManager { return consumer; } + private String getExperimentIdByJobId(String jobId) throws Exception { + byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/experiment"); + String process = new String(processBytes); + return process; + } + + private String getTaskIdByJobId(String jobId) throws Exception { + byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/task"); + String process = new String(processBytes); + return process; + } + private String getProcessIdByJobId(String jobId) throws Exception { byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/process"); String process = new String(processBytes); @@ -101,6 +121,8 @@ public class PostWorkflowManager { if (hasMonitoringRegistered(jobStatusResult.getJobId())) { String gateway = getGatewayByJobId(jobStatusResult.getJobId()); String processId = getProcessIdByJobId(jobStatusResult.getJobId()); + String experimentId = getExperimentIdByJobId(jobStatusResult.getJobId()); + String task = getTaskIdByJobId(jobStatusResult.getJobId()); String status = getStatusByJobId(jobStatusResult.getJobId()); logger.info("Starting the post workflow for job id : " + jobStatusResult.getJobId() + " with process id " @@ -111,6 +133,8 @@ public class PostWorkflowManager { } else { + saveAndPublishJobStatus(jobStatusResult.getJobId(), task, processId, experimentId, gateway, jobStatusResult.getState()); + if (jobStatusResult.getState() == JobState.COMPLETE) { logger.info("Job " + jobStatusResult.getJobId() + " was completed"); @@ -151,6 +175,14 @@ public class PostWorkflowManager { } } } + + CompletingTask completingTask = new CompletingTask(); + completingTask.setGatewayId(experimentModel.getGatewayId()); + completingTask.setExperimentId(experimentModel.getExperimentId()); + completingTask.setProcessId(processModel.getProcessId()); + completingTask.setTaskId("Completing-Task"); + allTasks.add(completingTask); + WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster", "wm-23", ServerSettings.getZookeeperConnection()); @@ -189,6 +221,48 @@ public class PostWorkflowManager { } } + public void saveAndPublishJobStatus(String jobId, String taskId, String processId, String experimentId, String gateway, + JobState jobState) throws Exception { + try { + + JobStatus jobStatus = new JobStatus(); + jobStatus.setReason(jobState.name()); + jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + jobStatus.setJobState(jobState); + + if (jobStatus.getTimeOfStateChange() == 0 || jobStatus.getTimeOfStateChange() > 0 ) { + jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + } else { + jobStatus.setTimeOfStateChange(jobStatus.getTimeOfStateChange()); + } + + CompositeIdentifier ids = new CompositeIdentifier(taskId, jobId); + ExperimentCatalog experimentCatalog = RegistryFactory.getExperimentCatalog(gateway); + experimentCatalog.add(ExpCatChildDataType.JOB_STATUS, jobStatus, ids); + JobIdentifier identifier = new JobIdentifier(jobId, taskId, + processId, experimentId, gateway); + + JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(jobStatus.getJobState(), identifier); + MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, MessageType.JOB, AiravataUtils.getId + (MessageType.JOB.name()), gateway); + msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + getStatusPublisher().publish(msgCtx); + } catch (Exception e) { + throw new Exception("Error persisting job status " + e.getLocalizedMessage(), e); + } + } + + public Publisher getStatusPublisher() throws AiravataException { + if (statusPublisher == null) { + synchronized (RabbitMQPublisher.class) { + if (statusPublisher == null) { + statusPublisher = MessagingFactory.getPublisher(Type.STATUS); + } + } + } + return statusPublisher; + } + public static void main(String[] args) throws Exception { PostWorkflowManager postManager = new PostWorkflowManager(); diff --git a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java index 748a533..9f6d7b8 100644 --- a/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java +++ b/modules/job-monitor/src/main/java/org/apache/airavata/job/monitor/kafka/MessageProducer.java @@ -32,5 +32,6 @@ public class MessageProducer { public void submitMessageToQueue(JobStatusResult jobStatusResult) throws ExecutionException, InterruptedException { final ProducerRecord<String, JobStatusResult> record = new ProducerRecord<>(TOPIC, jobStatusResult); RecordMetadata recordMetadata = producer.send(record).get(); + producer.flush(); } } -- To stop receiving notification emails like this one, please contact [email protected].
