Repository: airavata
Updated Branches:
  refs/heads/master e9a451dc0 -> 4a978d4f1


saving data in zookeeper when terminate


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4a978d4f
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4a978d4f
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4a978d4f

Branch: refs/heads/master
Commit: 4a978d4f1347aa683189f7ea5e814d125daaecb1
Parents: e9a451d
Author: Chathuri Wimalasena <[email protected]>
Authored: Mon May 11 15:52:17 2015 -0400
Committer: Chathuri Wimalasena <[email protected]>
Committed: Mon May 11 15:52:17 2015 -0400

----------------------------------------------------------------------
 .../airavata/gfac/server/GfacServerHandler.java | 11 +++
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 84 +++++++-------------
 2 files changed, 39 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/4a978d4f/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
 
b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 76497ba..b90c731 100644
--- 
a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ 
b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -403,12 +403,23 @@ public class GfacServerHandler implements 
GfacService.Iface, Watcher {
                     TBase messageEvent = message.getEvent();
                     byte[] bytes = 
ThriftUtils.serializeThriftObject(messageEvent);
                     ThriftUtils.createThriftFromBytes(bytes, event);
+                    GFacUtils.setExperimentCancel(event.getExperimentId(), 
event.getTaskId(), zk);
+                    AiravataZKUtils.getExpStatePath(event.getExperimentId());
                     cancelJob(event.getExperimentId(), event.getTaskId(), 
event.getGatewayId(), event.getTokenId());
                     System.out.println(" Message Received with message id '" + 
message.getMessageId()
                             + "' and with message type '" + message.getType());
                 } catch (TException e) {
                     logger.error(e.getMessage(), e); //nobody is listening so 
nothing to throw
                     
rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+                } catch (InterruptedException e) {
+                    logger.error(e.getMessage(), e);
+                    
rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+                } catch (ApplicationSettingsException e) {
+                    logger.error(e.getMessage(), e);
+                    
rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
+                } catch (KeeperException e) {
+                    logger.error(e.getMessage(), e);
+                    
rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a978d4f/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 6eeef28..dd82fa7 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -564,97 +564,69 @@ public class BetterGfacImpl implements GFac,Watcher {
     }
 
     private boolean cancel(JobExecutionContext jobExecutionContext) throws 
GFacException {
-        // We need to check whether this job is submitted as a part of a large 
workflow. If yes,
-        // we need to setup workflow tracking listener.
         try {
-               // we cannot call GFacUtils.getZKExperimentStateValue because 
experiment might be running in some other node
-//            String expPath = 
GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), zk);
-//            int stateVal = 0;
-//            if(expPath != null){
-//            Stat exists = zk.exists(expPath + File.separator + "operation", 
false);
-//            zk.getData(expPath + File.separator + "operation", this, exists);
-//            stateVal = GFacUtils.getZKExperimentStateValue(zk, expPath);   
// this is the original state came, if we query again it might be different,so 
we preserve this state in the environment
-//            }
+            // we cannot call GFacUtils.getZKExperimentStateValue because 
experiment might be running in some other node
+            String expPath = 
GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), zk);
+            Stat exists = zk.exists(expPath + File.separator + "operation", 
false);
+            zk.getData(expPath + File.separator + "operation", this, exists);
+            GfacExperimentState gfacExpState = 
GFacUtils.getZKExperimentState(zk, jobExecutionContext);   // this is the 
original state came, if we query again it might be different,so we preserve 
this state in the environment
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new 
MonitorID(jobExecutionContext)
                     , GfacExperimentState.ACCEPTED));                  // 
