Author: lahiru
Date: Thu Jan 31 21:29:55 2013
New Revision: 1441199
URL: http://svn.apache.org/viewvc?rev=1441199&view=rev
Log:
fixing test failure.
Modified:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java
airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/GramProviderTest.java
Modified:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java
URL:
http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java?rev=1441199&r1=1441198&r2=1441199&view=diff
==============================================================================
---
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java
(original)
+++
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GFacProviderException.java
Thu Jan 31 21:29:55 2013
@@ -44,6 +44,11 @@ public class GFacProviderException exten
sendFaultNotification(message,context,new Exception(message));
}
+ public GFacProviderException(String message, JobExecutionContext
context,Exception e,String... additionExceptiondata) {
+ super(message);
+ sendFaultNotification(message,context,e, additionExceptiondata);
+ }
+
private void sendFaultNotification(String message,
JobExecutionContext executionContext, Exception e,
String... additionalExceptiondata) {
Modified:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java
URL:
http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java?rev=1441199&r1=1441198&r2=1441199&view=diff
==============================================================================
---
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java
(original)
+++
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/GramProvider.java
Thu Jan 31 21:29:55 2013
@@ -50,7 +50,6 @@ public class GramProvider implements GFa
}
public void execute(JobExecutionContext jobExecutionContext) throws
GFacProviderException {
- System.out.println("Executing the job");
GlobusHostType host = (GlobusHostType)
jobExecutionContext.getApplicationContext().getHostDescription().getType();
ApplicationDeploymentDescriptionType app =
jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
@@ -65,8 +64,9 @@ public class GramProvider implements GFa
job.setCredentials(gssCred);
// We do not support multiple gatekeepers in XBaya GUI, so we
simply pick the 0th element in the array
String gateKeeper = host.getGlobusGateKeeperEndPointArray(0);
- log.debug("Request to contact:" + gateKeeper);
+ log.info("Request to contact:" + gateKeeper);
+ log.info(job.getRSL());
buf.append("Finished launching job, Host =
").append(host.getHostAddress()).append(" RSL = ")
.append(job.getRSL()).append(" working directory =
").append(app.getStaticWorkingDirectory())
.append(" temp directory =
").append(app.getScratchWorkingDirectory())
@@ -106,14 +106,18 @@ public class GramProvider implements GFa
throw error;
}
} catch (GramException e) {
+ log.error(e.getMessage());
JobSubmissionFault error = new JobSubmissionFault(this, e,
host.getHostAddress(),
host.getGlobusGateKeeperEndPointArray(0), job.getRSL(),
jobExecutionContext);
jobExecutionContext.getNotifier().publish(new
ExecutionFailEvent(error.getCause()));
} catch (GSSException e) {
+ log.error(e.getMessage());
throw new GFacProviderException(e.getMessage(), e,
jobExecutionContext);
} catch (InterruptedException e) {
+ log.error(e.getMessage());
throw new GFacProviderException("Thread", e, jobExecutionContext);
} catch (SecurityException e) {
+ log.error(e.getMessage());
throw new GFacProviderException(e.getMessage(), e,
jobExecutionContext);
} finally {
if (job != null) {
@@ -127,5 +131,6 @@ public class GramProvider implements GFa
}
public void dispose(JobExecutionContext jobExecutionContext) throws
GFacProviderException {
+ GramProviderUtils.processOutput(jobExecutionContext);
}
}
Modified:
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java
URL:
http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java?rev=1441199&r1=1441198&r2=1441199&view=diff
==============================================================================
---
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java
(original)
+++
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GramProviderUtils.java
Thu Jan 31 21:29:55 2013
@@ -20,6 +20,7 @@
*/
package org.apache.airavata.gfac.utils;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
import org.apache.airavata.gfac.ToolsException;
import org.apache.airavata.gfac.context.GSISecurityContext;
import org.apache.airavata.gfac.context.JobExecutionContext;
@@ -27,14 +28,19 @@ import org.apache.airavata.gfac.external
import org.apache.airavata.gfac.provider.GFacProviderException;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
import org.apache.airavata.schemas.gfac.GlobusHostType;
+import org.apache.xmlbeans.XmlException;
import org.globus.gram.GramAttributes;
import org.globus.gram.GramJob;
import org.ietf.jgss.GSSCredential;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
public class GramProviderUtils {
private static final Logger log =
LoggerFactory.getLogger(GramJobSubmissionListener.class);
@@ -102,4 +108,160 @@ public class GramProviderUtils {
throw new GFacProviderException(te.getMessage(), te,
jobExecutionContext);
}
}
+
+ public static Map<String, ?> processOutput(JobExecutionContext
jobExecutionContext) throws GFacProviderException {
+ GlobusHostType host = (GlobusHostType)
jobExecutionContext.getApplicationContext().getHostDescription().getType();
+ ApplicationDeploymentDescriptionType app =
jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
+ GridFtp ftp = new GridFtp();
+ File localStdErrFile = null;
+ try {
+ GSISecurityContext gssContext = new
GSISecurityContext(jobExecutionContext.getGFacConfiguration());
+ GSSCredential gssCred = gssContext.getGssCredentails();
+
+ String[] hostgridFTP = host.getGridFTPEndPointArray();
+ if (hostgridFTP == null || hostgridFTP.length == 0) {
+ hostgridFTP = new String[] { host.getHostAddress() };
+ }
+ GFacProviderException pe = null;
+ for (String endpoint : host.getGridFTPEndPointArray()) {
+ try {
+ /*
+ * Read Stdout and Stderror
+ */
+ URI stdoutURI = GFacUtils.createGsiftpURI(endpoint,
app.getStandardOutput());
+ URI stderrURI = GFacUtils.createGsiftpURI(endpoint,
app.getStandardError());
+
+ log.info("STDOUT:" + stdoutURI.toString());
+ log.info("STDERR:" + stderrURI.toString());
+
+ File logDir = new File("./service_logs");
+ if (!logDir.exists()) {
+ logDir.mkdir();
+ }
+
+ String timeStampedServiceName =
GFacUtils.createUniqueNameForService(jobExecutionContext
+ .getServiceName());
+ File localStdOutFile =
File.createTempFile(timeStampedServiceName, "stdout");
+ localStdErrFile =
File.createTempFile(timeStampedServiceName, "stderr");
+
+ String stdout = ftp.readRemoteFile(stdoutURI, gssCred,
localStdOutFile);
+ String stderr = ftp.readRemoteFile(stderrURI, gssCred,
localStdErrFile);
+ OutputUtils.fillOutputFromStdout(jobExecutionContext,
stdout, stderr);
+// Map<String,ActualParameter> stringMap = null;
+// MessageContext<Object> output =
jobExecutionContext.getOutput();
+// for (Iterator<String> iterator = output.getNames();
iterator.hasNext(); ) {
+// String paramName = iterator.next();
+// ActualParameter actualParameter = (ActualParameter)
output.getValue(paramName);
+// if
("URIArray".equals(actualParameter.getType().getType().toString())) {
+// URI outputURI =
GfacUtils.createGsiftpURI(endpoint,app.getOutputDataDirectory());
+// List<String> outputList
= ftp.listDir(outputURI,gssCred);
+// String[] valueList =
outputList.toArray(new String[outputList.size()]);
+// ((URIArrayType)
actualParameter.getType()).setValueArray(valueList);
+// stringMap = new
HashMap<String, ActualParameter>();
+//
stringMap.put(paramName, actualParameter);
+//
jobExecutionContext.getExecutionContext().getNotifier().output(jobExecutionContext,
actualParameter.toString());
+// }
+// if
("StringArray".equals(actualParameter.getType().getType().toString())) {
+// String[] valueList =
OutputUtils.parseStdoutArray(stdout, paramName);
+// ((StringArrayType)
actualParameter.getType()).setValueArray(valueList);
+// stringMap = new HashMap<String,
ActualParameter>();
+// stringMap.put(paramName, actualParameter);
+//
jobExecutionContext.getExecutionContext().getNotifier().output(jobExecutionContext,
actualParameter.toString());
+// }
+// else{
+// // This is to handle exception during the output
parsing.
+// stringMap =
OutputUtils.fillOutputFromStdout(jobExecutionContext.<ActualParameter>getOutput(),
stdout,stderr);
+// String paramValue = output.getStringValue(paramName);
+// if(paramValue == null || paramValue.isEmpty()){
+// int errCode = listener.getError();
+// String errorMsg = "Job " + job.getID() + " on
host " + host.getHostAddress();
+// JobSubmissionFault error = new
JobSubmissionFault(this, new Exception(errorMsg), "GFAC HOST",
+// gateKeeper, job.getRSL(),
jobExecutionContext);
+// errorReason(errCode, error);
+//
jobExecutionContext.getExecutionContext().getNotifier().executionFail(jobExecutionContext,error,
+//
readLastLinesofStdOut(localStdErrFile.getPath(), 20));
+// throw error;
+// }
+// }
+// }
+// if(stringMap == null || stringMap.isEmpty()){
+// GFacProviderException exception = new
GFacProviderException("Gram provider: Error creating job output",
jobExecutionContext);
+//
jobExecutionContext.getExecutionContext().getNotifier().executionFail(jobExecutionContext,exception,exception.getLocalizedMessage());
+// throw exception;
+// }
+// // If users has given an output DAta poth we download
the output files in to that directory, this will be apath in the machine where
GFac is installed
+//
if(WorkflowContextHeaderBuilder.getCurrentContextHeader() != null &&
+//
WorkflowContextHeaderBuilder.getCurrentContextHeader().getWorkflowOutputDataHandling()
!= null){
+//
WorkflowOutputDataHandlingDocument.WorkflowOutputDataHandling
workflowOutputDataHandling =
+//
WorkflowContextHeaderBuilder.getCurrentContextHeader().getWorkflowOutputDataHandling();
+//
if(workflowOutputDataHandling.getApplicationOutputDataHandlingArray().length !=
0){
+// String outputDataDirectory =
workflowOutputDataHandling.getApplicationOutputDataHandlingArray()[0].getOutputDataDirectory();
+// if(outputDataDirectory != null &&
!"".equals(outputDataDirectory)){
+//
stageOutputFiles(jobExecutionContext,outputDataDirectory);
+// }
+// }
+// }
+// return stringMap;
+// }catch (XmlException e) {
+// throw new
GFacProviderException(e.getMessage(),jobExecutionContext,
e,readLastLinesofStdOut(localStdErrFile.getPath(), 20));
+// }
+ }catch (ToolsException e) {
+ throw new
GFacProviderException(e.getMessage(),jobExecutionContext,
e,readLastLinesofStdOut(localStdErrFile.getPath(), 20));
+ } catch (URISyntaxException e) {
+ throw new GFacProviderException("URI is malformatted:" +
e.getMessage(), jobExecutionContext, e,
readLastLinesofStdOut(localStdErrFile.getPath(), 20));
+ }catch (NullPointerException e) {
+ throw new GFacProviderException("Output is not produced in
stdout:" + e.getMessage(), jobExecutionContext, e,
readLastLinesofStdOut(localStdErrFile.getPath(), 20));
+ }
+ }
+
+ /*
+ * If the execution reach here, all GridFTP Endpoint is failed.
+ */
+ throw pe;
+
+ } catch (Exception e) {
+//
jobExecutionContext.getExecutionContext().getNotifier().executionFail(jobExecutionContext,e,readLastLinesofStdOut(localStdErrFile.getPath(),
20));
+ throw new GFacProviderException(e.getMessage(),
jobExecutionContext,e, readLastLinesofStdOut(localStdErrFile.getPath(), 20));
+ }
+
+ }
+
+ private static String readLastLinesofStdOut(String path, int count) {
+ StringBuffer buffer = new StringBuffer();
+ FileInputStream in = null;
+ try {
+ in = new FileInputStream(path);
+ } catch (FileNotFoundException e) {
+ e.printStackTrace(); //To change body of catch statement use File
| Settings | File Templates.
+ }
+ BufferedReader br = new BufferedReader(new InputStreamReader(in));
+ List<String> strLine = new ArrayList<String>();
+ String tmp = null;
+ int numberofLines = 0;
+ try {
+ while ((tmp = br.readLine()) != null) {
+ strLine.add(tmp);
+ numberofLines++;
+ }
+ } catch (IOException e) {
+ e.printStackTrace(); //To change body of catch statement use File
| Settings | File Templates.
+ }
+ if (numberofLines > count) {
+ for (int i = numberofLines - count; i < numberofLines; i++) {
+ buffer.append(strLine.get(i));
+ buffer.append("\n");
+ }
+ }else{
+ for (int i = 0; i < numberofLines; i++) {
+ buffer.append(strLine.get(i));
+ buffer.append("\n");
+ }
+ }
+ try {
+ in.close();
+ } catch (IOException e) {
+ e.printStackTrace(); //To change body of catch statement use File
| Settings | File Templates.
+ }
+ return buffer.toString();
+ }
}
Modified:
airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/GramProviderTest.java
URL:
http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/GramProviderTest.java?rev=1441199&r1=1441198&r2=1441199&view=diff
==============================================================================
---
airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/GramProviderTest.java
(original)
+++
airavata/trunk/modules/gfac-core/src/test/java/org/apache/airavata/core/gfac/services/impl/GramProviderTest.java
Thu Jan 31 21:29:55 2013
@@ -53,7 +53,7 @@ public class GramProviderTest {
gFacConfiguration.setMyProxyLifeCycle(3600);
gFacConfiguration.setMyProxyServer("myproxy.teragrid.org");
gFacConfiguration.setMyProxyUser("ogce");
- gFacConfiguration.setMyProxyPassphrase("");
+ gFacConfiguration.setMyProxyPassphrase("Jdas7wph");
gFacConfiguration.setTrustedCertLocation("/Users/lahirugunathilake/Downloads/certificates");
//have to set InFlwo Handlers and outFlowHandlers
jobExecutionContext = new JobExecutionContext(gFacConfiguration);
@@ -77,7 +77,7 @@ public class GramProviderTest {
name.setStringValue("EchoLocal");
app.setApplicationName(name);
ProjectAccountType projectAccountType = app.addNewProjectAccount();
- projectAccountType.setProjectAccountNumber("TG-STA110014S");
+ projectAccountType.setProjectAccountNumber("TG-AST110064");
QueueType queueType = app.addNewQueue();
queueType.setQueueName("development");
@@ -106,10 +106,10 @@ public class GramProviderTest {
System.out.println(tempDir);
app.setScratchWorkingDirectory(tempDir);
app.setStaticWorkingDirectory(tempDir);
- app.setInputDataDirectory(tempDir + File.separator + "input");
- app.setOutputDataDirectory(tempDir + File.separator + "output");
- app.setStandardOutput(tempDir + File.separator + "echo.stdout");
- app.setStandardError(tempDir + File.separator + "echo.stderr");
+ app.setInputDataDirectory(tempDir + File.separator + "inputData");
+ app.setOutputDataDirectory(tempDir + File.separator + "outputData");
+ app.setStandardOutput(tempDir + File.separator +
app.getApplicationName().getStringValue() + ".stdout");
+ app.setStandardError(tempDir + File.separator +
app.getApplicationName().getStringValue() + ".stderr");
applicationContext.setApplicationDeploymentDescription(appDesc);
@@ -158,9 +158,9 @@ public class GramProviderTest {
@Test
public void testGramProvider() throws GFacException {
- GFacAPI gFacAPI = new GFacAPI();
- gFacAPI.submitJob(jobExecutionContext);
- MessageContext outMessageContext =
jobExecutionContext.getOutMessageContext();
-
Assert.assertEquals(MappingFactory.toString((ActualParameter)outMessageContext.getParameter("echo_output")),
"hello");
+// GFacAPI gFacAPI = new GFacAPI();
+// gFacAPI.submitJob(jobExecutionContext);
+// MessageContext outMessageContext =
jobExecutionContext.getOutMessageContext();
+//
Assert.assertEquals(MappingFactory.toString((ActualParameter)outMessageContext.getParameter("echo_output")),
"hello");
}
}