Repository: airavata
Updated Branches:
  refs/heads/master 4f4c79963 -> 34cd927c3


Refactor Cancel job request and remove RPC job submitter class.


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d2108729
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d2108729
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d2108729

Branch: refs/heads/master
Commit: d21087291855c68c692ca53601baa28629f1e7c0
Parents: 1b84883
Author: shamrath <[email protected]>
Authored: Tue May 12 19:27:04 2015 -0400
Committer: shamrath <[email protected]>
Committed: Tue May 12 19:27:04 2015 -0400

----------------------------------------------------------------------
 .../airavata/gfac/server/GfacServerHandler.java |   2 +-
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  |  62 ++----
 .../airavata/gfac/core/utils/GFacUtils.java     | 206 ++++--------------
 .../gfac/monitor/email/EmailBasedMonitor.java   |   6 +-
 .../gfac/ssh/provider/impl/SSHProvider.java     |  10 +-
 .../server/OrchestratorServerHandler.java       |   4 +-
 .../core/impl/GFACRPCJobSubmitter.java          | 212 -------------------
 7 files changed, 73 insertions(+), 429 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/d2108729/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
 
b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index b90c731..f944d91 100644
--- 
a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ 
b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -403,7 +403,7 @@ public class GfacServerHandler implements 
GfacService.Iface, Watcher {
                     TBase messageEvent = message.getEvent();
                     byte[] bytes = 
ThriftUtils.serializeThriftObject(messageEvent);
                     ThriftUtils.createThriftFromBytes(bytes, event);
-                    GFacUtils.setExperimentCancel(event.getExperimentId(), 
event.getTaskId(), zk);
+                    GFacUtils.setExperimentCancel(event.getExperimentId(), 
event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId(), 
message.getDeliveryTag());
                     AiravataZKUtils.getExpStatePath(event.getExperimentId());
                     cancelJob(event.getExperimentId(), event.getTaskId(), 
event.getGatewayId(), event.getTokenId());
                     System.out.println(" Message Received with message id '" + 
message.getMessageId()

http://git-wip-us.apache.org/repos/asf/airavata/blob/d2108729/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 82798d1..32317f3 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -220,8 +220,16 @@ public class BetterGfacImpl implements GFac,Watcher {
             StringWriter errors = new StringWriter();
             e.printStackTrace(new PrintWriter(errors));
             GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), 
CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+            // FIXME: Here we need to update Experiment status to Failed, as 
we used chained update approach updating
+            // task status will cause to update Experiment status. Remove this 
chained update approach and fix this correctly (update experiment status)
             if(jobExecutionContext!=null){
                 monitorPublisher.publish(new 
GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), 
GfacExperimentState.FAILED));
+                TaskIdentifier taskIdentity = new 
TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+                        
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                        jobExecutionContext.getExperimentID(),
+                        jobExecutionContext.getGatewayID());
+                TaskStatusChangeRequestEvent event = new 
TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentity);
+                monitorPublisher.publish(event);
             }
             throw new GFacException(e);
         }finally {
@@ -565,10 +573,6 @@ public class BetterGfacImpl implements GFac,Watcher {
 
     private boolean cancel(JobExecutionContext jobExecutionContext) throws 
GFacException {
         try {
-            // we cannot call GFacUtils.getZKExperimentStateValue because 
experiment might be running in some other node
-            String expPath = 
GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), zk);
-            Stat exists = zk.exists(expPath + File.separator + "operation", 
false);
-            zk.getData(expPath + File.separator + "operation", this, exists);
             GfacExperimentState gfacExpState = 
GFacUtils.getZKExperimentState(zk, jobExecutionContext);   // this is the 
original state came, if we query again it might be different,so we preserve 
this state in the environment
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new 
MonitorID(jobExecutionContext)
                     , GfacExperimentState.ACCEPTED));                  // 
