Repository: airavata
Updated Branches:
  refs/heads/master e290cfe17 -> b7e914ee3


Send acknowledgement to processed processes, Handle task status after execute 
each task and handle failed scenarios.


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b7e914ee
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b7e914ee
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b7e914ee

Branch: refs/heads/master
Commit: b7e914ee3c4c90b77676bd2c217bec3b3183dc76
Parents: e290cfe
Author: Shameera Rathanyaka <[email protected]>
Authored: Thu Jul 16 15:57:12 2015 -0400
Committer: Shameera Rathanyaka <[email protected]>
Committed: Thu Jul 16 15:57:12 2015 -0400

----------------------------------------------------------------------
 .../apache/airavata/gfac/core/GFacUtils.java    |  45 ++--
 .../gfac/core/context/ProcessContext.java       |  17 +-
 .../airavata/gfac/core/context/TaskContext.java |   3 +
 .../apache/airavata/gfac/core/task/Task.java    |  11 +-
 .../org/apache/airavata/gfac/impl/Factory.java  |   9 +
 .../airavata/gfac/impl/GFacEngineImpl.java      | 169 ++++++++-------
 .../apache/airavata/gfac/impl/GFacWorker.java   |  28 +++
 .../gfac/impl/task/AbstractSCPTask.java         |   2 -
 .../gfac/impl/task/ForkJobSubmissionTask.java   |   5 +-
 .../gfac/impl/task/LocalJobSubmissionTask.java  |  15 +-
 .../gfac/impl/task/SCPDataStageTask.java        |  81 ++++---
 .../gfac/impl/task/SCPInputDataStageTask.java   |  14 +-
 .../gfac/impl/task/SCPOutputDataStatgeTask.java |  10 +-
 .../gfac/impl/task/SSHEnvironmentSetupTask.java |  25 ++-
 .../gfac/impl/task/SSHJobSubmissionTask.java    | 212 ++++++++++++-------
 .../airavata/gfac/server/GfacServerHandler.java |   6 +-
 16 files changed, 413 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index b00240b..af10218 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -266,9 +266,7 @@ public class GFacUtils {
                // first we save job jobModel to the registry for sa and then 
save the job status.
                ProcessContext processContext = 
taskContext.getParentProcessContext();
                ExperimentCatalog experimentCatalog = 
processContext.getExperimentCatalog();
-               TaskStatus status = new TaskStatus();
-               status.setState(state);
-               taskContext.setTaskStatus(status);
+               TaskStatus status = taskContext.getTaskStatus();
                
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
                experimentCatalog.add(ExpCatChildDataType.TASK_STATUS, status, 
taskContext.getTaskId());
                TaskIdentifier identifier = new 
TaskIdentifier(taskContext.getTaskId(),
@@ -286,20 +284,17 @@ public class GFacUtils {
         }
     }
 
-    public static void saveProcessStatus(ProcessContext processContext,
-                                      ProcessState state) throws GFacException 
{
+    public static void saveAndPublishProcessStatus(ProcessContext 
processContext) throws GFacException {
         try {
             // first we save job jobModel to the registry for sa and then save 
the job status.
             ExperimentCatalog experimentCatalog = 
processContext.getExperimentCatalog();
-            ProcessStatus status = new ProcessStatus();
-            status.setState(state);
-            processContext.getProcessModel().setProcessStatus(status);
+            ProcessStatus status = processContext.getProcessStatus();
             
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
             experimentCatalog.add(ExpCatChildDataType.PROCESS_STATUS, status, 
processContext.getProcessId());
             ProcessIdentifier identifier = new 
ProcessIdentifier(processContext.getProcessId(),
                                                                  
processContext.getProcessModel().getExperimentId(),
                                                                  
processContext.getGatewayId());
-            ProcessStatusChangeEvent processStatusChangeEvent = new 
ProcessStatusChangeEvent(state, identifier);
+            ProcessStatusChangeEvent processStatusChangeEvent = new 
ProcessStatusChangeEvent(status.getState(), identifier);
                MessageContext msgCtx = new 
MessageContext(processStatusChangeEvent, MessageType.PROCESS,
                                
AiravataUtils.getId(MessageType.PROCESS.name()), processContext.getGatewayId());
                msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
@@ -1094,27 +1089,35 @@ public class GFacUtils {
                return GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + File.separator 
+ experimentId;
        }
 
-       public static void createExperimentNode(CuratorFramework curatorClient, 
String gfacServerName, String
+       public static void createProcessZKNode(CuratorFramework curatorClient, 
String gfacServerName, String
                        processId, long deliveryTag, String token) throws 
Exception {
-               // create /experiments/processId node and set data - 
serverName, add redelivery listener
-               String experimentPath = 
ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
-               
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
experimentPath);
-               curatorClient.setData().withVersion(-1).forPath(experimentPath, 
gfacServerName.getBytes());
-               curatorClient.getData().usingWatcher(new 
RedeliveryRequestWatcher()).forPath(experimentPath);
-
-               // create /experiments/processId/deliveryTag node and set data 
- deliveryTag
-               String deliveryTagPath = ZKPaths.makePath(experimentPath, 
GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE);
+               // TODO - To handle multiple processes per experiment, need to 
create a /experiment/{expId}/{processId} node
+               // create /experiments/{processId} node and set data - 
serverName, add redelivery listener
+               String zkProcessNodePath = 
ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
+               
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
zkProcessNodePath);
+               
curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, 
gfacServerName.getBytes());
+               curatorClient.getData().usingWatcher(new 
RedeliveryRequestWatcher()).forPath(zkProcessNodePath);
+
+               // create /experiments/{processId}/deliveryTag node and set 
data - deliveryTag
+               String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, 
GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE);
                
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
deliveryTagPath);
                
curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, 
GFacUtils.longToBytes(deliveryTag));
 
-               // create /experiments/processId/token node and set data - token
+               // create /experiments/{processId}/token node and set data - 
token
                String tokenNodePath = ZKPaths.makePath(processId, 
GFacConstants.ZOOKEEPER_TOKEN_NODE);
                
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
tokenNodePath);
                curatorClient.setData().withVersion(-1).forPath(tokenNodePath, 
token.getBytes());
 
-               // create /experiments/processId/cancelListener node and set 
watcher for data changes
-               String cancelListenerNode = ZKPaths.makePath(experimentPath, 
GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+               // create /experiments/{processId}/cancelListener node and set 
watcher for data changes
+               String cancelListenerNode = ZKPaths.makePath(zkProcessNodePath, 
GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
                
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
cancelListenerNode);
                curatorClient.getData().usingWatcher(new 
CancelRequestWatcher()).forPath(cancelListenerNode);
        }
