Repository: airavata Updated Branches: refs/heads/master 34a840108 -> e4be39e81
refactored , submitJob and relaunch methods logic with states. Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4819dbb2 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4819dbb2 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4819dbb2 Branch: refs/heads/master Commit: 4819dbb2c4f05f8ec4790bcbda641556e19944d1 Parents: 7023991 Author: shamrath <[email protected]> Authored: Sun May 10 11:10:41 2015 -0400 Committer: shamrath <[email protected]> Committed: Sun May 10 11:10:41 2015 -0400 ---------------------------------------------------------------------- .../airavata/gfac/core/cpi/BetterGfacImpl.java | 109 +++++++++++-------- 1 file changed, 65 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/4819dbb2/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index fcb1394..98ba942 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -494,7 +494,7 @@ public class BetterGfacImpl implements GFac,Watcher { String experimentEntry = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), zk); Stat exists = zk.exists(experimentEntry + File.separator + "operation", false); zk.getData(experimentEntry + File.separator + "operation", this, exists); - int stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment + GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(zk, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) , GfacExperimentState.ACCEPTED)); // immediately we get the request we update the status String workflowInstanceID = null; @@ -505,17 +505,17 @@ public class BetterGfacImpl implements GFac,Watcher { } // Register log event listener. This is required in all scenarios. jobExecutionContext.getNotificationService().registerListener(new LoggingListener()); - if (stateVal < 2) { + if (isNewJob(gfacExpState)) { // In this scenario We do everything from the beginning launch(jobExecutionContext); - } else if (stateVal >= 8) { + } else if (isCompletedJob(gfacExpState)) { log.info("There is nothing to recover in this job so we do not re-submit"); ZKUtil.deleteRecursive(zk, AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID())); } else { // Now we know this is an old Job, so we have to handle things gracefully log.info("Re-launching the job in GFac because this is re-submitted to GFac"); - reLaunch(jobExecutionContext, stateVal); + reLaunch(jobExecutionContext, gfacExpState); } return true; } catch (ApplicationSettingsException e) { @@ -530,6 +530,27 @@ public class BetterGfacImpl implements GFac,Watcher { } } + private boolean isCompletedJob(GfacExperimentState gfacExpState) { + switch (gfacExpState) { + case COMPLETED: + case FAILED: + return true; + default: + return false; + } + } + + private boolean isNewJob(GfacExperimentState stateVal) { + switch (stateVal) { + case UNKNOWN: + case LAUNCHED: + case ACCEPTED: + return true; + default: + return false; + } + } + public boolean cancel(String experimentID, String taskID, String gatewayID) throws GFacException { JobExecutionContext jobExecutionContext = null; try { @@ -635,7 +656,7 @@ public class BetterGfacImpl implements GFac,Watcher { } } - private void reLaunch(JobExecutionContext jobExecutionContext, int stateVal) throws GFacException { + private void reLaunch(JobExecutionContext jobExecutionContext, GfacExperimentState state) throws GFacException { // Scheduler will decide the execution flow of handlers and provider // which handles // the job. @@ -648,47 +669,46 @@ public class BetterGfacImpl implements GFac,Watcher { // here we do not skip handler if some handler does not have to be // run again during re-run it can implement // that logic in to the handler - reInvokeInFlowHandlers(jobExecutionContext); - // After executing the in handlers provider instance should be set - // to job execution context. - // We get the provider instance and execute it. - if (stateVal == 2 || stateVal == 3) { - invokeProviderExecute(jobExecutionContext); // provider never ran in - // previous invocation - } else if (stateVal == 4) { // whether sync or async job have to - // invoke the recovering because it - // crashed in the Handler - reInvokeProviderExecute(jobExecutionContext); - } else if (stateVal >= 5 && GFacUtils.isSynchronousMode(jobExecutionContext)) { - // In this case we do nothing because provider ran successfully, - // no need to re-run the job - log.info("Provider does not have to be recovered because it ran successfully for experiment: " + experimentID); - } else if (stateVal == 5 && !GFacUtils.isSynchronousMode(jobExecutionContext)) { - // this is async mode where monitoring of jobs is hapenning, we have to recover - reInvokeProviderExecute(jobExecutionContext); - } else if (stateVal == 6) { - reInvokeOutFlowHandlers(jobExecutionContext); - } else { - log.info("We skip invoking Handler, because the experiment:" + stateVal + " state is beyond the Provider Invocation !!!"); - log.info("ExperimentId: " + experimentID + " taskId: " + jobExecutionContext.getTaskData().getTaskID()); - } + // After executing the in handlers provider instance should be set + // to job execution context. + // We get the provider instance and execute it. + switch (state) { + case INHANDLERSINVOKING: + reInvokeInFlowHandlers(jobExecutionContext); + case INHANDLERSINVOKED: + invokeProviderExecute(jobExecutionContext); + break; + case PROVIDERINVOKING: + reInvokeProviderExecute(jobExecutionContext); + break; + case PROVIDERINVOKED: + // no need to re-run the job + log.info("Provider does not have to be recovered because it ran successfully for experiment: " + experimentID); + if (!GFacUtils.isSynchronousMode(jobExecutionContext)) { + monitorJob(jobExecutionContext); + } else { + // TODO - Need to handle this correctly , for now we will invoke ouput handlers. + invokeOutFlowHandlers(jobExecutionContext); + } + break; + case OUTHANDLERSINVOKING: + reInvokeOutFlowHandlers(jobExecutionContext); + break; + case OUTHANDLERSINVOKED: + case COMPLETED: + case FAILED: + case UNKNOWN: + log.info("All output handlers are invoked successfully, ExperimentId: " + experimentID + " taskId: " + jobExecutionContext.getTaskData().getTaskID()); + break; + default: + throw new GFacException("Un-handled GfacExperimentState : " + state.name()); + } } catch (Exception e) { log.error(e.getMessage(),e); try { // we make the experiment as failed due to exception scenario monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED)); - // monitorPublisher.publish(new - // ExperimentStatusChangedEvent(new - // ExperimentIdentity(jobExecutionContext.getExperimentID()), - // ExperimentState.FAILED)); - // Updating the task status if there's any task associated - // monitorPublisher.publish(new TaskStatusChangedEvent( - // new TaskIdentity(jobExecutionContext.getExperimentID(), - // jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), - // jobExecutionContext.getTaskData().getTaskID()), - // TaskState.FAILED - // )); JobIdentifier jobIdentity = new JobIdentifier( jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), @@ -699,9 +719,6 @@ public class BetterGfacImpl implements GFac,Watcher { } catch (NullPointerException e1) { log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, " + "NullPointerException occurred because at this point there might not have Job Created", e1, e); -// monitorPublisher -// .publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED)); - // Updating the task status if there's any task associated TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getExperimentID(), @@ -716,7 +733,11 @@ public class BetterGfacImpl implements GFac,Watcher { } } - private void launch(JobExecutionContext jobExecutionContext) throws GFacException { + private void monitorJob(JobExecutionContext jobExecutionContext) { + // TODO - Auto generated message. + } + + private void launch(JobExecutionContext jobExecutionContext) throws GFacException { // Scheduler will decide the execution flow of handlers and provider // which handles // the job.