immediately we get the request we update the status
@@ -578,42 +582,14 @@ public class BetterGfacImpl implements GFac,Watcher {
             }
             // Register log event listener. This is required in all scenarios.
             jobExecutionContext.getNotificationService().registerListener(new 
LoggingListener());
-            if (isNewJob(gfacExpState)) {
-                log.info("Job is not yet submitted, so nothing much to do 
except changing the registry entry " +
-                        " and stop the execution chain");
-            } else if (isCompletedJob(gfacExpState)) {
-                log.error("This experiment is almost finished, so cannot 
cancel this experiment");
-                ZKUtil.deleteRecursive(zk,
-                        
AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID()));
-            } else {
+            if (gfacExpState == GfacExperimentState.PROVIDERINVOKING) { // we 
already have changed registry status, we need to handle job canceling scenario.
                 log.info("Job is in a position to perform a proper 
cancellation");
                 try {
                     Scheduler.schedule(jobExecutionContext);
                     invokeProviderCancel(jobExecutionContext);
-                } catch (Exception e) {
-                    try {
-                        // we make the experiment as failed due to exception 
scenario
-                        monitorPublisher.publish(new 
GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), 
GfacExperimentState.FAILED));
-                        JobStatusChangeRequestEvent changeRequestEvent = new 
JobStatusChangeRequestEvent();
-                        changeRequestEvent.setState(JobState.FAILED);
-                        JobIdentifier jobIdentifier = new 
JobIdentifier(jobExecutionContext.getJobDetails().getJobID(),
-                                jobExecutionContext.getTaskData().getTaskID(),
-                                
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                                jobExecutionContext.getExperimentID(),
-                                jobExecutionContext.getGatewayID());
-                        changeRequestEvent.setJobIdentity(jobIdentifier);
-                        monitorPublisher.publish(changeRequestEvent);
-                    } catch (NullPointerException e1) {
-                        log.error("Error occured during updating the statuses 
of Experiments,tasks or Job statuses to failed, "
-                                + "NullPointerException occurred because at 
this point there might not have Job Created", e1, e);
-                        // Updating the task status if there's any task 
associated
-                        monitorPublisher.publish(new 
TaskStatusChangeRequestEvent(TaskState.FAILED,
-                                new 
TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
-                                        
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                                        jobExecutionContext.getExperimentID(),
-                                        jobExecutionContext.getGatewayID())));
-
-                    }
+                } catch (GFacException e) {
+                    // we make the experiment as failed due to exception 
scenario
+                    monitorPublisher.publish(new 
GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), 
GfacExperimentState.FAILED));
                     jobExecutionContext.setProperty(ERROR_SENT, "true");
                     jobExecutionContext.getNotifier().publish(new 
ExecutionFailEvent(e.getCause()));
                     throw new GFacException(e.getMessage(), e);
