Repository: airavata
Updated Branches:
  refs/heads/master 34a840108 -> e4be39e81


refactored , submitJob and relaunch methods logic with states.


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4819dbb2
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4819dbb2
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4819dbb2

Branch: refs/heads/master
Commit: 4819dbb2c4f05f8ec4790bcbda641556e19944d1
Parents: 7023991
Author: shamrath <[email protected]>
Authored: Sun May 10 11:10:41 2015 -0400
Committer: shamrath <[email protected]>
Committed: Sun May 10 11:10:41 2015 -0400

----------------------------------------------------------------------
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 109 +++++++++++--------
 1 file changed, 65 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/4819dbb2/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index fcb1394..98ba942 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -494,7 +494,7 @@ public class BetterGfacImpl implements GFac,Watcher {
             String experimentEntry = 
GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), zk);
             Stat exists = zk.exists(experimentEntry + File.separator + 
"operation", false);
             zk.getData(experimentEntry + File.separator + "operation", this, 
exists);
-            int stateVal = GFacUtils.getZKExperimentStateValue(zk, 
jobExecutionContext);   // this is the original state came, if we query again 
it might be different,so we preserve this state in the environment
+            GfacExperimentState gfacExpState = 
GFacUtils.getZKExperimentState(zk, jobExecutionContext);   // this is the 
original state came, if we query again it might be different,so we preserve 
this state in the environment
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new 
MonitorID(jobExecutionContext)
                     , GfacExperimentState.ACCEPTED));                  // 
immediately we get the request we update the status
             String workflowInstanceID = null;
