Fixed AIRAVATA-1888
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/5ce750d5 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/5ce750d5 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/5ce750d5 Branch: refs/heads/develop Commit: 5ce750d53bc4e8def467b9024c491a463e62203d Parents: fc3f979 Author: Shameera Rathnayaka <[email protected]> Authored: Wed Jan 6 14:15:35 2016 -0500 Committer: Shameera Rathnayaka <[email protected]> Committed: Wed Jan 6 14:15:35 2016 -0500 ---------------------------------------------------------------------- .../airavata/gfac/impl/GFacEngineImpl.java | 3 +- .../apache/airavata/gfac/impl/GFacWorker.java | 32 +++----------- .../airavata/gfac/server/GfacServerHandler.java | 44 +++++++++++++------- 3 files changed, 38 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/5ce750d5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java index 9e6a522..dd10c12 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java @@ -509,8 +509,9 @@ public class GFacEngineImpl implements GFacEngine { processContext.setTaskExecutionOrder(taskExecutionOrder); Map<String, TaskModel> taskMap = processContext.getTaskMap(); String recoverTaskId = null; + TaskModel taskModel = null; for (String taskId : taskExecutionOrder) { - TaskModel taskModel = taskMap.get(taskId); + taskModel = taskMap.get(taskId); TaskState state = taskModel.getTaskStatus().getState(); if (state == TaskState.CREATED || state == TaskState.EXECUTING) { recoverTaskId = taskId; http://git-wip-us.apache.org/repos/asf/airavata/blob/5ce750d5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java index c701ed5..e0664a5 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java @@ -78,14 +78,6 @@ public class GFacWorker implements Runnable { @Override public void run() { try { - if (processContext.isInterrupted()) { - GFacUtils.handleProcessInterrupt(processContext); - if (processContext.isCancel()) { - sendAck(); - Factory.getGfacContext().removeProcess(processContext.getProcessId()); - } - return; - } ProcessState processState = processContext.getProcessStatus().getState(); switch (processState) { case CREATED: @@ -206,7 +198,13 @@ public class GFacWorker implements Runnable { } private void executeProcess() throws GFacException { + // checkpoint + if (processContext.isInterrupted()) { + return; + } + engine.executeProcess(processContext); + // checkpoint if (processContext.isInterrupted()) { return; } @@ -216,24 +214,6 @@ public class GFacWorker implements Runnable { } } -// private void monitorProcess() throws GFacException { -// try { -// JobMonitor monitorService = Factory.getMonitorService(processContext.getMonitorMode()); -// if (monitorService != null) { -// monitorService.monitor(processContext.getJobModel().getJobId(), processContext); -// ProcessStatus status = new ProcessStatus(ProcessState.MONITORING); -// status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); -// processContext.setProcessStatus(status); -// GFacUtils.saveAndPublishProcessStatus(processContext); -// } else { -// // we directly invoke outflow -// continueTaskExecution(); -// } -// } catch (AiravataException e) { -// throw new GFacException("Error while retrieving moniot service", e); -// } -// } - private void sendAck() { // this ensure, gfac doesn't send ack more than once for a process. which cause to remove gfac rabbitmq consumer from rabbitmq server. if (!processContext.isAcknowledge()) { http://git-wip-us.apache.org/repos/asf/airavata/blob/5ce750d5/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index c1c08a5..10da052 100644 --- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -28,10 +28,8 @@ import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.common.utils.ZkConstants; import org.apache.airavata.common.utils.listener.AbstractActivityListener; -import org.apache.airavata.gfac.core.GFacConstants; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.GFacUtils; -import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher; import org.apache.airavata.gfac.cpi.GfacService; import org.apache.airavata.gfac.cpi.gfac_cpiConstants; import org.apache.airavata.gfac.impl.Factory; @@ -250,7 +248,13 @@ public class GfacServerHandler implements GfacService.Iface { publishProcessStatus(event, status); try { createProcessZKNode(curatorClient, gfacServerName, event, message); - submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId()); + boolean isCancel = setCancelWatcher(curatorClient, event.getExperimentId(), event.getProcessId()); + submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId()); + if (isCancel) { + // Need to trigger process cancel watcher, wait till process recover and then set zk data. + Thread.sleep(10000); + setCancelData(event.getExperimentId()); + } } catch (Exception e) { log.error(e.getMessage(), e); rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag()); @@ -262,7 +266,9 @@ public class GfacServerHandler implements GfacService.Iface { } catch (AiravataException e) { log.error("Error while publishing process status", e); } - } else if (message.getType().equals(MessageType.TERMINATEPROCESS)) { + } + // TODO - Now there is no process termination type messages, use zookeeper instead of rabbitmq to do that. it is safe to remove this else part. + else if (message.getType().equals(MessageType.TERMINATEPROCESS)) { ProcessTerminateEvent event = new ProcessTerminateEvent(); TBase messageEvent = message.getEvent(); try { @@ -289,7 +295,26 @@ public class GfacServerHandler implements GfacService.Iface { } } - private void publishProcessStatus(ProcessSubmitEvent event, ProcessStatus status) throws AiravataException { + private void setCancelData(String experimentId) throws Exception { + String expCancelNodePath = ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, + experimentId), ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); + curatorClient.setData().withVersion(-1).forPath(expCancelNodePath, ZkConstants.ZOOKEEPER_CANCEL_REQEUST + .getBytes()); + } + + private boolean setCancelWatcher(CuratorFramework curatorClient, + String experimentId, + String processId) throws Exception { + + String experimentNodePath = GFacUtils.getExperimentNodePath(experimentId); + // create /experiments/{experimentId}/cancel node and set watcher for data changes + String experimentCancelNode = ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); + byte[] bytes = curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher(experimentId, processId)).forPath(experimentCancelNode); + return bytes != null && new String(bytes).equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST); + + } + + private void publishProcessStatus(ProcessSubmitEvent event, ProcessStatus status) throws AiravataException { ProcessIdentifier identifier = new ProcessIdentifier(event.getProcessId(), event.getExperimentId(), event.getGatewayId()); @@ -324,15 +349,6 @@ public class GfacServerHandler implements GfacService.Iface { ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), tokenNodePath); curatorClient.setData().withVersion(-1).forPath(tokenNodePath, token.getBytes()); - // create /experiments/{experimentId}/{processId}/cancelListener node and set watcher for data changes -/* String cancelListenerNode = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); - ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), cancelListenerNode); - curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath(cancelListenerNode);*/ - - // create /experiments/{experimentId}/cancel node and set watcher for data changes - String experimentCancelNode = ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); - curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher(experimentId, processId)).forPath (experimentCancelNode); - } private void updateDeliveryTag(CuratorFramework curatorClient, String gfacServerName, ProcessSubmitEvent event,