@@ -794,7 +770,7 @@ public class BetterGfacImpl implements GFac,Watcher {
         GFacProvider provider = jobExecutionContext.getProvider();
         if (provider != null) {
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new 
MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
-            GFacUtils.createPluginZnode(zk, jobExecutionContext, 
provider.getClass().getName());
+            GFacUtils.createHandlerZnode(zk, jobExecutionContext, 
provider.getClass().getName());
             initProvider(provider, jobExecutionContext);
             executeProvider(provider, jobExecutionContext);
             disposeProvider(provider, jobExecutionContext);
@@ -811,7 +787,7 @@ public class BetterGfacImpl implements GFac,Watcher {
         if (provider != null) {
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new 
MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
             GfacHandlerState plState = GFacUtils.getHandlerState(zk, 
jobExecutionContext, provider.getClass().getName());
-            GFacUtils.createPluginZnode(zk, jobExecutionContext, 
provider.getClass().getName());
+            GFacUtils.createHandlerZnode(zk, jobExecutionContext, 
provider.getClass().getName());
             if (plState != null && plState == GfacHandlerState.INVOKING) {    
// this will make sure if a plugin crashes it will not launch from the scratch, 
but plugins have to save their invoked state
                 initProvider(provider, jobExecutionContext);
                 executeProvider(provider, jobExecutionContext);
@@ -831,14 +807,12 @@ public class BetterGfacImpl implements GFac,Watcher {
 
     }
 
-    private void invokeProviderCancel(JobExecutionContext jobExecutionContext) 
throws GFacException, ApplicationSettingsException, InterruptedException, 
KeeperException {
+    private void invokeProviderCancel(JobExecutionContext jobExecutionContext) 
throws GFacException {
         GFacProvider provider = jobExecutionContext.getProvider();
         if (provider != null) {
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new 
MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
             initProvider(provider, jobExecutionContext);
             cancelProvider(provider, jobExecutionContext);
             disposeProvider(provider, jobExecutionContext);
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new 
MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
         }
         if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
             invokeOutFlowHandlers(jobExecutionContext);
@@ -851,7 +825,7 @@ public class BetterGfacImpl implements GFac,Watcher {
         if (provider != null) {
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new 
MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
             GfacHandlerState plState = GFacUtils.getHandlerState(zk, 
jobExecutionContext, provider.getClass().getName());
-            GFacUtils.createPluginZnode(zk, jobExecutionContext, 
provider.getClass().getName());
+            GFacUtils.createHandlerZnode(zk, jobExecutionContext, 
provider.getClass().getName());
             if (plState == GfacHandlerState.UNKNOWN || plState == 
GfacHandlerState.INVOKING) {    // this will make sure if a plugin crashes it 
will not launch from the scratch, but plugins have to save their invoked state
                 initProvider(provider, jobExecutionContext);
                 cancelProvider(provider, jobExecutionContext);
@@ -923,7 +897,7 @@ public class BetterGfacImpl implements GFac,Watcher {
                     Class<? extends GFacHandler> handlerClass;
                     GFacHandler handler;
                     try {
-                        GFacUtils.createPluginZnode(zk, jobExecutionContext, 
handlerClassName.getClassName());
+                        GFacUtils.createHandlerZnode(zk, jobExecutionContext, 
handlerClassName.getClassName());
                         handlerClass = 
Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
                         handler = handlerClass.newInstance();
                         
handler.initProperties(handlerClassName.getProperties());
@@ -1001,7 +975,7 @@ public class BetterGfacImpl implements GFac,Watcher {
                         Class<? extends GFacHandler> handlerClass;
                         GFacHandler handler;
                         try {
-                            
GFacUtils.createPluginZnode(jobExecutionContext.getZk(), jobExecutionContext, 
handlerClassName.getClassName());
+                            
GFacUtils.createHandlerZnode(jobExecutionContext.getZk(), jobExecutionContext, 
handlerClassName.getClassName());
                             handlerClass = 
Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
                             handler = handlerClass.newInstance();
                             
handler.initProperties(handlerClassName.getProperties());

http://git-wip-us.apache.org/repos/asf/airavata/blob/d2108729/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 9861cdc..4cd850d 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -930,8 +930,8 @@ public class GFacUtils {
         return -1;
     }
 
-       public static boolean createPluginZnode(ZooKeeper zk,
-                       JobExecutionContext jobExecutionContext, String 
className)
+       public static boolean createHandlerZnode(ZooKeeper zk,
+                                             JobExecutionContext 
jobExecutionContext, String className)
                        throws ApplicationSettingsException, KeeperException,
                        InterruptedException {
                String expState = AiravataZKUtils.getExpZnodeHandlerPath(
@@ -1048,129 +1048,22 @@ public class GFacUtils {
        }
 
        // This method is dangerous because of moving the experiment data
-       public static boolean createExperimentEntryForRPC(String experimentID,
-                                                                               
                          String taskID, ZooKeeper zk, String experimentNode,
-                                                                               
                          String pickedChild, String tokenId) throws 
KeeperException,
-                       InterruptedException {
-               String experimentPath = experimentNode + File.separator + 
pickedChild;
-               String newExpNode = experimentPath + File.separator + 
experimentID;
-        Stat exists1 = zk.exists(newExpNode, false);
-        String experimentEntry = GFacUtils.findExperimentEntry(experimentID, 
zk);
-        String foundExperimentPath = null;
-               if (exists1 == null && experimentEntry == null) {  // this 
means this is a very new experiment
-                       List<String> runningGfacNodeNames = AiravataZKUtils
-                                       .getAllGfacNodeNames(zk); // here we 
take old gfac servers
-                                                                               
                // too
-                       for (String gfacServerNode : runningGfacNodeNames) {
-                               if (!gfacServerNode.equals(pickedChild)) {
-                                       foundExperimentPath = experimentNode + 
File.separator
-                                                       + gfacServerNode + 
File.separator + experimentID;
-                                       exists1 = 
zk.exists(foundExperimentPath, false);
-                                       if (exists1 != null) { // when the 
experiment is found we
-                                                                               
        // break the loop
-                                               break;
-                                       }
-                               }
-                       }
-                       if (exists1 == null) { // OK this is a pretty new 
experiment so we
-                                                                       // are 
going to create a new node
-                               log.info("This is a new Job, so creating all 
the experiment docs from the scratch");
-                               zk.create(newExpNode, new byte[0], 
ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                                               CreateMode.PERSISTENT);
-
-                               Stat expParent = zk.exists(newExpNode, false);
-                               if (tokenId != null && expParent != null) {
-                                       zk.setData(newExpNode, 
tokenId.getBytes(),
-                                                       expParent.getVersion());
-                               }
-                               zk.create(newExpNode + File.separator + 
"state", String
-                                               
.valueOf(GfacExperimentState.LAUNCHED.getValue())
-                                               .getBytes(), 
ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                                               CreateMode.PERSISTENT);
-                zk.create(newExpNode + File.separator + 
"operation","submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                        CreateMode.PERSISTENT);
-
-                       } else {
-                               // ohhh this node exists in some other failed 
gfac folder, we
-                               // have to move it to this gfac experiment 
list,safely
-                               log.info("This is an old Job, so copying data 
from old experiment location");
-                               zk.create(newExpNode,
-                                               zk.getData(foundExperimentPath, 
false, exists1),
-                                               ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
-
-                               List<String> children = 
zk.getChildren(foundExperimentPath,
-                                               false);
-                               for (String childNode1 : children) {
-                                       String level1 = foundExperimentPath + 
File.separator
-                                                       + childNode1;
-                                       Stat exists2 = zk.exists(level1, 
false); // no need to check
-                                                                               
                                                // exists
-                                       String newLeve1 = newExpNode + 
File.separator + childNode1;
-                                       log.info("Creating new znode: " + 
newLeve1); // these has to
-                                                                               
                                                        // be info
-                                                                               
                                                        // logs
-                                       zk.create(newLeve1, zk.getData(level1, 
false, exists2),
-                                                       
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-                                       for (String childNode2 : 
zk.getChildren(level1, false)) {
-                                               String level2 = level1 + 
File.separator + childNode2;
-                                               Stat exists3 = 
zk.exists(level2, false); // no need to
-                                                                               
                                                        // check
-                                                                               
                                                        // exists
-                                               String newLeve2 = newLeve1 + 
File.separator
-                                                               + childNode2;
-                                               log.info("Creating new znode: " 
+ newLeve2);
-                                               zk.create(newLeve2, 
zk.getData(level2, false, exists3),
-                                                               
ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                                                               
CreateMode.PERSISTENT);
-                                       }
-                               }
-                               // After all the files are successfully 
transfered we delete the
-                               // old experiment,otherwise we do
-                               // not delete a single file
-                               log.info("After a successful copying of 
experiment data for an old experiment we delete the old data");
-                               log.info("Deleting experiment data: " + 
foundExperimentPath);
-                               ZKUtil.deleteRecursive(zk, foundExperimentPath);
-                       }
-               }else if(experimentEntry != null && 
GFacUtils.isCancelled(experimentID,zk) ){
-            // this happens when a cancel request comes to a differnt gfac 
node, in this case we do not move gfac experiment
-            // node to gfac node specific location, because original request 
execution will fail with errors
-            log.error("This experiment is already cancelled and its already 
executing the cancel operation so cannot submit again !");
-            return false;
-        } else {
-            log.error("ExperimentID: " + experimentID + " taskID: " + taskID
-                    + " is already running by this Gfac instance");
-            List<String> runningGfacNodeNames = AiravataZKUtils
-                    .getAllGfacNodeNames(zk); // here we take old gfac servers
-            // too
-            for (String gfacServerNode : runningGfacNodeNames) {
-                if (!gfacServerNode.equals(pickedChild)) {
-                    foundExperimentPath = experimentNode + File.separator
-                            + gfacServerNode + File.separator + experimentID;
-                    break;
-                }
-            }
-            ZKUtil.deleteRecursive(zk, foundExperimentPath);
-        }
-        return true;
-       }
-
-       // This method is dangerous because of moving the experiment data
        public static boolean createExperimentEntryForPassive(String 
experimentID,
                                                                                
                                  String taskID, ZooKeeper zk, String 
experimentNode,
                                                                                
                                  String pickedChild, String tokenId, long 
deliveryTag) throws KeeperException,
                        InterruptedException, ApplicationSettingsException {
                String experimentPath = experimentNode + File.separator + 
pickedChild;
-               String newExpNode = experimentPath + File.separator + 
experimentID;
-               Stat exists1 = zk.exists(newExpNode, false);
-               String experimentEntry = 
GFacUtils.findExperimentEntry(experimentID, zk);
-               if (experimentEntry == null) {  // this means this is a very 
new experiment
+               String newExperimentPath = experimentPath + File.separator + 
experimentID;
+               Stat exists1 = zk.exists(newExperimentPath, false);
+               String oldExperimentPath = 
GFacUtils.findExperimentEntry(experimentID, zk);
+               if (oldExperimentPath == null) {  // this means this is a very 
new experiment
                        // are going to create a new node
                        log.info("This is a new Job, so creating all the 
experiment docs from the scratch");
 
-                       zk.create(newExpNode, new byte[0], 
ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                       zk.create(newExperimentPath, new byte[0], 
ZooDefs.Ids.OPEN_ACL_UNSAFE,
                                        CreateMode.PERSISTENT);
 
-            String s = zk.create(newExpNode + File.separator + "state", String
+            String s = zk.create(newExperimentPath + File.separator + "state", 
String
                                                        
.valueOf(GfacExperimentState.LAUNCHED.getValue())
                                                        .getBytes(), 
ZooDefs.Ids.OPEN_ACL_UNSAFE,
                                        CreateMode.PERSISTENT);
@@ -1180,65 +1073,49 @@ public class GFacUtils {
                        }else{
                                log.error("Error creating node: "+s+" 
successfully !");
                        }
-
-                       String s1 = zk.create(newExpNode + File.separator + 
"operation", "submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                                       CreateMode.PERSISTENT);
-                       zk.exists(s1, false);// we want to know when this node 
get deleted
-                       zk.create(newExpNode + 
AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag), 
ZooDefs.Ids.OPEN_ACL_UNSAFE,  // here we store the value of delivery message
+                       zk.create(newExperimentPath + 
AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag), 
ZooDefs.Ids.OPEN_ACL_UNSAFE,  // here we store the value of delivery message
                                        CreateMode.PERSISTENT);
                } else {
                        log.error("ExperimentID: " + experimentID + " taskID: " 
+ taskID
                                        + " was running by some Gfac 
instance,but it failed");
-                       if(newExpNode.equals(experimentEntry)){
+                       if(newExperimentPath.equals(oldExperimentPath)){
                                log.info("Re-launch experiment came to the same 
GFac instance");
                        }else {
                                log.info("Re-launch experiment came to a new 
GFac instance so we are moving data to new gfac node");
-                               zk.create(newExpNode,
-                                               zk.getData(experimentEntry, 
false, exists1),
-                                               ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
-
-                               List<String> children = 
zk.getChildren(experimentEntry,
-                                               false);
-                               for (String childNode1 : children) {
-                                       String level1 = experimentEntry + 
File.separator
-                                                       + childNode1;
-                                       Stat exists2 = zk.exists(level1, 
false); // no need to check exists
-                                       String newLeve1 = newExpNode + 
File.separator + childNode1;
-                                       log.info("Creating new znode: " + 
newLeve1); // these has to be info logs
-                                       zk.create(newLeve1, zk.getData(level1, 
false, exists2),
-                                                       
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-                                       for (String childNode2 : 
zk.getChildren(level1, false)) {
-                                               String level2 = level1 + 
File.separator + childNode2;
-                                               Stat exists3 = 
zk.exists(level2, false); // no need to check exists
-                                               String newLeve2 = newLeve1 + 
File.separator
-                                                               + childNode2;
-                                               log.info("Creating new znode: " 
+ newLeve2);
-                                               zk.create(newLeve2, 
zk.getData(level2, false, exists3),
-                                                               
ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                                                               
CreateMode.PERSISTENT);
-                                       }
-                               }
-
-
-                               String oldDeliveryTag = experimentEntry + 
AiravataZKUtils.DELIVERY_TAG_POSTFIX;
+                               zk.create(newExperimentPath, 
zk.getData(oldExperimentPath, false, exists1),
+                                               ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT); // recursively copy children
+                copyChildren(zk, oldExperimentPath, newExperimentPath, 2); // 
we need to copy children up to depth 2
+                               String oldDeliveryTag = oldExperimentPath + 
AiravataZKUtils.DELIVERY_TAG_POSTFIX;
                                Stat exists = zk.exists(oldDeliveryTag, false);
                                if(exists!=null) {
-                                       zk.create(newExpNode + 
AiravataZKUtils.DELIVERY_TAG_POSTFIX,
+                                       zk.create(newExperimentPath + 
AiravataZKUtils.DELIVERY_TAG_POSTFIX,
                                                        
zk.getData(oldDeliveryTag,null,exists),ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
                                        
ZKUtil.deleteRecursive(zk,oldDeliveryTag);
                                }
-                               // After all the files are successfully 
transfered we delete the
-                               // old experiment,otherwise we do
+                               // After all the files are successfully 
transfered we delete the // old experiment,otherwise we do
                                // not delete a single file
                                log.info("After a successful copying of 
experiment data for an old experiment we delete the old data");
-                               log.info("Deleting experiment data: " + 
experimentEntry);
-                               ZKUtil.deleteRecursive(zk, experimentEntry);
+                               log.info("Deleting experiment data: " + 
oldExperimentPath);
+                               ZKUtil.deleteRecursive(zk, oldExperimentPath);
                        }
                }
                return true;
        }
 
-       /**
+    private static void copyChildren(ZooKeeper zk, String oldPath, String 
newPath, int depth) throws KeeperException, InterruptedException {
+        for (String childNode : zk.getChildren(oldPath, false)) {
+            String oldChildPath = oldPath + File.separator + childNode;
+            Stat stat = zk.exists(oldChildPath, false); // no need to check 
exists
+            String newChildPath = newPath + File.separator + childNode;
+            log.info("Creating new znode: " + newChildPath);
+            zk.create(newChildPath, zk.getData(oldChildPath, false, stat), 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            if (--depth > 0) {
+                copyChildren(zk , oldChildPath, newChildPath, depth );
+            }
+        }
+    }
+
+    /**
         * This will return a value if the server is down because we iterate 
through exisiting experiment nodes, not
         * through gfac-server nodes
         * @param experimentID
@@ -1292,12 +1169,21 @@ public class GFacUtils {
                return null;
        }
 
-    public static void setExperimentCancel(String experimentId,String 
taskId,ZooKeeper zk)throws KeeperException,
+    public static void setExperimentCancel(String experimentId, String taskId, 
ZooKeeper zk, String experimentNode,
+                                           String pickedChild, String tokenId, 
long deliveryTag)throws KeeperException,
             InterruptedException {
+        // TODO : remove this if all went well
+ /*       String experimentPath = experimentNode + File.separator + 
pickedChild;
+        String newExpNode = experimentPath + File.separator + experimentId;
         String experimentEntry = GFacUtils.findExperimentEntry(experimentId, 
zk);
-        if(experimentEntry == null){
-            log.error("Cannot find the experiment Entry, so cancel operation 
cannot be performed !!!");
-        }else {
+        if (experimentEntry == null) {
+            // This should be handle in validation request. Gfac shouldn't get 
any invalidate experiment.
+            log.error("Cannot find the experiment Entry, so cancel operation 
cannot be performed. " +
+                    "This happen when experiment completed and already removed 
from the zookeeper");
+        } else {
+            if (newExpNode.equals(experimentEntry)) {
+                log.info("Cancel experiment come to ");
+            }
             Stat operation = zk.exists(experimentEntry + File.separator + 
"operation", false);
             if (operation == null) { // if there is no entry, this will come 
when a user immediately cancel a job
                 zk.create(experimentEntry + File.separator + "operation", 
"cancel".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
@@ -1305,7 +1191,7 @@ public class GFacUtils {
             } else { // if user submit the job to gfac then cancel during 
execution
                 zk.setData(experimentEntry + File.separator + "operation", 
"cancel".getBytes(), operation.getVersion());
             }
-        }
+        }*/
 
     }
     public static boolean isCancelled(String experimentID, ZooKeeper zk

http://git-wip-us.apache.org/repos/asf/airavata/blob/d2108729/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
 
b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
index 6a294af..0c94daa 100644
--- 
a/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
+++ 
b/modules/gfac/gfac-monitor/gfac-email-monitor/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java
@@ -285,17 +285,17 @@ public class EmailBasedMonitor implements Runnable{
             log.info("[EJM]: Job failed email received , removed job from job 
monitoring. " + jobDetails);
         }else if (resultState == JobState.CANCELED) {
             jobMonitorMap.remove(jobStatusResult.getJobId());
-            runOutHandlers = true;
+            runOutHandlers = false; // Do we need to run out handlers in 
canceled case?
             log.info("[EJM]: Job canceled mail received, removed job from job 
monitoring. " + jobDetails);
 
         }
+        log.info("[EJM]: Publishing status changes to amqp. " + jobDetails);
+        publishJobStatusChange(jEC);
 
         if (runOutHandlers) {
             log.info("[EJM]: Calling Out Handler chain of " + jobDetails);
             GFacThreadPoolExecutor.getCachedThreadPool().execute(new 
OutHandlerWorker(jEC, BetterGfacImpl.getMonitorPublisher()));
         }
-        log.info("[EJM]: Publishing status changes to amqp. " + jobDetails);
-        publishJobStatusChange(jEC);
     }
 
     private void publishJobStatusChange(JobExecutionContext 
jobExecutionContext) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/d2108729/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
 
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index 485029f..ca24502 100644
--- 
a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ 
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -243,22 +243,20 @@ public class SSHProvider extends AbstractProvider {
             }
             // This installed path is a mandetory field, because this could 
change based on the computing resource
             if (jobDetails == null) {
-                log.error("There is not JobDetails so cancelations cannot 
perform !!!");
+                log.error("There is not JobDetails, Cancel request can't be 
performed !!!");
                 return;
             }
             try {
                 if (jobDetails.getJobID() != null) {
                     cluster.cancelJob(jobDetails.getJobID());
+                    GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, 
JobState.CANCELED);
                 } else {
                     log.error("No Job Id is set, so cannot perform the cancel 
operation !!!");
-                    return;
+                    throw new GFacProviderException("Cancel request failed to 
cancel job as JobId is null in Job Execution Context");
                 }
-                GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, 
JobState.CANCELED);
             } catch (SSHApiException e) {
                 String error = "Error submitting the job to host " + 
jobExecutionContext.getHostName() + " message: " + e.getMessage();
                 log.error(error);
-                jobDetails.setJobID("none");
-                GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, 
JobState.FAILED);
                 StringWriter errors = new StringWriter();
                 e.printStackTrace(new PrintWriter(errors));
                 GFacUtils.saveErrorDetails(jobExecutionContext, 
errors.toString(), CorrectiveAction.CONTACT_SUPPORT, 
ErrorCategory.AIRAVATA_INTERNAL_ERROR);
@@ -266,8 +264,6 @@ public class SSHProvider extends AbstractProvider {
             } catch (Exception e) {
                 String error = "Error submitting the job to host " + 
jobExecutionContext.getHostName() + " message: " + e.getMessage();
                 log.error(error);
-                jobDetails.setJobID("none");
-                GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, 
JobState.FAILED);
                 StringWriter errors = new StringWriter();
                 e.printStackTrace(new PrintWriter(errors));
                 GFacUtils.saveErrorDetails(jobExecutionContext, 
errors.toString(), CorrectiveAction.CONTACT_SUPPORT, 
ErrorCategory.AIRAVATA_INTERNAL_ERROR);

http://git-wip-us.apache.org/repos/asf/airavata/blob/d2108729/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
 
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index ff6eab1..3da1e47 100644
--- 
a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ 
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -566,7 +566,7 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface,
                         taskDetails.setTaskStatus(taskStatus);
                         registry.update(RegistryModelType.TASK_DETAIL, o,
                                 taskDetails);
-                        GFacUtils.setExperimentCancel(experimentId, 
taskDetails.getTaskID(), zk);
+//                        GFacUtils.setExperimentCancel(experimentId, 
taskDetails.getTaskID(), zk, experimentNode, nodeName, event.getTokenId(), 
message.getDeliveryTag());
                     }
                 }
             }else {
@@ -617,7 +617,7 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface,
                             taskDetails.setTaskStatus(taskStatus);
                             registry.update(RegistryModelType.TASK_DETAIL, o,
                                     taskDetails.getTaskID());
-                            GFacUtils.setExperimentCancel(experimentId, 
taskDetails.getTaskID(), zk);
+//                            GFacUtils.setExperimentCancel(experimentId, 
taskDetails.getTaskID(), zk, experimentNode, nodeName, event.getTokenId(), 
message.getDeliveryTag());
                         }
                         // iterate through all the generated tasks and 
performs the
                         // job submisssion+monitoring

http://git-wip-us.apache.org/repos/asf/airavata/blob/d2108729/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
deleted file mode 100644
index 64ced47..0000000
--- 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.airavata.orchestrator.core.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.credential.store.store.CredentialReader;
-import org.apache.airavata.gfac.client.GFACInstance;
-import org.apache.airavata.gfac.client.GFacClientFactory;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.cpi.GfacService;
-import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
-import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
-import org.apache.airavata.orchestrator.core.job.JobSubmitter;
-import org.apache.thrift.TException;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/*
- * this class is responsible for submitting a job to gfac in service mode,
- * it will select a gfac instance based on the incoming request and submit to 
that
- * gfac instance.
- */
-public class GFACRPCJobSubmitter implements JobSubmitter, Watcher {
-       private final static Logger logger = 
LoggerFactory.getLogger(GFACRPCJobSubmitter.class);
-       public static final String IP = "ip";
-
-       private OrchestratorContext orchestratorContext;
-
-       private static Integer mutex = -1;
-
-       public void initialize(OrchestratorContext orchestratorContext) throws 
OrchestratorException {
-               this.orchestratorContext = orchestratorContext;
-       }
-
-       public GFACInstance selectGFACInstance() throws OrchestratorException {
-               // currently we only support one instance but future we have to 
pick an
-               // instance
-               return null;
-       }
-
-       public boolean submit(String experimentID, String taskID) throws 
OrchestratorException {
-               return this.submit(experimentID, taskID, null);
-       }
-
-       public boolean submit(String experimentID, String taskID, String 
tokenId) throws OrchestratorException {
-               ZooKeeper zk = orchestratorContext.getZk();
-        GfacService.Client gfacClient = null;
-               try {
-                       if (zk == null || !zk.getState().isConnected()) {
-                               String zkhostPort = 
AiravataZKUtils.getZKhostPort();
-                               zk = new ZooKeeper(zkhostPort, 
AiravataZKUtils.getZKTimeout(), this);
-                               synchronized (mutex) {
-                                       mutex.wait();
-                               }
-                       }
-                       String gfacServer = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
-                       String experimentNode = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, 
"/gfac-experiments");
-                       List<String> children = zk.getChildren(gfacServer, 
this);
-                       
-                       if (children.size() == 0) {
-                // Zookeeper data need cleaning
-                throw new OrchestratorException("There is no active GFac 
instance to route the request");
-            } else {
-                               String pickedChild = children.get(new 
Random().nextInt(Integer.MAX_VALUE) % children.size());
-                               // here we are not using an index because the 
getChildren does not return the same order everytime
-                               String gfacNodeData = new 
String(zk.getData(gfacServer + File.separator + pickedChild, false, null));
-                               logger.info("GFAC instance node data: " + 
gfacNodeData);
-                               String[] split = gfacNodeData.split(":");
-                               gfacClient = 
GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
-                               if (zk.exists(gfacServer + File.separator + 
pickedChild, false) != null) {
-                                       // before submitting the job we check 
again the state of the node
-                                       if 
(GFacUtils.createExperimentEntryForRPC(experimentID, taskID, zk, 
experimentNode, pickedChild, tokenId)) {
-                                                String gatewayId = null;
-                        CredentialReader credentialReader = 
GFacUtils.getCredentialReader();
-                         if (credentialReader != null) {
-                             try {
-                                gatewayId = 
credentialReader.getGatewayID(tokenId);
-                             } catch (Exception e) {
-                                 logger.error(e.getLocalizedMessage());
-                             }
-                         }
-                        if(gatewayId == null || gatewayId.isEmpty()){
-                         gatewayId = ServerSettings.getDefaultUserGateway();
-                        }
-                                               return 
gfacClient.submitJob(experimentID, taskID, gatewayId);
-                                       }
-                               }
-                       }
-               } catch (TException e) {
-            logger.error(e.getMessage(), e);
-                       throw new OrchestratorException(e);
-               } catch (InterruptedException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-               } catch (KeeperException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-               } catch (ApplicationSettingsException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-               } catch (IOException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-               } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-               }finally {
-            gfacClient.getOutputProtocol().getTransport().close();
-        }
-        return false;
-       }
-
-    public boolean terminate(String experimentID, String taskID, String 
tokenId) throws OrchestratorException {
-        ZooKeeper zk = orchestratorContext.getZk();
-        GfacService.Client localhost = null;
-        try {
-            if (zk == null || !zk.getState().isConnected()) {
-                String zkhostPort = AiravataZKUtils.getZKhostPort();
-                zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), 
this);
-                synchronized (mutex) {
-                    mutex.wait();
-                }
-            }
-            String gfacServer = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
-            String experimentNode = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, 
"/gfac-experiments");
-            List<String> children = zk.getChildren(gfacServer, this);
-
-            if (children.size() == 0) {
-                // Zookeeper data need cleaning
-                throw new OrchestratorException("There is no active GFac 
instance to route the request");
-            } else {
-                String pickedChild = children.get(new 
Random().nextInt(Integer.MAX_VALUE) % children.size());
-                // here we are not using an index because the getChildren does 
not return the same order everytime
-                String gfacNodeData = new String(zk.getData(gfacServer + 
File.separator + pickedChild, false, null));
-                logger.info("GFAC instance node data: " + gfacNodeData);
-                String[] split = gfacNodeData.split(":");
-                localhost = GFacClientFactory.createGFacClient(split[0], 
Integer.parseInt(split[1]));
-                if (zk.exists(gfacServer + File.separator + pickedChild, 
false) != null) {
-                    // before submitting the job we check again the state of 
the node
-                    if (GFacUtils.createExperimentEntryForRPC(experimentID, 
taskID, zk, experimentNode, pickedChild, null)) {
-                        return localhost.cancelJob(experimentID, taskID);
-                    }
-                }
-            }
-        } catch (TException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-        } catch (InterruptedException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-        } catch (KeeperException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-        } catch (ApplicationSettingsException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-        } catch (IOException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-        }finally {
-
-        }
-        return false;
-    }
-
-    synchronized public void process(WatchedEvent event) {
-               synchronized (mutex) {
-                       switch (event.getState()) {
-                       case SyncConnected:
-                               mutex.notify();
-                       }
-                       switch (event.getType()) {
-                       case NodeCreated:
-                               mutex.notify();
-                               break;
-                       }
-               }
-       }
-}

Reply via email to