Author: lahiru
Date: Thu Jan 31 19:52:57 2013
New Revision: 1441167
URL: http://svn.apache.org/viewvc?rev=1441167&view=rev
Log:
adding GramProvider in to new gfac structure.
Added:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/JobSubmissionFault.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramRSLGenerator.java
airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/GramProviderTest.java
Modified:
airavata/trunk/modules/gfac-core/pom.xml
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/MessageContext.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/phoebus/PhoebusGridConfigurationHandler.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java
Modified: airavata/trunk/modules/gfac-core/pom.xml
URL:
http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/pom.xml?rev=1441167&r1=1441166&r2=1441167&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/pom.xml (original)
+++ airavata/trunk/modules/gfac-core/pom.xml Thu Jan 31 19:52:57 2013
@@ -41,26 +41,26 @@
</dependency>
-->
<dependency>
- <groupId>cog-4_1_6_rc2</groupId>
+ <groupId>lead-security</groupId>
<artifactId>puretls</artifactId>
<type>jar</type>
- <version>cog-4_1_6</version>
+ <version>0.9b4-1</version>
</dependency>
- <!--dependency>
- <groupId>cog-4_1_6_rc2</groupId>
+ <dependency>
+ <groupId>lead-security</groupId>
<artifactId>cryptix32</artifactId>
- <version>cog-4_1_6</version>
+ <version>versionless</version>
</dependency>
<dependency>
- <groupId>cog-4_1_6_rc2</groupId>
+ <groupId>lead-security</groupId>
<artifactId>cryptix-asn1</artifactId>
- <version>cog-4_1_6</version>
- </dependency-->
- <!--dependency>
- <groupId>bouncycastle</groupId>
+ <version>versionless</version>
+ </dependency>
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15</artifactId>
- <version>143</version>
- </dependency-->
+ <version>1.45</version>
+ </dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
Added:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/JobSubmissionFault.java
URL:
http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/JobSubmissionFault.java?rev=1441167&view=auto
==============================================================================
---
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/JobSubmissionFault.java
(added)
+++
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/JobSubmissionFault.java
Thu Jan 31 19:52:57 2013
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac;
+
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.provider.GFacProvider;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+
+public class JobSubmissionFault extends GFacProviderException{
+
+ public static final String JOB_CANCEL = "JOB_CANCEL";
+
+ public static final String JOB_FAILED = "JOB_FAILED";
+
+ private String reason;
+
+ public JobSubmissionFault(GFacProvider provider, Throwable cause, String
submitHost, String contact, String rsl, JobExecutionContext
jobExecutionContext) {
+ super(cause.getMessage(), cause,jobExecutionContext);
+ }
+
+ public void setReason(String reason) {
+ this.reason = reason;
+ }
+
+ public void sendFaultNotification(String message,
+ JobExecutionContext jobExecutionContext, Exception e,
+ String... additionalExceptiondata) {
+
+ }
+}
Modified:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java
URL:
http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java?rev=1441167&r1=1441166&r2=1441167&view=diff
==============================================================================
---
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java
(original)
+++
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java
Thu Jan 31 19:52:57 2013
@@ -24,7 +24,9 @@ package org.apache.airavata.gfac;
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.provider.GFacProvider;
+import org.apache.airavata.gfac.provider.GramProvider;
import org.apache.airavata.gfac.provider.impl.LocalProvider;
+import org.apache.airavata.schemas.gfac.GlobusHostType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,8 +60,11 @@ public class Scheduler {
*/
private static GFacProvider getProvider(JobExecutionContext
jobExecutionContext){
HostDescription hostDescription =
jobExecutionContext.getApplicationContext().getHostDescription();
-
- return new LocalProvider();
+ if(hostDescription.getType() instanceof GlobusHostType){
+ return new GramProvider();
+ }else{
+ return new LocalProvider();
+ }
}
Modified:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/MessageContext.java
URL:
http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/MessageContext.java?rev=1441167&r1=1441166&r2=1441167&view=diff
==============================================================================
---
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/MessageContext.java
(original)
+++
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/context/MessageContext.java
Thu Jan 31 19:52:57 2013
@@ -44,5 +44,8 @@ public class MessageContext extends Abst
parameters.put(name, value);
}
+ public Map<String,Object> getParameters(){
+ return parameters;
+ }
}
Modified:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/phoebus/PhoebusGridConfigurationHandler.java
URL:
http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/phoebus/PhoebusGridConfigurationHandler.java?rev=1441167&r1=1441166&r2=1441167&view=diff
==============================================================================
---
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/phoebus/PhoebusGridConfigurationHandler.java
(original)
+++
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/phoebus/PhoebusGridConfigurationHandler.java
Thu Jan 31 19:52:57 2013
@@ -26,14 +26,14 @@ import org.globus.ftp.DataChannelAuthent
import org.globus.ftp.GridFTPClient;
public class PhoebusGridConfigurationHandler implements
GridConfigurationHandler{
- @Override
+
public void handleSourceFTPClient(GridFTPClient client) throws Exception {
if
(PhoebusUtils.isPhoebusDriverConfigurationsDefined(client.getHost())) {
client.setDataChannelAuthentication(DataChannelAuthentication.NONE);
client.site("SITE SETNETSTACK phoebus:" +
PhoebusUtils.getPhoebusDataChannelXIODriverParameters(client.getHost()));
}
}
- @Override
+
public void handleDestinationFTPClient(GridFTPClient client)
throws Exception {
Added:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java
URL:
http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java?rev=1441167&view=auto
==============================================================================
---
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java
(added)
+++
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java
Thu Jan 31 19:52:57 2013
@@ -0,0 +1,131 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.provider;
+
+import org.apache.airavata.gfac.JobSubmissionFault;
+import org.apache.airavata.gfac.context.GSISecurityContext;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.notification.events.ExecutionFailEvent;
+import org.apache.airavata.gfac.utils.GramJobSubmissionListener;
+import org.apache.airavata.gfac.utils.GramProviderUtils;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.GlobusHostType;
+import org.globus.gram.GramException;
+import org.globus.gram.GramJob;
+import org.ietf.jgss.GSSCredential;
+import org.ietf.jgss.GSSException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GramProvider implements GFacProvider {
+ private static final Logger log =
LoggerFactory.getLogger(GramJobSubmissionListener.class);
+
+ private GramJob job;
+ private GramJobSubmissionListener listener;
+
+ // This method precpare the environment before the application invocation.
+ public void initialize(JobExecutionContext jobExecutionContext) throws
GFacProviderException {
+ GramProviderUtils.makeDirectory(jobExecutionContext);
+ job = GramProviderUtils.setupEnvironment(jobExecutionContext);
+ listener = new GramJobSubmissionListener(job, jobExecutionContext);
+ job.addListener(listener);
+ }
+
+ public void execute(JobExecutionContext jobExecutionContext) throws
GFacProviderException {
+ System.out.println("Executing the job");
+ GlobusHostType host = (GlobusHostType)
jobExecutionContext.getApplicationContext().getHostDescription().getType();
+ ApplicationDeploymentDescriptionType app =
jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+
+ StringBuffer buf = new StringBuffer();
+ try {
+
+ /*
+ * Set Security
+ */
+ GSISecurityContext gssContext = new
GSISecurityContext(jobExecutionContext.getGFacConfiguration());
+ GSSCredential gssCred = gssContext.getGssCredentails();
+ job.setCredentials(gssCred);
+ // We do not support multiple gatekeepers in XBaya GUI, so we
simply pick the 0th element in the array
+ String gateKeeper = host.getGlobusGateKeeperEndPointArray(0);
+ log.debug("Request to contact:" + gateKeeper);
+
+ buf.append("Finished launching job, Host =
").append(host.getHostAddress()).append(" RSL = ")
+ .append(job.getRSL()).append(" working directory =
").append(app.getStaticWorkingDirectory())
+ .append(" temp directory =
").append(app.getScratchWorkingDirectory())
+ .append(" Globus GateKeeper Endpoint =
").append(gateKeeper);
+
+ /*
+ * The first boolean is to specify the job is a batch job - use
true for interactive and false for batch.
+ * The second boolean is to specify to use the full proxy and not
delegate a limited proxy.
+ */
+ job.request(gateKeeper, false, false);
+ String gramJobid = job.getIDAsString();
+ log.info("JobID = " + gramJobid);
+
+ log.info(buf.toString());
+ /*
+ * Block untill job is done
+ */
+ listener.waitFor();
+
+ /*
+ * Remove listener
+ */
+ job.removeListener(listener);
+
+ /*
+ * Fail job
+ */
+ int jobStatus = listener.getStatus();
+
+ if (job.getExitCode() != 0 || jobStatus == GramJob.STATUS_FAILED) {
+ int errCode = listener.getError();
+ String errorMsg = "Job " + job.getID() + " on host " +
host.getHostAddress() + " Job Exit Code = "
+ + listener.getError();
+ JobSubmissionFault error = new JobSubmissionFault(this, new
Exception(errorMsg), "GFAC HOST",
+ gateKeeper, job.getRSL(), jobExecutionContext);
+ jobExecutionContext.getNotifier().publish(new
ExecutionFailEvent(error.getCause()));
+ throw error;
+ }
+ } catch (GramException e) {
+ JobSubmissionFault error = new JobSubmissionFault(this, e,
host.getHostAddress(),
+ host.getGlobusGateKeeperEndPointArray(0), job.getRSL(),
jobExecutionContext);
+ jobExecutionContext.getNotifier().publish(new
ExecutionFailEvent(error.getCause()));
+ } catch (GSSException e) {
+ throw new GFacProviderException(e.getMessage(), e,
jobExecutionContext);
+ } catch (InterruptedException e) {
+ throw new GFacProviderException("Thread", e, jobExecutionContext);
+ } catch (SecurityException e) {
+ throw new GFacProviderException(e.getMessage(), e,
jobExecutionContext);
+ } finally {
+ if (job != null) {
+ try {
+ job.cancel();
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ }
+
+ public void dispose(JobExecutionContext jobExecutionContext) throws
GFacProviderException {
+ }
+}
Modified:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java
URL:
http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java?rev=1441167&r1=1441166&r2=1441167&view=diff
==============================================================================
---
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java
(original)
+++
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java
Thu Jan 31 19:52:57 2013
@@ -20,5 +20,86 @@
*/
package org.apache.airavata.gfac.utils;
+import org.apache.airavata.gfac.ToolsException;
+import org.apache.airavata.gfac.context.GSISecurityContext;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.external.GridFtp;
+import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
+import org.apache.airavata.schemas.gfac.GlobusHostType;
+import org.globus.gram.GramAttributes;
+import org.globus.gram.GramJob;
+import org.ietf.jgss.GSSCredential;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
public class GramProviderUtils {
+ private static final Logger log =
LoggerFactory.getLogger(GramJobSubmissionListener.class);
+
+ public static void makeDirectory(JobExecutionContext jobExecutionContext)
throws GFacProviderException {
+ GlobusHostType host = (GlobusHostType)
jobExecutionContext.getApplicationContext().getHostDescription().getType();
+ ApplicationDeploymentDescriptionType app =
jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ GridFtp ftp = new GridFtp();
+
+ try {
+ GSISecurityContext gssContext = new
GSISecurityContext(jobExecutionContext.getGFacConfiguration());
+ GSSCredential gssCred = gssContext.getGssCredentails();
+ String[] hostgridFTP = host.getGridFTPEndPointArray();
+ if (hostgridFTP == null || hostgridFTP.length == 0) {
+ hostgridFTP = new String[]{host.getHostAddress()};
+ }
+ boolean success = false;
+ GFacProviderException pe = null;// = new ProviderException("");
+ for (String endpoint : host.getGridFTPEndPointArray()) {
+ try {
+
+ URI tmpdirURI = GFacUtils.createGsiftpURI(endpoint,
app.getScratchWorkingDirectory());
+ URI workingDirURI = GFacUtils.createGsiftpURI(endpoint,
app.getStaticWorkingDirectory());
+ URI inputURI = GFacUtils.createGsiftpURI(endpoint,
app.getInputDataDirectory());
+ URI outputURI = GFacUtils.createGsiftpURI(endpoint,
app.getOutputDataDirectory());
+
+ log.info("Host FTP = " + hostgridFTP[0]);
+ log.info("temp directory = " + tmpdirURI);
+ log.info("Working directory = " + workingDirURI);
+ log.info("Input directory = " + inputURI);
+ log.info("Output directory = " + outputURI);
+
+ ftp.makeDir(tmpdirURI, gssCred);
+ ftp.makeDir(workingDirURI, gssCred);
+ ftp.makeDir(inputURI, gssCred);
+ ftp.makeDir(outputURI, gssCred);
+
+ success = true;
+ break;
+ } catch (URISyntaxException e) {
+ pe = new GFacProviderException("URI is malformatted:" +
e.getMessage(), e, jobExecutionContext);
+
+ } catch (ToolsException e) {
+ pe = new GFacProviderException(e.getMessage(), e,
jobExecutionContext);
+ }
+ }
+ if (success == false) {
+ throw pe;
+ }
+ } catch (SecurityException e) {
+ throw new GFacProviderException(e.getMessage(), e,
jobExecutionContext);
+ }
+ }
+
+ public static GramJob setupEnvironment(JobExecutionContext
jobExecutionContext) throws GFacProviderException {
+ log.debug("Searching for Gate Keeper");
+ try {
+ GramAttributes jobAttr =
GramRSLGenerator.configureRemoteJob(jobExecutionContext);
+ String rsl = jobAttr.toRSL();
+
+ log.debug("RSL = " + rsl);
+ GramJob job = new GramJob(rsl);
+ return job;
+ } catch (ToolsException te) {
+ throw new GFacProviderException(te.getMessage(), te,
jobExecutionContext);
+ }
+ }
}
Added:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramRSLGenerator.java
URL:
http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramRSLGenerator.java?rev=1441167&view=auto
==============================================================================
---
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramRSLGenerator.java
(added)
+++
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramRSLGenerator.java
Thu Jan 31 19:52:57 2013
@@ -0,0 +1,204 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.utils;
+
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.ToolsException;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
+import org.apache.airavata.schemas.gfac.NameValuePairType;
+import org.apache.airavata.schemas.gfac.URIArrayType;
+import org.globus.gram.GramAttributes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class GramRSLGenerator {
+ protected static final Logger log =
LoggerFactory.getLogger(GramRSLGenerator.class);
+
+ private enum JobType {
+ SERIAL, SINGLE, MPI, MULTIPLE, CONDOR
+ }
+
+ ;
+
+ public static GramAttributes configureRemoteJob(JobExecutionContext
context) throws ToolsException {
+ HpcApplicationDeploymentType app = (HpcApplicationDeploymentType)
context.getApplicationContext().getApplicationDeploymentDescription().getType();
+ GramAttributes jobAttr = new GramAttributes();
+ jobAttr.setExecutable(app.getExecutableLocation());
+ jobAttr.setDirectory(app.getStaticWorkingDirectory());
+ jobAttr.setStdout(app.getStandardOutput());
+ jobAttr.setStderr(app.getStandardError());
+
+ /*
+ * The env here contains the env of the host and the application. i.e
the env specified in the host description
+ * and application description documents
+ */
+ NameValuePairType[] env = app.getApplicationEnvironmentArray();
+ if (env.length != 0) {
+ Map<String, String> nv = new HashMap<String, String>();
+ for (int i = 0; i < env.length; i++) {
+ String key = env[i].getName();
+ String value = env[i].getValue();
+ nv.put(key, value);
+ }
+
+ for (Map.Entry<String, String> entry : nv.entrySet()) {
+ jobAttr.addEnvVariable(entry.getKey(), entry.getValue());
+ }
+ }
+ jobAttr.addEnvVariable(Constants.INPUT_DATA_DIR_VAR_NAME,
app.getInputDataDirectory());
+ jobAttr.addEnvVariable(Constants.OUTPUT_DATA_DIR_VAR_NAME,
app.getOutputDataDirectory());
+
+ if (app.getMaxWallTime() > 0) {
+ log.debug("Setting max wall clock time to " +
app.getMaxWallTime());
+
+ if (app.getMaxWallTime() > 30 && app.getQueue() != null &&
app.getQueue().getQueueName().equals("debug")) {
+ throw new ToolsException("NCSA debug Queue only support jobs <
30 minutes");
+ }
+
+ jobAttr.setMaxWallTime(app.getMaxWallTime());
+ jobAttr.set("proxy_timeout", "1");
+ } else {
+ jobAttr.setMaxWallTime(30);
+ }
+
+ if (app.getStandardInput() != null &&
!"".equals(app.getStandardInput())) {
+ jobAttr.setStdin(app.getStandardInput());
+ } else {
+ MessageContext input = context.getInMessageContext();;
+ Map<String,Object> inputs = input.getParameters();
+ Set<String> keys = inputs.keySet();
+ for (String paramName : keys ) {
+ ActualParameter actualParameter = (ActualParameter)
inputs.get(paramName);
+ if
("URIArray".equals(actualParameter.getType().getType().toString())) {
+ String[] values = ((URIArrayType)
actualParameter.getType()).getValueArray();
+ for (String value : values) {
+ jobAttr.addArgument(value);
+ }
+ } else {
+ String paramValue =
MappingFactory.toString(actualParameter);
+ jobAttr.addArgument(paramValue);
+ }
+ }
+ }
+ // Using the workflowContext Header values if user provided them in
the request and overwrite the default values in DD
+ //todo finish the scheduling based on workflow execution context
+// ContextHeaderDocument.ContextHeader currentContextHeader =
WorkflowContextHeaderBuilder.getCurrentContextHeader();
+// if (currentContextHeader.getWorkflowSchedulingContext() != null) {
+// if (currentContextHeader != null &&
+//
currentContextHeader.getWorkflowSchedulingContext().getApplicationSchedulingContextArray()
!= null &&
+//
currentContextHeader.getWorkflowSchedulingContext().getApplicationSchedulingContextArray().length
> 0) {
+// try {
+// int cpuCount =
currentContextHeader.getWorkflowSchedulingContext().getApplicationSchedulingContextArray()[0].getCpuCount();
+// if(cpuCount>0){
+// app.setCpuCount(cpuCount);
+// }
+// } catch (NullPointerException e) {
+// log.debug("No Value sent in WorkflowContextHeader for
CPU Count, value in the Deployment Descriptor will be used");
+// context.getNotifier().publish(new
ExecutionFailEvent(context, e, "No Value sent in WorkflowContextHeader for Node
Count, value in the Deployment Descriptor will be used");
+// }
+// try {
+// int nodeCount =
currentContextHeader.getWorkflowSchedulingContext().getApplicationSchedulingContextArray()[0].getNodeCount();
+// if(nodeCount>0){
+// app.setNodeCount(nodeCount);
+// }
+// } catch (NullPointerException e) {
+// log.debug("No Value sent in WorkflowContextHeader for
Node Count, value in the Deployment Descriptor will be used");
+//
context.getExecutionContext().getNotifier().executionFail(context, e, "No Value
sent in WorkflowContextHeader for Node Count, value in the Deployment
Descriptor will be used");
+// }
+// try {
+// String queueName =
currentContextHeader.getWorkflowSchedulingContext().getApplicationSchedulingContextArray()[0].getQueueName();
+// if (queueName != null) {
+// if(app.getQueue() == null){
+// QueueType queueType = app.addNewQueue();
+// queueType.setQueueName(queueName);
+// }else{
+// app.getQueue().setQueueName(queueName);
+// }
+// }
+// } catch (NullPointerException e) {
+// log.debug("No Value sent in WorkflowContextHeader for
Node Count, value in the Deployment Descriptor will be used");
+//
context.getExecutionContext().getNotifier().executionFail(context, e, "No Value
sent in WorkflowContextHeader for Node Count, value in the Deployment
Descriptor will be used");
+// }
+// }
+// }
+// if(currentContextHeader.getWorkflowOutputDataHandling() != null){
+//
if(currentContextHeader.getWorkflowOutputDataHandling().getApplicationOutputDataHandlingArray().length
!= 0)
+//
app.setOutputDataDirectory(currentContextHeader.getWorkflowOutputDataHandling().getApplicationOutputDataHandlingArray()[0].getOutputDataDirectory());
+// }
+ if (app.getNodeCount() > 0) {
+ jobAttr.set("hostCount", String.valueOf(app.getNodeCount()));
+ log.debug("Setting number of Nodes to " + app.getCpuCount());
+ }
+ if (app.getCpuCount() > 0) {
+ log.debug("Setting number of procs to " + app.getCpuCount());
+ jobAttr.setNumProcs(app.getCpuCount());
+ }
+ if (app.getMinMemory() > 0) {
+ log.debug("Setting minimum memory to " + app.getMinMemory());
+ jobAttr.setMinMemory(app.getMinMemory());
+ }
+ if (app.getMaxMemory() > 0) {
+ log.debug("Setting maximum memory to " + app.getMaxMemory());
+ jobAttr.setMaxMemory(app.getMaxMemory());
+ }
+ if (app.getProjectAccount() != null) {
+ if (app.getProjectAccount().getProjectAccountNumber() != null) {
+ log.debug("Setting project to " +
app.getProjectAccount().getProjectAccountNumber());
+
jobAttr.setProject(app.getProjectAccount().getProjectAccountNumber());
+ }
+ }
+ if (app.getQueue() != null) {
+ if (app.getQueue().getQueueName() != null) {
+ log.debug("Setting job queue to " +
app.getQueue().getQueueName());
+ jobAttr.setQueue(app.getQueue().getQueueName());
+ }
+ }
+ String jobType = JobType.SINGLE.toString();
+ if (app.getJobType() != null) {
+ jobType = app.getJobType().toString();
+ }
+ if (jobType.equalsIgnoreCase(JobType.SINGLE.toString())) {
+ log.debug("Setting job type to single");
+ jobAttr.setJobType(GramAttributes.JOBTYPE_SINGLE);
+ } if (jobType.equalsIgnoreCase(JobType.SERIAL.toString())) {
+ log.debug("Setting job type to single");
+ jobAttr.setJobType(GramAttributes.JOBTYPE_SINGLE);
+ } else if (jobType.equalsIgnoreCase(JobType.MPI.toString())) {
+ log.debug("Setting job type to mpi");
+ jobAttr.setJobType(GramAttributes.JOBTYPE_MPI);
+ } else if (jobType.equalsIgnoreCase(JobType.MULTIPLE.toString())) {
+ log.debug("Setting job type to multiple");
+ jobAttr.setJobType(GramAttributes.JOBTYPE_MULTIPLE);
+ } else if (jobType.equalsIgnoreCase(JobType.CONDOR.toString())) {
+ jobAttr.setJobType(GramAttributes.JOBTYPE_CONDOR);
+ }
+
+ return jobAttr;
+ }
+}
Added:
airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/GramProviderTest.java
URL:
http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/GramProviderTest.java?rev=1441167&view=auto
==============================================================================
---
airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/GramProviderTest.java
(added)
+++
airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/GramProviderTest.java
Thu Jan 31 19:52:57 2013
@@ -0,0 +1,166 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.core.gfac.services.impl;
+
+import com.amazonaws.services.importexport.model.JobType;
+import org.apache.airavata.client.AiravataAPIFactory;
+import org.apache.airavata.client.api.AiravataAPI;
+import org.apache.airavata.commons.gfac.type.*;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacAPI;
+import org.apache.airavata.gfac.GFacConfiguration;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.context.ApplicationContext;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.schemas.gfac.*;
+import org.apache.commons.lang.SystemUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+public class GramProviderTest {
+ private JobExecutionContext jobExecutionContext;
+ @Before
+ public void setUp() throws Exception {
+
+ GFacConfiguration gFacConfiguration = new GFacConfiguration(null);
+ gFacConfiguration.setMyProxyLifeCycle(3600);
+ gFacConfiguration.setMyProxyServer("myproxy.teragrid.org");
+ gFacConfiguration.setMyProxyUser("ogce");
+ gFacConfiguration.setMyProxyPassphrase("Jdas7wph");
+
gFacConfiguration.setTrustedCertLocation("/Users/lahirugunathilake/Downloads/certificates");
+ //have to set InFlwo Handlers and outFlowHandlers
+ jobExecutionContext = new JobExecutionContext(gFacConfiguration);
+ ApplicationContext applicationContext = new ApplicationContext();
+ jobExecutionContext.setApplicationContext(applicationContext);
+ /*
+ * Host
+ */
+ HostDescription host = new HostDescription(GlobusHostType.type);
+ host.getType().setHostAddress("ranger.tacc.teragrid.org");
+ host.getType().setHostName("ranger");
+ ((GlobusHostType)host.getType()).setGlobusGateKeeperEndPointArray(new
String[]{"gatekeeper.ranger.tacc.teragrid.org:2119/jobmanager-sge"});
+ ((GlobusHostType)host.getType()).setGridFTPEndPointArray(new
String[]{"gsiftp://gridftp.ranger.tacc.teragrid.org:2811/"});
+ applicationContext.setHostDescription(host);
+ /*
+ * App
+ */
+ ApplicationDescription appDesc = new
ApplicationDescription(HpcApplicationDeploymentType.type);
+ HpcApplicationDeploymentType app =
(HpcApplicationDeploymentType)appDesc.getType();
+ ApplicationDeploymentDescriptionType.ApplicationName name =
ApplicationDeploymentDescriptionType.ApplicationName.Factory.newInstance();
+ name.setStringValue("EchoLocal");
+ app.setApplicationName(name);
+ ProjectAccountType projectAccountType = app.addNewProjectAccount();
+ projectAccountType.setProjectAccountNumber("TG-STA110014S");
+
+ QueueType queueType = app.addNewQueue();
+ queueType.setQueueName("development");
+
+ app.setCpuCount(1);
+ app.setJobType(JobTypeType.SERIAL);
+ app.setNodeCount(1);
+ app.setProcessorsPerNode(1);
+
+ /*
+ * Use bat file if it is compiled on Windows
+ */
+ app.setExecutableLocation("/bin/echo");
+
+ /*
+ * Default tmp location
+ */
+ String tempDir = "/scratch/01437/ogce/test";
+ String date = (new Date()).toString();
+ date = date.replaceAll(" ", "_");
+ date = date.replaceAll(":", "_");
+
+ tempDir = tempDir + File.separator
+ + "SimpleEcho" + "_" + date + "_" + UUID.randomUUID();
+
+ System.out.println(tempDir);
+ app.setScratchWorkingDirectory(tempDir);
+ app.setStaticWorkingDirectory(tempDir);
+ app.setInputDataDirectory(tempDir + File.separator + "input");
+ app.setOutputDataDirectory(tempDir + File.separator + "output");
+ app.setStandardOutput(tempDir + File.separator + "echo.stdout");
+ app.setStandardError(tempDir + File.separator + "echo.stderr");
+
+ applicationContext.setApplicationDeploymentDescription(appDesc);
+
+ /*
+ * Service
+ */
+ ServiceDescription serv = new ServiceDescription();
+ serv.getType().setName("SimpleEcho");
+
+ List<InputParameterType> inputList = new
ArrayList<InputParameterType>();
+ InputParameterType input = InputParameterType.Factory.newInstance();
+ input.setParameterName("echo_input");
+ input.setParameterType(StringParameterType.Factory.newInstance());
+ inputList.add(input);
+ InputParameterType[] inputParamList = inputList.toArray(new
InputParameterType[inputList
+ .size()]);
+
+ List<OutputParameterType> outputList = new
ArrayList<OutputParameterType>();
+ OutputParameterType output = OutputParameterType.Factory.newInstance();
+ output.setParameterName("echo_output");
+ output.setParameterType(StringParameterType.Factory.newInstance());
+ outputList.add(output);
+ OutputParameterType[] outputParamList = outputList
+ .toArray(new OutputParameterType[outputList.size()]);
+
+ serv.getType().setInputParametersArray(inputParamList);
+ serv.getType().setOutputParametersArray(outputParamList);
+
+ applicationContext.setServiceDescription(serv);
+
+ MessageContext inMessage = new MessageContext();
+ ActualParameter echo_input = new ActualParameter();
+
((StringParameterType)echo_input.getType()).setValue("echo_output=hello");
+ inMessage.addParameter("echo_input", echo_input);
+
+ jobExecutionContext.setInMessageContext(inMessage);
+
+ MessageContext outMessage = new MessageContext();
+ ActualParameter echo_out = new ActualParameter();
+//
((StringParameterType)echo_input.getType()).setValue("echo_output=hello");
+ outMessage.addParameter("echo_output", echo_out);
+
+ jobExecutionContext.setOutMessageContext(outMessage);
+
+ }
+
+ @Test
+ public void testGramProvider() throws GFacException {
+ GFacAPI gFacAPI = new GFacAPI();
+ gFacAPI.submitJob(jobExecutionContext);
+ MessageContext outMessageContext =
jobExecutionContext.getOutMessageContext();
+
Assert.assertEquals(MappingFactory.toString((ActualParameter)outMessageContext.getParameter("echo_output")),
"hello");
+ }
+}