immediately we get the request we update the status
             String workflowInstanceID = null;
             if ((workflowInstanceID = (String) 
jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) {
-                // This mean we need to register workflow tracking listener.
                 //todo implement WorkflowTrackingListener properly
-//                registerWorkflowTrackingListener(workflowInstanceID, 
jobExecutionContext);
             }
             // Register log event listener. This is required in all scenarios.
             jobExecutionContext.getNotificationService().registerListener(new 
LoggingListener());
-//            if (stateVal < 2) {
-//                // In this scenario We do everything from the beginning
-//                log.info("Job is not yet submitted, so nothing much to do 
except changing the registry entry " +
-//                        " and stop the execution chain");
-//            } else if (stateVal >= 8) {
-//                log.error("This experiment is almost finished, so cannot 
cancel this experiment");
-//                ZKUtil.deleteRecursive(zk,
-//                        
AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID(), 
jobExecutionContext.getTaskData().getTaskID()));
-//            } else {
+            if (isNewJob(gfacExpState)) {
+                log.info("Job is not yet submitted, so nothing much to do 
except changing the registry entry " +
+                        " and stop the execution chain");
+            } else if (isCompletedJob(gfacExpState)) {
+                log.error("This experiment is almost finished, so cannot 
cancel this experiment");
+                ZKUtil.deleteRecursive(zk,
+                        
AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID()));
+            } else {
                 log.info("Job is in a position to perform a proper 
cancellation");
                 try {
                     Scheduler.schedule(jobExecutionContext);
-
                     invokeProviderCancel(jobExecutionContext);
-
                 } catch (Exception e) {
                     try {
                         // we make the experiment as failed due to exception 
scenario
                         monitorPublisher.publish(new 
GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), 
GfacExperimentState.FAILED));
-                        // monitorPublisher.publish(new
-                        // ExperimentStatusChangedEvent(new
-                        // 
ExperimentIdentity(jobExecutionContext.getExperimentID()),
-                        // ExperimentState.FAILED));
-                        // Updating the task status if there's any task 
associated
-                        // monitorPublisher.publish(new 
TaskStatusChangeRequest(
-                        // new 
TaskIdentity(jobExecutionContext.getExperimentID(),
-                        // 
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                        // jobExecutionContext.getTaskData().getTaskID()),
-                        // TaskState.FAILED
-                        // ));
                         JobStatusChangeRequestEvent changeRequestEvent = new 
JobStatusChangeRequestEvent();
                         changeRequestEvent.setState(JobState.FAILED);
                         JobIdentifier jobIdentifier = new 
JobIdentifier(jobExecutionContext.getJobDetails().getJobID(),
-                                                                        
jobExecutionContext.getTaskData().getTaskID(),
-                                                                        
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                                                                        
jobExecutionContext.getExperimentID(),
-                                                                        
jobExecutionContext.getGatewayID());
+                                jobExecutionContext.getTaskData().getTaskID(),
+                                
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                                jobExecutionContext.getExperimentID(),
+                                jobExecutionContext.getGatewayID());
                         changeRequestEvent.setJobIdentity(jobIdentifier);
                         monitorPublisher.publish(changeRequestEvent);
                     } catch (NullPointerException e1) {
                         log.error("Error occured during updating the statuses 
of Experiments,tasks or Job statuses to failed, "
                                 + "NullPointerException occurred because at 
this point there might not have Job Created", e1, e);
-                        //monitorPublisher.publish(new 
ExperimentStatusChangedEvent(new 
ExperimentIdentity(jobExecutionContext.getExperimentID()), 
ExperimentState.FAILED));
                         // Updating the task status if there's any task 
associated
                         monitorPublisher.publish(new 
TaskStatusChangeRequestEvent(TaskState.FAILED,
-                                                                               
   new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
-                                                                               
                      
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                                                                               
                      jobExecutionContext.getExperimentID(),
-                                                                               
                      jobExecutionContext.getGatewayID())));
+                                new 
TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+                                        
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                                        jobExecutionContext.getExperimentID(),
+                                        jobExecutionContext.getGatewayID())));
 
                     }
                     jobExecutionContext.setProperty(ERROR_SENT, "true");
                     jobExecutionContext.getNotifier().publish(new 
ExecutionFailEvent(e.getCause()));
                     throw new GFacException(e.getMessage(), e);
                 }
-//            }
+            }
             return true;
-//        } catch (ApplicationSettingsException e) {
-//            log.error("Error occured while cancelling job for experiment : " 
+ jobExecutionContext.getExperimentID(), e);
-//            throw new GFacException(e.getMessage(), e);
-//        } catch (KeeperException e) {
-//            log.error("Error occured while cancelling job for experiment : " 
+ jobExecutionContext.getExperimentID(), e);
-//            throw new GFacException(e.getMessage(), e);
-        } catch (Exception e) {
-            log.error("Error occured while cancelling job for experiment : " + 
jobExecutionContext.getExperimentID(), e);
-            throw new GFacException(e.getMessage(), e);
-        }finally {
-            closeZK(jobExecutionContext);
+            }catch(Exception e){
+                log.error("Error occured while cancelling job for experiment : 
" + jobExecutionContext.getExperimentID(), e);
+                throw new GFacException(e.getMessage(), e);
+            }finally{
+                closeZK(jobExecutionContext);
+            }
         }
-    }
 
        private void reLaunch(JobExecutionContext jobExecutionContext, 
GfacExperimentState state) throws GFacException {
                // Scheduler will decide the execution flow of handlers and 
provider

Reply via email to