Repository: airavata
Updated Branches:
  refs/heads/master 36922c9fc -> b98f65997


Fixing AIRAVATA-1797 , process redelivery handling


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/92243599
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/92243599
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/92243599

Branch: refs/heads/master
Commit: 92243599278db75e56f19da2b4d03d7f029ce17f
Parents: 4f6e8c5
Author: Shameera Rathanyaka <[email protected]>
Authored: Tue Aug 18 11:35:46 2015 -0400
Committer: Shameera Rathanyaka <[email protected]>
Committed: Tue Aug 18 11:35:46 2015 -0400

----------------------------------------------------------------------
 .../apache/airavata/gfac/core/GFacUtils.java    | 28 +------
 .../airavata/gfac/core/context/GFacContext.java | 60 ++++++++++++++
 .../gfac/core/context/ProcessContext.java       | 11 ++-
 .../gfac/core/watcher/CancelRequestWatcher.java |  8 +-
 .../core/watcher/RedeliveryRequestWatcher.java  |  8 +-
 .../org/apache/airavata/gfac/impl/Factory.java  | 25 ++++++
 .../airavata/gfac/impl/GFacEngineImpl.java      | 27 +++++++
 .../apache/airavata/gfac/impl/GFacWorker.java   | 35 +++++----
 .../impl/watcher/CancelRequestWatcherImpl.java  | 31 ++++++++
 .../watcher/RedeliveryRequestWatcherImpl.java   | 69 ++++++++++++++++
 .../airavata/gfac/server/GfacServerHandler.java | 82 +++++++++++++++++---
 .../airavata/messaging/core/MessageContext.java |  9 +++
 .../impl/RabbitMQProcessLaunchConsumer.java     |  2 +-
 .../core/impl/RabbitMQStatusConsumer.java       |  2 +
 14 files changed, 333 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