+
+       public static long getProcessDeliveryTag(CuratorFramework 
curatorClient, String processId) throws Exception {
+               String deliveryTagPath = 
GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + processId + GFacConstants
+                               .ZOOKEEPER_DELIVERYTAG_NODE;
+               byte[] bytes = curatorClient.getData().forPath(deliveryTagPath);
+               return GFacUtils.bytesToLong(bytes);
+       }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index dc8dace..9e1ac06 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -21,7 +21,7 @@
 
 package org.apache.airavata.gfac.core.context;
 
-import org.apache.airavata.common.utils.LocalEventPublisher;
+import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.messaging.core.Publisher;
 import 
org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
@@ -40,11 +40,15 @@ import org.apache.airavata.model.status.ProcessStatus;
 import org.apache.airavata.registry.cpi.AppCatalog;
 import org.apache.airavata.registry.cpi.ExperimentCatalog;
 import org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
 
 public class ProcessContext {
+
+       private static final Logger log = 
LoggerFactory.getLogger(ProcessContext.class);
        // process model
        private ExperimentCatalog experimentCatalog;
        private AppCatalog appCatalog;
@@ -289,11 +293,16 @@ public class ProcessContext {
 
        public void setProcessStatus(ProcessStatus status) {
                if (status != null) {
+                       log.info("expId: {}, processId: {} :- Status changed {} 
-> {}", getExperimentId(), processId,
+                                       getProcessState().name(), 
status.getState().name());
                        processModel.setProcessStatus(status);
-                       // TODO publish process status change.
                }
        }
 
+       public ProcessStatus getProcessStatus(){
+               return processModel.getProcessStatus();
+       }
+
        public String getComputeResourceId() {
                return getComputeResourceDescription().getComputeResourceId();
        }
@@ -321,4 +330,8 @@ public class ProcessContext {
        public void setLocalWorkingDir(String localWorkingDir) {
                this.localWorkingDir = localWorkingDir;
        }
+
+       public String getExperimentId() {
+               return processModel.getExperimentId();
+       }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
index 95d2fb9..597fd2e 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java
@@ -54,6 +54,9 @@ public class TaskContext {
        }
 
        public void setTaskStatus(TaskStatus taskStatus) {
+               log.info("expId: {}, processId: {}, taskId: {}, type: {}:- 
Status changed {} -> {}", parentProcessContext
+                               .getExperimentId(), 
parentProcessContext.getProcessId(), getTaskId(), getTaskType().name(),
+                               getTaskState().name(), taskStatus 
.getState().name());
                taskModel.setTaskStatus(taskStatus);
        }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java
index 62c069a..f4eec85 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/task/Task.java
@@ -22,6 +22,7 @@ package org.apache.airavata.gfac.core.task;
 
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.TaskTypes;
 
 import java.util.Map;
@@ -42,19 +43,17 @@ public interface Task {
         * This method will be called at the first time of task chain 
execution. This method should called before recover
         * method. For a given task chain execute method only call one time. 
recover method may be called more than once.
         * @param taskContext
-        * @throws TaskException
-        * @return
+        * @return completed task status if success otherwise failed task 
status.
         */
-       public TaskState execute(TaskContext taskContext) throws TaskException;
+       public TaskStatus execute(TaskContext taskContext);
 
        /**
         * This methond will be invoked at recover path.Before this method is 
invoked, execute method should be invoked.
         * This method may be called zero or few time in a process chain.
         * @param taskContext
-        * @throws TaskException
-        * @return
+        * @return completed task status if success otherwise failed task 
status.
         */
-       public TaskState recover(TaskContext taskContext) throws TaskException;
+       public TaskStatus recover(TaskContext taskContext);
 
        /**
         * Task type will be used to identify the task behaviour. eg : 
DATA_STAGING , JOB_SUBMISSION

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index 51db6f3..a5fa5ed 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -52,6 +52,7 @@ import org.apache.airavata.gfac.impl.job.UGEJobConfiguration;
 import org.apache.airavata.gfac.impl.job.UGEOutputParser;
 import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
 import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchConsumer;
 import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher;
 import 
org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol;
 import 
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
@@ -101,6 +102,7 @@ public abstract class Factory {
        private static Map<DataMovementProtocol, Task> dataMovementTask = new 
HashMap<>();
        private static Map<ResourceJobManagerType, ResourceConfig> resources = 
new HashMap<>();
        private static Map<MonitorMode, JobMonitor> jobMonitorServices = new 
HashMap<>();
+       private static RabbitMQProcessLaunchConsumer processLaunchConsumer;
 
        public static GFacEngine getGFacEngine() throws GFacException {
                if (engine == null) {
@@ -145,6 +147,13 @@ public abstract class Factory {
                return curatorClient;
        }
 
+       public static RabbitMQProcessLaunchConsumer getProcessLaunchConsumer() 
throws AiravataException {
+               if (processLaunchConsumer == null) {
+                       processLaunchConsumer = new 
RabbitMQProcessLaunchConsumer();
+               }
+               return processLaunchConsumer;
+       }
+
        public static JobManagerConfiguration 
getJobManagerConfiguration(ResourceJobManager resourceJobManager) throws 
GFacException {
                ResourceConfig resourceConfig = 
Factory.getResourceConfig(resourceJobManager.getResourceJobManagerType());
                OutputParser outputParser;

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index c7eba99..a4c8381 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -124,14 +124,23 @@ public class GFacEngineImpl implements GFacEngine {
                TaskContext taskCtx = null;
                List<TaskContext> taskChain = new ArrayList<>();
                processContext.setProcessStatus(new 
ProcessStatus(ProcessState.CONFIGURING_WORKSPACE));
+               GFacUtils.saveAndPublishProcessStatus(processContext);
                // Run all environment setup tasks
                taskCtx = getEnvSetupTaskContext(processContext);
                saveTaskModel(taskCtx);
                GFacUtils.saveAndPublishTaskStatus(taskCtx);
                SSHEnvironmentSetupTask envSetupTask = new 
SSHEnvironmentSetupTask();
-               executeTask(taskCtx, envSetupTask);
+               TaskStatus taskStatus = executeTask(taskCtx, envSetupTask);
+               if (taskStatus.getState() == TaskState.FAILED) {
+                       log.error("expId: {}, processId: {}, taskId: {} type: 
{},:- Input statging failed, " +
+                                       "reason:" + " {}", 
taskCtx.getParentProcessContext().getExperimentId(), taskCtx
+                                       
.getParentProcessContext().getProcessId(), taskCtx.getTaskId(), 
envSetupTask.getType
+                                       ().name(), taskStatus.getReason());
+                       throw new GFacException("Error while environment 
setup");
+               }
                // execute process inputs
                processContext.setProcessStatus(new 
ProcessStatus(ProcessState.INPUT_DATA_STAGING));
+               GFacUtils.saveAndPublishProcessStatus(processContext);
                List<InputDataObjectType> processInputs = 
processContext.getProcessModel().getProcessInputs();
                sortByInputOrder(processInputs);
                if (processInputs != null) {
@@ -151,7 +160,14 @@ public class GFacEngineImpl implements GFacEngine {
                                                saveTaskModel(taskCtx);
                                                
GFacUtils.saveAndPublishTaskStatus(taskCtx);
                                                Task dMoveTask = 
Factory.getDataMovementTask(processContext.getDataMovementProtocol());
-                                               executeTask(taskCtx, dMoveTask);
+                                               taskStatus = 
executeTask(taskCtx, dMoveTask);
+                                               if (taskStatus.getState() == 
TaskState.FAILED) {
+                                                       log.error("expId: {}, 
processId: {}, taskId: {} type: {},:- Input statging failed, " +
+                                                                       
"reason:" + " {}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx
+                                                                       
.getParentProcessContext().getProcessId(), taskCtx.getTaskId(), 
dMoveTask.getType
+                                                                       
().name(), taskStatus.getReason());
+                                                       throw new 
GFacException("Error while staging input data");
+                                               }
                                                break;
                                        default:
                                                // nothing to do
@@ -160,29 +176,87 @@ public class GFacEngineImpl implements GFacEngine {
                        }
                }
                processContext.setProcessStatus(new 
ProcessStatus(ProcessState.EXECUTING));
+               GFacUtils.saveAndPublishProcessStatus(processContext);
                taskCtx = getJobSubmissionTaskContext(processContext);
+               saveTaskModel(taskCtx);
                GFacUtils.saveAndPublishTaskStatus(taskCtx);
                JobSubmissionTask jobSubmissionTask = 
Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol());
-               executeTask(taskCtx, jobSubmissionTask);
+               taskStatus = executeTask(taskCtx, jobSubmissionTask);
+               if (taskStatus.getState() == TaskState.FAILED) {
+                       throw new GFacException("Job submission task failed");
+               }
                processContext.setTaskChain(taskChain);
        }
 
-       private void executeTask(TaskContext taskCtx, Task task) throws 
GFacException {
-               try {
-                       taskCtx.setTaskStatus(new 
TaskStatus(TaskState.EXECUTING));
-                       GFacUtils.saveAndPublishTaskStatus(taskCtx);
-                       task.execute(taskCtx);
-                       taskCtx.setTaskStatus(new 
TaskStatus(TaskState.COMPLETED));
-                       GFacUtils.saveAndPublishTaskStatus(taskCtx);
-               } catch (TaskException e) {
-                       TaskStatus status = new TaskStatus(TaskState.FAILED);
-                       status.setReason(taskCtx.getTaskType().toString() + " 
Task Failed to execute");
-                       taskCtx.setTaskStatus(status);
-                       GFacUtils.saveAndPublishTaskStatus(taskCtx);
+
+       @Override
+       public void recoverProcess(ProcessContext processContext) throws 
GFacException {
+
+       }
+
+       @Override
+       public void runProcessOutflow(ProcessContext processContext) throws 
GFacException {
+               TaskContext taskCtx = null;
+               processContext.setProcessStatus(new 
ProcessStatus(ProcessState.OUTPUT_DATA_STAGING));
+               GFacUtils.saveAndPublishProcessStatus(processContext);
+               List<OutputDataObjectType> processOutputs = 
processContext.getProcessModel().getProcessOutputs();
+               for (OutputDataObjectType processOutput : processOutputs) {
+                       DataType type = processOutput.getType();
+                       switch (type) {
+                               case STDERR:
+                                       break;
+                               case STDOUT:
+                                       break;
+                               case URI:
+                                       try {
+                                               taskCtx = 
getDataStagingTaskContext(processContext, processOutput);
+                                       } catch (TException e) {
+                                               throw new GFacException("Thrift 
model to byte[] convertion issue", e);
+                                       }
+                                       File localWorkingdir = new 
File(taskCtx.getLocalWorkingDir());
+                                       localWorkingdir.mkdirs(); // make local 
dir if not exist
+                                       saveTaskModel(taskCtx);
+                                       
GFacUtils.saveAndPublishTaskStatus(taskCtx);
+                                       Task dMoveTask = 
Factory.getDataMovementTask(processContext.getDataMovementProtocol());
+                                       TaskStatus taskStatus = 
executeTask(taskCtx, dMoveTask);
+                                       if (taskStatus.getState() == 
TaskState.FAILED) {
+                                               log.error("expId: {}, 
processId: {}, taskId: {} type: {},:- Input statging failed, " +
+                                                               "reason:" + " 
{}", taskCtx.getParentProcessContext().getExperimentId(), taskCtx
+                                                               
.getParentProcessContext().getProcessId(), taskCtx.getTaskId(), 
dMoveTask.getType
+                                                               ().name(), 
taskStatus.getReason());
+                                               throw new GFacException("Error 
while staging input data");
+                                       }
+                                       break;
+                               default:
+                                       // nothing to do
+                                       break;
+                       }
                }
+               processContext.setProcessStatus(new 
ProcessStatus(ProcessState.POST_PROCESSING));
+               GFacUtils.saveAndPublishProcessStatus(processContext);
+//             taskCtx = getEnvCleanupTaskContext(processContext);
+
+       }
+
+       @Override
+       public void recoverProcessOutflow(ProcessContext processContext) throws 
GFacException {
+
+       }
+
+       @Override
+       public void cancelProcess() throws GFacException {
 
        }
 
+       private TaskStatus executeTask(TaskContext taskCtx, Task task) throws 
GFacException {
+               taskCtx.setTaskStatus(new TaskStatus(TaskState.EXECUTING));
+               GFacUtils.saveAndPublishTaskStatus(taskCtx);
+               TaskStatus taskStatus = task.execute(taskCtx);
+               taskCtx.setTaskStatus(taskStatus);
+               GFacUtils.saveAndPublishTaskStatus(taskCtx);
+               return taskCtx.getTaskStatus();
+       }
+
        private TaskContext getJobSubmissionTaskContext(ProcessContext 
processContext) throws GFacException {
                TaskContext taskCtx = new TaskContext();
                taskCtx.setParentProcessContext(processContext);
@@ -197,7 +271,8 @@ public class GFacEngineImpl implements GFacEngine {
                return taskCtx;
        }
 
-       private TaskContext getDataStagingTaskContext(ProcessContext 
processContext, InputDataObjectType processInput) throws TException {
+       private TaskContext getDataStagingTaskContext(ProcessContext 
processContext, InputDataObjectType processInput)
+                       throws TException {
                TaskContext taskCtx = new TaskContext();
                taskCtx.setParentProcessContext(processContext);
                // create new task model for this task
@@ -210,13 +285,15 @@ public class GFacEngineImpl implements GFacEngine {
                // create data staging sub task model
                DataStagingTaskModel submodel = new DataStagingTaskModel();
                submodel.setSource(processInput.getValue());
-               
submodel.setDestination(processContext.getDataMovementProtocol().name() + ":" + 
processContext.getWorkingDir());
+               
submodel.setDestination(processContext.getDataMovementProtocol().name() + ":" + 
processContext.getWorkingDir
+                               ());
                
taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
                taskCtx.setTaskModel(taskModel);
                return taskCtx;
        }
 
-       private TaskContext getDataStagingTaskContext(ProcessContext 
processContext, OutputDataObjectType processOutput) throws TException {
+       private TaskContext getDataStagingTaskContext(ProcessContext 
processContext, OutputDataObjectType processOutput)
+                       throws TException {
                TaskContext taskCtx = new TaskContext();
                taskCtx.setParentProcessContext(processContext);
                // create new task model for this task
@@ -241,7 +318,6 @@ public class GFacEngineImpl implements GFacEngine {
 
        /**
         * Persist task model
-        * @param taskContext
         */
        private void saveTaskModel(TaskContext taskContext) throws 
GFacException {
                try {
@@ -266,56 +342,6 @@ public class GFacEngineImpl implements GFacEngine {
                return taskCtx;
        }
 
-       @Override
-       public void recoverProcess(ProcessContext processContext) throws 
GFacException {
-
-       }
-
-       @Override
-       public void runProcessOutflow(ProcessContext processContext) throws 
GFacException {
-               TaskContext taskCtx = null;
-               processContext.setProcessStatus(new 
ProcessStatus(ProcessState.OUTPUT_DATA_STAGING));
-               List<OutputDataObjectType> processOutputs = 
processContext.getProcessModel().getProcessOutputs();
-               for (OutputDataObjectType processOutput : processOutputs) {
-                       DataType type = processOutput.getType();
-                       switch (type) {
-                               case STDERR:
-                                       break;
-                               case STDOUT:
-                                       break;
-                               case URI:
-                                       // TODO : Provide data staging data 
model
-                                       try {
-                                               taskCtx = 
getDataStagingTaskContext(processContext, processOutput);
-                                       } catch (TException e) {
-                                               throw new GFacException("Thrift 
model to byte[] convertion issue", e);
-                                       }
-                                       File localWorkingdir = new 
File(taskCtx.getLocalWorkingDir());
-                                       localWorkingdir.mkdirs(); // make local 
dir if not exist
-                                       saveTaskModel(taskCtx);
-                                       
GFacUtils.saveAndPublishTaskStatus(taskCtx);
-                                       Task dMoveTask = 
Factory.getDataMovementTask(processContext.getDataMovementProtocol());
-                                       executeTask(taskCtx, dMoveTask);
-                                       break;
-                               default:
-                                       // nothing to do
-                                       break;
-                       }
-               }
-               processContext.setProcessStatus(new 
ProcessStatus(ProcessState.POST_PROCESSING));
-//             taskCtx = getEnvCleanupTaskContext(processContext);
-
-       }
-
-       @Override
-       public void recoverProcessOutflow(ProcessContext processContext) throws 
GFacException {
-
-       }
-
-       @Override
-       public void cancelProcess() throws GFacException {
-
-       }
 
        /**
         * Sort input data type by input order.
@@ -331,7 +357,7 @@ public class GFacEngineImpl implements GFacEngine {
 
        public static ResourceJobManager getResourceJobManager(ProcessContext 
processCtx) throws AppCatalogException {
                List<JobSubmissionInterface> jobSubmissionInterfaces = 
Factory.getDefaultAppCatalog().getComputeResource()
-                               
.getComputeResource(processCtx.getComputeResourceId()) 
.getJobSubmissionInterfaces();
+                               
.getComputeResource(processCtx.getComputeResourceId()).getJobSubmissionInterfaces();
 
                ResourceJobManager resourceJobManager = null;
                JobSubmissionInterface jsInterface = null;
@@ -346,7 +372,8 @@ public class GFacEngineImpl implements GFacEngine {
                } else if (jsInterface.getJobSubmissionProtocol() == 
JobSubmissionProtocol.SSH) {
                        SSHJobSubmission sshJobSubmission = 
Factory.getDefaultAppCatalog().getComputeResource().getSSHJobSubmission
                                        
(jsInterface.getJobSubmissionInterfaceId());
-                       
processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move 
this to populate process context method.
+                       
processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move 
this to populate process
+                       // context method.
                        resourceJobManager = 
sshJobSubmission.getResourceJobManager();
                } else if (jsInterface.getJobSubmissionProtocol() == 
JobSubmissionProtocol.LOCAL) {
                        LOCALSubmission localSubmission = 
Factory.getDefaultAppCatalog().getComputeResource().getLocalJobSubmission

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 899f684..a759f90 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -25,6 +25,7 @@ import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.gfac.core.GFac;
 import org.apache.airavata.gfac.core.GFacEngine;
 import org.apache.airavata.gfac.core.GFacException;
+import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.gfac.core.monitor.JobMonitor;
 import org.apache.airavata.model.status.ProcessState;
@@ -32,6 +33,8 @@ import org.apache.airavata.model.status.ProcessStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.text.MessageFormat;
+
 public class GFacWorker implements Runnable {
 
        private static final Logger log = 
LoggerFactory.getLogger(GFacWorker.class);
@@ -85,11 +88,15 @@ public class GFacWorker implements Runnable {
                                                // run the outflow task
                                                
engine.runProcessOutflow(processContext);
                                                
processContext.setProcessStatus(new ProcessStatus(ProcessState.COMPLETED));
+                                               
GFacUtils.saveAndPublishProcessStatus(processContext);
+                                               sendAck();
                                                break;
                                        case RECOVER_OUTFLOW:
                                                // recover  outflow task;
                                                
engine.recoverProcessOutflow(processContext);
                                                
processContext.setProcessStatus(new ProcessStatus(ProcessState.COMPLETED));
+                                               
GFacUtils.saveAndPublishProcessStatus(processContext);
+                                               sendAck();
                                                break;
                                        default:
                                                throw new 
GFacException("process Id : " + processId + " Couldn't identify process type");
@@ -113,6 +120,14 @@ public class GFacWorker implements Runnable {
                        }
                } catch (GFacException e) {
                        log.error("GFac Worker throws an exception", e);
+                       processContext.setProcessStatus(new 
ProcessStatus(ProcessState.FAILED));
+                       try {
+                               
GFacUtils.saveAndPublishProcessStatus(processContext);
+                       } catch (GFacException e1) {
+                               log.error("expId: {}, processId: {} :- Couldn't 
save and publish process status {}", processContext
+                                               .getExperimentId(), 
processContext.getProcessId(), processContext.getProcessState());
+                       }
+                       sendAck();
                }
        }
 
@@ -142,6 +157,19 @@ public class GFacWorker implements Runnable {
                }
        }
 
+       private void sendAck() {
+               try {
+                       long processDeliveryTag = 
GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(), processId);
+                       
Factory.getProcessLaunchConsumer().sendAck(processDeliveryTag);
+                       log.info("expId: {}, procesId: {} :- Sent ack for 
deliveryTag {}", processContext.getExperimentId(),
+                                       processId, processDeliveryTag);
+               } catch (Exception e1) {
+                       String format = MessageFormat.format("expId: {0}, 
processId: {1} :- Couldn't send ack for deliveryTag ",
+                                       processContext .getExperimentId(), 
processId);
+                       log.error(format, e1);
+               }
+       }
+
        private ProcessType getProcessType(ProcessContext processContext) {
                // check the status and return correct type of process.
                switch (processContext.getProcessState()) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
index a34997c..17746f4 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AbstractSCPTask.java
@@ -20,10 +20,8 @@
  */
 package org.apache.airavata.gfac.impl.task;
 
-import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.model.status.TaskState;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
index dbc1a97..f8ef0ea 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ForkJobSubmissionTask.java
@@ -25,6 +25,7 @@ import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.JobSubmissionTask;
 import org.apache.airavata.gfac.core.task.TaskException;
 import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.TaskTypes;
 
 import java.util.Map;
@@ -36,12 +37,12 @@ public class ForkJobSubmissionTask implements 
JobSubmissionTask {
     }
 
     @Override
-    public TaskState execute(TaskContext taskContext) throws TaskException {
+    public TaskStatus execute(TaskContext taskContext) {
         return null;
     }
 
     @Override
-    public TaskState recover(TaskContext taskContext) throws TaskException {
+    public TaskStatus recover(TaskContext taskContext) {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
index ad7ab6d..5201de6 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/LocalJobSubmissionTask.java
@@ -36,6 +36,7 @@ import 
org.apache.airavata.model.application.io.InputDataObjectType;
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.status.JobState;
 import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.TaskTypes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,8 +54,8 @@ public class LocalJobSubmissionTask implements 
JobSubmissionTask{
     }
 
     @Override
-    public TaskState execute(TaskContext taskContext) throws TaskException {
-        try {
+    public TaskStatus execute(TaskContext taskContext) {
+     /*   try {
             ProcessContext processContext = 
taskContext.getParentProcessContext();
             // build command with all inputs
             List<String> cmdList = buildCommand(processContext);
@@ -97,10 +98,10 @@ public class LocalJobSubmissionTask implements 
JobSubmissionTask{
             standardOutWriter.join();
             standardErrorWriter.join();
 
-            /*
+            *//*
              * check return value. usually not very helpful to draw 
conclusions based on return values so don't bother.
              * just provide warning in the log messages
-             */
+             *//*
             if (returnValue != 0) {
                 log.error("Process finished with non zero return value. 
Process may have failed");
             } else {
@@ -124,12 +125,12 @@ public class LocalJobSubmissionTask implements 
JobSubmissionTask{
         } catch (IOException e) {
             log.error("Error while submitting local job", e);
             throw new TaskException("Error while submitting local job", e);
-        }
-        return TaskState.COMPLETED;
+        }*/
+           return new TaskStatus(TaskState.COMPLETED);
     }
 
     @Override
-    public TaskState recover(TaskContext taskContext) throws TaskException {
+    public TaskStatus recover(TaskContext taskContext) {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
index 089535e..b2a83ed 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java
@@ -20,63 +20,88 @@
  */
 package org.apache.airavata.gfac.impl.task;
 
-import com.jcraft.jsch.JSch;
-import com.jcraft.jsch.JSchException;
-import com.jcraft.jsch.Session;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.gfac.core.SSHApiException;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.Task;
 import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.SSHUtils;
+import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.DataStagingTaskModel;
 import org.apache.airavata.model.task.TaskTypes;
 import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Map;
 
 public class SCPDataStageTask implements Task {
+       private static final Logger log = 
LoggerFactory.getLogger(SCPDataStageTask.class);
+
        @Override
        public void init(Map<String, String> propertyMap) throws TaskException {
 
        }
 
        @Override
-       public TaskState execute(TaskContext taskContext) throws TaskException {
-
+       public TaskStatus execute(TaskContext taskContext) {
+               TaskStatus status = new TaskStatus(TaskState.COMPLETED);
                if (taskContext.getTaskModel().getTaskType() != 
TaskTypes.DATA_STAGING) {
-                       throw new TaskException("Invalid task call, expected " 
+ TaskTypes.DATA_STAGING.toString() + " but found "
+                       status.setState(TaskState.FAILED);
+                       status.setReason("Invalid task call, expected " + 
TaskTypes.DATA_STAGING.toString() + " but found "
                                        + 
taskContext.getTaskModel().getTaskType().toString());
-               }
-               try {
-                       DataStagingTaskModel subTaskModel = 
(DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext
-                                       .getTaskModel());
-                       URI sourceURI = new URI(subTaskModel.getSource());
-                       URI destinationURI = new 
URI(subTaskModel.getDestination());
+               } else {
+                       try {
+                               DataStagingTaskModel subTaskModel = 
(DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext
+                                               .getTaskModel());
+                               URI sourceURI = new 
URI(subTaskModel.getSource());
+                               URI destinationURI = new 
URI(subTaskModel.getDestination());
 
-                       if (sourceURI.getScheme().equalsIgnoreCase("file")) {  
//  Airavata --> RemoteCluster
-                               
taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURI.getPath(),
 destinationURI
-                                               .getPath());
-                       } else { // RemoteCluster --> Airavata
-                               
taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURI.getPath(),
 destinationURI
-                                               .getPath());
+                               if 
(sourceURI.getScheme().equalsIgnoreCase("file")) {  //  Airavata --> 
RemoteCluster
+                                       
taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURI.getPath(),
 destinationURI
+                                                       .getPath());
+                               } else { // RemoteCluster --> Airavata
+                                       
taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURI.getPath(),
 destinationURI
+                                                       .getPath());
+                               }
+                               status.setReason("Successfully staged data");
+                       } catch (SSHApiException e) {
+                               String msg = "Scp attempt failed";
+                               log.error(msg, e);
+                               status.setState(TaskState.FAILED);
+                               status.setReason(msg);
+                               ErrorModel errorModel = new ErrorModel();
+                               
errorModel.setActualErrorMessage(e.getMessage());
+                               errorModel.setUserFriendlyMessage(msg);
+                               
taskContext.getTaskModel().setTaskError(errorModel);
+                       } catch (TException e) {
+                               String msg = "Invalid task invocation";
+                               log.error(msg, e);
+                               status.setState(TaskState.FAILED);
+                               status.setReason(msg);
+                               ErrorModel errorModel = new ErrorModel();
+                               
errorModel.setActualErrorMessage(e.getMessage());
+                               errorModel.setUserFriendlyMessage(msg);
+                               
taskContext.getTaskModel().setTaskError(errorModel);
+                       } catch (URISyntaxException e) {
+                               String msg = "source or destination is not a 
valid URI";
+                               log.error(msg, e);
+                               status.setState(TaskState.FAILED);
+                               status.setReason(msg);
+                               ErrorModel errorModel = new ErrorModel();
+                               
errorModel.setActualErrorMessage(e.getMessage());
+                               errorModel.setUserFriendlyMessage(msg);
+                               
taskContext.getTaskModel().setTaskError(errorModel);
                        }
-               } catch (SSHApiException e) {
-                       throw new TaskException("Scp attempt failed", e);
-               } catch (TException e) {
-                       throw new TaskException("Invalid task invocation");
-               } catch (URISyntaxException e) {
-                       throw new TaskException("source or destination is not a 
valid URI");
                }
-               return null;
+               return status;
        }
 
        @Override
-       public TaskState recover(TaskContext taskContext) throws TaskException {
+       public TaskStatus recover(TaskContext taskContext) {
                return null;
        }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
index 332a0aa..fc4d634 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java
@@ -23,23 +23,19 @@ package org.apache.airavata.gfac.impl.task;
 import com.jcraft.jsch.JSch;
 import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
-import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.gfac.core.SSHApiException;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.Factory;
 import org.apache.airavata.gfac.impl.SSHUtils;
-import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.DataStagingTaskModel;
 import org.apache.airavata.model.task.TaskTypes;
 import org.apache.thrift.TException;
 
 import java.io.IOException;
-import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.net.URL;
 
 public class SCPInputDataStageTask extends AbstractSCPTask {
 
@@ -47,9 +43,9 @@ public class SCPInputDataStageTask extends AbstractSCPTask {
        }
 
        @Override
-       public TaskState execute(TaskContext taskContext) throws TaskException {
+       public TaskStatus execute(TaskContext taskContext) {
 
-               if (taskContext.getTaskModel().getTaskType() != 
TaskTypes.DATA_STAGING) {
+/*             if (taskContext.getTaskModel().getTaskType() != 
TaskTypes.DATA_STAGING) {
                        throw new TaskException("Invalid task call, expected " 
+ TaskTypes.DATA_STAGING.toString() + " but found "
                                        + 
taskContext.getTaskModel().getTaskType().toString());
                }
@@ -81,12 +77,12 @@ public class SCPInputDataStageTask extends AbstractSCPTask {
                        throw new TaskException("Invalid task invocation");
                } catch (URISyntaxException e) {
                        e.printStackTrace();
-               }
+               }*/
                return null;
        }
 
        @Override
-       public TaskState recover(TaskContext taskContext) throws TaskException {
+       public TaskStatus recover(TaskContext taskContext) {
                return null;
        }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
index 72e071c..6fa87c4 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPOutputDataStatgeTask.java
@@ -28,7 +28,7 @@ import org.apache.airavata.gfac.core.SSHApiException;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.TaskException;
 import org.apache.airavata.gfac.impl.SSHUtils;
-import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.DataStagingTaskModel;
 import org.apache.airavata.model.task.TaskTypes;
 import org.apache.thrift.TException;
@@ -41,8 +41,8 @@ public class SCPOutputDataStatgeTask extends AbstractSCPTask {
 
 
        @Override
-       public TaskState execute(TaskContext taskContext) throws TaskException {
-               if (taskContext.getTaskModel().getTaskType() != 
TaskTypes.DATA_STAGING) {
+       public TaskStatus execute(TaskContext taskContext) {
+/*             if (taskContext.getTaskModel().getTaskType() != 
TaskTypes.DATA_STAGING) {
                        throw new TaskException("Invalid task call, expected " 
+ TaskTypes.DATA_STAGING.toString() + " but found "
                                        + 
taskContext.getTaskModel().getTaskType().toString());
                }
@@ -69,12 +69,12 @@ public class SCPOutputDataStatgeTask extends 
AbstractSCPTask {
                        throw new TaskException("Scp failed", e);
                } catch (TException e) {
                        throw new TaskException("Invalid task invocation");
-               }
+               }*/
                return null;
        }
 
        @Override
-       public TaskState recover(TaskContext taskContext) throws TaskException {
+       public TaskStatus recover(TaskContext taskContext) {
                return null;
        }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
index e541644..74f5826 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHEnvironmentSetupTask.java
@@ -20,37 +20,50 @@
  */
 package org.apache.airavata.gfac.impl.task;
 
-import org.apache.airavata.gfac.core.task.TaskException;
 import org.apache.airavata.gfac.core.SSHApiException;
 import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.gfac.core.context.TaskContext;
 import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.TaskTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
 public class SSHEnvironmentSetupTask implements Task {
 
+       private static final Logger log = 
LoggerFactory.getLogger(SSHEnvironmentSetupTask.class);
        @Override
        public void init(Map<String, String> propertyMap) throws TaskException {
 
        }
 
        @Override
-       public TaskState execute(TaskContext taskContext) throws TaskException {
-
+       public TaskStatus execute(TaskContext taskContext) {
+               TaskStatus status = new TaskStatus(TaskState.COMPLETED);
                try {
                        RemoteCluster remoteCluster = 
taskContext.getParentProcessContext().getRemoteCluster();
                        
remoteCluster.makeDirectory(taskContext.getParentProcessContext().getWorkingDir());
+                       status.setReason("Successfully createded environment");
                } catch (SSHApiException e) {
-                       throw new TaskException("Error while environment 
setup", e);
+                       String msg = "Error while environment setup";
+                       log.error(msg, e);
+                       status.setState(TaskState.FAILED);
+                       status.setReason(msg);
+                       ErrorModel errorModel = new ErrorModel();
+                       errorModel.setActualErrorMessage(e.getMessage());
+                       errorModel.setUserFriendlyMessage(msg);
+                       taskContext.getTaskModel().setTaskError(errorModel);
                }
-               return null;
+               return status;
        }
 
        @Override
-       public TaskState recover(TaskContext taskContext) throws TaskException {
+       public TaskStatus recover(TaskContext taskContext) {
                return execute(taskContext);
        }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
index ff3a6f8..c282f17 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SSHJobSubmissionTask.java
@@ -31,10 +31,12 @@ import org.apache.airavata.gfac.core.task.JobSubmissionTask;
 import org.apache.airavata.gfac.core.task.TaskException;
 import org.apache.airavata.gfac.impl.Factory;
 import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.commons.ErrorModel;
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.status.JobState;
 import org.apache.airavata.model.status.JobStatus;
 import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
 import org.apache.airavata.model.task.TaskTypes;
 import org.apache.airavata.registry.cpi.AppCatalogException;
 import org.apache.commons.io.FileUtils;
@@ -53,87 +55,143 @@ public class SSHJobSubmissionTask implements 
JobSubmissionTask {
     }
 
     @Override
-    public TaskState execute(TaskContext taskContext) throws TaskException {
-        try {
-            ProcessContext processContext = 
taskContext.getParentProcessContext();
-            JobModel jobModel = processContext.getJobModel();
-            if (jobModel == null){
-                jobModel = new JobModel();
-                   jobModel.setWorkingDir(processContext.getWorkingDir());
-                   jobModel.setTaskId(taskContext.getTaskId());
-                   
jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
-            }
-            RemoteCluster remoteCluster = processContext.getRemoteCluster();
-            JobDescriptor jobDescriptor = 
GFacUtils.createJobDescriptor(processContext);
-            jobModel.setJobName(jobDescriptor.getJobName());
-            ResourceJobManager resourceJobManager = 
GFacUtils.getResourceJobManager(processContext);
-            JobManagerConfiguration jConfig = null;
-            if (resourceJobManager != null) {
-                jConfig = 
Factory.getJobManagerConfiguration(resourceJobManager);
-            }
-            File jobFile = GFacUtils.createJobFile(jobDescriptor, jConfig);
-            if (jobFile != null && jobFile.exists()){
-                
jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
-                String jobId = remoteCluster.submitBatchJob(jobFile.getPath(), 
processContext.getWorkingDir());
-                if (jobId != null && !jobId.isEmpty()) {
-                    jobModel.setJobId(jobId);
-                    GFacUtils.saveJobStatus(taskContext, jobModel, 
JobState.SUBMITTED);
+    public TaskStatus execute(TaskContext taskContext){
+           TaskStatus status = new TaskStatus(TaskState.COMPLETED); // set to 
completed.
+           try {
+                   ProcessContext processContext = 
taskContext.getParentProcessContext();
+                   JobModel jobModel = processContext.getJobModel();
+                   if (jobModel == null) {
+                           jobModel = new JobModel();
+                           
jobModel.setWorkingDir(processContext.getWorkingDir());
+                           jobModel.setTaskId(taskContext.getTaskId());
+                           
jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+                   }
+                   RemoteCluster remoteCluster = 
processContext.getRemoteCluster();
+                   JobDescriptor jobDescriptor = 
GFacUtils.createJobDescriptor(processContext);
+                   jobModel.setJobName(jobDescriptor.getJobName());
+                   ResourceJobManager resourceJobManager = 
GFacUtils.getResourceJobManager(processContext);
+                   JobManagerConfiguration jConfig = null;
+                   if (resourceJobManager != null) {
+                           jConfig = 
Factory.getJobManagerConfiguration(resourceJobManager);
+                   }
+                   File jobFile = GFacUtils.createJobFile(jobDescriptor, 
jConfig);
+                   if (jobFile != null && jobFile.exists()) {
+                           
jobModel.setJobDescription(FileUtils.readFileToString(jobFile));
+                           String jobId = 
remoteCluster.submitBatchJob(jobFile.getPath(), processContext.getWorkingDir());
+                           if (jobId != null && !jobId.isEmpty()) {
+                                   jobModel.setJobId(jobId);
+                                   GFacUtils.saveJobStatus(taskContext, 
jobModel, JobState.SUBMITTED);
 //                    publisher.publish(new 
GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
 //                            , GfacExperimentState.JOBSUBMITTED));
-                    processContext.setJobModel(jobModel);
-                    if (verifyJobSubmissionByJobId(remoteCluster, jobId)) {
+                                   processContext.setJobModel(jobModel);
+                                   if 
(verifyJobSubmissionByJobId(remoteCluster, jobId)) {
 //                        publisher.publish(new 
GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
 //                                , GfacExperimentState.JOBSUBMITTED));
-                        GFacUtils.saveJobStatus(taskContext, jobModel, 
JobState.QUEUED);
-                    }
-                } else {
-                    processContext.setJobModel(jobModel);
-                    int verificationTryCount = 0;
-                    while (verificationTryCount++ < 3) {
-                        String verifyJobId = 
verifyJobSubmission(remoteCluster, jobModel);
-                        if (verifyJobId != null && !verifyJobId.isEmpty()) {
-                            // JobStatus either changed from SUBMITTED to 
QUEUED or directly to QUEUED
-                            jobId = verifyJobId;
-                            jobModel.setJobId(jobId);
+                                           
GFacUtils.saveJobStatus(taskContext, jobModel, JobState.QUEUED);
+                                   }
+                                   status = new 
TaskStatus(TaskState.COMPLETED);
+                                   status.setReason("Submitted job to compute 
resource");
+                           } else {
+                                   processContext.setJobModel(jobModel);
+                                   int verificationTryCount = 0;
+                                   while (verificationTryCount++ < 3) {
+                                           String verifyJobId = 
verifyJobSubmission(remoteCluster, jobModel);
+                                           if (verifyJobId != null && 
!verifyJobId.isEmpty()) {
+                                                   // JobStatus either changed 
from SUBMITTED to QUEUED or directly to QUEUED
+                                                   jobId = verifyJobId;
+                                                   jobModel.setJobId(jobId);
 //                            publisher.publish(new 
GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
 //                                    , GfacExperimentState.JOBSUBMITTED));
-                            GFacUtils.saveJobStatus(taskContext, jobModel, 
JobState.QUEUED);
-                            break;
-                        }
-                        Thread.sleep(verificationTryCount * 1000);
-                    }
-                }
+                                                   
GFacUtils.saveJobStatus(taskContext, jobModel, JobState.QUEUED);
+                                                   
status.setState(TaskState.COMPLETED);
+                                                   status.setReason("Submitted 
job to compute resource");
+                                                   break;
+                                           }
+                                           Thread.sleep(verificationTryCount * 
1000);
+                                   }
+                           }
 
-                if (jobId == null || jobId.isEmpty()) {
-                    String msg = "expId:" + 
processContext.getProcessModel().getExperimentId() + " Couldn't find remote 
jobId for JobName:"
-                            + jobModel.getJobName() + ", both submit and 
verify steps doesn't return a valid JobId. Hence changing experiment state to 
Failed";
-                    log.error(msg);
-                    GFacUtils.saveErrorDetails(processContext, msg);
-                    // FIXME : Need to handle according to status update chain
-//                    GFacUtils.publishTaskStatus(jobExecutionContext, 
publisher, TaskState.FAILED);
-                    return TaskState.FAILED;
-                }
-            }
-            return TaskState.COMPLETED;
-        } catch (AppCatalogException e) {
-            log.error("Error while instatiating app catalog",e);
-            throw new TaskException("Error while instatiating app catalog", e);
-        } catch (ApplicationSettingsException e) {
-            log.error("Error occurred while creating job descriptor", e);
-            throw new TaskException("Error occurred while creating job 
descriptor", e);
-        } catch (GFacException e) {
-            log.error("Error occurred while creating job descriptor", e);
-            throw new TaskException("Error occurred while creating job 
descriptor", e);
-        } catch (SSHApiException e) {
-            log.error("Error occurred while submitting the job", e);
-            throw new TaskException("Error occurred while submitting the job", 
e);
-        } catch (IOException e) {
-            log.error("Error while reading the content of the job file", e);
-            throw new TaskException("Error while reading the content of the 
job file", e);
-        } catch (InterruptedException e) {
-            log.error("Error occurred while verifying the job submission", e);
-            throw new TaskException("Error occurred while verifying the job 
submission", e);
-        }
+                           if (jobId == null || jobId.isEmpty()) {
+                                   String msg = "expId:" + 
processContext.getProcessModel().getExperimentId() + " Couldn't find " +
+                                                   "remote jobId for JobName:" 
+ jobModel.getJobName() + ", both submit and verify steps " +
+                                                   "doesn't return a valid 
JobId. " + "Hence changing experiment state to Failed";
+                                   log.error(msg);
+                                   GFacUtils.saveErrorDetails(processContext, 
msg);
+                                   status.setState(TaskState.FAILED);
+                                   status.setReason("Couldn't find job id in 
both submitted and verified steps");
+                           }
+                   } else {
+                           status.setState(TaskState.FAILED);
+                           if (jobFile == null) {
+                                   status.setReason("JobFile is null");
+                           } else {
+                                   status.setReason("Job file doesn't exist");
+                           }
+                   }
+
+           } catch (AppCatalogException e) {
+                   String msg = "Error while instatiating app catalog";
+                   log.error(msg, e);
+                   status.setState(TaskState.FAILED);
+                   status.setReason(msg);
+                   ErrorModel errorModel = new ErrorModel();
+                   errorModel.setActualErrorMessage(e.getMessage());
+                   errorModel.setUserFriendlyMessage(msg);
+                   taskContext.getTaskModel().setTaskError(errorModel);
+           } catch (ApplicationSettingsException e) {
+                   String msg = "Error occurred while creating job descriptor";
+                   log.error(msg, e);
+                   status.setState(TaskState.FAILED);
+                   status.setReason(msg);
+                   ErrorModel errorModel = new ErrorModel();
+                   errorModel.setActualErrorMessage(e.getMessage());
+                   errorModel.setUserFriendlyMessage(msg);
+                   taskContext.getTaskModel().setTaskError(errorModel);
+           } catch (GFacException e) {
+                   String msg = "Error occurred while creating job descriptor";
+                   log.error(msg, e);
+                   status.setState(TaskState.FAILED);
+                   status.setReason(msg);
+                   ErrorModel errorModel = new ErrorModel();
+                   errorModel.setActualErrorMessage(e.getMessage());
+                   errorModel.setUserFriendlyMessage(msg);
+                   taskContext.getTaskModel().setTaskError(errorModel);
+           } catch (SSHApiException e) {
+                   String msg = "Error occurred while submitting the job";
+                   log.error(msg, e);
+                   status.setState(TaskState.FAILED);
+                   status.setReason(msg);
+                   ErrorModel errorModel = new ErrorModel();
+                   errorModel.setActualErrorMessage(e.getMessage());
+                   errorModel.setUserFriendlyMessage(msg);
+                   taskContext.getTaskModel().setTaskError(errorModel);
+           } catch (IOException e) {
+                   String msg = "Error while reading the content of the job 
file";
+                   log.error(msg, e);
+                   status.setState(TaskState.FAILED);
+                   status.setReason(msg);
+                   ErrorModel errorModel = new ErrorModel();
+                   errorModel.setActualErrorMessage(e.getMessage());
+                   errorModel.setUserFriendlyMessage(msg);
+                   taskContext.getTaskModel().setTaskError(errorModel);
+           } catch (InterruptedException e) {
+                   String msg = "Error occurred while verifying the job 
submission";
+                   log.error(msg, e);
+                   status.setState(TaskState.FAILED);
+                   status.setReason(msg);
+                   ErrorModel errorModel = new ErrorModel();
+                   errorModel.setActualErrorMessage(e.getMessage());
+                   errorModel.setUserFriendlyMessage(msg);
+                   taskContext.getTaskModel().setTaskError(errorModel);
+           }
+
+           taskContext.setTaskStatus(status);
+           try {
+                   GFacUtils.saveAndPublishTaskStatus(taskContext);
+           } catch (GFacException e) {
+                   log.error("Error while saving task status", e);
+           }
+           return status;
     }
 
     private boolean verifyJobSubmissionByJobId(RemoteCluster remoteCluster, 
String jobID) throws SSHApiException {
@@ -154,15 +212,15 @@ public class SSHJobSubmissionTask implements 
JobSubmissionTask {
 
 
     @Override
-    public TaskState recover(TaskContext taskContext) throws TaskException {
+    public TaskStatus recover(TaskContext taskContext) {
             ProcessContext processContext = 
taskContext.getParentProcessContext();
             JobModel jobModel = processContext.getJobModel();
             // original job failed before submitting
             if (jobModel == null || jobModel.getJobId() == null ){
                 return execute(taskContext);
             }else {
-                // job is already submitted and monitor should handle the 
recovery
-                return TaskState.COMPLETED;
+                   // job is already submitted and monitor should handle the 
recovery
+                   return new TaskStatus(TaskState.COMPLETED);
             }
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b7e914ee/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
 
b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 6fa1288..56d9d9c 100644
--- 
a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ 
b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -38,7 +38,6 @@ import org.apache.airavata.messaging.core.MessageHandler;
 import org.apache.airavata.messaging.core.MessagingConstants;
 import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchConsumer;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher;
 import org.apache.airavata.model.messaging.event.*;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.ProcessStatus;
@@ -92,7 +91,7 @@ public class GfacServerHandler implements GfacService.Iface {
     }
 
     private void initAMQPClient() throws AiravataException {
-        rabbitMQProcessLaunchConsumer = new RabbitMQProcessLaunchConsumer();
+        rabbitMQProcessLaunchConsumer = Factory.getProcessLaunchConsumer();
         rabbitMQProcessLaunchConsumer.listen(new 
ProcessLaunchMessageHandler());
     }
 
@@ -220,7 +219,8 @@ public class GfacServerHandler implements GfacService.Iface 
{
                     
status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
                     
Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS,
 status, event.getProcessId());
                     try {
-                           GFacUtils.createExperimentNode(curatorClient, 
gfacServerName, event.getProcessId(), message.getDeliveryTag(),
+                           GFacUtils.createProcessZKNode(curatorClient, 
gfacServerName, event.getProcessId(), message
+                                                           .getDeliveryTag(),
                                            event.getTokenId());
                         submitProcess(event.getProcessId(), 
event.getGatewayId(), event.getTokenId());
                     } catch (Exception e) {

Reply via email to