Repository: airavata Updated Branches: refs/heads/master 36922c9fc -> b98f65997
Fixing AIRAVATA-1797 , process redelivery handling Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/92243599 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/92243599 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/92243599 Branch: refs/heads/master Commit: 92243599278db75e56f19da2b4d03d7f029ce17f Parents: 4f6e8c5 Author: Shameera Rathanyaka <[email protected]> Authored: Tue Aug 18 11:35:46 2015 -0400 Committer: Shameera Rathanyaka <[email protected]> Committed: Tue Aug 18 11:35:46 2015 -0400 ---------------------------------------------------------------------- .../apache/airavata/gfac/core/GFacUtils.java | 28 +------ .../airavata/gfac/core/context/GFacContext.java | 60 ++++++++++++++ .../gfac/core/context/ProcessContext.java | 11 ++- .../gfac/core/watcher/CancelRequestWatcher.java | 8 +- .../core/watcher/RedeliveryRequestWatcher.java | 8 +- .../org/apache/airavata/gfac/impl/Factory.java | 25 ++++++ .../airavata/gfac/impl/GFacEngineImpl.java | 27 +++++++ .../apache/airavata/gfac/impl/GFacWorker.java | 35 +++++---- .../impl/watcher/CancelRequestWatcherImpl.java | 31 ++++++++ .../watcher/RedeliveryRequestWatcherImpl.java | 69 ++++++++++++++++ .../airavata/gfac/server/GfacServerHandler.java | 82 +++++++++++++++++--- .../airavata/messaging/core/MessageContext.java | 9 +++ .../impl/RabbitMQProcessLaunchConsumer.java | 2 +- .../core/impl/RabbitMQStatusConsumer.java | 2 + 14 files changed, 333 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java index 45af599..8461216 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java @@ -1025,7 +1025,8 @@ public class GFacUtils { try { GwyResourceProfile gatewayProfile = context.getAppCatalog().getGatewayProfile(); String resourceHostId = context.getComputeResourceDescription().getComputeResourceId(); - ComputeResourcePreference preference = gatewayProfile.getComputeResourcePreference(context.getGatewayId(), resourceHostId); + ComputeResourcePreference preference = gatewayProfile.getComputeResourcePreference(context.getGatewayId() + , resourceHostId); return preference.getPreferredJobSubmissionProtocol(); } catch (AppCatalogException e) { log.error("Error occurred while initializing app catalog", e); @@ -1086,31 +1087,6 @@ public class GFacUtils { return GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + File.separator + experimentId; } - public static void createProcessZKNode(CuratorFramework curatorClient, String gfacServerName, String - processId, long deliveryTag, String token) throws Exception { - // TODO - To handle multiple processes per experiment, need to create a /experiment/{expId}/{processId} node - // create /experiments/{processId} node and set data - serverName, add redelivery listener - String zkProcessNodePath = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId); - ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkProcessNodePath); - curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, gfacServerName.getBytes()); - curatorClient.getData().usingWatcher(new RedeliveryRequestWatcher()).forPath(zkProcessNodePath); - - // create /experiments/{processId}/deliveryTag node and set data - deliveryTag - String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE); - ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), deliveryTagPath); - curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, GFacUtils.longToBytes(deliveryTag)); - - // create /experiments/{processId}/token node and set data - token - String tokenNodePath = ZKPaths.makePath(processId, GFacConstants.ZOOKEEPER_TOKEN_NODE); - ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), tokenNodePath); - curatorClient.setData().withVersion(-1).forPath(tokenNodePath, token.getBytes()); - - // create /experiments/{processId}/cancelListener node and set watcher for data changes - String cancelListenerNode = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); - ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), cancelListenerNode); - curatorClient.getData().usingWatcher(new CancelRequestWatcher()).forPath(cancelListenerNode); - } - public static long getProcessDeliveryTag(CuratorFramework curatorClient, String processId) throws Exception { String deliveryTagPath = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + processId + GFacConstants .ZOOKEEPER_DELIVERYTAG_NODE; http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/GFacContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/GFacContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/GFacContext.java new file mode 100644 index 0000000..9bca09f --- /dev/null +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/GFacContext.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.gfac.core.context; + +import java.util.HashMap; +import java.util.Map; + +/** + * This is a singleton class, which store all required details of running processes. + */ +public class GFacContext { + + private Map<String,ProcessContext> processes; + private static GFacContext gfacContext; + + private GFacContext() { + processes = new HashMap<>(); + } + + public static GFacContext getInstance() { + if (gfacContext == null) { + synchronized (GFacContext.class) { + if (gfacContext == null) { + gfacContext = new GFacContext(); + } + } + } + return gfacContext; + } + + public void addProcess(ProcessContext processContext) { + processes.put(processContext.getProcessId(), processContext); + } + + public ProcessContext getProcess(String processId) { + return processes.get(processId); + } + + public void remoteProcess(String processId) { + processes.remove(processId); + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java index 4004787..37f8b20 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java @@ -22,7 +22,6 @@ package org.apache.airavata.gfac.core.context; import org.apache.airavata.common.utils.AiravataUtils; -import org.apache.airavata.gfac.core.GFacUtils; import org.apache.airavata.gfac.core.cluster.RemoteCluster; import org.apache.airavata.messaging.core.Publisher; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; @@ -36,7 +35,6 @@ import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePrefer import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile; import org.apache.airavata.model.job.JobModel; import org.apache.airavata.model.process.ProcessModel; -import org.apache.airavata.model.status.JobStatus; import org.apache.airavata.model.status.ProcessState; import org.apache.airavata.model.status.ProcessStatus; import org.apache.airavata.registry.cpi.AppCatalog; @@ -79,6 +77,7 @@ public class ProcessContext { private ComputeResourcePreference computeResourcePreference; private MonitorMode monitorMode; private ResourceJobManager resourceJobManager; + private boolean handOver; /** * Note: process context property use lazy loading approach. In runtime you will see some properties as null @@ -341,4 +340,12 @@ public class ProcessContext { public String getExperimentId() { return processModel.getExperimentId(); } + + public boolean isHandOver() { + return handOver; + } + + public void setHandOver(boolean handOver) { + this.handOver = handOver; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/watcher/CancelRequestWatcher.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/watcher/CancelRequestWatcher.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/watcher/CancelRequestWatcher.java index 41bb0f0..2bdc770 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/watcher/CancelRequestWatcher.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/watcher/CancelRequestWatcher.java @@ -21,11 +21,7 @@ package org.apache.airavata.gfac.core.watcher; import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.zookeeper.WatchedEvent; -public class CancelRequestWatcher implements CuratorWatcher { - @Override - public void process(WatchedEvent watchedEvent) throws Exception { - // this watcher change data in cancel listener node in the experiment node - } +public interface CancelRequestWatcher extends CuratorWatcher { + } http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/watcher/RedeliveryRequestWatcher.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/watcher/RedeliveryRequestWatcher.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/watcher/RedeliveryRequestWatcher.java index c9ad139..d200bf5 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/watcher/RedeliveryRequestWatcher.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/watcher/RedeliveryRequestWatcher.java @@ -21,11 +21,7 @@ package org.apache.airavata.gfac.core.watcher; import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.zookeeper.WatchedEvent; -public class RedeliveryRequestWatcher implements CuratorWatcher { - @Override - public void process(WatchedEvent watchedEvent) throws Exception { - // get the data in experiment node and compare with gfac server name. - } +public interface RedeliveryRequestWatcher extends CuratorWatcher { + } http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java index 4bfb6cf..e660196 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java @@ -41,11 +41,14 @@ import org.apache.airavata.gfac.core.config.DataTransferTaskConfig; import org.apache.airavata.gfac.core.config.GFacYamlConfigruation; import org.apache.airavata.gfac.core.config.JobSubmitterTaskConfig; import org.apache.airavata.gfac.core.config.ResourceConfig; +import org.apache.airavata.gfac.core.context.GFacContext; import org.apache.airavata.gfac.core.context.ProcessContext; import org.apache.airavata.gfac.core.monitor.JobMonitor; import org.apache.airavata.gfac.core.scheduler.HostScheduler; import org.apache.airavata.gfac.core.task.JobSubmissionTask; import org.apache.airavata.gfac.core.task.Task; +import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher; +import org.apache.airavata.gfac.core.watcher.RedeliveryRequestWatcher; import org.apache.airavata.gfac.impl.job.LSFJobConfiguration; import org.apache.airavata.gfac.impl.job.LSFOutputParser; import org.apache.airavata.gfac.impl.job.PBSJobConfiguration; @@ -54,6 +57,8 @@ import org.apache.airavata.gfac.impl.job.SlurmJobConfiguration; import org.apache.airavata.gfac.impl.job.SlurmOutputParser; import org.apache.airavata.gfac.impl.job.UGEJobConfiguration; import org.apache.airavata.gfac.impl.job.UGEOutputParser; +import org.apache.airavata.gfac.impl.watcher.CancelRequestWatcherImpl; +import org.apache.airavata.gfac.impl.watcher.RedeliveryRequestWatcherImpl; import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor; import org.apache.airavata.messaging.core.Publisher; import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchConsumer; @@ -98,6 +103,7 @@ public abstract class Factory { }*/ private static GFacEngine engine; + private static GFacContext gfacContext; private static Publisher statusPublisher; private static CuratorFramework curatorClient; private static EmailBasedMonitor emailBasedMonitor; @@ -120,6 +126,13 @@ public abstract class Factory { return engine; } + public static GFacContext getGfacContext() { + if (gfacContext == null) { + gfacContext = GFacContext.getInstance(); + } + return gfacContext; + } + public static ExperimentCatalog getDefaultExpCatalog() throws RegistryException { return RegistryFactory.getDefaultExpCatalog(); } @@ -304,6 +317,18 @@ public abstract class Factory { return jobMonitor; } + public static JobMonitor getDefaultMonitorService() throws AiravataException { + return getMonitorService(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR); + } + + public static RedeliveryRequestWatcher getRedeliveryReqeustWatcher() { + return new RedeliveryRequestWatcherImpl(); + } + + public static CancelRequestWatcher getCancelRequestWatcher() { + return new CancelRequestWatcherImpl(); + } + public static Session getSSHSession(AuthenticationInfo authenticationInfo, ServerInfo serverInfo) throws AiravataException { SSHKeyAuthentication authentication = null; String key = serverInfo.getUserName() + "_" + serverInfo.getHost() + "_" + serverInfo.getPort(); http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java index a4c8381..33e354b 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java @@ -121,6 +121,9 @@ public class GFacEngineImpl implements GFacEngine { @Override public void executeProcess(ProcessContext processContext) throws GFacException { + if (processContext.isHandOver()) { + return; + } TaskContext taskCtx = null; List<TaskContext> taskChain = new ArrayList<>(); processContext.setProcessStatus(new ProcessStatus(ProcessState.CONFIGURING_WORKSPACE)); @@ -138,6 +141,9 @@ public class GFacEngineImpl implements GFacEngine { ().name(), taskStatus.getReason()); throw new GFacException("Error while environment setup"); } + if (processContext.isHandOver()) { + return; + } // execute process inputs processContext.setProcessStatus(new ProcessStatus(ProcessState.INPUT_DATA_STAGING)); GFacUtils.saveAndPublishProcessStatus(processContext); @@ -145,6 +151,9 @@ public class GFacEngineImpl implements GFacEngine { sortByInputOrder(processInputs); if (processInputs != null) { for (InputDataObjectType processInput : processInputs) { + if (processContext.isHandOver()) { + return; + } DataType type = processInput.getType(); switch (type) { case STDERR: @@ -175,17 +184,26 @@ public class GFacEngineImpl implements GFacEngine { } } } + if (processContext.isHandOver()) { + return; + } processContext.setProcessStatus(new ProcessStatus(ProcessState.EXECUTING)); GFacUtils.saveAndPublishProcessStatus(processContext); taskCtx = getJobSubmissionTaskContext(processContext); saveTaskModel(taskCtx); GFacUtils.saveAndPublishTaskStatus(taskCtx); JobSubmissionTask jobSubmissionTask = Factory.getJobSubmissionTask(processContext.getJobSubmissionProtocol()); + if (processContext.isHandOver()) { + return; + } taskStatus = executeTask(taskCtx, jobSubmissionTask); if (taskStatus.getState() == TaskState.FAILED) { throw new GFacException("Job submission task failed"); } processContext.setTaskChain(taskChain); + if (processContext.isHandOver()) { + return; + } } @@ -196,11 +214,17 @@ public class GFacEngineImpl implements GFacEngine { @Override public void runProcessOutflow(ProcessContext processContext) throws GFacException { + if (processContext.isHandOver()) { + return; + } TaskContext taskCtx = null; processContext.setProcessStatus(new ProcessStatus(ProcessState.OUTPUT_DATA_STAGING)); GFacUtils.saveAndPublishProcessStatus(processContext); List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs(); for (OutputDataObjectType processOutput : processOutputs) { + if (processContext.isHandOver()) { + return; + } DataType type = processOutput.getType(); switch (type) { case STDERR: @@ -232,6 +256,9 @@ public class GFacEngineImpl implements GFacEngine { break; } } + if (processContext.isHandOver()) { + return; + } processContext.setProcessStatus(new ProcessStatus(ProcessState.POST_PROCESSING)); GFacUtils.saveAndPublishProcessStatus(processContext); // taskCtx = getEnvCleanupTaskContext(processContext); http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java index a759f90..6bbf159 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java @@ -22,7 +22,6 @@ package org.apache.airavata.gfac.impl; import org.apache.airavata.common.exception.AiravataException; -import org.apache.airavata.gfac.core.GFac; import org.apache.airavata.gfac.core.GFacEngine; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.GFacUtils; @@ -38,11 +37,12 @@ import java.text.MessageFormat; public class GFacWorker implements Runnable { private static final Logger log = LoggerFactory.getLogger(GFacWorker.class); + private GFacEngine engine; private ProcessContext processContext; private String processId; private String gatewayId; private String tokenId; - private boolean isProcessContextPopulated = false; + private boolean runOutflow = false; /** @@ -55,7 +55,9 @@ public class GFacWorker implements Runnable { this.processId = processContext.getProcessId(); this.gatewayId = processContext.getGatewayId(); this.tokenId = processContext.getTokenId(); + engine = Factory.getGFacEngine(); this.processContext = processContext; + runOutflow = true; } /** @@ -65,16 +67,17 @@ public class GFacWorker implements Runnable { this.processId = processId; this.gatewayId = gatewayId; this.tokenId = tokenId; + engine = Factory.getGFacEngine(); + this.processContext = engine.populateProcessContext(processId, gatewayId, tokenId); + Factory.getGfacContext().addProcess(this.processContext); } @Override public void run() { + if (processContext.isHandOver()) { + return; + } try { - GFacEngine engine = Factory.getGFacEngine(); - if (processContext == null) { - processContext = engine.populateProcessContext(processId, gatewayId, tokenId); - isProcessContextPopulated = true; - } ProcessType type = getProcessType(processContext); try { switch (type) { @@ -84,7 +87,9 @@ public class GFacWorker implements Runnable { case RECOVER: recoverProcess(engine); break; - case OUTFLOW: +// case RECOVER_MONITORING: + // TODO get monitor mode from process and get correct monitor service instead default service. + case RUN_OUTFLOW: // run the outflow task engine.runProcessOutflow(processContext); processContext.setProcessStatus(new ProcessStatus(ProcessState.COMPLETED)); @@ -109,7 +114,7 @@ public class GFacWorker implements Runnable { case RECOVER: log.error("Process recover error ", e); break; - case OUTFLOW: + case RUN_OUTFLOW: log.error("Process outflow execution error", e); break; case RECOVER_OUTFLOW: @@ -138,6 +143,9 @@ public class GFacWorker implements Runnable { } private void exectuteProcess(GFacEngine engine) throws GFacException { + if (processContext.isHandOver()) { + return; + } engine.executeProcess(processContext); if (processContext.getMonitorMode() == null) { engine.runProcessOutflow(processContext); @@ -182,10 +190,10 @@ public class GFacWorker implements Runnable { case EXECUTING: return ProcessType.RECOVER; case MONITORING: - if (isProcessContextPopulated) { - return ProcessType.RECOVER; // hand over to monitor task + if (runOutflow) { + return ProcessType.RUN_OUTFLOW; // execute outflow } else { - return ProcessType.OUTFLOW; // execute outflow + return ProcessType.RECOVER_MONITORING; // hand over to monitor task } case OUTPUT_DATA_STAGING: case POST_PROCESSING: @@ -205,7 +213,8 @@ public class GFacWorker implements Runnable { private enum ProcessType { NEW, RECOVER, - OUTFLOW, + RECOVER_MONITORING, + RUN_OUTFLOW, RECOVER_OUTFLOW, COMPLETED } http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java new file mode 100644 index 0000000..bc49316 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.gfac.impl.watcher; + +import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher; +import org.apache.zookeeper.WatchedEvent; + +public class CancelRequestWatcherImpl implements CancelRequestWatcher { + @Override + public void process(WatchedEvent watchedEvent) throws Exception { + // this watcher change data in cancel listener node in the experiment node + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java new file mode 100644 index 0000000..c459f0c --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.gfac.impl.watcher; + +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.gfac.core.context.ProcessContext; +import org.apache.airavata.gfac.core.watcher.RedeliveryRequestWatcher; +import org.apache.airavata.gfac.impl.Factory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; + +public class RedeliveryRequestWatcherImpl implements RedeliveryRequestWatcher { + + private static final Logger log = org.slf4j.LoggerFactory.getLogger(RedeliveryRequestWatcherImpl.class); + + @Override + public void process(WatchedEvent watchedEvent) throws Exception { + String path = watchedEvent.getPath(); + Watcher.Event.EventType eventType = watchedEvent.getType(); + log.info("Redelivery request came for zk path {} event type {} ", path, eventType.name()); + switch (eventType) { + case NodeDataChanged: + CuratorFramework curatorClient = Factory.getCuratorClient(); + byte[] bytes = curatorClient.getData().forPath(path); + String serverName = new String(bytes); + String processId = path.substring(path.lastIndexOf("/") + 1); + if (ServerSettings.getGFacServerName().trim().equals(serverName)) { + curatorClient.getData().usingWatcher(this).forPath(path); + log.info("processId: {}, change data with same server name : {}" , processId, serverName); + } else { + ProcessContext processContext = Factory.getGfacContext().getProcess(processId); + if (processContext != null) { + processContext.setHandOver(true); + log.info("procesId : {}, handing over to new server instance : {}", processId, serverName); + } else { + log.info("Redelivery request came for processId {} but couldn't find process context"); + } + } + break; + case NodeDeleted: + case NodeCreated: + case NodeChildrenChanged: + case None: + // not yet implemented + default: + break; + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 3fbe245..9ebfa05 100644 --- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -30,6 +30,7 @@ import org.apache.airavata.common.utils.listener.AbstractActivityListener; import org.apache.airavata.gfac.core.GFacConstants; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher; import org.apache.airavata.gfac.cpi.GfacService; import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants; import org.apache.airavata.gfac.impl.Factory; @@ -211,25 +212,51 @@ public class GfacServerHandler implements GfacService.Iface { public void onMessage(MessageContext message) { log.info(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getType()); + + "' and with message type '" + message.getType()); if (message.getType().equals(MessageType.LAUNCHPROCESS)) { + ProcessStatus status = new ProcessStatus(); + status.setState(ProcessState.EXECUTING); try { ProcessSubmitEvent event = new ProcessSubmitEvent(); TBase messageEvent = message.getEvent(); byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); ThriftUtils.createThriftFromBytes(bytes, event); + if (message.isRedeliver()) { + // check the process is already active in this instance. + if (Factory.getGfacContext().getProcess(event.getProcessId()) != null) { + // update deliver tag + try { + updateDeliveryTag(curatorClient, gfacServerName, event.getProcessId(), message + .getDeliveryTag()); + return; + } catch (Exception e) { + log.error("Error while updating delivery tag for redelivery message , messageId : " + + message.getMessageId(), e); + rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag()); + } + } else { + // give time to complete handover logic in previous instance. + try { + Thread.sleep(60000); + } catch (InterruptedException e) { + // ignore + } + // read process status from registry + ProcessStatus processStatus = ((ProcessStatus) Factory.getDefaultExpCatalog().get(ExperimentCatalogModelType + .PROCESS_STATUS, + event.getProcessId())); + status.setState(processStatus.getState()); + } + } // update process status to executing - ProcessStatus status = new ProcessStatus(); - status.setState(ProcessState.EXECUTING); - status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); - Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, status, event - .getProcessId()); + status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); + Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, status, event + .getProcessId()); publishProcessStatus(event, status); try { - GFacUtils.createProcessZKNode(curatorClient, gfacServerName, event.getProcessId(), message - .getDeliveryTag(), - event.getTokenId()); - submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId()); + createProcessZKNode(curatorClient, gfacServerName, event.getProcessId(), message + .getDeliveryTag(), event.getTokenId()); + submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId()); } catch (Exception e) { log.error(e.getMessage(), e); rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag()); @@ -278,4 +305,39 @@ public class GfacServerHandler implements GfacService.Iface { msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); statusPublisher.publish(msgCtx); } + + private void createProcessZKNode(CuratorFramework curatorClient, String gfacServerName, String + processId, long deliveryTag, String token) throws Exception { + // TODO - To handle multiple processes per experiment, need to create a /experiments/{expId}/{processId} node + // create /experiments/{processId} node and set data - serverName, add redelivery listener + String zkProcessNodePath = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId); + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkProcessNodePath); + curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, gfacServerName.getBytes()); + curatorClient.getData().usingWatcher(Factory.getRedeliveryReqeustWatcher()).forPath(zkProcessNodePath); + + // create /experiments/{processId}/deliveryTag node and set data - deliveryTag + String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE); + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), deliveryTagPath); + curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, GFacUtils.longToBytes(deliveryTag)); + + // create /experiments/{processId}/token node and set data - token + String tokenNodePath = ZKPaths.makePath(processId, GFacConstants.ZOOKEEPER_TOKEN_NODE); + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), tokenNodePath); + curatorClient.setData().withVersion(-1).forPath(tokenNodePath, token.getBytes()); + + // create /experiments/{processId}/cancelListener node and set watcher for data changes + String cancelListenerNode = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), cancelListenerNode); + curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath(cancelListenerNode); + } + + private void updateDeliveryTag(CuratorFramework curatorClient, String gfacServerName, String processId, long + deliveryTag) throws Exception { + // create /experiments/{processId} node and set data - serverName, add redelivery listener + String zkProcessNodePath = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId); + // create /experiments/{processId}/deliveryTag node and set data - deliveryTag + String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE); + curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, GFacUtils.longToBytes(deliveryTag)); + } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java index b53de0d..a161e2a 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java @@ -33,6 +33,7 @@ public class MessageContext { private final String gatewayId; private Timestamp updatedTime; private long deliveryTag; + private boolean isRedeliver; public MessageContext(TBase event, MessageType type, String messageId, String gatewayId) { @@ -81,4 +82,12 @@ public class MessageContext { public void setDeliveryTag(long deliveryTag) { this.deliveryTag = deliveryTag; } + + public void setIsRedeliver(boolean isRedeliver) { + this.isRedeliver = isRedeliver; + } + + public boolean isRedeliver() { + return isRedeliver; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java index 850128e..ce697da 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java @@ -275,7 +275,7 @@ public class RabbitMQProcessLaunchConsumer { public void sendAck(long deliveryTag){ try { - channel.basicAck(deliveryTag,false); //todo move this logic to monitoring component to ack when the job is done + channel.basicAck(deliveryTag,false); } catch (IOException e) { logger.error(e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/airavata/blob/92243599/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java index 46784bf..561cde2 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java @@ -154,6 +154,7 @@ public class RabbitMQStatusConsumer implements Consumer { ThriftUtils.createThriftFromBytes(body, message); TBase event = null; String gatewayId = null; + if (message.getMessageType().equals(MessageType.EXPERIMENT)) { ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent(); ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent); @@ -211,6 +212,7 @@ public class RabbitMQStatusConsumer implements Consumer { } MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId); messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime())); + messageContext.setIsRedeliver(envelope.isRedeliver()); handler.onMessage(messageContext); } catch (TException e) { String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
