Adding new GFac cpi method AIRAVATA-1011
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/029e604d Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/029e604d Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/029e604d Branch: refs/heads/master Commit: 029e604d03eb3c81bb30bf18b14fda3ff5f82e42 Parents: 02606da Author: lahiru <[email protected]> Authored: Wed Feb 12 15:34:25 2014 -0500 Committer: lahiru <[email protected]> Committed: Wed Feb 12 15:34:25 2014 -0500 ---------------------------------------------------------------------- modules/gfac/gfac-core/pom.xml | 95 ++++++++------- .../java/org/apache/airavata/gfac/cpi/GFac.java | 118 +++++++++++++++++++ .../apache/airavata/gfac/utils/GFacUtils.java | 56 +++++---- 3 files changed, 206 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/029e604d/modules/gfac/gfac-core/pom.xml ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml index 791e16c..e75cf62 100644 --- a/modules/gfac/gfac-core/pom.xml +++ b/modules/gfac/gfac-core/pom.xml @@ -8,7 +8,8 @@ ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <groupId>org.apache.airavata</groupId> <artifactId>gfac</artifactId> @@ -72,7 +73,16 @@ <artifactId>airavata-workflow-execution-context</artifactId> <version>${project.version}</version> </dependency> - + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>airavata-registry-api</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>airavata-registry-cpi</artifactId> + <version>${project.version}</version> + </dependency> <!-- Workflow Tracking --> <dependency> <groupId>org.apache.airavata</groupId> @@ -146,22 +156,22 @@ <scope>test</scope> </dependency> <!-- Unicore dependencies --> - <dependency> - <groupId>eu.unicore</groupId> - <artifactId>ogsabes-client</artifactId> - <version>1.7.0-rc</version> - <exclusions> - <exclusion> - <groupId>org.apache.santuario</groupId> - <artifactId>xmlsec</artifactId> - </exclusion> - <exclusion> + <dependency> + <groupId>eu.unicore</groupId> + <artifactId>ogsabes-client</artifactId> + <version>1.7.0-rc</version> + <exclusions> + <exclusion> + <groupId>org.apache.santuario</groupId> + <artifactId>xmlsec</artifactId> + </exclusion> + <exclusion> <groupId>org.bouncycastle</groupId> <artifactId>bcprov-jdk16</artifactId> </exclusion> </exclusions> - </dependency> - + </dependency> + <!-- Hadoop provider related dependencies --> <dependency> @@ -211,18 +221,23 @@ <version>12.0</version> </dependency> - <!-- gsi-ssh api dependencies --> - <dependency> - <groupId>org.apache.airavata</groupId> - <artifactId>gsissh</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> + <!-- gsi-ssh api dependencies --> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>gsissh</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>airavata-data-models</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>com.jcraft</groupId> <artifactId>jsch</artifactId> <version>0.1.50</version> </dependency> - <dependency> + <dependency> <groupId>org.ogce</groupId> <artifactId>bcgss</artifactId> <version>146</version> @@ -250,24 +265,24 @@ <failIfNoTests>false</failIfNoTests> </configuration> </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>target/lib</outputDirectory> - <overWriteReleases>false</overWriteReleases> - <overWriteSnapshots>true</overWriteSnapshots> - </configuration> - </execution> - </executions> - </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>target/lib</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>true</overWriteSnapshots> + </configuration> + </execution> + </executions> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/airavata/blob/029e604d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java new file mode 100644 index 0000000..c4ea71b --- /dev/null +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFac.java @@ -0,0 +1,118 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.cpi; + +import org.apache.airavata.client.api.AiravataAPI; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.commons.gfac.type.ApplicationDescription; +import org.apache.airavata.commons.gfac.type.HostDescription; +import org.apache.airavata.commons.gfac.type.ServiceDescription; +import org.apache.airavata.gfac.Constants; +import org.apache.airavata.gfac.GFacConfiguration; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.context.ApplicationContext; +import org.apache.airavata.gfac.context.JobExecutionContext; +import org.apache.airavata.gfac.context.MessageContext; +import org.apache.airavata.gfac.scheduler.HostScheduler; +import org.apache.airavata.gfac.utils.GFacUtils; +import org.apache.airavata.model.experiment.ConfigurationData; +import org.apache.airavata.registry.api.AiravataRegistry2; +import org.apache.airavata.registry.cpi.DataType; +import org.apache.airavata.registry.cpi.Registry; + +import java.io.File; +import java.net.URL; +import java.util.*; + +public class GFac { + + private Registry registry; + + private AiravataAPI airavataAPI; + + private AiravataRegistry2 airavataRegistry2; + + public GFac(Registry registry, AiravataAPI airavataAPI, AiravataRegistry2 airavataRegistry2) { + this.registry = registry; + this.airavataAPI = airavataAPI; + this.airavataRegistry2 = airavataRegistry2; + } + + + public boolean submitJob(String experimentID) throws GFacException { + ConfigurationData configurationData = (ConfigurationData)registry.get(DataType.EXPERIMENT_CONFIGURATION_DATA, experimentID); + String serviceName = configurationData.getApplicationId(); + + if (serviceName == null) { + throw new GFacException("Error executing the job because there is not Application Name in this Experiment"); + } + try { + List<HostDescription> registeredHosts = new ArrayList<HostDescription>(); + Map<String, ApplicationDescription> applicationDescriptors = airavataRegistry2.getApplicationDescriptors(serviceName); + for (String hostDescName : applicationDescriptors.keySet()) { + registeredHosts.add(airavataRegistry2.getHostDescriptor(hostDescName)); + } + Class<? extends HostScheduler> aClass = Class.forName(ServerSettings.getHostScheduler()).asSubclass(HostScheduler.class); + HostScheduler hostScheduler = aClass.newInstance(); + HostDescription hostDescription = hostScheduler.schedule(registeredHosts); + + ServiceDescription serviceDescription = airavataRegistry2.getServiceDescriptor(serviceName); + + ApplicationDescription applicationDescription = airavataRegistry2.getApplicationDescriptors(serviceName, hostDescription.getType().getHostName()); + // When we run getInParameters we set the actualParameter object, this has to be fixed + //FIXME: will these class loaders work correctly in Thrift? + //FIXME: gfac-config.xml is only under src/test. + URL resource = GFac.class.getClassLoader().getResource("gfac-config.xml"); + Properties configurationProperties = ServerSettings.getProperties(); + GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), airavataAPI, configurationProperties); + + JobExecutionContext jobExecutionContext = new JobExecutionContext(gFacConfiguration, serviceName); + //Here we get only the contextheader information sent specific for this node + //Add security context + + + ApplicationContext applicationContext = new ApplicationContext(); + applicationContext.setApplicationDeploymentDescription(applicationDescription); + applicationContext.setHostDescription(hostDescription); + applicationContext.setServiceDescription(serviceDescription); + + jobExecutionContext.setApplicationContext(applicationContext); + + + Map<String, String> experimentInputs = configurationData.getExperimentInputs(); + + jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getMessageContext(experimentInputs, + serviceDescription.getType().getInputParametersArray()))); + + HashMap<String, Object> outputData = new HashMap<String, Object>(); + jobExecutionContext.setOutMessageContext(new MessageContext(outputData)); + + jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID); + jobExecutionContext.setExperimentID(experimentID); + //FIXME: (MEP) GFacAPI.submitJob() throws a GFacException that isn't caught here. You want to catch this before updating the registry. + GFacAPI gfacAPI1 = new GFacAPI(); + gfacAPI1.submitJob(jobExecutionContext); + }catch (Exception e){ + + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/029e604d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java index bc2a352..3d88499 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java @@ -29,19 +29,14 @@ import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Calendar; -import java.util.Date; -import java.util.Iterator; -import java.util.List; -import java.util.UUID; +import java.util.*; import org.apache.airavata.client.api.AiravataAPI; import org.apache.airavata.client.api.exception.AiravataAPIInvocationException; import org.apache.airavata.common.utils.StringUtil; import org.apache.airavata.commons.gfac.type.ActualParameter; import org.apache.airavata.gfac.Constants; +import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.context.JobExecutionContext; import org.apache.airavata.registry.api.workflow.ApplicationJob; import org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus; @@ -158,7 +153,7 @@ public class GFacUtils { buf.append(localPath); return new URI(buf.toString()); } - + public static String createGsiftpURIAsString(String host, String localPath) throws URISyntaxException { StringBuffer buf = new StringBuffer(); if (!host.startsWith("gsiftp://")) @@ -432,19 +427,19 @@ public class GFacUtils { } return actualParameter; } - - - public static ApplicationJob createApplicationJob( - JobExecutionContext jobExecutionContext) { - ApplicationJob appJob = new ApplicationJob(); - appJob.setExperimentId((String) jobExecutionContext.getProperty(Constants.PROP_TOPIC)); - appJob.setWorkflowExecutionId(appJob.getExperimentId()); - appJob.setNodeId((String)jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_NODE_ID)); - appJob.setServiceDescriptionId(jobExecutionContext.getApplicationContext().getServiceDescription().getType().getName()); - appJob.setHostDescriptionId(jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostName()); - appJob.setApplicationDescriptionId(jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType().getApplicationName().getStringValue()); - return appJob; - } + + + public static ApplicationJob createApplicationJob( + JobExecutionContext jobExecutionContext) { + ApplicationJob appJob = new ApplicationJob(); + appJob.setExperimentId((String) jobExecutionContext.getProperty(Constants.PROP_TOPIC)); + appJob.setWorkflowExecutionId(appJob.getExperimentId()); + appJob.setNodeId((String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_NODE_ID)); + appJob.setServiceDescriptionId(jobExecutionContext.getApplicationContext().getServiceDescription().getType().getName()); + appJob.setHostDescriptionId(jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostName()); + appJob.setApplicationDescriptionId(jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType().getApplicationName().getStringValue()); + return appJob; + } public static void updateApplicationJobStatusUpdateTime(JobExecutionContext context, String jobId, Date statusUpdateTime) { AiravataAPI airavataAPI = context.getGFacConfiguration().getAiravataAPI(); @@ -562,8 +557,8 @@ public class GFacUtils { updateApplicationJobStatus(context, jobId, status, Calendar.getInstance().getTime()); } - public static ApplicationJobStatus getApplicationJobStatus(int gramStatus){ - switch(gramStatus){ + public static ApplicationJobStatus getApplicationJobStatus(int gramStatus) { + switch (gramStatus) { case GramJob.STATUS_UNSUBMITTED: return ApplicationJobStatus.UN_SUBMITTED; case GramJob.STATUS_ACTIVE: @@ -584,4 +579,19 @@ public class GFacUtils { return ApplicationJobStatus.UNKNOWN; } } + + public static Map<String, Object> getMessageContext(Map<String, String> experimentData, + Parameter[] parameters) throws GFacException { + HashMap<String, Object> stringObjectHashMap = new HashMap<String, Object>(); + + for (int i = 0; i < parameters.length; i++) { + String input = experimentData.get(parameters[i].getParameterName()); + if (input != null) { + stringObjectHashMap.put(parameters[i].getParameterName(), GFacUtils.getInputActualParameter(parameters[i], input)); + } else { + throw new GFacException("Parameter:" + input + "is missing"); + } + } + return stringObjectHashMap; + } }
