Fixed AIRAVATA-1888

Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/5ce750d5
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/5ce750d5
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/5ce750d5

Branch: refs/heads/develop
Commit: 5ce750d53bc4e8def467b9024c491a463e62203d
Parents: fc3f979
Author: Shameera Rathnayaka <[email protected]>
Authored: Wed Jan 6 14:15:35 2016 -0500
Committer: Shameera Rathnayaka <[email protected]>
Committed: Wed Jan 6 14:15:35 2016 -0500

----------------------------------------------------------------------
 .../airavata/gfac/impl/GFacEngineImpl.java      |  3 +-
 .../apache/airavata/gfac/impl/GFacWorker.java   | 32 +++-----------
 .../airavata/gfac/server/GfacServerHandler.java | 44 +++++++++++++-------
 3 files changed, 38 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/5ce750d5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 9e6a522..dd10c12 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -509,8 +509,9 @@ public class GFacEngineImpl implements GFacEngine {
         processContext.setTaskExecutionOrder(taskExecutionOrder);
         Map<String, TaskModel> taskMap = processContext.getTaskMap();
         String recoverTaskId = null;
+        TaskModel taskModel = null;
         for (String taskId : taskExecutionOrder) {
-            TaskModel taskModel = taskMap.get(taskId);
+            taskModel = taskMap.get(taskId);
             TaskState state = taskModel.getTaskStatus().getState();
             if (state == TaskState.CREATED || state == TaskState.EXECUTING) {
                 recoverTaskId = taskId;

http://git-wip-us.apache.org/repos/asf/airavata/blob/5ce750d5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index c701ed5..e0664a5 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -78,14 +78,6 @@ public class GFacWorker implements Runnable {
        @Override
        public void run() {
                try {
-                       if (processContext.isInterrupted()) {
-                               
GFacUtils.handleProcessInterrupt(processContext);
-                               if (processContext.isCancel()) {
-                                       sendAck();
-                                       
Factory.getGfacContext().removeProcess(processContext.getProcessId());
-                               }
-                               return;
-                       }
                        ProcessState processState = 
processContext.getProcessStatus().getState();
                        switch (processState) {
                                case CREATED:
@@ -206,7 +198,13 @@ public class GFacWorker implements Runnable {
     }
 
        private void executeProcess() throws GFacException {
+               // checkpoint
+               if (processContext.isInterrupted()) {
+                       return;
+               }
+
                engine.executeProcess(processContext);
+               // checkpoint
                if (processContext.isInterrupted()) {
                        return;
                }
@@ -216,24 +214,6 @@ public class GFacWorker implements Runnable {
         }
        }
 
-//     private void monitorProcess() throws GFacException {
-//             try {
-//                     JobMonitor monitorService = 
Factory.getMonitorService(processContext.getMonitorMode());
-//                     if (monitorService != null) {
-//                             
monitorService.monitor(processContext.getJobModel().getJobId(), processContext);
-//                ProcessStatus status = new 
ProcessStatus(ProcessState.MONITORING);
-//                
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-//                processContext.setProcessStatus(status);
-//                             
GFacUtils.saveAndPublishProcessStatus(processContext);
-//                     } else {
-//                             // we directly invoke outflow
-//                             continueTaskExecution();
-//                     }
-//             } catch (AiravataException e) {
-//                     throw new GFacException("Error while retrieving moniot 
service", e);
-//             }
-//     }
-
        private void sendAck() {
                // this ensure, gfac doesn't send ack more than once for a 
process. which cause to remove gfac rabbitmq consumer from rabbitmq server.
                if (!processContext.isAcknowledge()) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/5ce750d5/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
 
b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index c1c08a5..10da052 100644
--- 
a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ 
b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -28,10 +28,8 @@ import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.common.utils.ZkConstants;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
-import org.apache.airavata.gfac.core.GFacConstants;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher;
 import org.apache.airavata.gfac.cpi.GfacService;
 import org.apache.airavata.gfac.cpi.gfac_cpiConstants;
 import org.apache.airavata.gfac.impl.Factory;
@@ -250,7 +248,13 @@ public class GfacServerHandler implements 
GfacService.Iface {
                        publishProcessStatus(event, status);
                     try {
                            createProcessZKNode(curatorClient, gfacServerName, 
event, message);
-                           submitProcess(event.getProcessId(), 
event.getGatewayId(), event.getTokenId());
+                        boolean isCancel = setCancelWatcher(curatorClient, 
event.getExperimentId(), event.getProcessId());
+                        submitProcess(event.getProcessId(), 
event.getGatewayId(), event.getTokenId());
+                        if (isCancel) {
+                            // Need to trigger process cancel watcher, wait 
till process recover and then set zk data.
+                            Thread.sleep(10000);
+                            setCancelData(event.getExperimentId());
+                        }
                     } catch (Exception e) {
                         log.error(e.getMessage(), e);
                         
rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
@@ -262,7 +266,9 @@ public class GfacServerHandler implements GfacService.Iface 
{
                 } catch (AiravataException e) {
                        log.error("Error while publishing process status", e);
                 }
-            } else if (message.getType().equals(MessageType.TERMINATEPROCESS)) 
{
+            }
+            // TODO - Now there is no process termination type messages, use 
zookeeper instead of rabbitmq to do that. it is safe to remove this else part.
+            else if (message.getType().equals(MessageType.TERMINATEPROCESS)) {
                 ProcessTerminateEvent event = new ProcessTerminateEvent();
                 TBase messageEvent = message.getEvent();
                 try {
@@ -289,7 +295,26 @@ public class GfacServerHandler implements 
GfacService.Iface {
         }
     }
 
-       private void publishProcessStatus(ProcessSubmitEvent event, 
ProcessStatus status) throws AiravataException {
+    private void setCancelData(String experimentId) throws Exception {
+        String expCancelNodePath = 
ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE,
+                experimentId), ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+        curatorClient.setData().withVersion(-1).forPath(expCancelNodePath, 
ZkConstants.ZOOKEEPER_CANCEL_REQEUST
+                .getBytes());
+    }
+
+    private boolean setCancelWatcher(CuratorFramework curatorClient,
+                                     String experimentId,
+                                     String processId) throws Exception {
+
+        String experimentNodePath = 
GFacUtils.getExperimentNodePath(experimentId);
+        // create /experiments/{experimentId}/cancel node and set watcher for 
data changes
+        String experimentCancelNode = ZKPaths.makePath(experimentNodePath, 
ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+        byte[] bytes = 
curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher(experimentId,
 processId)).forPath(experimentCancelNode);
+        return bytes != null && new 
String(bytes).equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST);
+
+    }
+
+    private void publishProcessStatus(ProcessSubmitEvent event, ProcessStatus 
status) throws AiravataException {
                ProcessIdentifier identifier = new 
ProcessIdentifier(event.getProcessId(),
                                event.getExperimentId(),
                                event.getGatewayId());
@@ -324,15 +349,6 @@ public class GfacServerHandler implements 
GfacService.Iface {
                
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
tokenNodePath);
                curatorClient.setData().withVersion(-1).forPath(tokenNodePath, 
token.getBytes());
 
-               // create 
/experiments/{experimentId}/{processId}/cancelListener node and set watcher for 
data changes
-/*             String cancelListenerNode = ZKPaths.makePath(zkProcessNodePath, 
GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
-               
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
cancelListenerNode);
-               
curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath(cancelListenerNode);*/
-
-               // create /experiments/{experimentId}/cancel node and set 
watcher for data changes
-               String experimentCancelNode = 
ZKPaths.makePath(experimentNodePath, 
ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
-               
curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher(experimentId,
 processId)).forPath (experimentCancelNode);
-
        }
 
        private void updateDeliveryTag(CuratorFramework curatorClient, String 
gfacServerName, ProcessSubmitEvent event,

Reply via email to