This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/develop by this push: new f912d39 In an experiment cancel request, marking the experiment is cancelled once the job is cancelled, completed or failed f912d39 is described below commit f912d39d37e85d0ac9b3a5c4a027714d17e208f2 Author: dimuthu <dimuthu.upeks...@gmail.com> AuthorDate: Mon Apr 9 19:20:51 2018 -0400 In an experiment cancel request, marking the experiment is cancelled once the job is cancelled, completed or failed --- .../airavata/helix/impl/task/AiravataTask.java | 1 + .../impl/task/cancel/CancelCompletingTask.java | 11 +++- .../task/cancel/RemoteJobCancellationTask.java | 58 ++++++++++++++++++---- .../helix/impl/workflow/PostWorkflowManager.java | 19 ++++++- .../helix/impl/workflow/PreWorkflowManager.java | 30 ++--------- .../helix/impl/workflow/WorkflowManager.java | 35 +++++++++++++ 6 files changed, 116 insertions(+), 38 deletions(-) diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java index 6eb6456..c108bd3 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java @@ -81,6 +81,7 @@ public abstract class AiravataTask extends AbstractTask { private boolean skipTaskStatusPublish = false; protected TaskResult onSuccess(String message) { + logger.info(message); if (!skipTaskStatusPublish) { publishTaskState(TaskState.COMPLETED); } diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java index 308cc0e..c8c39a7 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java @@ -17,7 +17,16 @@ public class CancelCompletingTask extends AiravataTask { public TaskResult onRun(TaskHelper helper, TaskContext taskContext) { logger.info("Starting cancel completing task for task " + getTaskId() + ", experiment id " + getExperimentId()); logger.info("Process " + getProcessId() + " successfully cancelled"); - saveAndPublishProcessStatus(ProcessState.CANCELED); + String cancelled = getContextVariable(RemoteJobCancellationTask.JOB_ALREADY_CANCELLED_OR_NOT_AVAILABLE); + + if ("true".equals(cancelled)) { + // make the experiment state as cancelled if it is already being cancelled or similar state. + // Otherwise wait for the post workflow to cancel the experiment + logger.info("Making process as cancelled as the job is already being cancelled or not available"); + saveAndPublishProcessStatus(ProcessState.CANCELED); + } else { + logger.info("Not updating process as cancelled as the job is not cancelled yet"); + } return onSuccess("Process " + getProcessId() + " successfully cancelled"); } diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java index a4aa2ac..3c5803e 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java @@ -9,6 +9,7 @@ import org.apache.airavata.helix.impl.task.submission.config.JobManagerConfigura import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo; import org.apache.airavata.helix.task.api.TaskHelper; import org.apache.airavata.helix.task.api.annotation.TaskDef; +import org.apache.airavata.model.status.JobStatus; import org.apache.helix.HelixManager; import org.apache.helix.task.TaskResult; import org.slf4j.Logger; @@ -21,6 +22,8 @@ public class RemoteJobCancellationTask extends AiravataTask { private final static Logger logger = LoggerFactory.getLogger(RemoteJobCancellationTask.class); + public static final String JOB_ALREADY_CANCELLED_OR_NOT_AVAILABLE = "job-already-cancelled"; + @Override public void init(HelixManager manager, String workflowName, String jobName, String taskName) { super.init(manager, workflowName, jobName, taskName); @@ -35,6 +38,7 @@ public class RemoteJobCancellationTask extends AiravataTask { logger.info("Fetching jobs for process " + getProcessId()); if (jobs == null || jobs.size() == 0) { + setContextVariable(JOB_ALREADY_CANCELLED_OR_NOT_AVAILABLE, "true"); return onSuccess("Can not find running jobs for process " + getProcessId()); } @@ -48,29 +52,63 @@ public class RemoteJobCancellationTask extends AiravataTask { getTaskContext().getJobSubmissionProtocol(), getTaskContext().getPreferredJobSubmissionInterface())); + AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor( + getTaskContext().getGatewayId(), + getTaskContext().getComputeResourceId(), + getTaskContext().getJobSubmissionProtocol(), + getTaskContext().getComputeResourceCredentialToken(), + getTaskContext().getComputeResourceLoginUserName()); + for (String jobId : jobs) { try { + logger.info("Fetching current job status for job id " + jobId); + RawCommandInfo monitorCommand = jobManagerConfiguration.getMonitorCommand(jobId); + + CommandOutput jobMonitorOutput = adaptor.executeCommand(monitorCommand.getRawCommand(), null); + + if (jobMonitorOutput.getExitCode() == 0) { + JobStatus jobStatus = jobManagerConfiguration.getParser().parseJobStatus(jobId, jobMonitorOutput.getStdOut()); + if (jobStatus != null) { + logger.info("Job " + jobId + " state is " + jobStatus.getJobState().name()); + switch (jobStatus.getJobState()) { + case COMPLETE: + case CANCELED: + case SUSPENDED: + case FAILED: + // if the job already is in above states, there is no use of trying cancellation + // setting context variable to be used in the Cancel Completing Task + setContextVariable(JOB_ALREADY_CANCELLED_OR_NOT_AVAILABLE, "true"); + return onSuccess("Job already is in a saturated state"); + } + } else { + logger.warn("Job status for job " + jobId + " is null. Std out " + jobMonitorOutput.getStdOut() + + ". Std err " + jobMonitorOutput.getStdError() + ". Job monitor command " + monitorCommand.getRawCommand()); + } + } else { + logger.warn("Error while fetching the job " + jobId + " status. Std out " + jobMonitorOutput.getStdOut() + + ". Std err " + jobMonitorOutput.getStdError() + ". Job monitor command " + monitorCommand.getRawCommand()); + } + } catch (Exception e) { + logger.error("Unknown error while fetching the job status but continuing..", e); + } + + try { logger.info("Cancelling job " + jobId + " of process " + getProcessId()); RawCommandInfo cancelCommand = jobManagerConfiguration.getCancelCommand(jobId); logger.info("Command to cancel the job " + jobId + " : " + cancelCommand.getRawCommand()); - AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor( - getTaskContext().getGatewayId(), - getTaskContext().getComputeResourceId(), - getTaskContext().getJobSubmissionProtocol(), - getTaskContext().getComputeResourceCredentialToken(), - getTaskContext().getComputeResourceLoginUserName()); + logger.info("Running cancel command on compute host"); - CommandOutput commandOutput = adaptor.executeCommand(cancelCommand.getRawCommand(), null); + CommandOutput jobCancelOutput = adaptor.executeCommand(cancelCommand.getRawCommand(), null); - if (commandOutput.getExitCode() != 0) { + if (jobCancelOutput.getExitCode() != 0) { logger.error("Failed to execute job cancellation command for job " + jobId + " Sout : " + - commandOutput.getStdOut() + ", Serr : " + commandOutput.getStdError()); + jobCancelOutput.getStdOut() + ", Serr : " + jobCancelOutput.getStdError()); return onFail("Failed to execute job cancellation command for job " + jobId + " Sout : " + - commandOutput.getStdOut() + ", Serr : " + commandOutput.getStdError(), true, null); + jobCancelOutput.getStdOut() + ", Serr : " + jobCancelOutput.getStdError(), true, null); } } catch (Exception ex) { logger.error("Unknown error while canceling job " + jobId + " of process " + getProcessId()); diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java index c8a3f8d..7c85d31 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java @@ -28,6 +28,8 @@ import org.apache.airavata.helix.impl.task.*; import org.apache.airavata.helix.impl.task.completing.CompletingTask; import org.apache.airavata.helix.impl.task.staging.ArchiveTask; import org.apache.airavata.helix.impl.task.staging.OutputDataStagingTask; +import org.apache.airavata.model.status.ProcessState; +import org.apache.airavata.model.status.ProcessStatus; import org.apache.airavata.monitor.JobStateValidator; import org.apache.airavata.monitor.JobStatusResult; import org.apache.airavata.monitor.kafka.JobStatusResultDeserializer; @@ -196,8 +198,21 @@ public class PostWorkflowManager extends WorkflowManager { // TODO get cluster lock before that if ("cancel".equals(processStatus)) { - logger.info("Cancelled post workflow for process " + processId); - // TODO to be implemented + logger.info("Cancelled post workflow for process " + processId + " in experiment " + experimentId); + // This will mark an cancelling Experiment into a cancelled status for a set of valid job statuses + switch (jobStatusResult.getState()) { + case FAILED: + case SUSPENDED: + case CANCELED: + case COMPLETE: + logger.info("Job " + jobStatusResult.getJobId() + " status is " + jobStatusResult.getState() + + " so marking experiment " + experimentId + " as cancelled" ); + publishProcessStatus(processId, experimentId, gateway, ProcessState.CANCELED); + break; + default: + logger.warn("Job " + jobStatusResult.getJobId() + " status " + jobStatusResult.getState() + + " is invalid to mark experiment " + experimentId + " as cancelled"); + } } else { if (jobStatusResult.getState() == JobState.COMPLETE || jobStatusResult.getState() == JobState.FAILED) { diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java index 78aa1a6..dce98c3 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java @@ -253,20 +253,21 @@ public class PreWorkflowManager extends WorkflowManager { } String processId = event.getProcessId(); + String experimentId = event.getExperimentId(); String gateway = event.getGatewayId(); - logger.info("Received process launch message for process " + processId + " in gateway " + gateway); + logger.info("Received process launch message for process " + processId + " of experiment " + experimentId + " in gateway " + gateway); try { - logger.info("Launching the pre workflow for process " + processId + " in gateway " + gateway); + logger.info("Launching the pre workflow for process " + processId + " of experiment " + experimentId + " in gateway " + gateway); String workflowName = createAndLaunchPreWorkflow(processId); - logger.info("Completed launching the pre workflow " + workflowName + " for process " + processId + " in gateway " + gateway); + logger.info("Completed launching the pre workflow " + workflowName + " for process" + processId + " of experiment " + experimentId + " in gateway " + gateway); // updating the process status ProcessStatus status = new ProcessStatus(); status.setState(ProcessState.STARTED); status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); - publishProcessStatus(event, status); + publishProcessStatus(processId, experimentId, gateway, ProcessState.STARTED); subscriber.sendAck(messageContext.getDeliveryTag()); } catch (Exception e) { logger.error("Failed to launch the pre workflow for process " + processId + " in gateway " + gateway, e); @@ -305,25 +306,4 @@ public class PreWorkflowManager extends WorkflowManager { } } } - - private void publishProcessStatus(ProcessSubmitEvent event, ProcessStatus status) throws AiravataException { - - RegistryService.Client registryClient = getRegistryClientPool().getResource(); - - try { - registryClient.updateProcessStatus(status, event.getProcessId()); - getRegistryClientPool().returnResource(registryClient); - - } catch (Exception e) { - logger.error("Failed to update process status " + event.getProcessId(), e); - getRegistryClientPool().returnBrokenResource(registryClient); - } - - ProcessIdentifier identifier = new ProcessIdentifier(event.getProcessId(), event.getExperimentId(), event.getGatewayId()); - ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(status.getState(), identifier); - MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS, - AiravataUtils.getId(MessageType.PROCESS.name()), event.getGatewayId()); - msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); - getStatusPublisher().publish(msgCtx); - } } diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java index c8fe1be..7413ad5 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java @@ -2,12 +2,19 @@ package org.apache.airavata.helix.impl.workflow; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftClientPool; import org.apache.airavata.helix.workflow.WorkflowOperator; +import org.apache.airavata.messaging.core.MessageContext; import org.apache.airavata.messaging.core.MessagingFactory; import org.apache.airavata.messaging.core.Publisher; import org.apache.airavata.messaging.core.Type; +import org.apache.airavata.model.messaging.event.MessageType; +import org.apache.airavata.model.messaging.event.ProcessIdentifier; +import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent; +import org.apache.airavata.model.status.ProcessState; +import org.apache.airavata.model.status.ProcessStatus; import org.apache.airavata.registry.api.RegistryService; import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.curator.RetryPolicy; @@ -17,6 +24,8 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Calendar; + public class WorkflowManager { private final static Logger logger = LoggerFactory.getLogger(WorkflowManager.class); @@ -86,4 +95,30 @@ public class WorkflowManager { public ThriftClientPool<RegistryService.Client> getRegistryClientPool() { return registryClientPool; } + + public void publishProcessStatus(String processId, String experimentId, String gatewayId, ProcessState state) + throws AiravataException { + + ProcessStatus status = new ProcessStatus(); + status.setState(state); + status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); + + RegistryService.Client registryClient = getRegistryClientPool().getResource(); + + try { + registryClient.updateProcessStatus(status, processId); + getRegistryClientPool().returnResource(registryClient); + + } catch (Exception e) { + logger.error("Failed to update process status " + processId, e); + getRegistryClientPool().returnBrokenResource(registryClient); + } + + ProcessIdentifier identifier = new ProcessIdentifier(processId, experimentId, gatewayId); + ProcessStatusChangeEvent processStatusChangeEvent = new ProcessStatusChangeEvent(status.getState(), identifier); + MessageContext msgCtx = new MessageContext(processStatusChangeEvent, MessageType.PROCESS, + AiravataUtils.getId(MessageType.PROCESS.name()), gatewayId); + msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + getStatusPublisher().publish(msgCtx); + } } -- To stop receiving notification emails like this one, please contact dimuthu...@apache.org.