@@ -505,17 +505,17 @@ public class BetterGfacImpl implements GFac,Watcher {
             }
             // Register log event listener. This is required in all scenarios.
             jobExecutionContext.getNotificationService().registerListener(new 
LoggingListener());
-            if (stateVal < 2) {
+            if (isNewJob(gfacExpState)) {
                 // In this scenario We do everything from the beginning
                 launch(jobExecutionContext);
-            } else if (stateVal >= 8) {
+            } else if (isCompletedJob(gfacExpState)) {
                 log.info("There is nothing to recover in this job so we do not 
re-submit");
                 ZKUtil.deleteRecursive(zk,
                         
AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID()));
             } else {
                 // Now we know this is an old Job, so we have to handle things 
gracefully
                 log.info("Re-launching the job in GFac because this is 
re-submitted to GFac");
-                reLaunch(jobExecutionContext, stateVal);
+                reLaunch(jobExecutionContext, gfacExpState);
             }
             return true;
         } catch (ApplicationSettingsException e) {
@@ -530,6 +530,27 @@ public class BetterGfacImpl implements GFac,Watcher {
         }
     }
 
+    private boolean isCompletedJob(GfacExperimentState gfacExpState) {
+        switch (gfacExpState) {
+            case COMPLETED:
+            case FAILED:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    private boolean isNewJob(GfacExperimentState stateVal) {
+        switch (stateVal) {
+            case UNKNOWN:
+            case LAUNCHED:
+            case ACCEPTED:
+                return true;
+            default:
+                return false;
+        }
+    }
+
     public boolean cancel(String experimentID, String taskID, String 
gatewayID) throws GFacException {
         JobExecutionContext jobExecutionContext = null;
         try {
@@ -635,7 +656,7 @@ public class BetterGfacImpl implements GFac,Watcher {
         }
     }
 
-       private void reLaunch(JobExecutionContext jobExecutionContext, int 
stateVal) throws GFacException {
+       private void reLaunch(JobExecutionContext jobExecutionContext, 
GfacExperimentState state) throws GFacException {
                // Scheduler will decide the execution flow of handlers and 
provider
                // which handles
                // the job.
@@ -648,47 +669,46 @@ public class BetterGfacImpl implements GFac,Watcher {
                        // here we do not skip handler if some handler does not 
have to be
                        // run again during re-run it can implement
                        // that logic in to the handler
-                       reInvokeInFlowHandlers(jobExecutionContext);
 
-                       // After executing the in handlers provider instance 
should be set
-                       // to job execution context.
-                       // We get the provider instance and execute it.
-                       if (stateVal == 2 || stateVal == 3) {
-                               invokeProviderExecute(jobExecutionContext); // 
provider never ran in
-                                                                               
                                // previous invocation
-                       } else if (stateVal == 4) { // whether sync or async 
job have to
-                                                                               
// invoke the recovering because it
-                                                                               
// crashed in the Handler
-                               reInvokeProviderExecute(jobExecutionContext);
-                       } else if (stateVal >= 5 && 
GFacUtils.isSynchronousMode(jobExecutionContext)) {
-                               // In this case we do nothing because provider 
ran successfully,
-                               // no need to re-run the job
-                               log.info("Provider does not have to be 
recovered because it ran successfully for experiment: " + experimentID);
-                       } else if (stateVal == 5 && 
!GFacUtils.isSynchronousMode(jobExecutionContext)) {
-                               // this is async mode where monitoring of jobs 
is hapenning, we  have to recover
-                               reInvokeProviderExecute(jobExecutionContext);
-                       } else if (stateVal == 6) {
-                               reInvokeOutFlowHandlers(jobExecutionContext);
-                       } else {
-                               log.info("We skip invoking Handler, because the 
experiment:" + stateVal + " state is beyond the Provider Invocation !!!");
-                               log.info("ExperimentId: " + experimentID + " 
taskId: " + jobExecutionContext.getTaskData().getTaskID());
-                       }
+            // After executing the in handlers provider instance should be set
+            // to job execution context.
+            // We get the provider instance and execute it.
+            switch (state) {
+                case INHANDLERSINVOKING:
+                    reInvokeInFlowHandlers(jobExecutionContext);
+                case INHANDLERSINVOKED:
+                    invokeProviderExecute(jobExecutionContext);
+                    break;
+                case PROVIDERINVOKING:
+                    reInvokeProviderExecute(jobExecutionContext);
+                    break;
+                case PROVIDERINVOKED:
+                    // no need to re-run the job
+                    log.info("Provider does not have to be recovered because 
it ran successfully for experiment: " + experimentID);
+                    if (!GFacUtils.isSynchronousMode(jobExecutionContext)) {
+                        monitorJob(jobExecutionContext);
+                    } else {
+                        // TODO - Need to handle this correctly , for now we 
will invoke ouput handlers.
+                        invokeOutFlowHandlers(jobExecutionContext);
+                    }
+                    break;
+                case OUTHANDLERSINVOKING:
+                    reInvokeOutFlowHandlers(jobExecutionContext);
+                    break;
+                case OUTHANDLERSINVOKED:
+                case COMPLETED:
+                case FAILED:
+                case UNKNOWN:
+                    log.info("All output handlers are invoked successfully, 
ExperimentId: " + experimentID + " taskId: " + 
jobExecutionContext.getTaskData().getTaskID());
+                    break;
+                default:
+                    throw new GFacException("Un-handled GfacExperimentState : 
" + state.name());
+            }
                } catch (Exception e) {
             log.error(e.getMessage(),e);
                        try {
                                // we make the experiment as failed due to 
exception scenario
                                monitorPublisher.publish(new 
GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), 
GfacExperimentState.FAILED));
-                               // monitorPublisher.publish(new
-                               // ExperimentStatusChangedEvent(new
-                               // 
ExperimentIdentity(jobExecutionContext.getExperimentID()),
-                               // ExperimentState.FAILED));
-                               // Updating the task status if there's any task 
associated
-                               // monitorPublisher.publish(new 
TaskStatusChangedEvent(
-                               // new 
TaskIdentity(jobExecutionContext.getExperimentID(),
-                               // 
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                               // 
jobExecutionContext.getTaskData().getTaskID()),
-                               // TaskState.FAILED
-                               // ));
                 JobIdentifier jobIdentity = new JobIdentifier(
                         jobExecutionContext.getJobDetails().getJobID(), 
jobExecutionContext.getTaskData().getTaskID(),
                         
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
@@ -699,9 +719,6 @@ public class BetterGfacImpl implements GFac,Watcher {
                        } catch (NullPointerException e1) {
                                log.error("Error occured during updating the 
statuses of Experiments,tasks or Job statuses to failed, "
                                                + "NullPointerException 
occurred because at this point there might not have Job Created", e1, e);
-//                             monitorPublisher
-//                                             .publish(new 
ExperimentStatusChangedEvent(new 
ExperimentIdentity(jobExecutionContext.getExperimentID()), 
ExperimentState.FAILED));
-                               // Updating the task status if there's any task 
associated
                 TaskIdentifier taskIdentity = new 
TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
                         
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
                         jobExecutionContext.getExperimentID(),
@@ -716,7 +733,11 @@ public class BetterGfacImpl implements GFac,Watcher {
         }
     }
 
-       private void launch(JobExecutionContext jobExecutionContext) throws 
GFacException {
+    private void monitorJob(JobExecutionContext jobExecutionContext) {
+        // TODO - Auto generated message.
+    }
+
+    private void launch(JobExecutionContext jobExecutionContext) throws 
GFacException {
                // Scheduler will decide the execution flow of handlers and 
provider
                // which handles
                // the job.

Reply via email to