Repository: airavata
Updated Branches:
  refs/heads/airavata-0.15-release-branch f18e34307 -> 1951bfd10


Trying to push outputs to registry so that PGA can show the outputs,
still in progress..

Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/1951bfd1
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/1951bfd1
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/1951bfd1

Branch: refs/heads/airavata-0.15-release-branch
Commit: 1951bfd107bb0811516415cddb720183ca6c92a4
Parents: f18e343
Author: msmemon <[email protected]>
Authored: Tue Jun 23 15:10:27 2015 +0200
Committer: msmemon <[email protected]>
Committed: Tue Jun 23 15:10:27 2015 +0200

----------------------------------------------------------------------
 .../gfac/bes/provider/impl/BESProvider.java     |   6 +-
 .../gfac/bes/utils/DataTransferrer.java         | 174 +++++++++----------
 2 files changed, 85 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/1951bfd1/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
 
b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
index 3ed08cb..69e4915 100644
--- 
a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
+++ 
b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
@@ -49,6 +49,7 @@ import 
org.apache.airavata.model.messaging.event.JobIdentifier;
 import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.registry.cpi.ChildDataType;
 import org.apache.xmlbeans.XmlCursor;
 import org.bouncycastle.asn1.x500.style.BCStyle;
 import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStateEnumeration;
@@ -141,8 +142,6 @@ public class BESProvider extends AbstractProvider 
implements GFacProvider,
             
             log.info("Submitted JSDL: " + jobDefinition.getJobDescription());
             
-            
-
             // upload files if any
             DataTransferrer dt = new DataTransferrer(jobExecutionContext, sc);
             dt.uploadLocalFiles();
@@ -228,7 +227,8 @@ public class BESProvider extends AbstractProvider 
implements GFacProvider,
         } catch (Exception e) {
             log.error("Cannot create storage..");
             throw new GFacProviderException("Cannot create storage..", e);
-        } 
+        }
+        
     }
        
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/1951bfd1/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
 
b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
index 2dbf637..7de0a11 100644
--- 
a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
+++ 
b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java
@@ -26,25 +26,21 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
-import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 import org.apache.airavata.gfac.Constants;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.provider.GFacProviderException;
-import 
org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
 import org.apache.airavata.model.appcatalog.appinterface.DataType;
 import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
 import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.FilenameUtils;
