Repository: airavata Updated Branches: refs/heads/airavata-0.15-release-branch f18e34307 -> 1951bfd10
Trying to push outputs to registry so that PGA can show the outputs, still in progress.. Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/1951bfd1 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/1951bfd1 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/1951bfd1 Branch: refs/heads/airavata-0.15-release-branch Commit: 1951bfd107bb0811516415cddb720183ca6c92a4 Parents: f18e343 Author: msmemon <[email protected]> Authored: Tue Jun 23 15:10:27 2015 +0200 Committer: msmemon <[email protected]> Committed: Tue Jun 23 15:10:27 2015 +0200 ---------------------------------------------------------------------- .../gfac/bes/provider/impl/BESProvider.java | 6 +- .../gfac/bes/utils/DataTransferrer.java | 174 +++++++++---------- 2 files changed, 85 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/1951bfd1/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java index 3ed08cb..69e4915 100644 --- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java +++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java @@ -49,6 +49,7 @@ import org.apache.airavata.model.messaging.event.JobIdentifier; import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent; import org.apache.airavata.model.workspace.experiment.JobDetails; import org.apache.airavata.model.workspace.experiment.JobState; +import org.apache.airavata.registry.cpi.ChildDataType; import org.apache.xmlbeans.XmlCursor; import org.bouncycastle.asn1.x500.style.BCStyle; import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStateEnumeration; @@ -141,8 +142,6 @@ public class BESProvider extends AbstractProvider implements GFacProvider, log.info("Submitted JSDL: " + jobDefinition.getJobDescription()); - - // upload files if any DataTransferrer dt = new DataTransferrer(jobExecutionContext, sc); dt.uploadLocalFiles(); @@ -228,7 +227,8 @@ public class BESProvider extends AbstractProvider implements GFacProvider, } catch (Exception e) { log.error("Cannot create storage.."); throw new GFacProviderException("Cannot create storage..", e); - } + } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/1951bfd1/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java index 2dbf637..7de0a11 100644 --- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java +++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java @@ -26,25 +26,21 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; -import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; -import java.net.URL; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.Set; import org.apache.airavata.gfac.Constants; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.provider.GFacProviderException; -import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; import org.apache.airavata.model.appcatalog.appinterface.DataType; import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType; import org.apache.airavata.model.workspace.experiment.TaskDetails; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.FilenameUtils; +import org.apache.airavata.registry.cpi.ChildDataType; +import org.apache.airavata.registry.cpi.Registry; +import org.apache.airavata.registry.cpi.RegistryException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,11 +54,42 @@ public class DataTransferrer { private StorageClient storageClient; + private List<OutputDataObjectType> resultantOutputsLst; + + private String downloadLocation, stdoutLocation, stderrLocation; + public DataTransferrer(JobExecutionContext jobContext, StorageClient storageClient) { this.jobContext = jobContext; this.storageClient = storageClient; + resultantOutputsLst = new ArrayList<OutputDataObjectType>(); + initStdoutsLocation(); } + private void initStdoutsLocation() { + + downloadLocation = getDownloadLocation(); + + String stdout = jobContext.getStandardOutput(); + String stderr = jobContext.getStandardError(); + if(stdout != null) { + stdout = stdout.substring(stdout.lastIndexOf('/')+1); + } + + if(stderr != null) { + stderr = stderr.substring(stderr.lastIndexOf('/')+1); + } + + String stdoutFileName = (stdout == null || stdout.equals("")) ? "stdout" + : stdout; + String stderrFileName = (stdout == null || stderr.equals("")) ? "stderr" + : stderr; + + stdoutLocation = downloadLocation+File.separator+stdoutFileName; + + stderrLocation = downloadLocation+File.separator+stderrFileName; + + + } public void uploadLocalFiles() throws GFacProviderException { List<String> inFilePrms = extractInFileParams(); @@ -86,43 +113,13 @@ public class DataTransferrer { } + /** * This method will download all the remote files specified in the output * context of a job. * */ public void downloadRemoteFiles() throws GFacProviderException { - String downloadLocation = getDownloadLocation(); - - File file = new File(downloadLocation); - if(!file.exists()){ - file.mkdirs(); - } - -// Map<String, Object> output = jobContext.getOutMessageContext().getParameters(); -// Set<String> keys = output.keySet(); -// -// for (String outPrm : keys) { -// OutputDataObjectType actualParameter = (OutputDataObjectType) output.get(outPrm); -// if (DataType.STDERR == actualParameter.getType() || -// DataType.STDOUT == actualParameter.getType() || -// DataType.URI == actualParameter.getType()) { -// continue; -// } -// -// String value = actualParameter.getValue(); -// FileDownloader fileDownloader = new FileDownloader(value,downloadLocation, Mode.overwrite); -// try { -// fileDownloader.perform(storageClient); -// String outputPath = downloadLocation + File.separator + value.substring(value.lastIndexOf('/')+1); -// actualParameter.setValue(outputPath); -// actualParameter.setType(DataType.URI); -// jobContext.addOutputFile(outputPath); -// } catch (Exception e) { -// throw new GFacProviderException(e.getLocalizedMessage(),e); -// } -// } - if(log.isDebugEnabled()) { log.debug("Download location is:"+downloadLocation); } @@ -136,48 +133,36 @@ public class DataTransferrer { if(output.getType().equals(DataType.STRING)) { String value = output.getValue(); - FileDownloader fileDownloader = new FileDownloader(value,downloadLocation, Mode.overwrite); + String outputPath = downloadLocation + File.separator + value; + FileDownloader fileDownloader = new FileDownloader(value,outputPath, Mode.overwrite); try { fileDownloader.perform(storageClient); - String outputPath = downloadLocation + File.separator + value; - jobContext.addOutputFile(outputPath); } catch (Exception e) { - log.error("Cannot download remote files.."); + log.error("Error downloading remote files.."); throw new GFacProviderException(e.getLocalizedMessage(),e); } + resultantOutputsLst.add(output); + jobContext.addOutputFile(outputPath); + } + + if(output.getType().equals(DataType.STDOUT)) { + resultantOutputsLst.add(output); + } + + if(output.getType().equals(DataType.STDERR)) { + resultantOutputsLst.add(output); } } } - downloadStdOuts(); } - public void downloadStdOuts() throws GFacProviderException{ - String downloadLocation = getDownloadLocation(); - File file = new File(downloadLocation); - if(!file.exists()){ - file.mkdirs(); - } - String stdout = jobContext.getStandardOutput(); - String stderr = jobContext.getStandardError(); - if(stdout != null) { - stdout = stdout.substring(stdout.lastIndexOf('/')+1); - } + String stdoutFileName = new File(stdoutLocation).getName(); - if(stderr != null) { - stderr = stderr.substring(stderr.lastIndexOf('/')+1); - } + String stderrFileName = new File(stderrLocation).getName(); - String stdoutFileName = (stdout == null || stdout.equals("")) ? "stdout" - : stdout; - String stderrFileName = (stdout == null || stderr.equals("")) ? "stderr" - : stderr; - - ApplicationDeploymentDescription application = jobContext.getApplicationContext().getApplicationDeploymentDescription(); - - String stdoutLocation = downloadLocation+File.separator+stdoutFileName; FileDownloader f1 = new FileDownloader(stdoutFileName,stdoutLocation, Mode.overwrite); try { f1.perform(storageClient); @@ -185,47 +170,45 @@ public class DataTransferrer { String stdoutput = readFile(stdoutLocation); jobContext.addOutputFile(stdoutLocation); jobContext.setStandardOutput(stdoutLocation); - log.info("Stdout downloaded to -> "+stdoutLocation); + log.info(stdoutFileName + " -> "+stdoutLocation); if(UASDataStagingProcessor.isUnicoreEndpoint(jobContext)) { String scriptExitCodeFName = "UNICORE_SCRIPT_EXIT_CODE"; String scriptCodeLocation = downloadLocation+File.separator+scriptExitCodeFName; f1.setFrom(scriptExitCodeFName); f1.setTo(scriptCodeLocation); f1.perform(storageClient); - log.info("UNICORE_SCRIPT_EXIT_CODE downloaded to "+scriptCodeLocation); + jobContext.addOutputFile(scriptCodeLocation); + log.info("UNICORE_SCRIPT_EXIT_CODE -> "+scriptCodeLocation); } - String stderrLocation = downloadLocation+File.separator+stderrFileName; + f1.setFrom(stderrFileName); f1.setTo(stderrLocation); f1.perform(storageClient); String stderror = readFile(stderrLocation); jobContext.addOutputFile(stderrLocation); jobContext.setStandardError(stderrLocation); - log.info("Stderr downloaded to -> "+stderrLocation); + log.info(stderrFileName + " -> " + stderrLocation); } catch (Exception e) { - throw new GFacProviderException(e.getLocalizedMessage(),e); } + publishFinalOutputs(); } - public List<String> extractOutParams(JobExecutionContext context) { - List<String> outPrmsList = new ArrayList<String>(); - List<OutputDataObjectType> applicationOutputs = jobContext.getTaskData().getApplicationOutputs(); - if (applicationOutputs != null && !applicationOutputs.isEmpty()){ - for (OutputDataObjectType output : applicationOutputs){ - if(output.getType().equals(DataType.STRING)) { - outPrmsList.add(output.getValue()); - } - else if(output.getType().equals(DataType.FLOAT) || output.getType().equals(DataType.INTEGER)) { - outPrmsList.add(String.valueOf(output.getValue())); - - } - } - } - return outPrmsList; + protected void publishFinalOutputs() throws GFacProviderException { + try { + if(!resultantOutputsLst.isEmpty()) { + Registry registry = jobContext.getRegistry(); + registry.add(ChildDataType.EXPERIMENT_OUTPUT, resultantOutputsLst, jobContext.getExperimentID()); + } + } catch (RegistryException e) { + throw new GFacProviderException("Cannot publish outputs to the registry."); + } + + } + public List<String> extractInFileParams() { List<String> filePrmsList = new ArrayList<String>(); List<InputDataObjectType> applicationInputs = jobContext.getTaskData().getApplicationInputs(); @@ -274,11 +257,6 @@ public class DataTransferrer { else { - try { - URI u = new URI(outputDataDir); - } catch (URISyntaxException e) { - return getTempPath(jobContext.getExperimentID()); - } // in case of remote locations use the tmp location if (outputDataDir.startsWith("scp:") || outputDataDir.startsWith("ftp:") || @@ -289,12 +267,24 @@ public class DataTransferrer { outputDataDir = getTempPath(jobContext.getExperimentID()); } else { - outputDataDir = taskData.getAdvancedOutputDataHandling() - .getOutputDataDir(); + try { + URI u = new URI(outputDataDir); + outputDataDir = u.getPath(); + } catch (URISyntaxException e) { + outputDataDir = getTempPath(jobContext.getExperimentID()); + } + + } } } + + File file = new File(outputDataDir); + if(!file.exists()){ + file.mkdirs(); + } + return outputDataDir; }
