Repository: airavata Updated Branches: refs/heads/master 0b75afd8a -> 6d338d07f
Fixing issues with cancelation with recovery Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6d338d07 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6d338d07 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6d338d07 Branch: refs/heads/master Commit: 6d338d07f039bcc30de2d9324a8a37f268ea3b2b Parents: 0b75afd Author: Shameera Rathnayaka <[email protected]> Authored: Thu Jan 7 18:32:18 2016 -0500 Committer: Shameera Rathnayaka <[email protected]> Committed: Thu Jan 7 18:32:18 2016 -0500 ---------------------------------------------------------------------- .../gfac/core/context/ProcessContext.java | 9 +++ .../airavata/gfac/impl/GFacEngineImpl.java | 75 +++++++++++++++++++- .../airavata/gfac/server/GfacServerHandler.java | 50 +++++++++---- .../server/OrchestratorServerHandler.java | 15 ++-- 4 files changed, 131 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/6d338d07/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java index 02d4cbc..2880551 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java @@ -97,6 +97,7 @@ public class ProcessContext { private TaskModel currentExecutingTaskModel; // current execution task model in case we pause process execution we need this to continue process exectuion again private boolean acknowledge; private SSHKeyAuthentication sshKeyAuthentication; + private boolean recoveryWithCancel = false; /** * Note: process context property use lazy loading approach. In runtime you will see some properties as null @@ -501,5 +502,13 @@ public class ProcessContext { public void setSshKeyAuthentication(SSHKeyAuthentication sshKeyAuthentication) { this.sshKeyAuthentication = sshKeyAuthentication; } + + public boolean isRecoveryWithCancel() { + return recoveryWithCancel; + } + + public void setRecoveryWithCancel(boolean recoveryWithCancel) { + this.recoveryWithCancel = recoveryWithCancel; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/6d338d07/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java index dd10c12..00d920d 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java @@ -25,6 +25,7 @@ import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.common.utils.ZkConstants; import org.apache.airavata.gfac.core.GFacEngine; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.GFacUtils; @@ -61,6 +62,9 @@ import org.apache.airavata.registry.cpi.ExpCatChildDataType; import org.apache.airavata.registry.cpi.ExperimentCatalog; import org.apache.airavata.registry.cpi.ExperimentCatalogModelType; import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.airavata.registry.cpi.utils.Constants; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.ZKPaths; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,6 +95,14 @@ public class GFacEngineImpl implements GFacEngine { ProcessModel processModel = (ProcessModel) expCatalog.get(ExperimentCatalogModelType.PROCESS, processId); processContext.setProcessModel(processModel); + + try { + checkRecoveryWithCancel(processContext); + } catch (Exception e) { + log.error("expId: {}, processId: {}, Error while checking process cancel data in zookeeper", + processContext.getExperimentId(), processContext.getProcessId()); + } + GatewayResourceProfile gatewayProfile = appCatalog.getGatewayProfile().getGatewayProfile(gatewayId); processContext.setGatewayResourceProfile(gatewayProfile); processContext.setComputeResourcePreference(appCatalog.getGatewayProfile().getComputeResourcePreference @@ -176,6 +188,19 @@ public class GFacEngineImpl implements GFacEngine { } } + private void checkRecoveryWithCancel(ProcessContext processContext) throws Exception { + CuratorFramework curatorClient = processContext.getCuratorClient(); + String experimentId = processContext.getExperimentId(); + String processId = processContext.getProcessId(); + String processCancelNodePath = ZKPaths.makePath(ZKPaths.makePath(ZKPaths.makePath( + ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, experimentId), processId), ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); + log.info("expId: {}, processId: {}, get process cancel data from zookeeper node {}", experimentId, processId, processCancelNodePath); + byte[] bytes = curatorClient.getData().forPath(processCancelNodePath); + if (bytes != null && new String(bytes).equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST)) { + processContext.setRecoveryWithCancel(true); + } + } + @Override public void executeProcess(ProcessContext processContext) throws GFacException { if (processContext.isInterrupted()) { @@ -509,6 +534,7 @@ public class GFacEngineImpl implements GFacEngine { processContext.setTaskExecutionOrder(taskExecutionOrder); Map<String, TaskModel> taskMap = processContext.getTaskMap(); String recoverTaskId = null; + String previousTaskId = null; TaskModel taskModel = null; for (String taskId : taskExecutionOrder) { taskModel = taskMap.get(taskId); @@ -517,9 +543,56 @@ public class GFacEngineImpl implements GFacEngine { recoverTaskId = taskId; break; } + previousTaskId = taskId; + } + final String rTaskId = recoverTaskId; + final String pTaskId = previousTaskId; + if (recoverTaskId != null) { + if (processContext.isRecoveryWithCancel()) { + cancelJobSubmission(processContext, rTaskId, pTaskId); + } + continueProcess(processContext, recoverTaskId); } + } - continueProcess(processContext, recoverTaskId); + private void cancelJobSubmission(ProcessContext processContext, String rTaskId, String pTaskId) { + new Thread(() -> { + try { + processContext.setCancel(true); + ProcessState processState = processContext.getProcessState(); + List<Object> jobModels = null; + switch (processState) { + case EXECUTING: + jobModels = processContext.getExperimentCatalog().get( + ExperimentCatalogModelType.JOB, Constants.FieldConstants.TaskConstants.TASK_ID, + rTaskId); + break; + case MONITORING: + if (pTaskId != null) { + jobModels = processContext.getExperimentCatalog().get( + ExperimentCatalogModelType.JOB, Constants.FieldConstants.TaskConstants.TASK_ID, + pTaskId); + } + } + + if (jobModels != null && !jobModels.isEmpty()) { + JobModel jobModel = (JobModel) jobModels.get(jobModels.size() - 1); + processContext.setJobModel(jobModel); + log.info("expId: {}, processId: {}, Canceling jobId {}", processContext.getExperimentId(), + processContext.getProcessId(), jobModel.getJobId()); + cancelProcess(processContext); + log.info("expId: {}, processId: {}, Canceled jobId {}", processContext.getExperimentId(), + processContext.getProcessId(), jobModel.getJobId()); + } + } catch (GFacException e) { + log.error("expId: {}, processId: {}, Error while canceling process which is in recovery mode", + processContext.getExperimentId(), processContext.getProcessId()); + } catch (RegistryException e) { + log.error("expId: {}, processId: {}, Error while getting job model for taskId {}, " + + "couldn't cancel process which is in recovery mode", processContext.getExperimentId(), + processContext.getProcessId(), rTaskId); + } + }).start(); } private JobModel getJobModel(ProcessContext processContext) { http://git-wip-us.apache.org/repos/asf/airavata/blob/6d338d07/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 10da052..c59e199 100644 --- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -28,6 +28,7 @@ import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.common.utils.ZkConstants; import org.apache.airavata.common.utils.listener.AbstractActivityListener; +import org.apache.airavata.gfac.core.GFac; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.GFacUtils; import org.apache.airavata.gfac.cpi.GfacService; @@ -247,14 +248,29 @@ public class GfacServerHandler implements GfacService.Iface { .getProcessId()); publishProcessStatus(event, status); try { - createProcessZKNode(curatorClient, gfacServerName, event, message); + createProcessZKNode(curatorClient, gfacServerName, event, message); boolean isCancel = setCancelWatcher(curatorClient, event.getExperimentId(), event.getProcessId()); - submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId()); if (isCancel) { - // Need to trigger process cancel watcher, wait till process recover and then set zk data. - Thread.sleep(10000); - setCancelData(event.getExperimentId()); + if (status.getState() == ProcessState.STARTED) { + status.setState(ProcessState.CANCELLING); + status.setReason("Process Cancel is triggered"); + status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, status, event.getProcessId()); + publishProcessStatus(event, status); + + // do cancel operation here + + status.setState(ProcessState.CANCELED); + status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, status, event.getProcessId()); + publishProcessStatus(event, status); + rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag()); + return; + } else { + setCancelData(event.getExperimentId(),event.getProcessId()); + } } + submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId()); } catch (Exception e) { log.error(e.getMessage(), e); rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag()); @@ -295,10 +311,11 @@ public class GfacServerHandler implements GfacService.Iface { } } - private void setCancelData(String experimentId) throws Exception { - String expCancelNodePath = ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, - experimentId), ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); - curatorClient.setData().withVersion(-1).forPath(expCancelNodePath, ZkConstants.ZOOKEEPER_CANCEL_REQEUST + private void setCancelData(String experimentId, String processId) throws Exception { + String processCancelNodePath = ZKPaths.makePath(ZKPaths.makePath(ZKPaths.makePath( + ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, experimentId), processId), ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); + log.info("expId: {}, processId: {}, set process cancel data to zookeeper node {}", experimentId, processId, processCancelNodePath); + curatorClient.setData().withVersion(-1).forPath(processCancelNodePath, ZkConstants.ZOOKEEPER_CANCEL_REQEUST .getBytes()); } @@ -307,10 +324,15 @@ public class GfacServerHandler implements GfacService.Iface { String processId) throws Exception { String experimentNodePath = GFacUtils.getExperimentNodePath(experimentId); - // create /experiments/{experimentId}/cancel node and set watcher for data changes + // /experiments/{experimentId}/cancelListener, set watcher for data changes String experimentCancelNode = ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); - byte[] bytes = curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher(experimentId, processId)).forPath(experimentCancelNode); - return bytes != null && new String(bytes).equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST); + byte[] bytes = curatorClient.getData().forPath(experimentCancelNode); + if (bytes != null && new String(bytes).equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST)) { + return true; + } else { + bytes = curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher(experimentId, processId)).forPath(experimentCancelNode); + return bytes != null && new String(bytes).equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST); + } } @@ -339,6 +361,10 @@ public class GfacServerHandler implements GfacService.Iface { curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, gfacServerName.getBytes()); curatorClient.getData().usingWatcher(Factory.getRedeliveryReqeustWatcher(experimentId, processId)).forPath(zkProcessNodePath); + // create /experiments//{experimentId}{processId}/cancelListener + String zkProcessCancelPath = ZKPaths.makePath(zkProcessNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkProcessCancelPath); + // create /experiments/{experimentId}/{processId}/deliveryTag node and set data - deliveryTag String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, ZkConstants.ZOOKEEPER_DELIVERYTAG_NODE); ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), deliveryTagPath); http://git-wip-us.apache.org/repos/asf/airavata/blob/6d338d07/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index 9e39c47..2cda709 100644 --- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -408,16 +408,21 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { private boolean launchSingleAppExperiment() throws TException { try { - List<String> processIds = experimentCatalog.getIds(ExperimentCatalogModelType.PROCESS, AbstractExpCatResource.ProcessConstants.EXPERIMENT_ID, experimentId); + List<String> processIds = experimentCatalog.getIds(ExperimentCatalogModelType.PROCESS, + AbstractExpCatResource.ProcessConstants.EXPERIMENT_ID, experimentId); for (String processId : processIds) { launchProcess(processId, airavataCredStoreToken, gatewayId); } - - } catch (Exception e) { + ExperimentStatus status = new ExperimentStatus(ExperimentState.LAUNCHED); + status.setReason("submitted all processes"); + status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + OrchestratorUtils.updageExperimentStatus(experimentId, status); + log.info("expId: {}, Launched experiment ", experimentId); + } catch (Exception e) { ExperimentStatus status = new ExperimentStatus(ExperimentState.FAILED); status.setReason("Error while updating task status"); OrchestratorUtils.updageExperimentStatus(experimentId, status); - log.error(experimentId, "Error while updating task status, hence updated experiment status to " + + log.error("expId: " + experimentId + ", Error while updating task status, hence updated experiment status to " + ExperimentState.FAILED, e); throw new TException(e); } @@ -465,7 +470,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { .getExperimentId()); if (stat.getState() == ExperimentState.CANCELING) { status.setState(ExperimentState.CANCELING); - status.setReason("Process competed but experiment cancelling is triggered"); + status.setReason("Process started but experiment cancelling is triggered"); } else { status.setState(ExperimentState.EXECUTING); status.setReason("process started");