+import org.apache.airavata.registry.cpi.ChildDataType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,11 +54,42 @@ public class DataTransferrer {
        
        private StorageClient storageClient;
        
+       private List<OutputDataObjectType> resultantOutputsLst;
+       
+       private String downloadLocation, stdoutLocation, stderrLocation;
+       
        public DataTransferrer(JobExecutionContext jobContext, StorageClient 
storageClient) {
                this.jobContext = jobContext;
                this.storageClient = storageClient;
+               resultantOutputsLst = new ArrayList<OutputDataObjectType>();
+               initStdoutsLocation();
        }
        
+       private void initStdoutsLocation() {
+
+               downloadLocation = getDownloadLocation();
+               
+               String stdout = jobContext.getStandardOutput();
+               String stderr = jobContext.getStandardError();
+               if(stdout != null) {
+                       stdout = stdout.substring(stdout.lastIndexOf('/')+1);
+               }
+               
+               if(stderr != null) {
+                       stderr = stderr.substring(stderr.lastIndexOf('/')+1);
+               }
+               
+               String stdoutFileName = (stdout == null || stdout.equals("")) ? 
"stdout"
+                               : stdout;
+               String stderrFileName = (stdout == null || stderr.equals("")) ? 
"stderr"
+                               : stderr;
+               
+               stdoutLocation = downloadLocation+File.separator+stdoutFileName;
+               
+               stderrLocation = downloadLocation+File.separator+stderrFileName;
+
+               
+       }
        
        public void uploadLocalFiles() throws GFacProviderException {
                List<String> inFilePrms = extractInFileParams();
@@ -86,43 +113,13 @@ public class DataTransferrer {
                
        }
        
+
        /**
         * This method will download all the remote files specified in the 
output 
         * context of a job.  
         * */
        public void downloadRemoteFiles() throws GFacProviderException {
                
-               String downloadLocation = getDownloadLocation();
-               
-               File file = new File(downloadLocation);
-               if(!file.exists()){
-                       file.mkdirs();  
-               }
-
-//             Map<String, Object> output = 
jobContext.getOutMessageContext().getParameters();
-//        Set<String> keys = output.keySet();
-//        
-//             for (String outPrm : keys) {
-//                     OutputDataObjectType actualParameter = 
(OutputDataObjectType) output.get(outPrm);
-//                             if (DataType.STDERR == 
actualParameter.getType() ||
-//                                     DataType.STDOUT == 
actualParameter.getType() ||
-//                                     DataType.URI == 
actualParameter.getType())  {
-//                                     continue;
-//                             }
-//                             
-//                             String value = actualParameter.getValue();
-//                             FileDownloader fileDownloader = new 
FileDownloader(value,downloadLocation, Mode.overwrite);
-//                             try {
-//                                     fileDownloader.perform(storageClient);
-//                                     String outputPath = downloadLocation + 
File.separator + value.substring(value.lastIndexOf('/')+1);
-//                                     actualParameter.setValue(outputPath);
-//                                     actualParameter.setType(DataType.URI);
-//                                     jobContext.addOutputFile(outputPath);
-//                             } catch (Exception e) {
-//                                     throw new 
GFacProviderException(e.getLocalizedMessage(),e);
-//                             }
-//             }
-               
                if(log.isDebugEnabled()) {
                        log.debug("Download location is:"+downloadLocation);
                }
@@ -136,48 +133,36 @@ public class DataTransferrer {
 
                        if(output.getType().equals(DataType.STRING)) {
                                        String value = output.getValue();
-                                       FileDownloader fileDownloader = new 
FileDownloader(value,downloadLocation, Mode.overwrite);
+                                       String outputPath = downloadLocation + 
File.separator + value;
+                                       FileDownloader fileDownloader = new 
FileDownloader(value,outputPath, Mode.overwrite);
                                        try {
                                                
fileDownloader.perform(storageClient);
-                                               String outputPath = 
downloadLocation + File.separator + value;
-                                               
jobContext.addOutputFile(outputPath);
                                        } catch (Exception e) {
-                                               log.error("Cannot download 
remote files..");
+                                               log.error("Error downloading 
remote files..");
                                                throw new 
GFacProviderException(e.getLocalizedMessage(),e);
                                        }
+                                       resultantOutputsLst.add(output);
+                                       jobContext.addOutputFile(outputPath);
+                       }
+                       
+                       if(output.getType().equals(DataType.STDOUT)) {
+                               resultantOutputsLst.add(output);
+                       }
+                       
+                       if(output.getType().equals(DataType.STDERR)) {
+                               resultantOutputsLst.add(output);
                        }
             }
                 }
-               
                downloadStdOuts();
        }
        
-       
        public void downloadStdOuts()  throws GFacProviderException{
-               String downloadLocation = getDownloadLocation();
-               File file = new File(downloadLocation);
-               if(!file.exists()){
-                       file.mkdirs();  
-               }
                
-               String stdout = jobContext.getStandardOutput();
-               String stderr = jobContext.getStandardError();
-               if(stdout != null) {
-                       stdout = stdout.substring(stdout.lastIndexOf('/')+1);
-               }
+               String stdoutFileName = new File(stdoutLocation).getName();
                
-               if(stderr != null) {
-                       stderr = stderr.substring(stderr.lastIndexOf('/')+1);
-               }
+               String stderrFileName = new File(stderrLocation).getName();
                
-               String stdoutFileName = (stdout == null || stdout.equals("")) ? 
"stdout"
-                               : stdout;
-               String stderrFileName = (stdout == null || stderr.equals("")) ? 
"stderr"
-                               : stderr;
-               
-               ApplicationDeploymentDescription application = 
jobContext.getApplicationContext().getApplicationDeploymentDescription();
-               
-               String stdoutLocation = 
downloadLocation+File.separator+stdoutFileName;
                FileDownloader f1 = new 
FileDownloader(stdoutFileName,stdoutLocation, Mode.overwrite);
                try {
                        f1.perform(storageClient);
@@ -185,47 +170,45 @@ public class DataTransferrer {
                        String stdoutput = readFile(stdoutLocation);
                        jobContext.addOutputFile(stdoutLocation);
                        jobContext.setStandardOutput(stdoutLocation);
-                       log.info("Stdout downloaded to -> "+stdoutLocation);
+                       log.info(stdoutFileName + " -> "+stdoutLocation);
                        
if(UASDataStagingProcessor.isUnicoreEndpoint(jobContext)) {
                                String scriptExitCodeFName = 
"UNICORE_SCRIPT_EXIT_CODE";
                                String scriptCodeLocation = 
downloadLocation+File.separator+scriptExitCodeFName;
                                f1.setFrom(scriptExitCodeFName);
                                f1.setTo(scriptCodeLocation);
                                f1.perform(storageClient);
-                               log.info("UNICORE_SCRIPT_EXIT_CODE downloaded 
to "+scriptCodeLocation);
+                               jobContext.addOutputFile(scriptCodeLocation);
+                               log.info("UNICORE_SCRIPT_EXIT_CODE -> 
"+scriptCodeLocation);
                        }
-                       String stderrLocation = 
downloadLocation+File.separator+stderrFileName;
+                       
                        f1.setFrom(stderrFileName);
                        f1.setTo(stderrLocation);
                        f1.perform(storageClient);
                        String stderror = readFile(stderrLocation);
                        jobContext.addOutputFile(stderrLocation);
                        jobContext.setStandardError(stderrLocation);
-                       log.info("Stderr downloaded to -> "+stderrLocation);
+                       log.info(stderrFileName + " -> " + stderrLocation);
                } catch (Exception e) {
-                       
                        throw new 
GFacProviderException(e.getLocalizedMessage(),e);
                }
                
+               publishFinalOutputs();
        }
        
-       public List<String> extractOutParams(JobExecutionContext context) {
-               List<String> outPrmsList = new ArrayList<String>();
-               List<OutputDataObjectType> applicationOutputs = 
jobContext.getTaskData().getApplicationOutputs();
-                if (applicationOutputs != null && 
!applicationOutputs.isEmpty()){
-           for (OutputDataObjectType output : applicationOutputs){
-                if(output.getType().equals(DataType.STRING)) {
-                       outPrmsList.add(output.getValue());
-                }
-                else if(output.getType().equals(DataType.FLOAT) || 
output.getType().equals(DataType.INTEGER)) {
-                       outPrmsList.add(String.valueOf(output.getValue()));
-                        
-                }
-           }
-                }
-               return outPrmsList;
+       protected void publishFinalOutputs() throws GFacProviderException {
+        try {
+               if(!resultantOutputsLst.isEmpty()) { 
+                       Registry registry = jobContext.getRegistry();
+                               registry.add(ChildDataType.EXPERIMENT_OUTPUT, 
resultantOutputsLst, jobContext.getExperimentID());
+               }
+               } catch (RegistryException e) {
+                       throw new GFacProviderException("Cannot publish outputs 
to the registry.");
+               }
+
+               
        }
        
+       
        public List<String> extractInFileParams() {
                List<String> filePrmsList = new ArrayList<String>();
                List<InputDataObjectType> applicationInputs = 
jobContext.getTaskData().getApplicationInputs();
@@ -274,11 +257,6 @@ public class DataTransferrer {
 
                        else {
                                
-                               try {
-                                       URI u = new URI(outputDataDir);
-                               } catch (URISyntaxException e) {
-                                       return 
getTempPath(jobContext.getExperimentID());
-                               }
                                // in case of remote locations use the tmp 
location
                                if (outputDataDir.startsWith("scp:") || 
                                                
outputDataDir.startsWith("ftp:") ||
@@ -289,12 +267,24 @@ public class DataTransferrer {
                                                        outputDataDir = 
getTempPath(jobContext.getExperimentID());
                                        
                                } else {
-                                       outputDataDir = 
taskData.getAdvancedOutputDataHandling()
-                                                       .getOutputDataDir();
+                                       try {
+                                               URI u = new URI(outputDataDir);
+                                               outputDataDir = u.getPath();
+                                       } catch (URISyntaxException e) {
+                                               outputDataDir = 
getTempPath(jobContext.getExperimentID());
+                                       }
+
+                                       
                                }
                        }
                }
+               
+               File file = new File(outputDataDir);
+               if(!file.exists()){
+                       file.mkdirs();  
+               }
 
+               
                return outputDataDir;
        }
 

Reply via email to