This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata.git


The following commit(s) were added to refs/heads/develop by this push:
     new f912d39  In an experiment cancel request, marking the experiment is 
cancelled once the job is cancelled, completed or failed
f912d39 is described below

commit f912d39d37e85d0ac9b3a5c4a027714d17e208f2
Author: dimuthu <dimuthu.upeks...@gmail.com>
AuthorDate: Mon Apr 9 19:20:51 2018 -0400

    In an experiment cancel request, marking the experiment is cancelled once 
the job is cancelled, completed or failed
---
 .../airavata/helix/impl/task/AiravataTask.java     |  1 +
 .../impl/task/cancel/CancelCompletingTask.java     | 11 +++-
 .../task/cancel/RemoteJobCancellationTask.java     | 58 ++++++++++++++++++----
 .../helix/impl/workflow/PostWorkflowManager.java   | 19 ++++++-
 .../helix/impl/workflow/PreWorkflowManager.java    | 30 ++---------
 .../helix/impl/workflow/WorkflowManager.java       | 35 +++++++++++++
 6 files changed, 116 insertions(+), 38 deletions(-)

diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
index 6eb6456..c108bd3 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java
@@ -81,6 +81,7 @@ public abstract class AiravataTask extends AbstractTask {
     private boolean skipTaskStatusPublish = false;
 
     protected TaskResult onSuccess(String message) {
+        logger.info(message);
         if (!skipTaskStatusPublish) {
             publishTaskState(TaskState.COMPLETED);
         }
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java
index 308cc0e..c8c39a7 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java
@@ -17,7 +17,16 @@ public class CancelCompletingTask extends AiravataTask {
     public TaskResult onRun(TaskHelper helper, TaskContext taskContext) {
         logger.info("Starting cancel completing task for task " + getTaskId() 
+ ", experiment id " + getExperimentId());
         logger.info("Process " + getProcessId() + " successfully cancelled");
-        saveAndPublishProcessStatus(ProcessState.CANCELED);
+        String cancelled = 
getContextVariable(RemoteJobCancellationTask.JOB_ALREADY_CANCELLED_OR_NOT_AVAILABLE);
+
+        if ("true".equals(cancelled)) {
+            // make  the experiment state as cancelled if it is already being 
cancelled or similar state.
+            // Otherwise wait for the post workflow to cancel the experiment
+            logger.info("Making process as cancelled as the job is already 
being cancelled or not available");
+            saveAndPublishProcessStatus(ProcessState.CANCELED);
+        } else {
+            logger.info("Not updating process as cancelled as the job is not 
cancelled yet");
+        }
         return onSuccess("Process " + getProcessId() + " successfully 
cancelled");
     }
 
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
index a4aa2ac..3c5803e 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
@@ -9,6 +9,7 @@ import 
org.apache.airavata.helix.impl.task.submission.config.JobManagerConfigura
 import org.apache.airavata.helix.impl.task.submission.config.RawCommandInfo;
 import org.apache.airavata.helix.task.api.TaskHelper;
 import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.status.JobStatus;
 import org.apache.helix.HelixManager;
 import org.apache.helix.task.TaskResult;
 import org.slf4j.Logger;
@@ -21,6 +22,8 @@ public class RemoteJobCancellationTask extends AiravataTask {
 
     private final static Logger logger = 
LoggerFactory.getLogger(RemoteJobCancellationTask.class);
 
+    public static final String JOB_ALREADY_CANCELLED_OR_NOT_AVAILABLE = 
"job-already-cancelled";
+
     @Override
     public void init(HelixManager manager, String workflowName, String 
jobName, String taskName) {
         super.init(manager, workflowName, jobName, taskName);
@@ -35,6 +38,7 @@ public class RemoteJobCancellationTask extends AiravataTask {
             logger.info("Fetching jobs for process " + getProcessId());
 
             if (jobs == null || jobs.size() == 0) {
+                setContextVariable(JOB_ALREADY_CANCELLED_OR_NOT_AVAILABLE, 
"true");
                 return onSuccess("Can not find running jobs for process " + 
getProcessId());
             }
 
@@ -48,29 +52,63 @@ public class RemoteJobCancellationTask extends AiravataTask 
{
                             getTaskContext().getJobSubmissionProtocol(),
                             
getTaskContext().getPreferredJobSubmissionInterface()));
 
+            AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
+                    getTaskContext().getGatewayId(),
+                    getTaskContext().getComputeResourceId(),
+                    getTaskContext().getJobSubmissionProtocol(),
+                    getTaskContext().getComputeResourceCredentialToken(),
+                    getTaskContext().getComputeResourceLoginUserName());
+
             for (String jobId : jobs) {
 
                 try {
+                    logger.info("Fetching current job status for job id " + 
jobId);
+                    RawCommandInfo monitorCommand = 
jobManagerConfiguration.getMonitorCommand(jobId);
+
+                    CommandOutput jobMonitorOutput = 
adaptor.executeCommand(monitorCommand.getRawCommand(), null);
+
+                    if (jobMonitorOutput.getExitCode() == 0) {
+                        JobStatus jobStatus = 
jobManagerConfiguration.getParser().parseJobStatus(jobId, 
jobMonitorOutput.getStdOut());
+                        if (jobStatus != null) {
+                            logger.info("Job " + jobId + " state is " + 
jobStatus.getJobState().name());
+                            switch (jobStatus.getJobState()) {
+                                case COMPLETE:
+                                case CANCELED:
+                                case SUSPENDED:
+                                case FAILED:
+                                    // if the job already is in above states, 
there is no use of trying cancellation
+                                    // setting context variable to be used in 
the Cancel Completing Task
+                                    
setContextVariable(JOB_ALREADY_CANCELLED_OR_NOT_AVAILABLE, "true");
+                                    return onSuccess("Job already is in a 
saturated state");
+                            }
+                        } else {
+                            logger.warn("Job status for job " + jobId + " is 
null. Std out " + jobMonitorOutput.getStdOut() +
+                                    ". Std err " + 
jobMonitorOutput.getStdError() + ". Job monitor command " + 
monitorCommand.getRawCommand());
+                        }
+                    } else {
+                        logger.warn("Error while fetching the job " + jobId + 
" status. Std out " + jobMonitorOutput.getStdOut() +
+                                ". Std err " + jobMonitorOutput.getStdError() 
+ ". Job monitor command " + monitorCommand.getRawCommand());
+                    }
+                } catch (Exception e) {
+                    logger.error("Unknown error while fetching the job status 
but continuing..", e);
+                }
+
+                try {
                     logger.info("Cancelling job " + jobId + " of process " + 
getProcessId());
                     RawCommandInfo cancelCommand = 
jobManagerConfiguration.getCancelCommand(jobId);
 
                     logger.info("Command to cancel the job " + jobId + " : " + 
cancelCommand.getRawCommand());
 
-                    AgentAdaptor adaptor = 
taskHelper.getAdaptorSupport().fetchAdaptor(
-                            getTaskContext().getGatewayId(),
-                            getTaskContext().getComputeResourceId(),
-                            getTaskContext().getJobSubmissionProtocol(),
-                            
getTaskContext().getComputeResourceCredentialToken(),
-                            
getTaskContext().getComputeResourceLoginUserName());
+
 
                     logger.info("Running cancel command on compute host");
-                    CommandOutput commandOutput = 
adaptor.executeCommand(cancelCommand.getRawCommand(), null);
+                    CommandOutput jobCancelOutput = 
adaptor.executeCommand(cancelCommand.getRawCommand(), null);
 
-                    if (commandOutput.getExitCode() != 0) {
+                    if (jobCancelOutput.getExitCode() != 0) {
                         logger.error("Failed to execute job cancellation 
command for job " + jobId + " Sout : " +
-                                commandOutput.getStdOut() + ", Serr : " + 
commandOutput.getStdError());
+                                jobCancelOutput.getStdOut() + ", Serr : " + 
jobCancelOutput.getStdError());
                         return onFail("Failed to execute job cancellation 
command for job " + jobId + " Sout : " +
-                                commandOutput.getStdOut() + ", Serr : " + 
commandOutput.getStdError(), true, null);
+                                jobCancelOutput.getStdOut() + ", Serr : " + 
jobCancelOutput.getStdError(), true, null);
                     }
                 } catch (Exception ex) {
                     logger.error("Unknown error while canceling job " + jobId 
+ " of process " + getProcessId());
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index c8a3f8d..7c85d31 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -28,6 +28,8 @@ import org.apache.airavata.helix.impl.task.*;
 import org.apache.airavata.helix.impl.task.completing.CompletingTask;
 import org.apache.airavata.helix.impl.task.staging.ArchiveTask;
 import org.apache.airavata.helix.impl.task.staging.OutputDataStagingTask;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
 import org.apache.airavata.monitor.JobStateValidator;
 import org.apache.airavata.monitor.JobStatusResult;
 import org.apache.airavata.monitor.kafka.JobStatusResultDeserializer;
@@ -196,8 +198,21 @@ public class PostWorkflowManager extends WorkflowManager {
 
                 // TODO get cluster lock before that
                 if ("cancel".equals(processStatus)) {
-                    logger.info("Cancelled post workflow for process " + 
processId);
-                    // TODO to be implemented
+                    logger.info("Cancelled post workflow for process " + 
processId + " in experiment " + experimentId);
+                    // This will mark an cancelling Experiment into a 
cancelled status for a set of valid job statuses
+                    switch (jobStatusResult.getState()) {
+                        case FAILED:
+                        case SUSPENDED:
+                        case CANCELED:
+                        case COMPLETE:
+                            logger.info("Job " + jobStatusResult.getJobId() + 
" status is " + jobStatusResult.getState() +
+                                    " so marking experiment " + experimentId + 
" as cancelled" );
+                            publishProcessStatus(processId, experimentId, 
gateway, ProcessState.CANCELED);
+                            break;
+                        default:
+                            logger.warn("Job " + jobStatusResult.getJobId() + 
" status " + jobStatusResult.getState() +
+                                    " is invalid to mark experiment " + 
experimentId + " as cancelled");
+                    }
                 } else {
 
                     if (jobStatusResult.getState() == JobState.COMPLETE || 
jobStatusResult.getState() == JobState.FAILED) {
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index 78aa1a6..dce98c3 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -253,20 +253,21 @@ public class PreWorkflowManager extends WorkflowManager {
                 }
 
                 String processId = event.getProcessId();
+                String experimentId = event.getExperimentId();
                 String gateway = event.getGatewayId();
 
-                logger.info("Received process launch message for process " + 
processId + " in gateway " + gateway);
+                logger.info("Received process launch message for process " + 
processId + " of experiment " + experimentId + " in gateway " + gateway);
 
                 try {
-                    logger.info("Launching the pre workflow for process " + 
processId + " in gateway " + gateway);
+                    logger.info("Launching the pre workflow for process " + 
processId + " of experiment " + experimentId + " in gateway " + gateway);
                     String workflowName = 
createAndLaunchPreWorkflow(processId);
-                    logger.info("Completed launching the pre workflow " + 
workflowName + " for process " + processId + " in gateway " + gateway);
+                    logger.info("Completed launching the pre workflow " + 
workflowName + " for process" + processId + " of experiment " + experimentId + 
" in gateway " + gateway);
 
                     // updating the process status
                     ProcessStatus status = new ProcessStatus();
                     status.setState(ProcessState.STARTED);
                     
status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
-                    publishProcessStatus(event, status);
+                    publishProcessStatus(processId, experimentId, gateway, 
ProcessState.STARTED);
                     subscriber.sendAck(messageContext.getDeliveryTag());
                 } catch (Exception e) {
                     logger.error("Failed to launch the pre workflow for 
process " + processId + " in gateway " + gateway, e);
@@ -305,25 +306,4 @@ public class PreWorkflowManager extends WorkflowManager {
             }
         }
     }
-
-    private void publishProcessStatus(ProcessSubmitEvent event, ProcessStatus 
status) throws AiravataException {
-
-        RegistryService.Client registryClient = 
getRegistryClientPool().getResource();
-
-        try {
-            registryClient.updateProcessStatus(status, event.getProcessId());
-            getRegistryClientPool().returnResource(registryClient);
-
-        } catch (Exception e) {
-            logger.error("Failed to update process status " + 
event.getProcessId(), e);
-            getRegistryClientPool().returnBrokenResource(registryClient);
-        }
-
-        ProcessIdentifier identifier = new 
ProcessIdentifier(event.getProcessId(), event.getExperimentId(), 
event.getGatewayId());
-        ProcessStatusChangeEvent processStatusChangeEvent = new 
ProcessStatusChangeEvent(status.getState(), identifier);
-        MessageContext msgCtx = new MessageContext(processStatusChangeEvent, 
MessageType.PROCESS,
-                AiravataUtils.getId(MessageType.PROCESS.name()), 
event.getGatewayId());
-        msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-        getStatusPublisher().publish(msgCtx);
-    }
 }
diff --git 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java
 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java
index c8fe1be..7413ad5 100644
--- 
a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java
+++ 
b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/WorkflowManager.java
@@ -2,12 +2,19 @@ package org.apache.airavata.helix.impl.workflow;
 
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftClientPool;
 import org.apache.airavata.helix.workflow.WorkflowOperator;
+import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessagingFactory;
 import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.messaging.core.Type;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessIdentifier;
+import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.status.ProcessStatus;
 import org.apache.airavata.registry.api.RegistryService;
 import org.apache.commons.pool.impl.GenericObjectPool;
 import org.apache.curator.RetryPolicy;
@@ -17,6 +24,8 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Calendar;
+
 public class WorkflowManager {
 
     private final static Logger logger = 
LoggerFactory.getLogger(WorkflowManager.class);
@@ -86,4 +95,30 @@ public class WorkflowManager {
     public ThriftClientPool<RegistryService.Client> getRegistryClientPool() {
         return registryClientPool;
     }
+
+    public void publishProcessStatus(String processId, String experimentId, 
String gatewayId, ProcessState state)
+            throws AiravataException {
+
+        ProcessStatus status = new ProcessStatus();
+        status.setState(state);
+        status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+
+        RegistryService.Client registryClient = 
getRegistryClientPool().getResource();
+
+        try {
+            registryClient.updateProcessStatus(status, processId);
+            getRegistryClientPool().returnResource(registryClient);
+
+        } catch (Exception e) {
+            logger.error("Failed to update process status " + processId, e);
+            getRegistryClientPool().returnBrokenResource(registryClient);
+        }
+
+        ProcessIdentifier identifier = new ProcessIdentifier(processId, 
experimentId, gatewayId);
+        ProcessStatusChangeEvent processStatusChangeEvent = new 
ProcessStatusChangeEvent(status.getState(), identifier);
+        MessageContext msgCtx = new MessageContext(processStatusChangeEvent, 
MessageType.PROCESS,
+                AiravataUtils.getId(MessageType.PROCESS.name()), gatewayId);
+        msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+        getStatusPublisher().publish(msgCtx);
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
dimuthu...@apache.org.

Reply via email to