Repository: airavata Updated Branches: refs/heads/auroraMesosIntegration 325d1630c -> 21b1923da
Added job submission task and aurora montoring services Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/21b1923d Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/21b1923d Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/21b1923d Branch: refs/heads/auroraMesosIntegration Commit: 21b1923da2184d6df71a855caf9aea80ce33b300 Parents: 325d163 Author: Shameera Rathnayaka <[email protected]> Authored: Mon Oct 31 19:36:17 2016 -0400 Committer: Shameera Rathnayaka <[email protected]> Committed: Mon Oct 31 19:36:17 2016 -0400 ---------------------------------------------------------------------- modules/cloud/aurora-client/pom.xml | 8 + .../cloud/aurora/client/AuroraThriftClient.java | 2 +- .../main/resources/aurora-scheduler.properties | 2 +- modules/cloud/cloud-provisioning/pom.xml | 7 + .../apache/airavata/gfac/core/GFacUtils.java | 88 ++++++- modules/gfac/gfac-impl/pom.xml | 5 + .../apache/airavata/gfac/impl/AuroraUtils.java | 30 +++ .../gfac/impl/task/AuroraJobSubmission.java | 146 +++++++++++ .../gfac/monitor/cloud/AuroraJobMonitor.java | 247 +++++++++++++++++++ .../core/utils/OrchestratorUtils.java | 16 ++ .../cpi/impl/SimpleOrchestratorImpl.java | 5 +- pom.xml | 2 +- 12 files changed, 543 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/cloud/aurora-client/pom.xml ---------------------------------------------------------------------- diff --git a/modules/cloud/aurora-client/pom.xml b/modules/cloud/aurora-client/pom.xml index 4cac7d9..8188c49 100644 --- a/modules/cloud/aurora-client/pom.xml +++ b/modules/cloud/aurora-client/pom.xml @@ -70,7 +70,15 @@ <directory>src/test/resources</directory> </testResource> </testResources> + <plugins> + <plugin> + <groupId>com.mycila</groupId> + <artifactId>license-maven-plugin</artifactId> + <configuration> + <header>../../../apache-license-header.txt</header> + </configuration> + </plugin> </plugins> </build> </project> http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java ---------------------------------------------------------------------- diff --git a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java index 3fb2468..0e0b36e 100644 --- a/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java +++ b/modules/cloud/aurora-client/src/main/java/org/apache/airavata/cloud/aurora/client/AuroraThriftClient.java @@ -193,7 +193,7 @@ public class AuroraThriftClient { * @return the job details * @throws Exception the exception */ - public ResponseBean getJobDetails(JobKeyBean jobKeyBean) throws Exception { + public JobDetailsResponseBean getJobDetails(JobKeyBean jobKeyBean) throws Exception { JobDetailsResponseBean response = null; try { if(jobKeyBean != null) { http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/cloud/aurora-client/src/main/resources/aurora-scheduler.properties ---------------------------------------------------------------------- diff --git a/modules/cloud/aurora-client/src/main/resources/aurora-scheduler.properties b/modules/cloud/aurora-client/src/main/resources/aurora-scheduler.properties index 6cb1fe9..0e1cc95 100644 --- a/modules/cloud/aurora-client/src/main/resources/aurora-scheduler.properties +++ b/modules/cloud/aurora-client/src/main/resources/aurora-scheduler.properties @@ -1,6 +1,6 @@ #Aurora scheduler properties -aurora.scheduler.host=mesos-master-1 +aurora.scheduler.host=52.15.129.208 aurora.scheduler.port=8081 http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/cloud/cloud-provisioning/pom.xml ---------------------------------------------------------------------- diff --git a/modules/cloud/cloud-provisioning/pom.xml b/modules/cloud/cloud-provisioning/pom.xml index c08cfb9..7981a10 100644 --- a/modules/cloud/cloud-provisioning/pom.xml +++ b/modules/cloud/cloud-provisioning/pom.xml @@ -76,6 +76,13 @@ </testResource> </testResources> <plugins> + <plugin> + <groupId>com.mycila</groupId> + <artifactId>license-maven-plugin</artifactId> + <configuration> + <header>../../../apache-license-header.txt</header> + </configuration> + </plugin> </plugins> </build> </project> http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java index b69764e..66998c3 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java @@ -21,10 +21,14 @@ package org.apache.airavata.gfac.core; import groovy.lang.Writable; import groovy.text.GStringTemplateEngine; -import groovy.text.SimpleTemplateEngine; import groovy.text.TemplateEngine; import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.common.utils.*; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.common.utils.AiravataZKUtils; +import org.apache.airavata.common.utils.ApplicationSettings; +import org.apache.airavata.common.utils.DBUtil; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.common.utils.ZkConstants; import org.apache.airavata.credential.store.store.CredentialReader; import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl; import org.apache.airavata.gfac.core.context.ProcessContext; @@ -32,23 +36,54 @@ import org.apache.airavata.gfac.core.context.TaskContext; import org.apache.airavata.messaging.core.MessageContext; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; import org.apache.airavata.model.appcatalog.appdeployment.CommandObject; -import org.apache.airavata.model.appcatalog.computeresource.*; +import org.apache.airavata.model.appcatalog.computeresource.CloudJobSubmission; +import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; +import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission; +import org.apache.airavata.model.appcatalog.computeresource.MonitorMode; +import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager; +import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType; +import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; +import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission; import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; import org.apache.airavata.model.application.io.DataType; import org.apache.airavata.model.application.io.InputDataObjectType; import org.apache.airavata.model.application.io.OutputDataObjectType; import org.apache.airavata.model.commons.ErrorModel; -import org.apache.airavata.model.data.replica.*; +import org.apache.airavata.model.data.replica.DataProductModel; +import org.apache.airavata.model.data.replica.DataProductType; +import org.apache.airavata.model.data.replica.DataReplicaLocationModel; +import org.apache.airavata.model.data.replica.ReplicaLocationCategory; +import org.apache.airavata.model.data.replica.ReplicaPersistentType; import org.apache.airavata.model.experiment.ExperimentModel; import org.apache.airavata.model.job.JobModel; -import org.apache.airavata.model.messaging.event.*; +import org.apache.airavata.model.messaging.event.JobIdentifier; +import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; +import org.apache.airavata.model.messaging.event.MessageType; +import org.apache.airavata.model.messaging.event.ProcessIdentifier; +import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent; +import org.apache.airavata.model.messaging.event.TaskIdentifier; +import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent; import org.apache.airavata.model.parallelism.ApplicationParallelismType; import org.apache.airavata.model.process.ProcessModel; import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel; -import org.apache.airavata.model.status.*; +import org.apache.airavata.model.status.JobStatus; +import org.apache.airavata.model.status.ProcessState; +import org.apache.airavata.model.status.ProcessStatus; +import org.apache.airavata.model.status.TaskState; +import org.apache.airavata.model.status.TaskStatus; import org.apache.airavata.model.task.JobSubmissionTaskModel; import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; -import org.apache.airavata.registry.cpi.*; +import org.apache.airavata.registry.cpi.AppCatalog; +import org.apache.airavata.registry.cpi.AppCatalogException; +import org.apache.airavata.registry.cpi.CompositeIdentifier; +import org.apache.airavata.registry.cpi.ExpCatChildDataType; +import org.apache.airavata.registry.cpi.ExperimentCatalog; +import org.apache.airavata.registry.cpi.ExperimentCatalogModelType; +import org.apache.airavata.registry.cpi.GwyResourceProfile; +import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.airavata.registry.cpi.ReplicaCatalog; import org.apache.airavata.registry.cpi.utils.Constants; import org.apache.commons.io.FileUtils; import org.apache.curator.framework.CuratorFramework; @@ -65,15 +100,35 @@ import org.w3c.dom.Element; import org.w3c.dom.Node; import org.w3c.dom.NodeList; -import javax.xml.xpath.*; -import java.io.*; +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpression; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; import java.net.InetAddress; import java.net.URISyntaxException; import java.net.URL; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.security.SecureRandom; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -374,6 +429,17 @@ public class GFacUtils { } } + public static CloudJobSubmission getCloudJobSubmission(String submissionId) throws RegistryException { + try { + AppCatalog appCatalog = RegistryFactory.getAppCatalog(); + return appCatalog.getComputeResource().getCloudJobSubmission(submissionId); + } catch (Exception e) { + String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId; + log.error(errorMsg, e); + throw new RegistryException(errorMsg, e); + } + } + /** * To convert list to separated value * @@ -700,7 +766,7 @@ public class GFacUtils { return null; } - private static int generateJobName() { + public static int generateJobName() { Random random = new Random(); int i = random.nextInt(Integer.MAX_VALUE); i = i + 99999999; http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/gfac/gfac-impl/pom.xml ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/pom.xml b/modules/gfac/gfac-impl/pom.xml index 2a0a949..60e3259 100644 --- a/modules/gfac/gfac-impl/pom.xml +++ b/modules/gfac/gfac-impl/pom.xml @@ -122,5 +122,10 @@ <artifactId>commons-httpclient</artifactId> <version>3.1</version> </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>aurora-client</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AuroraUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AuroraUtils.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AuroraUtils.java new file mode 100644 index 0000000..4412694 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/AuroraUtils.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.gfac.impl; + +public class AuroraUtils { + + public static final String ENVIRONMENT = "devel"; + public static final String ROLE = "centos"; + public static final String AURORA_SCHEDULER_PROP_FILE = "aurora-scheduler.properties"; + public static final String CLUSTER = "example"; + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmission.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmission.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmission.java new file mode 100644 index 0000000..c2e12d5 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmission.java @@ -0,0 +1,146 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.gfac.impl.task; + +import org.apache.airavata.cloud.aurora.client.AuroraThriftClient; +import org.apache.airavata.cloud.aurora.client.bean.IdentityBean; +import org.apache.airavata.cloud.aurora.client.bean.JobConfigBean; +import org.apache.airavata.cloud.aurora.client.bean.JobKeyBean; +import org.apache.airavata.cloud.aurora.client.bean.ProcessBean; +import org.apache.airavata.cloud.aurora.client.bean.ResourceBean; +import org.apache.airavata.cloud.aurora.client.bean.ResponseBean; +import org.apache.airavata.cloud.aurora.client.bean.TaskConfigBean; +import org.apache.airavata.cloud.aurora.util.AuroraThriftClientUtil; +import org.apache.airavata.cloud.aurora.util.Constants; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.gfac.core.GFacException; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.core.context.ProcessContext; +import org.apache.airavata.gfac.core.context.TaskContext; +import org.apache.airavata.gfac.core.task.JobSubmissionTask; +import org.apache.airavata.gfac.core.task.TaskException; +import org.apache.airavata.gfac.impl.AuroraUtils; +import org.apache.airavata.model.commons.ErrorModel; +import org.apache.airavata.model.job.JobModel; +import org.apache.airavata.model.status.JobState; +import org.apache.airavata.model.status.JobStatus; +import org.apache.airavata.model.status.TaskState; +import org.apache.airavata.model.status.TaskStatus; +import org.apache.airavata.model.task.TaskTypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; + +public class AuroraJobSubmission implements JobSubmissionTask{ + + private static final Logger log = LoggerFactory.getLogger(AuroraJobSubmission.class); + + @Override + public JobStatus cancel(TaskContext taskcontext) throws TaskException { + return null; + } + + @Override + public void init(Map<String, String> propertyMap) throws TaskException { + + } + + @Override + public TaskStatus execute(TaskContext taskContext) { + TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED); // set to completed. + ProcessContext processContext = taskContext.getParentProcessContext(); + JobModel jobModel = processContext.getJobModel(); + jobModel.setTaskId(taskContext.getTaskId()); + String jobIdAndName = "A" + GFacUtils.generateJobName(); + jobModel.setJobName(jobIdAndName); + JobStatus jobStatus = new JobStatus(); + jobStatus.setJobState(JobState.SUBMITTED); + + try { + JobKeyBean jobKey = new JobKeyBean(AuroraUtils.ENVIRONMENT, AuroraUtils.ROLE, jobIdAndName); + IdentityBean owner = new IdentityBean(AuroraUtils.ROLE); + // only autodoc vina + String workingDir = taskContext.getWorkingDir(); + ProcessBean proc1 = new ProcessBean("process_1", "mkdir -p " + workingDir, false); + ProcessBean proc2 = new ProcessBean("process_1", "cp -rf /home/centos/efs-mount-point/autodock-vina/* " + workingDir , false); + ProcessBean proc3 = new ProcessBean("process_2", "cd " + workingDir + " && ./vina_screenA.sh", false); + Set<ProcessBean> processes = new LinkedHashSet<>(); + processes.add(proc1); + processes.add(proc2); + processes.add(proc3); + + ResourceBean resources = new ResourceBean(1.5, 512, 512); + + TaskConfigBean taskConfig = new TaskConfigBean("Airavata-Aurora-" + jobIdAndName, processes, resources); + JobConfigBean jobConfig = new JobConfigBean(jobKey, owner, taskConfig, AuroraUtils.CLUSTER); + + String executorConfigJson = AuroraThriftClientUtil.getExecutorConfigJson(jobConfig); + log.info("Executor Config for Job {} , {}", jobIdAndName, executorConfigJson); + + AuroraThriftClient client = AuroraThriftClient.getAuroraThriftClient(Constants.AURORA_SCHEDULER_PROP_FILE); + ResponseBean response = client.createJob(jobConfig); + log.info("Response for job {}, {}", jobIdAndName, response); + + jobModel.setJobId(jobIdAndName); + jobStatus.setReason("Successfully Submitted"); + jobModel.setJobStatuses(Arrays.asList(jobStatus )); + jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + taskContext.getParentProcessContext().setJobModel(jobModel); + + GFacUtils.saveJobModel(processContext, jobModel); + GFacUtils.saveJobStatus(processContext, jobModel); + taskStatus.setReason("Successfully submitted job to Aurora"); + } catch (Exception e) { + String msg = "Error occurred while submitting the job"; + log.error(msg, e); + taskStatus.setState(TaskState.FAILED); + taskStatus.setReason(msg); + taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel)); + } + + taskContext.setTaskStatus(taskStatus); + try { + GFacUtils.saveAndPublishTaskStatus(taskContext); + } catch (GFacException e) { + log.error("Error while saving task status", e); + } + return taskStatus; + } + + @Override + public TaskStatus recover(TaskContext taskContext) { + return null; + } + + @Override + public TaskTypes getType() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/cloud/AuroraJobMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/cloud/AuroraJobMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/cloud/AuroraJobMonitor.java new file mode 100644 index 0000000..5fe9dd8 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/cloud/AuroraJobMonitor.java @@ -0,0 +1,247 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.gfac.monitor.cloud; + +import org.apache.airavata.cloud.aurora.client.AuroraThriftClient; +import org.apache.airavata.cloud.aurora.client.bean.JobDetailsResponseBean; +import org.apache.airavata.cloud.aurora.client.bean.JobKeyBean; +import org.apache.airavata.cloud.aurora.client.sdk.ScheduledTask; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.gfac.core.GFacException; +import org.apache.airavata.gfac.core.GFacThreadPoolExecutor; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.core.context.ProcessContext; +import org.apache.airavata.gfac.core.context.TaskContext; +import org.apache.airavata.gfac.core.monitor.JobMonitor; +import org.apache.airavata.gfac.impl.AuroraUtils; +import org.apache.airavata.gfac.impl.GFacWorker; +import org.apache.airavata.model.job.JobModel; +import org.apache.airavata.model.status.JobState; +import org.apache.airavata.model.status.JobStatus; +import org.apache.airavata.model.status.ProcessState; +import org.apache.airavata.model.status.ProcessStatus; +import org.apache.airavata.model.status.TaskState; +import org.apache.airavata.model.status.TaskStatus; +import org.apache.airavata.registry.cpi.ExperimentCatalog; +import org.apache.airavata.registry.cpi.ExperimentCatalogModelType; +import org.apache.airavata.registry.cpi.RegistryException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class AuroraJobMonitor implements JobMonitor, Runnable { + private static final Logger log = LoggerFactory.getLogger(AuroraJobMonitor.class); + + + + private static AuroraJobMonitor auroraJobMonitor; + private Timer timer; + private Map<String,TaskContext> jobMonitoringMap; + private AuroraJobMonitor(){ + jobMonitoringMap = new ConcurrentHashMap<>(); + timer = new Timer("Aurora status poll timer"); + + } + + public static AuroraJobMonitor getInstance(){ + if (auroraJobMonitor == null) { + synchronized (AuroraJobMonitor.class){ + if (auroraJobMonitor == null) { + auroraJobMonitor = new AuroraJobMonitor(); + } + } + } + return auroraJobMonitor; + } + @Override + public void run() { + AuroraTimer task = null; + try { + task = new AuroraTimer(); + timer.schedule(task, 5000, 2000); + } catch (Exception e) { + log.error("Error couldn't run Aurora status poll timer task"); + } + } + + @Override + public void monitor(String jobId, TaskContext taskContext) { + jobMonitoringMap.put(jobId, taskContext); + log.info("Added JobId : {} to Aurora Job Monitoring map", jobId); + taskContext.getParentProcessContext().setPauseTaskExecution(true); + + } + + @Override + public void stopMonitor(String jobId, boolean runOutFlow) { + jobMonitoringMap.remove(jobId); + } + + @Override + public boolean isMonitoring(String jobId) { + return jobMonitoringMap.get(jobId) != null; + } + + @Override + public void canceledJob(String jobId) { + throw new IllegalStateException("Method not yet implemented"); + } + + class AuroraTimer extends TimerTask { + + AuroraThriftClient client; + public AuroraTimer() throws Exception { + client = AuroraThriftClient.getAuroraThriftClient(AuroraUtils.AURORA_SCHEDULER_PROP_FILE); + + } + + + @Override + + public void run() { + JobKeyBean jobKeyBean = new JobKeyBean(AuroraUtils.ENVIRONMENT, AuroraUtils.ROLE, "dummy"); + Iterator<Map.Entry<String, TaskContext>> iterator = jobMonitoringMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<String, TaskContext> currentEntry = iterator.next(); + try { + jobKeyBean.setName(currentEntry.getKey()); + JobDetailsResponseBean jobDetailsResponseBean = client.getJobDetails(jobKeyBean); + List<ScheduledTask> tasks = jobDetailsResponseBean.getTasks(); + switch (tasks.get(0).getStatus()) { + case FINISHED: + iterator.remove(); + processJob(currentEntry.getKey(), currentEntry.getValue(), JobState.COMPLETE); + break; + case FAILED: + iterator.remove(); + processJob(currentEntry.getKey(), currentEntry.getValue(), JobState.FAILED); + break; + case RUNNING: + updateStatus(currentEntry.getKey(), currentEntry.getValue(), JobState.ACTIVE); + break; + default: + log.info("Job {} is in {} state", currentEntry.getKey(), tasks.get(0).getStatus().name()); + break; + } + } catch (Exception e) { + log.error("Error while getting response for job : {}", currentEntry.getKey()); + + } + } + } + + private void updateStatus(String jobKey, TaskContext taskContext, JobState jobState) { + ProcessContext pc = taskContext.getParentProcessContext(); + ExperimentCatalog experimentCatalog = pc.getExperimentCatalog(); + List<Object> objects = null; + try { + objects = experimentCatalog.get(ExperimentCatalogModelType.JOB_STATUS, taskContext.getTaskId(), jobKey); + } catch (RegistryException e) { + log.error("Error while getting job statuses for job : {} , task : {}, process : {}", jobKey, + taskContext.getTaskId(), pc.getProcessId()); + } + List<JobState> jobStatuses = objects.stream() + .map(o -> ((JobStatus) o).getJobState()) + .collect(Collectors.toList()); + if (!jobStatuses.contains(jobState)) { + JobStatus jobStatus = new JobStatus(jobState); + jobStatus.setReason("Aurora return " + jobState.name()); + jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + JobModel jobModel = pc.getJobModel(); + jobModel.setJobStatuses(Arrays.asList(jobStatus)); + try { + GFacUtils.saveJobStatus(pc, jobModel); + } catch (GFacException e) { + log.error("Error while saving job status {}, job : {}, task :{}, process:{} exp:{}", + jobState.name(), jobKey, taskContext.getTaskId(), pc.getProcessId(), pc.getExperimentId()); + } + } + } + + private void processJob(String jobKey, TaskContext taskContext, JobState jobState) { + JobStatus jobStatus = new JobStatus(); + jobStatus.setJobState(jobState); + if (jobState == JobState.COMPLETE) { + jobStatus.setReason("Aurora Job completed"); + } else if (jobState == JobState.FAILED) { + jobStatus.setReason("Aurora Job Failed"); + } + ProcessContext pc = taskContext.getParentProcessContext(); + JobModel jobModel = pc.getJobModel(); + jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + jobModel.setJobStatuses(Arrays.asList(jobStatus)); + try { + GFacUtils.saveJobStatus(pc, jobModel); + } catch (GFacException e) { + log.error("Error while saving job status for job : {} ", jobKey); + } + + TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED); + taskStatus.setReason("Job monitoring completed with final state: " + TaskState.COMPLETED.name()); + taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + taskContext.setTaskStatus(taskStatus); + try { + GFacUtils.saveAndPublishTaskStatus(taskContext); + } catch (GFacException e) { + log.error("Error while saving task status for exp : {} , process : {} , task : {} , job : {}", + taskContext.getExperimentId(), taskContext.getProcessId(), taskContext.getTaskId(), jobKey); + } + + if (pc.isCancel()) { + ProcessStatus processStatus = new ProcessStatus(ProcessState.CANCELLING); + processStatus.setReason("Process has been cancelled"); + pc.setProcessStatus(processStatus); + try { + GFacUtils.saveAndPublishProcessStatus(pc); + } catch (GFacException e) { + log.error("Error while cancelling process, exp : {}, process : {}", pc.getExperimentId(), pc.getProcessId()); + } + } + + try { + GFacThreadPoolExecutor.getCachedThreadPool().execute(new GFacWorker(pc)); + } catch (GFacException e) { + log.error("Error while running output tasks for exp : {} , process : {}", taskContext.getExperimentId(), pc.getProcessId()); + + ProcessStatus processStatus = new ProcessStatus(ProcessState.FAILED); + processStatus.setReason("Failed to run output tasks"); + processStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + pc.setProcessStatus(processStatus); + try { + GFacUtils.saveAndPublishProcessStatus(pc); + } catch (GFacException ex) { + log.error("Error while updating process status to FAILED, exp : {}, process : {}", pc.getExperimentId(), pc.getProcessId()); + } + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java index 74bd2db..83c9273 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorUtils.java @@ -262,6 +262,11 @@ public class OrchestratorUtils { if (sshJobSubmission != null) { return sshJobSubmission.getSecurityProtocol(); } + } else if (submissionProtocol == JobSubmissionProtocol.CLOUD) { + CloudJobSubmission cloudJobSubmission = getCloudJobSubmission(context, jobSubmissionInterface.getJobSubmissionInterfaceId()); + if (cloudJobSubmission != null) { + return cloudJobSubmission.getSecurityProtocol(); + } } } catch (RegistryException e) { logger.error("Error occurred while retrieving security protocol", e); @@ -302,6 +307,17 @@ public class OrchestratorUtils { } } + public static CloudJobSubmission getCloudJobSubmission(OrchestratorContext context, String submissionId) throws RegistryException { + try { + AppCatalog appCatalog = context.getRegistry().getAppCatalog(); + return appCatalog.getComputeResource().getCloudJobSubmission(submissionId); + } catch (Exception e) { + String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId; + logger.error(errorMsg, e); + throw new RegistryException(errorMsg, e); + } + } + public static SCPDataMovement getSCPDataMovement(OrchestratorContext context, String dataMoveId) throws RegistryException { try { AppCatalog appCatalog = context.getRegistry().getAppCatalog(); http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java index dec9b2c..b97e79a 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java @@ -481,6 +481,8 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ monitorMode = MonitorMode.FORK; } else if(jobSubmissionProtocol == JobSubmissionProtocol.LOCAL){ monitorMode = MonitorMode.LOCAL; + } else if (jobSubmissionProtocol == JobSubmissionProtocol.CLOUD) { + monitorMode = MonitorMode.CLOUD_JOB_MONITOR; }else { logger.error("expId : {}, processId : {} :- Unsupported Job submission protocol {}.", processModel.getExperimentId(), processModel.getProcessId(), jobSubmissionProtocol.name()); @@ -507,7 +509,8 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ submissionTaskIds.add(taskModel.getTaskId()); // create monitor task for this Email based monitor mode job - if (monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) { + if (monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR + || monitorMode == MonitorMode.CLOUD_JOB_MONITOR) { TaskModel monitorTaskModel = new TaskModel(); monitorTaskModel.setParentProcessId(processModel.getProcessId()); monitorTaskModel.setCreationTime(new Date().getTime()); http://git-wip-us.apache.org/repos/asf/airavata/blob/21b1923d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 71fc9ba..81b258f 100644 --- a/pom.xml +++ b/pom.xml @@ -626,7 +626,7 @@ <module>modules/orchestrator</module> <module>modules/monitoring</module> <module>modules/user-profile</module> - <!--<module>modules/cloud</module>--> + <module>modules/cloud</module> <module>modules/server</module> <module>modules/workflow</module> <module>modules/test-suite</module>
