Repository: airavata
Updated Branches:
  refs/heads/master 545e75344 -> 4792eac6e


Fixed edge case issues with cancel & recovery


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4792eac6
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4792eac6
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4792eac6

Branch: refs/heads/master
Commit: 4792eac6e173c9ec81bab4f510ebb45da341a3ad
Parents: 545e753
Author: Shameera Rathnayaka <[email protected]>
Authored: Fri Jan 8 13:36:44 2016 -0500
Committer: Shameera Rathnayaka <[email protected]>
Committed: Fri Jan 8 13:36:44 2016 -0500

----------------------------------------------------------------------
 .../airavata/gfac/impl/GFacEngineImpl.java      | 28 +++++++++------
 .../apache/airavata/gfac/impl/GFacWorker.java   | 36 +++++++++++++++-----
 .../gfac/monitor/email/EmailBasedMonitor.java   | 18 ++++++----
 3 files changed, 56 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/4792eac6/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 00d920d..f264e6c 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -215,7 +215,7 @@ public class GFacEngineImpl implements GFacEngine {
 
     private void executeTaskListFrom(ProcessContext processContext, String 
startingTaskId) throws GFacException {
         // checkpoint
-        if (processContext.isInterrupted()) {
+        if (processContext.isInterrupted() && processContext.getProcessState() 
!= ProcessState.MONITORING) {
             GFacUtils.handleProcessInterrupt(processContext);
             return;
         }
@@ -552,7 +552,12 @@ public class GFacEngineImpl implements GFacEngine {
                 cancelJobSubmission(processContext, rTaskId, pTaskId);
             }
             continueProcess(processContext, recoverTaskId);
+        } else {
+            log.error("expId: {}, processId: {}, Error while recovering 
process, couldn't find recovery task",
+                    processContext.getExperimentId(), 
processContext.getProcessId());
         }
+
+
     }
 
     private void cancelJobSubmission(ProcessContext processContext, String 
rTaskId, String pTaskId) {
@@ -577,12 +582,17 @@ public class GFacEngineImpl implements GFacEngine {
 
                 if (jobModels != null && !jobModels.isEmpty()) {
                     JobModel jobModel = (JobModel) 
jobModels.get(jobModels.size() - 1);
-                    processContext.setJobModel(jobModel);
-                    log.info("expId: {}, processId: {}, Canceling jobId {}", 
processContext.getExperimentId(),
-                            processContext.getProcessId(), 
jobModel.getJobId());
-                    cancelProcess(processContext);
-                    log.info("expId: {}, processId: {}, Canceled jobId {}", 
processContext.getExperimentId(),
-                            processContext.getProcessId(), 
jobModel.getJobId());
+                    if (jobModel.getJobId() != null) {
+                        processContext.setJobModel(jobModel);
+                        log.info("expId: {}, processId: {}, Canceling jobId 
{}", processContext.getExperimentId(),
+                                processContext.getProcessId(), 
jobModel.getJobId());
+                        cancelProcess(processContext);
+                        log.info("expId: {}, processId: {}, Canceled jobId 
{}", processContext.getExperimentId(),
+                                processContext.getProcessId(), 
jobModel.getJobId());
+                    } else {
+                        log.error("expId: {}, processId: {}, Couldn't find 
jobId in jobModel, aborting process recovery",
+                                processContext.getExperimentId(), 
processContext.getProcessId());
+                    }
                 }
             } catch (GFacException e) {
                 log.error("expId: {}, processId: {}, Error while canceling 
process which is in recovery mode",
@@ -606,10 +616,6 @@ public class GFacEngineImpl implements GFacEngine {
 
     @Override
     public void continueProcess(ProcessContext processContext, String taskId) 
throws GFacException {
-        if (processContext.isInterrupted()) {
-            GFacUtils.handleProcessInterrupt(processContext);
-            return;
-        }
         executeTaskListFrom(processContext, taskId);
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/4792eac6/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index e0664a5..fd6dad3 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -29,6 +29,7 @@ import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.ProcessStatus;
+import org.apache.airavata.registry.core.experiment.catalog.model.Process;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +54,7 @@ public class GFacWorker implements Runnable {
         */
        public GFacWorker(ProcessContext processContext) throws GFacException {
                if (processContext == null) {
-                       throw new GFacException("Worker must initialize with 
valide processContext, Process context is null");
+                       throw new GFacException("Worker must initialize with 
valid processContext, Process context is null");
                }
                this.processId = processContext.getProcessId();
                this.gatewayId = processContext.getGatewayId();
@@ -78,7 +79,7 @@ public class GFacWorker implements Runnable {
        @Override
        public void run() {
                try {
-                       ProcessState processState = 
processContext.getProcessStatus().getState();
+                       ProcessState processState = 
processContext.getProcessState();
                        switch (processState) {
                                case CREATED:
                                case VALIDATED:
@@ -101,6 +102,9 @@ public class GFacWorker implements Runnable {
                                case COMPLETED:
                                        completeProcess();
                                        break;
+                               case CANCELLING:
+                                       cancelProcess();
+                                       break;
                                case CANCELED:
                                        // TODO - implement cancel scenario
                                        break;
@@ -111,12 +115,18 @@ public class GFacWorker implements Runnable {
                                        throw new GFacException("process Id : " 
+ processId + " Couldn't identify process type");
                        }
                        if (processContext.isCancel()) {
-                               if (processContext.getProcessState() == 
ProcessState.MONITORING
-                                               || 
processContext.getProcessState() == ProcessState.EXECUTING) {
-                                       // don't send ack if the process is in 
MONITORING state, wait until cancel email comes to airavata.
-                               } else {
-                                       sendAck();
-                                       
Factory.getGfacContext().removeProcess(processContext.getProcessId());
+                               processState = processContext.getProcessState();
+                               switch (processState) {
+                                       case MONITORING: case EXECUTING:
+                                               // don't send ack if the 
process is in MONITORING or EXECUTING states, wait until cancel email comes to 
airavata
+                                               break;
+                                       case CANCELLING:
+                                               cancelProcess();
+                                               break;
+                                       default:
+                                               sendAck();
+                                               
Factory.getGfacContext().removeProcess(processContext.getProcessId());
+                                               break;
                                }
                        }
                } catch (GFacException e) {
@@ -143,6 +153,16 @@ public class GFacWorker implements Runnable {
                }
        }
 
+       private void cancelProcess() throws GFacException {
+               // do cleanup works before cancel the process.
+               ProcessStatus processStatus = new 
ProcessStatus(ProcessState.CANCELED);
+               processStatus.setReason("Process cancellation has been 
triggered");
+               processContext.setProcessStatus(processStatus);
+               GFacUtils.saveAndPublishProcessStatus(processContext);
+               sendAck();
+               
Factory.getGfacContext().removeProcess(processContext.getProcessId());
+       }
+
        private void completeProcess() throws GFacException {
         ProcessStatus status = new ProcessStatus(ProcessState.COMPLETED);
         
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());

http://git-wip-us.apache.org/repos/asf/airavata/blob/4792eac6/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index 144465b..c7a6875 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -35,10 +35,7 @@ import org.apache.airavata.gfac.core.monitor.JobStatusResult;
 import org.apache.airavata.gfac.impl.GFacWorker;
 import 
org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
 import org.apache.airavata.model.job.JobModel;
-import org.apache.airavata.model.status.JobState;
-import org.apache.airavata.model.status.JobStatus;
-import org.apache.airavata.model.status.TaskState;
-import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.status.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -333,7 +330,8 @@ public class EmailBasedMonitor implements JobMonitor, 
Runnable{
         // TODO : update job state on process context
         boolean runOutflowTasks = false;
         JobStatus jobStatus = new JobStatus();
-        JobModel jobModel = 
taskContext.getParentProcessContext().getJobModel();
+        ProcessContext parentProcessContext = 
taskContext.getParentProcessContext();
+        JobModel jobModel = parentProcessContext.getJobModel();
         String jobDetails = "JobName : " + jobStatusResult.getJobName() + ", 
JobId : " + jobStatusResult.getJobId();
         // TODO - Handle all other valid JobStates
         if (resultState == JobState.COMPLETE) {
@@ -374,7 +372,7 @@ public class EmailBasedMonitor implements JobMonitor, 
Runnable{
                    try {
                            jobModel.setJobStatus(jobStatus);
                            log.info("[EJM]: Publishing status changes to amqp. 
" + jobDetails);
-                           
GFacUtils.saveJobStatus(taskContext.getParentProcessContext(), jobModel);
+                           GFacUtils.saveJobStatus(parentProcessContext, 
jobModel);
                    } catch (GFacException e) {
                            log.error("expId: {}, processId: {}, taskId: {}, 
jobId: {} :- Error while save and publishing Job " +
                         "status {}", taskContext.getExperimentId(), 
taskContext.getProcessId(), jobModel
@@ -390,7 +388,13 @@ public class EmailBasedMonitor implements JobMonitor, 
Runnable{
                 taskStatus.setReason("Job monitoring completed with final 
state: " + TaskState.COMPLETED.name());
                 taskContext.setTaskStatus(taskStatus);
                 GFacUtils.saveAndPublishTaskStatus(taskContext);
-                       
GFacThreadPoolExecutor.getCachedThreadPool().execute(new 
GFacWorker(taskContext.getParentProcessContext()));
+                if (parentProcessContext.isCancel()) {
+                    ProcessStatus processStatus = new 
ProcessStatus(ProcessState.CANCELLING);
+                    processStatus.setReason("Process has been cancelled");
+                    parentProcessContext.setProcessStatus(processStatus);
+                    
GFacUtils.saveAndPublishProcessStatus(parentProcessContext);
+                }
+                       
GFacThreadPoolExecutor.getCachedThreadPool().execute(new 
GFacWorker(parentProcessContext));
                } catch (GFacException e) {
                        log.info("[EJM]: Error while running output tasks", e);
                }

Reply via email to