index 45af599..8461216 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java
@@ -1025,7 +1025,8 @@ public class GFacUtils {
         try {
             GwyResourceProfile gatewayProfile = 
context.getAppCatalog().getGatewayProfile();
             String resourceHostId = 
context.getComputeResourceDescription().getComputeResourceId();
-            ComputeResourcePreference preference = 
gatewayProfile.getComputeResourcePreference(context.getGatewayId(), 
resourceHostId);
+            ComputeResourcePreference preference = 
gatewayProfile.getComputeResourcePreference(context.getGatewayId()
+                           , resourceHostId);
             return preference.getPreferredJobSubmissionProtocol();
         } catch (AppCatalogException e) {
             log.error("Error occurred while initializing app catalog", e);
@@ -1086,31 +1087,6 @@ public class GFacUtils {
                return GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + File.separator 
+ experimentId;
        }
 
-       public static void createProcessZKNode(CuratorFramework curatorClient, 
String gfacServerName, String
-                       processId, long deliveryTag, String token) throws 
Exception {
-               // TODO - To handle multiple processes per experiment, need to 
create a /experiment/{expId}/{processId} node
-               // create /experiments/{processId} node and set data - 
serverName, add redelivery listener
-               String zkProcessNodePath = 
ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
-               
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
zkProcessNodePath);
-               
curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, 
gfacServerName.getBytes());
-               curatorClient.getData().usingWatcher(new 
RedeliveryRequestWatcher()).forPath(zkProcessNodePath);
-
-               // create /experiments/{processId}/deliveryTag node and set 
data - deliveryTag
-               String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, 
GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE);
-               
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
deliveryTagPath);
-               
curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, 
GFacUtils.longToBytes(deliveryTag));
-
-               // create /experiments/{processId}/token node and set data - 
token
-               String tokenNodePath = ZKPaths.makePath(processId, 
GFacConstants.ZOOKEEPER_TOKEN_NODE);
-               
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
tokenNodePath);
-               curatorClient.setData().withVersion(-1).forPath(tokenNodePath, 
token.getBytes());
-
-               // create /experiments/{processId}/cancelListener node and set 
watcher for data changes
-               String cancelListenerNode = ZKPaths.makePath(zkProcessNodePath, 
GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
-               
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
cancelListenerNode);
-               curatorClient.getData().usingWatcher(new 
CancelRequestWatcher()).forPath(cancelListenerNode);
-       }
-
        public static long getProcessDeliveryTag(CuratorFramework 
curatorClient, String processId) throws Exception {
                String deliveryTagPath = 
GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + processId + GFacConstants
                                .ZOOKEEPER_DELIVERYTAG_NODE;

http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/GFacContext.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/GFacContext.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/GFacContext.java
new file mode 100644
index 0000000..9bca09f
--- /dev/null
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/GFacContext.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.core.context;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This is a singleton class, which store all required details of running 
processes.
+ */
+public class GFacContext {
+
+       private Map<String,ProcessContext> processes;
+       private static GFacContext gfacContext;
+
+       private GFacContext() {
+               processes = new HashMap<>();
+       }
+
+       public static GFacContext getInstance() {
+               if (gfacContext == null) {
+                       synchronized (GFacContext.class) {
+                               if (gfacContext == null) {
+                                       gfacContext = new GFacContext();
+                               }
+                       }
+               }
+               return gfacContext;
+       }
+
+       public void addProcess(ProcessContext processContext) {
+               processes.put(processContext.getProcessId(), processContext);
+       }
+
+       public ProcessContext getProcess(String processId) {
+               return processes.get(processId);
+       }
+
+       public void remoteProcess(String processId) {
+               processes.remove(processId);
+       }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 4004787..37f8b20 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -22,7 +22,6 @@
 package org.apache.airavata.gfac.core.context;
 
 import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.cluster.RemoteCluster;
 import org.apache.airavata.messaging.core.Publisher;
 import 
org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
@@ -36,7 +35,6 @@ import 
org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePrefer
 import 
org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
 import org.apache.airavata.model.job.JobModel;
 import org.apache.airavata.model.process.ProcessModel;
-import org.apache.airavata.model.status.JobStatus;
 import org.apache.airavata.model.status.ProcessState;
 import org.apache.airavata.model.status.ProcessStatus;
 import org.apache.airavata.registry.cpi.AppCatalog;
@@ -79,6 +77,7 @@ public class ProcessContext {
        private ComputeResourcePreference computeResourcePreference;
        private MonitorMode monitorMode;
        private ResourceJobManager resourceJobManager;
+       private boolean handOver;
 
        /**
         * Note: process context property use lazy loading approach. In runtime 
you will see some properties as null
@@ -341,4 +340,12 @@ public class ProcessContext {
        public String getExperimentId() {
                return processModel.getExperimentId();
        }
+
+       public boolean isHandOver() {
+               return handOver;
+       }
+
+       public void setHandOver(boolean handOver) {
+               this.handOver = handOver;
+       }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/watcher/CancelRequestWatcher.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/watcher/CancelRequestWatcher.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/watcher/CancelRequestWatcher.java
index 41bb0f0..2bdc770 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/watcher/CancelRequestWatcher.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/watcher/CancelRequestWatcher.java
@@ -21,11 +21,7 @@
 package org.apache.airavata.gfac.core.watcher;
 
 import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.zookeeper.WatchedEvent;
 
-public class CancelRequestWatcher implements CuratorWatcher {
-       @Override
-       public void process(WatchedEvent watchedEvent) throws Exception {
-               // this watcher change data in cancel listener node in the 
experiment node
-       }
+public interface CancelRequestWatcher extends CuratorWatcher {
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/watcher/RedeliveryRequestWatcher.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/watcher/RedeliveryRequestWatcher.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/watcher/RedeliveryRequestWatcher.java
index c9ad139..d200bf5 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/watcher/RedeliveryRequestWatcher.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/watcher/RedeliveryRequestWatcher.java
@@ -21,11 +21,7 @@
 package org.apache.airavata.gfac.core.watcher;
 
 import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.zookeeper.WatchedEvent;
 
-public class RedeliveryRequestWatcher implements CuratorWatcher {
-       @Override
-       public void process(WatchedEvent watchedEvent) throws Exception {
-               // get the data in experiment node and compare with gfac server 
name.
-       }
+public interface RedeliveryRequestWatcher extends CuratorWatcher {
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index 4bfb6cf..e660196 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -41,11 +41,14 @@ import 
org.apache.airavata.gfac.core.config.DataTransferTaskConfig;
 import org.apache.airavata.gfac.core.config.GFacYamlConfigruation;
 import org.apache.airavata.gfac.core.config.JobSubmitterTaskConfig;
 import org.apache.airavata.gfac.core.config.ResourceConfig;
+import org.apache.airavata.gfac.core.context.GFacContext;
 import org.apache.airavata.gfac.core.context.ProcessContext;
 import org.apache.airavata.gfac.core.monitor.JobMonitor;
 import org.apache.airavata.gfac.core.scheduler.HostScheduler;
 import org.apache.airavata.gfac.core.task.JobSubmissionTask;
 import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher;
+import org.apache.airavata.gfac.core.watcher.RedeliveryRequestWatcher;
 import org.apache.airavata.gfac.impl.job.LSFJobConfiguration;
 import org.apache.airavata.gfac.impl.job.LSFOutputParser;
 import org.apache.airavata.gfac.impl.job.PBSJobConfiguration;
@@ -54,6 +57,8 @@ import 
org.apache.airavata.gfac.impl.job.SlurmJobConfiguration;
 import org.apache.airavata.gfac.impl.job.SlurmOutputParser;
 import org.apache.airavata.gfac.impl.job.UGEJobConfiguration;
 import org.apache.airavata.gfac.impl.job.UGEOutputParser;
+import org.apache.airavata.gfac.impl.watcher.CancelRequestWatcherImpl;
+import org.apache.airavata.gfac.impl.watcher.RedeliveryRequestWatcherImpl;
 import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
 import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchConsumer;
@@ -98,6 +103,7 @@ public abstract class Factory {
        }*/
 
        private static GFacEngine engine;
+       private static GFacContext gfacContext;
        private static Publisher statusPublisher;
        private static CuratorFramework curatorClient;
        private static EmailBasedMonitor emailBasedMonitor;
@@ -120,6 +126,13 @@ public abstract class Factory {
                return engine;
        }
 
+       public static GFacContext getGfacContext() {
+               if (gfacContext == null) {
+                       gfacContext = GFacContext.getInstance();
+               }
+               return gfacContext;
+       }
+
        public static ExperimentCatalog getDefaultExpCatalog() throws 
RegistryException {
                return RegistryFactory.getDefaultExpCatalog();
        }
@@ -304,6 +317,18 @@ public abstract class Factory {
                return jobMonitor;
        }
 
+       public static JobMonitor getDefaultMonitorService() throws 
AiravataException {
+               return 
getMonitorService(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR);
+       }
+
+       public static RedeliveryRequestWatcher getRedeliveryReqeustWatcher() {
+               return new RedeliveryRequestWatcherImpl();
+       }
+
+       public static CancelRequestWatcher getCancelRequestWatcher() {
+               return new CancelRequestWatcherImpl();
+       }
+
        public static Session getSSHSession(AuthenticationInfo 
authenticationInfo, ServerInfo serverInfo) throws AiravataException {
                SSHKeyAuthentication authentication = null;
                String key = serverInfo.getUserName() + "_" + 
serverInfo.getHost() + "_" + serverInfo.getPort();

http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index a4c8381..33e354b 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -121,6 +121,9 @@ public class GFacEngineImpl implements GFacEngine {
 
        @Override
        public void executeProcess(ProcessContext processContext) throws 
GFacException {
+               if (processContext.isHandOver()) {
+                       return;
+               }
                TaskContext taskCtx = null;
                List<TaskContext> taskChain = new ArrayList<>();
                processContext.setProcessStatus(new 
ProcessStatus(ProcessState.CONFIGURING_WORKSPACE));
@@ -138,6 +141,9 @@ public class GFacEngineImpl implements GFacEngine {
                                        ().name(), taskStatus.getReason());
                        throw new GFacException("Error while environment 
setup");
                }
+               if (processContext.isHandOver()) {
+                       return;
+               }
                // execute process inputs
                processContext.setProcessStatus(new 
ProcessStatus(ProcessState.INPUT_DATA_STAGING));
                GFacUtils.saveAndPublishProcessStatus(processContext);
@@ -145,6 +151,9 @@ public class GFacEngineImpl implements GFacEngine {
                sortByInputOrder(processInputs);
                if (processInputs != null) {
                        for (InputDataObjectType processInput : processInputs) {
+                               if (processContext.isHandOver()) {
+                                       return;
+                               }
                                DataType type = processInput.getType();
                                switch (type) {
                                        case STDERR:
@@ -175,17 +184,26 @@ public class GFacEngineImpl implements GFacEngine {
                                }
                        }
                }
+               if (processContext.isHandOver()) {
+                       return;
+               }
                processContext.setProcessStatus(new 
ProcessStatus(ProcessState.EXECUTING));
                GFacUtils.saveAndPublishProcessStatus(processContext);
                taskCtx = getJobSubmissionTaskContext(processContext);
                saveTaskModel(taskCtx);
                GFacUtils.saveAndPublishTaskStatus(taskCtx);
                JobSubmissionTask jobSubmissionTask = 
Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol());
+               if (processContext.isHandOver()) {
+                       return;
+               }
                taskStatus = executeTask(taskCtx, jobSubmissionTask);
                if (taskStatus.getState() == TaskState.FAILED) {
                        throw new GFacException("Job submission task failed");
                }
                processContext.setTaskChain(taskChain);
+               if (processContext.isHandOver()) {
+                       return;
+               }
        }
 
 
@@ -196,11 +214,17 @@ public class GFacEngineImpl implements GFacEngine {
 
        @Override
        public void runProcessOutflow(ProcessContext processContext) throws 
GFacException {
+               if (processContext.isHandOver()) {
+                       return;
+               }
                TaskContext taskCtx = null;
                processContext.setProcessStatus(new 
ProcessStatus(ProcessState.OUTPUT_DATA_STAGING));
                GFacUtils.saveAndPublishProcessStatus(processContext);
                List<OutputDataObjectType> processOutputs = 
processContext.getProcessModel().getProcessOutputs();
                for (OutputDataObjectType processOutput : processOutputs) {
+                       if (processContext.isHandOver()) {
+                               return;
+                       }
                        DataType type = processOutput.getType();
                        switch (type) {
                                case STDERR:
@@ -232,6 +256,9 @@ public class GFacEngineImpl implements GFacEngine {
                                        break;
                        }
                }
+               if (processContext.isHandOver()) {
+                       return;
+               }
                processContext.setProcessStatus(new 
ProcessStatus(ProcessState.POST_PROCESSING));
                GFacUtils.saveAndPublishProcessStatus(processContext);
 //             taskCtx = getEnvCleanupTaskContext(processContext);

http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index a759f90..6bbf159 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -22,7 +22,6 @@
 package org.apache.airavata.gfac.impl;
 
 import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.gfac.core.GFac;
 import org.apache.airavata.gfac.core.GFacEngine;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
@@ -38,11 +37,12 @@ import java.text.MessageFormat;
 public class GFacWorker implements Runnable {
 
        private static final Logger log = 
LoggerFactory.getLogger(GFacWorker.class);
+       private GFacEngine engine;
        private ProcessContext processContext;
        private String processId;
        private String gatewayId;
        private String tokenId;
-       private boolean isProcessContextPopulated = false;
+       private boolean runOutflow = false;
 
 
        /**
@@ -55,7 +55,9 @@ public class GFacWorker implements Runnable {
                this.processId = processContext.getProcessId();
                this.gatewayId = processContext.getGatewayId();
                this.tokenId = processContext.getTokenId();
+               engine = Factory.getGFacEngine();
                this.processContext = processContext;
+               runOutflow = true;
        }
 
        /**
@@ -65,16 +67,17 @@ public class GFacWorker implements Runnable {
                this.processId = processId;
                this.gatewayId = gatewayId;
                this.tokenId = tokenId;
+               engine = Factory.getGFacEngine();
+               this.processContext = engine.populateProcessContext(processId, 
gatewayId, tokenId);
+               Factory.getGfacContext().addProcess(this.processContext);
        }
 
        @Override
        public void run() {
+               if (processContext.isHandOver()) {
+                       return;
+               }
                try {
-                       GFacEngine engine = Factory.getGFacEngine();
-                       if (processContext == null) {
-                               processContext = 
engine.populateProcessContext(processId, gatewayId, tokenId);
-                               isProcessContextPopulated = true;
-                       }
                        ProcessType type = getProcessType(processContext);
                        try {
                                switch (type) {
@@ -84,7 +87,9 @@ public class GFacWorker implements Runnable {
                                        case RECOVER:
                                                recoverProcess(engine);
                                                break;
-                                       case OUTFLOW:
+//                                     case RECOVER_MONITORING:
+                                               // TODO get monitor mode from 
process and get correct monitor service instead default service.
+                                       case RUN_OUTFLOW:
                                                // run the outflow task
                                                
engine.runProcessOutflow(processContext);
                                                
processContext.setProcessStatus(new ProcessStatus(ProcessState.COMPLETED));
@@ -109,7 +114,7 @@ public class GFacWorker implements Runnable {
                                        case RECOVER:
                                                log.error("Process recover 
error ", e);
                                                break;
-                                       case OUTFLOW:
+                                       case RUN_OUTFLOW:
                                                log.error("Process outflow 
execution error", e);
                                                break;
                                        case RECOVER_OUTFLOW:
@@ -138,6 +143,9 @@ public class GFacWorker implements Runnable {
        }
 
        private void exectuteProcess(GFacEngine engine) throws GFacException {
+               if (processContext.isHandOver()) {
+                       return;
+               }
                engine.executeProcess(processContext);
                if (processContext.getMonitorMode() == null) {
                        engine.runProcessOutflow(processContext);
@@ -182,10 +190,10 @@ public class GFacWorker implements Runnable {
                        case EXECUTING:
                                return ProcessType.RECOVER;
                        case MONITORING:
-                               if (isProcessContextPopulated) {
-                                       return ProcessType.RECOVER; // hand 
over to monitor task
+                               if (runOutflow) {
+                                       return ProcessType.RUN_OUTFLOW; // 
execute outflow
                                } else {
-                                       return ProcessType.OUTFLOW; // execute 
outflow
+                                       return ProcessType.RECOVER_MONITORING; 
// hand over to monitor task
                                }
                        case OUTPUT_DATA_STAGING:
                        case POST_PROCESSING:
@@ -205,7 +213,8 @@ public class GFacWorker implements Runnable {
        private enum ProcessType {
                NEW,
                RECOVER,
-               OUTFLOW,
+               RECOVER_MONITORING,
+               RUN_OUTFLOW,
                RECOVER_OUTFLOW,
                COMPLETED
        }

http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
new file mode 100644
index 0000000..bc49316
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.watcher;
+
+import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher;
+import org.apache.zookeeper.WatchedEvent;
+
+public class CancelRequestWatcherImpl implements CancelRequestWatcher {
+       @Override
+       public void process(WatchedEvent watchedEvent) throws Exception {
+               // this watcher change data in cancel listener node in the 
experiment node
+       }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
new file mode 100644
index 0000000..c459f0c
--- /dev/null
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.watcher;
+
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.core.context.ProcessContext;
+import org.apache.airavata.gfac.core.watcher.RedeliveryRequestWatcher;
+import org.apache.airavata.gfac.impl.Factory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+
+public class RedeliveryRequestWatcherImpl implements RedeliveryRequestWatcher {
+
+       private static final Logger log = 
org.slf4j.LoggerFactory.getLogger(RedeliveryRequestWatcherImpl.class);
+
+       @Override
+       public void process(WatchedEvent watchedEvent) throws Exception {
+               String path = watchedEvent.getPath();
+               Watcher.Event.EventType eventType = watchedEvent.getType();
+               log.info("Redelivery request came for zk path {} event type {} 
", path, eventType.name());
+               switch (eventType) {
+                       case NodeDataChanged:
+                               CuratorFramework curatorClient = 
Factory.getCuratorClient();
+                               byte[] bytes = 
curatorClient.getData().forPath(path);
+                               String serverName = new String(bytes);
+                               String processId = 
path.substring(path.lastIndexOf("/") + 1);
+                               if 
(ServerSettings.getGFacServerName().trim().equals(serverName)) {
+                                       
curatorClient.getData().usingWatcher(this).forPath(path);
+                                       log.info("processId: {}, change data 
with same server name : {}" , processId, serverName);
+                               } else {
+                                       ProcessContext processContext = 
Factory.getGfacContext().getProcess(processId);
+                                       if (processContext != null) {
+                                               
processContext.setHandOver(true);
+                                               log.info("procesId : {}, 
handing over to new server instance : {}", processId, serverName);
+                                       } else {
+                                               log.info("Redelivery request 
came for processId {} but couldn't find process context");
+                                       }
+                               }
+                               break;
+                       case NodeDeleted:
+                       case NodeCreated:
+                       case NodeChildrenChanged:
+                       case None:
+                               // not yet implemented
+                       default:
+                               break;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
 
b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 3fbe245..9ebfa05 100644
--- 
a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ 
b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -30,6 +30,7 @@ import 
org.apache.airavata.common.utils.listener.AbstractActivityListener;
 import org.apache.airavata.gfac.core.GFacConstants;
 import org.apache.airavata.gfac.core.GFacException;
 import org.apache.airavata.gfac.core.GFacUtils;
+import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher;
 import org.apache.airavata.gfac.cpi.GfacService;
 import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants;
 import org.apache.airavata.gfac.impl.Factory;
@@ -211,25 +212,51 @@ public class GfacServerHandler implements 
GfacService.Iface {
 
         public void onMessage(MessageContext message) {
             log.info(" Message Received with message id '" + 
message.getMessageId()
-                    + "' and with message type '" + message.getType());
+                           + "' and with message type '" + message.getType());
             if (message.getType().equals(MessageType.LAUNCHPROCESS)) {
+                   ProcessStatus status = new ProcessStatus();
+                   status.setState(ProcessState.EXECUTING);
                 try {
                     ProcessSubmitEvent event = new ProcessSubmitEvent();
                     TBase messageEvent = message.getEvent();
                     byte[] bytes = 
ThriftUtils.serializeThriftObject(messageEvent);
                     ThriftUtils.createThriftFromBytes(bytes, event);
+                       if (message.isRedeliver()) {
+                               // check the process is already active in this 
instance.
+                               if 
(Factory.getGfacContext().getProcess(event.getProcessId()) != null) {
+                                       // update deliver tag
+                                       try {
+                                               
updateDeliveryTag(curatorClient, gfacServerName, event.getProcessId(), message
+                                                               
.getDeliveryTag());
+                                               return;
+                                       } catch (Exception e) {
+                                               log.error("Error while updating 
delivery tag for redelivery message , messageId : " +
+                                                               
message.getMessageId(), e);
+                                               
rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
+                                       }
+                               } else {
+                                       // give time to complete handover logic 
in previous instance.
+                                       try {
+                                               Thread.sleep(60000);
+                                       } catch (InterruptedException e) {
+                                               // ignore
+                                       }
+                                       // read process status from registry
+                                       ProcessStatus processStatus = 
((ProcessStatus) Factory.getDefaultExpCatalog().get(ExperimentCatalogModelType
+                                                                       
.PROCESS_STATUS,
+                                                       event.getProcessId()));
+                                       
status.setState(processStatus.getState());
+                               }
+                       }
                     // update process status to executing
-                    ProcessStatus status = new ProcessStatus();
-                    status.setState(ProcessState.EXECUTING);
-                    
status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
-                    
Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS,
 status, event
-                                   .getProcessId());
+                       
status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+                       
Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS,
 status, event
+                                       .getProcessId());
                        publishProcessStatus(event, status);
                     try {
-                           GFacUtils.createProcessZKNode(curatorClient, 
gfacServerName, event.getProcessId(), message
-                                                           .getDeliveryTag(),
-                                           event.getTokenId());
-                        submitProcess(event.getProcessId(), 
event.getGatewayId(), event.getTokenId());
+                           createProcessZKNode(curatorClient, gfacServerName, 
event.getProcessId(), message
+                                           .getDeliveryTag(), 
event.getTokenId());
+                           submitProcess(event.getProcessId(), 
event.getGatewayId(), event.getTokenId());
                     } catch (Exception e) {
                         log.error(e.getMessage(), e);
                         
rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
@@ -278,4 +305,39 @@ public class GfacServerHandler implements 
GfacService.Iface {
                msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
                statusPublisher.publish(msgCtx);
        }
+
+       private void createProcessZKNode(CuratorFramework curatorClient, String 
gfacServerName, String
+                       processId, long deliveryTag, String token) throws 
Exception {
+               // TODO - To handle multiple processes per experiment, need to 
create a /experiments/{expId}/{processId} node
+               // create /experiments/{processId} node and set data - 
serverName, add redelivery listener
+               String zkProcessNodePath = 
ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
+               
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
zkProcessNodePath);
+               
curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, 
gfacServerName.getBytes());
+               
curatorClient.getData().usingWatcher(Factory.getRedeliveryReqeustWatcher()).forPath(zkProcessNodePath);
+
+               // create /experiments/{processId}/deliveryTag node and set 
data - deliveryTag
+               String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, 
GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE);
+               
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
deliveryTagPath);
+               
curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, 
GFacUtils.longToBytes(deliveryTag));
+
+               // create /experiments/{processId}/token node and set data - 
token
+               String tokenNodePath = ZKPaths.makePath(processId, 
GFacConstants.ZOOKEEPER_TOKEN_NODE);
+               
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
tokenNodePath);
+               curatorClient.setData().withVersion(-1).forPath(tokenNodePath, 
token.getBytes());
+
+               // create /experiments/{processId}/cancelListener node and set 
watcher for data changes
+               String cancelListenerNode = ZKPaths.makePath(zkProcessNodePath, 
GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+               
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
cancelListenerNode);
+               
curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath(cancelListenerNode);
+       }
+
+       private void updateDeliveryTag(CuratorFramework curatorClient, String 
gfacServerName, String processId, long
+                       deliveryTag) throws Exception {
+               // create /experiments/{processId} node and set data - 
serverName, add redelivery listener
+               String zkProcessNodePath = 
ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId);
+               // create /experiments/{processId}/deliveryTag node and set 
data - deliveryTag
+               String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, 
GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE);
+               
curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, 
GFacUtils.longToBytes(deliveryTag));
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
----------------------------------------------------------------------
diff --git 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
index b53de0d..a161e2a 100644
--- 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
+++ 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
@@ -33,6 +33,7 @@ public class MessageContext {
     private final String gatewayId;
     private Timestamp updatedTime;
     private long deliveryTag;
+       private boolean isRedeliver;
 
 
     public MessageContext(TBase event, MessageType type, String messageId, 
String gatewayId) {
@@ -81,4 +82,12 @@ public class MessageContext {
     public void setDeliveryTag(long deliveryTag) {
         this.deliveryTag = deliveryTag;
     }
+
+       public void setIsRedeliver(boolean isRedeliver) {
+               this.isRedeliver = isRedeliver;
+       }
+
+       public boolean isRedeliver() {
+               return isRedeliver;
+       }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
----------------------------------------------------------------------
diff --git 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
index 850128e..ce697da 100644
--- 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
+++ 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
@@ -275,7 +275,7 @@ public class RabbitMQProcessLaunchConsumer {
 
     public void sendAck(long deliveryTag){
         try {
-            channel.basicAck(deliveryTag,false); //todo move this logic to 
monitoring component to ack when the job is done
+            channel.basicAck(deliveryTag,false);
         } catch (IOException e) {
             logger.error(e.getMessage(), e);
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
----------------------------------------------------------------------
diff --git 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
index 46784bf..561cde2 100644
--- 
a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
+++ 
b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
@@ -154,6 +154,7 @@ public class RabbitMQStatusConsumer implements Consumer {
                         ThriftUtils.createThriftFromBytes(body, message);
                         TBase event = null;
                         String gatewayId = null;
+
                         if 
(message.getMessageType().equals(MessageType.EXPERIMENT)) {
                             ExperimentStatusChangeEvent 
experimentStatusChangeEvent = new ExperimentStatusChangeEvent();
                             
ThriftUtils.createThriftFromBytes(message.getEvent(), 
experimentStatusChangeEvent);
@@ -211,6 +212,7 @@ public class RabbitMQStatusConsumer implements Consumer {
                         }
                         MessageContext messageContext = new 
MessageContext(event, message.getMessageType(), message.getMessageId(), 
gatewayId);
                         
messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+                           
messageContext.setIsRedeliver(envelope.isRedeliver());
                         handler.onMessage(messageContext);
                     } catch (TException e) {
                         String msg = "Failed to de-serialize the thrift 
message, from routing keys and queueName " + id;

Reply via email to