Add implementation for BESJobSubmissionTask

Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d231956e
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d231956e
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d231956e

Branch: refs/heads/feature-workload-mgmt
Commit: d231956e853aca385f6187f5ded5978a21d6548f
Parents: 9f0e45b
Author: Gourav Shenoy <[email protected]>
Authored: Tue May 2 13:50:17 2017 -0400
Committer: Gourav Shenoy <[email protected]>
Committed: Tue May 2 13:50:17 2017 -0400

----------------------------------------------------------------------
 modules/worker/task-jobsubmission/pom.xml       |  33 ++
 .../impl/BESJobSubmissionTask.java              |  83 +--
 .../jobsubmission/utils/bes/ActivityInfo.java   |  50 ++
 .../utils/bes/ApplicationProcessor.java         | 221 ++++++++
 .../jobsubmission/utils/bes/BESConstants.java   |  45 ++
 .../utils/bes/DataTransferrer.java              | 328 ++++++++++++
 .../jobsubmission/utils/bes/FileDownloader.java | 255 +++++++++
 .../utils/bes/FileTransferBase.java             | 223 ++++++++
 .../jobsubmission/utils/bes/FileUploader.java   | 242 +++++++++
 .../jobsubmission/utils/bes/JSDLGenerator.java  | 115 ++++
 .../task/jobsubmission/utils/bes/JSDLUtils.java | 517 ++++++++++++++++++
 .../task/jobsubmission/utils/bes/Mode.java      |  45 ++
 .../jobsubmission/utils/bes/MyProxyLogon.java   | 465 ++++++++++++++++
 .../task/jobsubmission/utils/bes/OSType.java    | 124 +++++
 .../utils/bes/ProcessorRequirement.java         |  61 +++
 .../jobsubmission/utils/bes/RangeValueType.java | 271 ++++++++++
 .../utils/bes/ResourceProcessor.java            |  97 ++++
 .../utils/bes/ResourceRequirement.java          |  34 ++
 .../jobsubmission/utils/bes/SPMDVariations.java |  52 ++
 .../jobsubmission/utils/bes/SecurityUtils.java  | 160 ++++++
 .../jobsubmission/utils/bes/StorageCreator.java | 207 ++++++++
 .../utils/bes/UASDataStagingProcessor.java      | 182 +++++++
 .../utils/bes/UNICORESecurityContext.java       | 195 +++++++
 .../task/jobsubmission/utils/bes/URIUtils.java  | 121 +++++
 .../utils/bes/X509SecurityContext.java          | 340 ++++++++++++
 modules/worker/worker-core/pom.xml              |   6 +
 .../airavata/worker/core/RequestData.java       | 149 ++++++
 .../airavata/worker/core/SecurityContext.java   |  24 +
 .../core/context/AbstractSecurityContext.java   |  57 ++
 .../airavata/worker/core/utils/SSHUtils.java    | 524 +++++++++++++++++++
 .../worker/core/utils/WorkerFactory.java        |  12 +
 .../airavata/worker/core/utils/WorkerUtils.java |  64 +++
 32 files changed, 5262 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/pom.xml
----------------------------------------------------------------------
diff --git a/modules/worker/task-jobsubmission/pom.xml 
b/modules/worker/task-jobsubmission/pom.xml
index 7d9506e..45d720e 100644
--- a/modules/worker/task-jobsubmission/pom.xml
+++ b/modules/worker/task-jobsubmission/pom.xml
@@ -41,6 +41,39 @@
             <artifactId>aurora-client</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>eu.unicore</groupId>
+            <artifactId>unicore-client-wrapper</artifactId>
+            <version>1.7.2_1</version>
+            <exclusions>
+                <!-- <exclusion>
+                    <groupId>org.apache.santuario</groupId>
+                    <artifactId>xmlsec</artifactId>
+                </exclusion> -->
+                <exclusion>
+                    <groupId>net.sf.saxon</groupId>
+                    <artifactId>saxon</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>net.sf.saxon</groupId>
+                    <artifactId>saxon-dom</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>net.sf.saxon</groupId>
+                    <artifactId>saxon-xpath</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>net.sf.saxon</groupId>
+            <artifactId>Saxon-HE</artifactId>
+            <version>9.6.0-1</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-httpclient</groupId>
+            <artifactId>commons-httpclient</artifactId>
+            <version>3.1</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java
----------------------------------------------------------------------
diff --git 
a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java
 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java
index 2c6b984..65668c0 100644
--- 
a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java
+++ 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java
@@ -31,17 +31,6 @@ import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities;
 import eu.unicore.util.httpclient.DefaultClientConfiguration;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.credential.store.store.CredentialStoreException;
-import org.apache.airavata.gfac.core.GFacException;
-import org.apache.airavata.gfac.core.GFacUtils;
-import org.apache.airavata.gfac.core.SSHApiException;
-import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
-import org.apache.airavata.gfac.core.cluster.ServerInfo;
-import org.apache.airavata.gfac.core.context.ProcessContext;
-import org.apache.airavata.gfac.core.context.TaskContext;
-import org.apache.airavata.gfac.core.task.JobSubmissionTask;
-import org.apache.airavata.gfac.core.task.TaskException;
-import org.apache.airavata.gfac.impl.Factory;
-import org.apache.airavata.gfac.impl.SSHUtils;
 import 
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
 import 
org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
 import 
org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
@@ -59,7 +48,21 @@ import org.apache.airavata.model.task.TaskTypes;
 import org.apache.airavata.registry.cpi.AppCatalogException;
 import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
 import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.worker.core.authentication.AuthenticationInfo;
+import org.apache.airavata.worker.core.cluster.ServerInfo;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.context.TaskContext;
+import org.apache.airavata.worker.core.exceptions.SSHApiException;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.apache.airavata.worker.core.task.TaskException;
+import org.apache.airavata.worker.core.utils.SSHUtils;
+import org.apache.airavata.worker.core.utils.WorkerFactory;
+import org.apache.airavata.worker.core.utils.WorkerUtils;
+import org.apache.airavata.worker.task.jobsubmission.JobSubmissionTask;
+import org.apache.airavata.worker.task.jobsubmission.utils.JobSubmissionUtils;
+import org.apache.airavata.worker.task.jobsubmission.utils.bes.*;
 import org.apache.xmlbeans.XmlCursor;
+import org.ggf.schemas.bes.x2006.x08.besFactory.*;
 import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -105,7 +108,7 @@ public class BESJobSubmissionTask implements 
JobSubmissionTask {
             // con't reuse if UserDN has been changed.
             secProperties = getSecurityConfig(processContext);
             // try secProperties = secProperties.clone() if we can't use 
already initialized ClientConfigurations.
-        } catch (GFacException e) {
+        } catch (WorkerException e) {
             String msg = "Unicorn security context initialization error";
             log.error(msg, e);
             taskStatus.setState(TaskState.FAILED);
@@ -115,10 +118,10 @@ public class BESJobSubmissionTask implements 
JobSubmissionTask {
 
         try {
             JobSubmissionProtocol protocol = 
processContext.getJobSubmissionProtocol();
-            JobSubmissionInterface jobSubmissionInterface = 
GFacUtils.getPreferredJobSubmissionInterface(processContext);
+            JobSubmissionInterface jobSubmissionInterface = 
JobSubmissionUtils.getPreferredJobSubmissionInterface(processContext);
             String factoryUrl = null;
             if (protocol.equals(JobSubmissionProtocol.UNICORE)) {
-                UnicoreJobSubmission unicoreJobSubmission = 
GFacUtils.getUnicoreJobSubmission(
+                UnicoreJobSubmission unicoreJobSubmission = 
JobSubmissionUtils.getUnicoreJobSubmission(
                         jobSubmissionInterface.getJobSubmissionInterfaceId());
                 factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL();
             }
@@ -167,8 +170,8 @@ public class BESJobSubmissionTask implements 
JobSubmissionTask {
             jobDetails.setJobDescription(activityEpr.toString());
             jobDetails.setJobStatuses(Arrays.asList(new 
JobStatus(JobState.SUBMITTED)));
             processContext.setJobModel(jobDetails);
-            GFacUtils.saveJobModel(processContext, jobDetails);
-            GFacUtils.saveJobStatus(processContext, jobDetails);
+            JobSubmissionUtils.saveJobModel(processContext, jobDetails);
+            WorkerUtils.saveJobStatus(processContext, jobDetails);
             log.info(formatStatusMessage(activityEpr.getAddress()
                     .getStringValue(), factory.getActivityStatus(activityEpr)
                     .toString()));
@@ -205,8 +208,8 @@ public class BESJobSubmissionTask implements 
JobSubmissionTask {
             } else if (activityStatus.getState() == 
ActivityStateEnumeration.CANCELLED) {
                 JobState applicationJobStatus = JobState.CANCELED;
                 jobDetails.setJobStatuses(Arrays.asList(new 
JobStatus(applicationJobStatus)));
-                GFacUtils.saveJobStatus(processContext, jobDetails);
-                throw new GFacException(
+                WorkerUtils.saveJobStatus(processContext, jobDetails);
+                throw new WorkerException(
                         processContext.getExperimentId() + "Job Canceled");
             } else if (activityStatus.getState() == 
ActivityStateEnumeration.FINISHED) {
                 try {
@@ -215,7 +218,7 @@ public class BESJobSubmissionTask implements 
JobSubmissionTask {
                 }
                 JobState applicationJobStatus = JobState.COMPLETE;
                 jobDetails.setJobStatuses(Arrays.asList(new 
JobStatus(applicationJobStatus)));
-                GFacUtils.saveJobStatus(processContext, jobDetails);
+                WorkerUtils.saveJobStatus(processContext, jobDetails);
                 log.info("Job Id: {}, exit code: {}, exit status: {}", 
jobDetails.getJobId(),
                         activityStatus.getExitCode(), 
ActivityStateEnumeration.FINISHED.toString());
 
@@ -228,7 +231,7 @@ public class BESJobSubmissionTask implements 
JobSubmissionTask {
             if (copyOutput != null) {
                 copyOutputFilesToStorage(taskContext, copyOutput);
                 for (OutputDataObjectType outputDataObjectType : copyOutput) {
-                    GFacUtils.saveExperimentOutput(processContext, 
outputDataObjectType.getName(), outputDataObjectType.getValue());
+                    WorkerUtils.saveExperimentOutput(processContext, 
outputDataObjectType.getName(), outputDataObjectType.getValue());
                 }
             }
 //            dt.publishFinalOutputs();
@@ -244,13 +247,13 @@ public class BESJobSubmissionTask implements 
JobSubmissionTask {
         return taskStatus;
     }
 
-    private void copyOutputFilesToStorage(TaskContext taskContext, 
List<OutputDataObjectType> copyOutput) throws GFacException {
+    private void copyOutputFilesToStorage(TaskContext taskContext, 
List<OutputDataObjectType> copyOutput) throws WorkerException {
         ProcessContext pc = taskContext.getParentProcessContext();
         String remoteFilePath = null, fileName = null, localFilePath = null;
         try {
-            authenticationInfo = Factory.getStorageSSHKeyAuthentication(pc);
+            authenticationInfo = 
WorkerFactory.getStorageSSHKeyAuthentication(pc);
             ServerInfo serverInfo = pc.getComputeResourceServerInfo();
-            Session sshSession = Factory.getSSHSession(authenticationInfo, 
serverInfo);
+            Session sshSession = 
WorkerFactory.getSSHSession(authenticationInfo, serverInfo);
             for (OutputDataObjectType output : copyOutput) {
                 switch (output.getType()) {
                     case STDERR: case STDOUT: case STRING: case URI:
@@ -259,7 +262,7 @@ public class BESJobSubmissionTask implements 
JobSubmissionTask {
                             localFilePath = 
localFilePath.substring(localFilePath.indexOf("://") + 2, 
localFilePath.length());
                         }
                         fileName = 
localFilePath.substring(localFilePath.lastIndexOf("/") + 1);
-                        URI destinationURI = 
TaskUtils.getDestinationURI(taskContext, hostName, inputPath, fileName);
+                        URI destinationURI = 
WorkerUtils.getDestinationURI(taskContext, hostName, inputPath, fileName);
                         remoteFilePath = destinationURI.getPath();
                         log.info("SCP local file :{} -> from remote :{}", 
localFilePath, remoteFilePath);
                         SSHUtils.scpTo(localFilePath, remoteFilePath, 
sshSession);
@@ -271,18 +274,18 @@ public class BESJobSubmissionTask implements 
JobSubmissionTask {
             }
         } catch (IOException | JSchException | SSHApiException | 
URISyntaxException | CredentialStoreException e) {
             log.error("Error while coping local file " + localFilePath + " to 
remote " + remoteFilePath, e);
-            throw new GFacException("Error while scp output files to remote 
storage file location", e);
+            throw new WorkerException("Error while scp output files to remote 
storage file location", e);
         }
     }
 
-    private void copyInputFilesToLocal(TaskContext taskContext) throws 
GFacException {
+    private void copyInputFilesToLocal(TaskContext taskContext) throws 
WorkerException {
         ProcessContext pc = taskContext.getParentProcessContext();
         StorageResourceDescription storageResource = pc.getStorageResource();
 
         if (storageResource != null) {
             hostName = storageResource.getHostName();
         } else {
-            throw new GFacException("Storage Resource is null");
+            throw new WorkerException("Storage Resource is null");
         }
         inputPath = pc.getStorageFileSystemRootLocation();
         inputPath = (inputPath.endsWith(File.separator) ? inputPath : 
inputPath + File.separator);
@@ -290,9 +293,9 @@ public class BESJobSubmissionTask implements 
JobSubmissionTask {
         String remoteFilePath = null, fileName = null, localFilePath = null;
         URI remoteFileURI = null;
         try {
-            authenticationInfo = Factory.getStorageSSHKeyAuthentication(pc);
+            authenticationInfo = 
WorkerFactory.getStorageSSHKeyAuthentication(pc);
             ServerInfo serverInfo = pc.getStorageResourceServerInfo();
-            Session sshSession = Factory.getSSHSession(authenticationInfo, 
serverInfo);
+            Session sshSession = 
WorkerFactory.getSSHSession(authenticationInfo, serverInfo);
 
             List<InputDataObjectType> processInputs = 
pc.getProcessModel().getProcessInputs();
             for (InputDataObjectType input : processInputs) {
@@ -308,11 +311,11 @@ public class BESJobSubmissionTask implements 
JobSubmissionTask {
             }
         } catch (IOException | JSchException | SSHApiException | 
URISyntaxException e) {
             log.error("Error while coping remote file " + remoteFilePath + " 
to local " + localFilePath, e);
-            throw new GFacException("Error while scp input files to local file 
location", e);
+            throw new WorkerException("Error while scp input files to local 
file location", e);
         } catch (CredentialStoreException e) {
             String msg = "Authentication issue, make sure you are passing 
valid credential token";
             log.error(msg, e);
-            throw new GFacException(msg, e);
+            throw new WorkerException(msg, e);
         }
     }
 
@@ -324,7 +327,7 @@ public class BESJobSubmissionTask implements 
JobSubmissionTask {
         processContext.setOutputDir(localPath);
     }
 
-    private DefaultClientConfiguration getSecurityConfig(ProcessContext pc) 
throws GFacException {
+    private DefaultClientConfiguration getSecurityConfig(ProcessContext pc) 
throws WorkerException {
         DefaultClientConfiguration clientConfig = null;
         try {
             UNICORESecurityContext unicoreSecurityContext = 
SecurityUtils.getSecurityContext(pc);
@@ -339,9 +342,9 @@ public class BESJobSubmissionTask implements 
JobSubmissionTask {
                 clientConfig = 
unicoreSecurityContext.getDefaultConfiguration(false);
             }
         } catch (RegistryException e) {
-            throw new GFacException("Error! reading user configuration data 
from registry", e);
+            throw new WorkerException("Error! reading user configuration data 
from registry", e);
         } catch (ApplicationSettingsException e) {
-            throw new GFacException("Error! retrieving default client 
configurations", e);
+            throw new WorkerException("Error! retrieving default client 
configurations", e);
         }
 
         return clientConfig;
@@ -380,8 +383,8 @@ public class BESJobSubmissionTask implements 
JobSubmissionTask {
         }
     }
 
-    private void sendNotification(ProcessContext processContext,  JobModel 
jobModel) throws GFacException {
-        GFacUtils.saveJobStatus(processContext, jobModel);
+    private void sendNotification(ProcessContext processContext,  JobModel 
jobModel) throws WorkerException {
+        WorkerUtils.saveJobStatus(processContext, jobModel);
     }
 
     @Override
@@ -467,9 +470,9 @@ public class BESJobSubmissionTask implements 
JobSubmissionTask {
      * EndpointReference need to be saved to make cancel work.
      *
      * @param processContext
-     * @throws GFacException
+     * @throws WorkerException
      */
-    public boolean cancelJob(ProcessContext processContext) throws 
GFacException {
+    public boolean cancelJob(ProcessContext processContext) throws 
WorkerException {
         try {
             String activityEpr = 
processContext.getJobModel().getJobDescription();
             // initSecurityProperties(processContext);
@@ -479,7 +482,7 @@ public class BESJobSubmissionTask implements 
JobSubmissionTask {
             String interfaceId = 
processContext.getApplicationInterfaceDescription().getApplicationInterfaceId();
             String factoryUrl = null;
             if (protocol.equals(JobSubmissionProtocol.UNICORE)) {
-                UnicoreJobSubmission unicoreJobSubmission = 
GFacUtils.getUnicoreJobSubmission(interfaceId);
+                UnicoreJobSubmission unicoreJobSubmission = 
JobSubmissionUtils.getUnicoreJobSubmission(interfaceId);
                 factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL();
             }
             EndpointReferenceType epr = EndpointReferenceType.Factory
@@ -490,7 +493,7 @@ public class BESJobSubmissionTask implements 
JobSubmissionTask {
             factory.terminateActivity(eprt);
             return true;
         } catch (Exception e) {
-            throw new GFacException(e.getLocalizedMessage(), e);
+            throw new WorkerException(e.getLocalizedMessage(), e);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ActivityInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ActivityInfo.java
 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ActivityInfo.java
new file mode 100644
index 0000000..22cf4db
--- /dev/null
+++ 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ActivityInfo.java
@@ -0,0 +1,50 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStatusType;
+import org.w3.x2005.x08.addressing.EndpointReferenceType;
+
+import java.io.Serializable;
+
+public class ActivityInfo implements Serializable{
+       
+       private static final long serialVersionUID = 1L;
+
+       private EndpointReferenceType activityEPR;
+       
+       private ActivityStatusType activityStatusDoc;
+       
+
+       public EndpointReferenceType getActivityEPR() {
+               return activityEPR;
+       }
+       public void setActivityEPR(EndpointReferenceType activityEPR) {
+               this.activityEPR = activityEPR;
+       }
+       public ActivityStatusType getActivityStatus() {
+               return activityStatusDoc;
+       }
+       public void setActivityStatusDoc(ActivityStatusType activityStatusDoc) {
+               this.activityStatusDoc = activityStatusDoc;
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ApplicationProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ApplicationProcessor.java
 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ApplicationProcessor.java
new file mode 100644
index 0000000..7fb442f
--- /dev/null
+++ 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ApplicationProcessor.java
@@ -0,0 +1,221 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import 
org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.parallelism.ApplicationParallelismType;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.ApplicationType;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
+import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.FileNameType;
+import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.UserNameType;
+import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.NumberOfProcessesType;
+import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.ProcessesPerHostType;
+import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.ThreadsPerProcessType;
+
+import java.util.Iterator;
+import java.util.List;
+
+
+public class ApplicationProcessor {
+       
+       public static void generateJobSpecificAppElements(JobDefinitionType 
value, ProcessContext context){
+               
+               String userName = getUserNameFromContext(context);
+//             if (userName.equalsIgnoreCase("admin")){
+//                     userName = "CN=zdv575, O=Ultrascan Gateway, C=DE";
+//             }
+               
+               ApplicationDeploymentDescription appDep= 
context.getApplicationDeploymentDescription();
+        String appname = 
context.getApplicationInterfaceDescription().getApplicationName();
+        ApplicationParallelismType parallelism = appDep.getParallelism();
+        ApplicationType appType = JSDLUtils.getOrCreateApplication(value);
+        appType.setApplicationName(appname);
+        
+
+//             if (appDep.getSetEnvironment().size() > 0) {
+//            createApplicationEnvironment(value, appDep.getSetEnvironment(), 
parallelism);
+//             }
+//
+        
+               String stdout = context.getStdoutLocation();
+               String stderr = context.getStderrLocation();
+               if(stdout != null) {
+                       stdout = stdout.substring(stdout.lastIndexOf('/')+1);
+               }
+               
+               if(stderr != null) {
+                       stderr = stderr.substring(stderr.lastIndexOf('/')+1);
+               }
+               
+               stdout = (stdout == null || stdout.equals("")) ? 
"stdout":stdout;
+               stderr = (stdout == null || stderr.equals("")) ? 
"stderr":stderr;
+
+        if (appDep.getExecutablePath() != null) {
+                       FileNameType fNameType = 
FileNameType.Factory.newInstance();
+                       fNameType.setStringValue(appDep.getExecutablePath());
+                       if(isParallelJob(context)) {
+                               
JSDLUtils.getOrCreateSPMDApplication(value).setExecutable(fNameType);
+                if (parallelism.equals(ApplicationParallelismType.OPENMP_MPI)){
+                    
JSDLUtils.getSPMDApplication(value).setSPMDVariation(SPMDVariations.OpenMPI.value());
+                }else if (parallelism.equals(ApplicationParallelismType.MPI)){
+                    
JSDLUtils.getSPMDApplication(value).setSPMDVariation(SPMDVariations.MPI.value());
+                }
+                
+                // setting number of processes
+                try { 
+                       String np = getInputAsString(context, 
BESConstants.NUMBER_OF_PROCESSES);
+                       if((np != null)  && (Integer.parseInt(np) > 0)){
+                                               NumberOfProcessesType num = 
NumberOfProcessesType.Factory.newInstance();
+                           num.setStringValue(np);
+                                               
JSDLUtils.getSPMDApplication(value).setNumberOfProcesses(num);
+                                       }
+       
+                }catch(RuntimeException np) {
+                       // do nothing
+                }
+
+
+                try { 
+                       // setting processes per host
+                       String pphost = getInputAsString(context, 
BESConstants.PROCESSES_PER_HOST);
+                       if((pphost != null)  && (Integer.parseInt(pphost) > 0)){
+                           ProcessesPerHostType pph = 
ProcessesPerHostType.Factory.newInstance();
+                           pph.setStringValue(String.valueOf(pphost));
+                           
JSDLUtils.getSPMDApplication(value).setProcessesPerHost(pph);
+                       }
+                }catch(RuntimeException np) {
+                       // do nothing
+                }
+                
+                int totalThreadCount = 
context.getProcessModel().getProcessResourceSchedule().getNumberOfThreads();
+                // we take it as threads per processes
+                if(totalThreadCount > 0){
+                                       ThreadsPerProcessType tpp = 
ThreadsPerProcessType.Factory.newInstance();
+                                       
tpp.setStringValue(String.valueOf(totalThreadCount));
+                                       
JSDLUtils.getSPMDApplication(value).setThreadsPerProcess(tpp);
+                               }
+                               
+                               if(userName != null) {
+                                       UserNameType userNameType = 
UserNameType.Factory.newInstance();
+                                       userNameType.setStringValue(userName);
+                                       
JSDLUtils.getSPMDApplication(value).setUserName(userNameType);
+                               }
+                if (stdout != null){
+                    FileNameType fName = FileNameType.Factory.newInstance();
+                    fName.setStringValue(stdout);
+                    
JSDLUtils.getOrCreateSPMDApplication(value).setOutput(fName);
+                }
+                if (stderr != null){
+                    FileNameType fName = FileNameType.Factory.newInstance();
+                    fName.setStringValue(stderr);
+                    
JSDLUtils.getOrCreateSPMDApplication(value).setError(fName);
+                }
+
+
+                       }
+                       else {
+                               
JSDLUtils.getOrCreatePOSIXApplication(value).setExecutable(fNameType);
+                               if(userName != null) {
+                                       UserNameType userNameType = 
UserNameType.Factory.newInstance();
+                                       userNameType.setStringValue(userName);
+                                       
JSDLUtils.getOrCreatePOSIXApplication(value).setUserName(userNameType);
+                               }
+                if (stdout != null){
+                    FileNameType fName = FileNameType.Factory.newInstance();
+                    fName.setStringValue(stdout);
+                    
JSDLUtils.getOrCreatePOSIXApplication(value).setOutput(fName);
+                }
+                if (stderr != null){
+                    FileNameType fName = FileNameType.Factory.newInstance();
+                    fName.setStringValue(stderr);
+                    
JSDLUtils.getOrCreatePOSIXApplication(value).setError(fName);
+                }
+                       }
+               }
+       }
+       
+       public static String getUserNameFromContext(ProcessContext jobContext) {
+               if(jobContext.getProcessModel() == null)
+                       return null;
+               //TODO: Extend unicore model to specify optional unix user id 
(allocation account) 
+               return "admin";
+       }
+
+       public static void addApplicationArgument(JobDefinitionType value, 
ProcessContext context, String stringPrm) {
+               if(isParallelJob(context)){             
+                       
JSDLUtils.getOrCreateSPMDApplication(value).addNewArgument().setStringValue(stringPrm);
+               }
+               else { 
+                   
JSDLUtils.getOrCreatePOSIXApplication(value).addNewArgument().setStringValue(stringPrm);
+               }
+       }
+       
+       public static String getApplicationStdOut(JobDefinitionType value, 
ProcessContext context) throws RuntimeException {
+               if (isParallelJob(context)) return 
JSDLUtils.getOrCreateSPMDApplication(value).getOutput().getStringValue();
+               else return 
JSDLUtils.getOrCreatePOSIXApplication(value).getOutput().getStringValue();
+       }
+       
+       public static String getApplicationStdErr(JobDefinitionType value, 
ProcessContext context) throws RuntimeException {
+               if (isParallelJob(context)) return 
JSDLUtils.getOrCreateSPMDApplication(value).getError().getStringValue();
+               else return 
JSDLUtils.getOrCreatePOSIXApplication(value).getError().getStringValue();
+       }
+       
+       public static void createGenericApplication(JobDefinitionType value, 
String appName) {
+        ApplicationType appType = JSDLUtils.getOrCreateApplication(value);
+        appType.setApplicationName(appName);
+    }
+       
+       public static boolean isParallelJob(ProcessContext context) {
+               
+               ApplicationDeploymentDescription appDep = 
context.getApplicationDeploymentDescription();
+               ApplicationParallelismType parallelism = 
appDep.getParallelism();
+               
+               boolean isParallel = false;
+               
+               if(parallelism.equals(ApplicationParallelismType.MPI) ||
+                  parallelism.equals(ApplicationParallelismType.OPENMP_MPI) ||
+                  parallelism.equals(ApplicationParallelismType.OPENMP )) {
+                       isParallel = true;
+               }
+               
+               return isParallel;
+       }
+       
+       private static String getInputAsString(ProcessContext context, String 
name) {
+               List<InputDataObjectType> inputList = 
context.getProcessModel().getProcessInputs();
+               String value = null;
+               for (Iterator<InputDataObjectType> iterator = 
inputList.iterator(); iterator.hasNext();) {
+                       InputDataObjectType inputDataObjectType = iterator
+                                       .next();
+                       if (inputDataObjectType.getName().equals(name)) {
+                               value = inputDataObjectType.getValue();
+                               break;
+                       }
+               }
+               return value;
+       }
+       
+       
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/BESConstants.java
----------------------------------------------------------------------
diff --git 
a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/BESConstants.java
 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/BESConstants.java
new file mode 100644
index 0000000..5f3991e
--- /dev/null
+++ 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/BESConstants.java
@@ -0,0 +1,45 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+public interface BESConstants {
+       
+       public static final String PROP_SMS_EPR = "unicore.sms.epr";
+       
+       public static final String PROP_BES_URL = "bes.factory.url";
+
+       public static final String PROP_ACTIVITY_INFO = "bes.activity.info";
+       
+       public static final String PROP_CLIENT_CONF = "bes.client.config";
+       
+       public static final String PROP_CA_CERT_PATH = "bes.ca.cert.path";
+       
+       public static final String PROP_CA_KEY_PATH = "bes.ca.key.path";
+       
+       public static final String PROP_CA_KEY_PASS = "bes.ca.key.pass";
+       
+       public static final String NUMBER_OF_PROCESSES = "NumberOfProcesses";
+       
+       public static final String PROCESSES_PER_HOST = "ProcessesPerHost";
+
+       
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/DataTransferrer.java
----------------------------------------------------------------------
diff --git 
a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/DataTransferrer.java
 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/DataTransferrer.java
new file mode 100644
index 0000000..736d982
--- /dev/null
+++ 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/DataTransferrer.java
@@ -0,0 +1,328 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import de.fzj.unicore.uas.client.StorageClient;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.process.ProcessModel;
+import org.apache.airavata.registry.cpi.ExpCatChildDataType;
+import org.apache.airavata.registry.cpi.ExperimentCatalog;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.apache.airavata.worker.core.exceptions.WorkerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Data movement utility class for transferring files before and after the job 
execution phase.   
+ * 
+ * */
+public class DataTransferrer {
+   
+       protected final Logger log = LoggerFactory.getLogger(this.getClass());
+
+       protected ProcessContext processContext;
+       
+       protected StorageClient storageClient;
+       
+       protected List<OutputDataObjectType> resultantOutputsLst;
+       
+       protected String gatewayDownloadLocation, stdoutLocation, 
stderrLocation;
+       
+       public DataTransferrer(ProcessContext processContext, StorageClient 
storageClient) {
+               this.processContext = processContext;
+               this.storageClient = storageClient;
+               resultantOutputsLst = new ArrayList<OutputDataObjectType>();
+               initStdoutsLocation();
+       }
+       
+       private void initStdoutsLocation() {
+
+               gatewayDownloadLocation = getDownloadLocation();
+               
+               String stdout = processContext.getStdoutLocation();
+               String stderr = processContext.getStderrLocation();
+
+               if(stdout != null) {
+                       stdout = stdout.substring(stdout.lastIndexOf('/')+1);
+               }
+               
+               if(stderr != null) {
+                       stderr = stderr.substring(stderr.lastIndexOf('/')+1);
+               }
+               
+               String stdoutFileName = (stdout == null || stdout.equals("")) ? 
"stdout"
+                               : stdout;
+               String stderrFileName = (stdout == null || stderr.equals("")) ? 
"stderr"
+                               : stderr;
+               
+               stdoutLocation = 
gatewayDownloadLocation+File.separator+stdoutFileName;
+               
+               stderrLocation = 
gatewayDownloadLocation+File.separator+stderrFileName;
+
+        List<OutputDataObjectType> processOutputs = 
processContext.getProcessModel().getProcessOutputs();
+        if (processOutputs != null && !processOutputs.isEmpty()){
+            for (OutputDataObjectType processOutput : processOutputs){
+                if (processOutput.getType().equals(DataType.STDOUT)){
+                    processOutput.setValue(stdoutLocation);
+                }
+                if (processOutput.getType().equals(DataType.STDERR)){
+                    processOutput.setValue(stderrLocation);
+                }
+
+            }
+        }
+       }
+
+    public void uploadLocalFiles() throws WorkerException {
+        List<String> inFilePrms = new ArrayList<>();
+        // FIXME - remove hard coded file path.
+        inFilePrms.addAll(extractInFileParams());
+//        
inFilePrms.add("file://home/airavata/test/hpcinput-localhost-uslims3_cauma3d-00950.tar");
+        for (String uri : inFilePrms) {
+            String fileName = new File(uri).getName();
+            if (uri.startsWith("file")) {
+                try {
+                    String uriWithoutProtocol = 
uri.substring(uri.lastIndexOf("://") + 2, uri.length());
+                    FileUploader fileUploader = new 
FileUploader(uriWithoutProtocol, fileName, Mode.overwrite, false);
+                    log.info("Uploading file {}", fileName);
+                    fileUploader.perform(storageClient);
+                } catch (FileNotFoundException e3) {
+                    throw new WorkerException(
+                            "Error while staging-in, local file "+fileName+" 
not found", e3);
+                } catch (Exception e) {
+                    throw new WorkerException("Cannot upload files", e);
+
+                }
+
+            }
+        }
+    }
+
+    public List<String> extractInFileParams() {
+        List<String> filePrmsList = new ArrayList<String>();
+        List<InputDataObjectType> applicationInputs = 
processContext.getProcessModel().getProcessInputs();
+        if (applicationInputs != null && !applicationInputs.isEmpty()){
+            for (InputDataObjectType output : applicationInputs){
+                if(output.getType().equals(DataType.URI)) {
+                    filePrmsList.add(output.getValue());
+                }
+            }
+        }
+        return filePrmsList;
+    }
+
+    public void setStorageClient(StorageClient sc){
+        storageClient = sc;
+    }
+
+    public void downloadStdOuts()  throws WorkerException{
+
+        String stdoutFileName = new File(stdoutLocation).getName();
+
+        String stderrFileName = new File(stderrLocation).getName();
+
+        FileDownloader f1 = null;
+        log.info("Downloading stdout and stderr..");
+        log.info(stdoutFileName + " -> " + stdoutLocation);
+
+        f1 = new FileDownloader(stdoutFileName, stdoutLocation, 
Mode.overwrite);
+        try {
+            f1.perform(storageClient);
+//            String stdoutput = readFile(stdoutLocation);
+        } catch (Exception e) {
+            log.error("Error while downloading " + stdoutFileName + " to 
location " + stdoutLocation, e);
+        }
+
+        log.info(stderrFileName + " -> " + stderrLocation);
+        f1.setFrom(stderrFileName);
+        f1.setTo(stderrLocation);
+        try {
+            f1.perform(storageClient);
+//            String stderror = readFile(stderrLocation);
+        } catch (Exception e) {
+            log.error("Error while downloading " + stderrFileName + " to 
location " + stderrLocation);
+        }
+        String scriptExitCodeFName = "UNICORE_SCRIPT_EXIT_CODE";
+        String scriptCodeLocation = gatewayDownloadLocation + File.separator + 
scriptExitCodeFName;
+        if (UASDataStagingProcessor.isUnicoreEndpoint(processContext)) {
+            f1.setFrom(scriptExitCodeFName);
+            f1.setTo(scriptCodeLocation);
+            try {
+                f1.perform(storageClient);
+                OutputDataObjectType output = new OutputDataObjectType();
+                output.setName(scriptExitCodeFName);
+                output.setValue(scriptCodeLocation);
+                output.setType(DataType.URI);
+                output.setIsRequired(true);
+                
processContext.getProcessModel().getProcessOutputs().add(output);
+                log.info("UNICORE_SCRIPT_EXIT_CODE -> " + scriptCodeLocation);
+                log.info("EXIT CODE: " + readFile(scriptCodeLocation));
+            } catch (Exception e) {
+                log.error("Error downloading file " + scriptExitCodeFName + " 
to location " + scriptCodeLocation, e);
+            }
+        }
+    }
+
+    private String readFile(String localFile) throws IOException {
+        BufferedReader instream = new BufferedReader(new 
FileReader(localFile));
+        StringBuffer buff = new StringBuffer();
+        String temp = null;
+        while ((temp = instream.readLine()) != null) {
+            buff.append(temp);
+            buff.append(Constants.NEWLINE);
+        }
+
+        log.info("finish read file:" + localFile);
+
+        return buff.toString();
+    }
+
+       private String getDownloadLocation() {
+               ProcessModel processModel = processContext.getProcessModel();
+               String outputDataDir = "";
+
+               if (processContext.getOutputDir() != null ) {
+
+                       outputDataDir = processContext.getOutputDir();
+                       
+                       
+                       if ("".equals(outputDataDir)) {
+                               outputDataDir = getTempPath();
+                       }
+
+                       else {
+                               
+                               // in case of remote locations use the tmp 
location
+                               if (outputDataDir.startsWith("scp:") || 
+                                               
outputDataDir.startsWith("ftp:") ||
+                                               
outputDataDir.startsWith("gsiftp:")) {
+                                               outputDataDir = getTempPath();
+                               } else if ( outputDataDir.startsWith("file:")  
&& 
+                                                    
outputDataDir.contains("@")){
+                                                       outputDataDir = 
getTempPath();
+                                       
+                               } else {
+                                       try {
+                                               URI u = new URI(outputDataDir);
+                                               outputDataDir = u.getPath();
+                                       } catch (URISyntaxException e) {
+                                               outputDataDir = getTempPath();
+                                       }
+                               }
+                       }
+               }
+               
+               File file = new File(outputDataDir);
+               if(!file.exists()){
+                       file.mkdirs();  
+               }
+
+               
+               return outputDataDir;
+       }
+
+       private String getTempPath() {
+               String tmpOutputDir = File.separator + "tmp" + File.separator
+                               + processContext.getProcessId();
+               (new File(tmpOutputDir)).mkdirs();
+               return tmpOutputDir;
+       }
+
+    public List<OutputDataObjectType> downloadRemoteFiles() throws 
WorkerException {
+
+        if(log.isDebugEnabled()) {
+            log.debug("Download location is:" + gatewayDownloadLocation);
+        }
+
+        List<OutputDataObjectType> applicationOutputs = 
processContext.getProcessModel().getProcessOutputs();
+        if (applicationOutputs != null && !applicationOutputs.isEmpty()){
+            for (OutputDataObjectType output : applicationOutputs){
+                if("".equals(output.getValue()) || output.getValue() == null) {
+                    continue;
+                }
+                if(output.getType().equals(DataType.STDOUT)) {
+                    output.setValue(stdoutLocation);
+                    resultantOutputsLst.add(output);
+                } else if(output.getType().equals(DataType.STDERR)) {
+                    output.setValue(stderrLocation);
+                    resultantOutputsLst.add(output);
+                } else if (output.getType().equals(DataType.URI)) {
+                    String value = null;
+                    if (!output.getLocation().isEmpty()) {
+                        value = output.getLocation() + File.separator + 
output.getValue();
+                    } else {
+                        value = output.getValue();
+                    }
+                    String outputPath = gatewayDownloadLocation + 
File.separator + output.getValue();
+                    File f = new File(gatewayDownloadLocation);
+                    if (!f.exists())
+                        f.mkdirs();
+
+                    FileDownloader fileDownloader = new FileDownloader(value, 
outputPath, Mode.overwrite);
+                    try {
+                        log.info("Downloading file {}", value);
+                        fileDownloader.perform(storageClient);
+                        output.setType(DataType.URI);
+                        output.setValue(outputPath);
+                        resultantOutputsLst.add(output);
+                    } catch (Exception e) {
+                        log.error("Error downloading " + value + " from job 
working directory. ");
+//                        throw new WorkerException(e.getLocalizedMessage(),e);
+                    }
+                } else {
+                    log.info("Ignore output file {}, type {}", 
output.getValue(), output.getType().toString());
+                }
+
+            }
+
+        }
+
+        downloadStdOuts();
+        return resultantOutputsLst;
+
+    }
+
+    public void publishFinalOutputs() throws WorkerException {
+        try {
+            if(!resultantOutputsLst.isEmpty()) {
+                log.debug("Publishing the list of outputs to the registry 
instance..");
+                ExperimentCatalog experimentCatalog = 
processContext.getExperimentCatalog();
+                experimentCatalog.add(ExpCatChildDataType.EXPERIMENT_OUTPUT, 
resultantOutputsLst, processContext.getExperimentId());
+            }
+        } catch (RegistryException e) {
+            throw new WorkerException("Cannot publish outputs to the 
registry.");
+        }
+
+
+    }
+       
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileDownloader.java
----------------------------------------------------------------------
diff --git 
a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileDownloader.java
 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileDownloader.java
new file mode 100644
index 0000000..937b1c1
--- /dev/null
+++ 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileDownloader.java
@@ -0,0 +1,255 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import de.fzj.unicore.uas.client.FileTransferClient;
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.uas.client.UFTPConstants;
+import de.fzj.unicore.uas.client.UFTPFileTransferClient;
+import de.fzj.unicore.uas.fts.FiletransferOptions.IMonitorable;
+import de.fzj.unicore.uas.fts.FiletransferOptions.SupportsPartialRead;
+import org.unigrids.services.atomic.types.GridFileType;
+import org.unigrids.services.atomic.types.ProtocolType;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * helper that exports remote files from a UNICORE Storage 
+ * to the local client machine.<br/>
+ * Simple wildcards ("*" and "?") and download of 
+ * directories are supported.
+ * 
+ * TODO this should be refactored so the single-file download logic 
+ * is separated from the wildcard/directory/provided outputStream logic
+ * 
+ * @author schuller
+ */
+public class FileDownloader extends FileTransferBase{
+
+       private boolean showProgress=true;
+
+       private boolean forceFileOnly=false;
+       
+       private OutputStream targetStream=null;
+       
+       public FileDownloader(String from, String to, Mode mode){
+               this(from,to,mode,true);
+       }
+       
+       public FileDownloader(String from, String to, Mode mode, boolean 
failOnError){
+               this.to=to;
+               this.from=from;
+               this.mode=mode;
+               this.failOnError=failOnError;
+       }
+       
+       public void perform(StorageClient sms)throws Exception{
+               boolean isWildcard=hasWildCards(from);
+               boolean isDirectory=false;
+               GridFileType gridSource=null;
+               if(isWildcard){
+                       performWildCardExport(sms);
+               }
+               else {
+                       //check if source is a directory
+                       gridSource=sms.listProperties(from);
+                       isDirectory=gridSource.getIsDirectory();
+                       if(isDirectory){
+                               if(forceFileOnly){
+                                       throw new IOException("Source is a 
directory");
+                               }
+                               performDirectoryExport(gridSource, new 
File(to), sms);
+                       }
+                       else{
+                               download(gridSource,new File(to),sms);
+                       }
+               }       
+       }
+       
+       protected void performDirectoryExport(GridFileType directory, File 
targetDirectory, StorageClient sms)throws Exception{
+               if(!targetDirectory.exists()|| !targetDirectory.canWrite()){
+                       throw new IOException("Target directory <"+to+"> does 
not exist or is not writable!");
+               }
+               if(!targetDirectory.isDirectory()){
+                       throw new IOException("Target <"+to+"> is not a 
directory!");
+               }
+               GridFileType[]gridFiles=sms.listDirectory(directory.getPath());
+               for(GridFileType file: gridFiles){
+                       if(file.getIsDirectory()){
+                               if(!recurse) {
+                                       System.out.println("Skipping directory 
"+file.getPath());
+                                       continue;
+                               }
+                               else{
+                                       File newTargetDirectory=new 
File(targetDirectory,getName(file.getPath()));
+                                       boolean 
success=newTargetDirectory.mkdirs();
+                                       if(!success)throw new IOException("Can 
create directory: "+newTargetDirectory.getAbsolutePath());
+                                       performDirectoryExport(file, 
newTargetDirectory, sms);
+                                       continue;
+                               }
+                       }
+                       download(file, new 
File(targetDirectory,getName(file.getPath())), sms);
+               }
+       }
+       
+       protected void performWildCardExport(StorageClient sms)throws Exception{
+               String dir=getDir(from);
+               if(dir==null)dir="/";
+               GridFileType[] files=sms.find(dir, false, from, false, null, 
null);
+               File targetDir=targetStream==null?new File(to):null;
+               if(targetStream==null){
+                       if(!targetDir.isDirectory())throw new 
IOException("Target is not a directory.");
+               }
+               for(GridFileType f: files){
+                       download(f, targetDir, sms);
+               }
+       }       
+       
+       private String getDir(String path){
+               return new File(path).getParent();
+       }
+       
+       private String getName(String path){
+               return new File(path).getName();
+       }
+       
+       /**
+        * download a single regular file
+        * 
+        * @param source - grid file descriptor
+        * @param localFile - local file or directory to write to
+        * @param sms
+        * @throws Exception
+        */
+       private void download(GridFileType source, File localFile, 
StorageClient sms)throws Exception{
+               if(source==null || source.getIsDirectory()){
+                       throw new IllegalStateException("Source="+source); 
+               }
+               
+               OutputStream os=targetStream!=null?targetStream:null;
+               FileTransferClient ftc=null;
+               try{
+                       String path=source.getPath();
+                       if(targetStream==null){
+                               if(localFile.isDirectory()){
+                                       localFile=new 
File(localFile,getName(source.getPath()));
+                               }
+                               if(mode.equals(Mode.nooverwrite) && 
localFile.exists()){
+                                       System.out.println("File exists and 
creation mode was set to 'nooverwrite'.");
+                                       return; 
+                               }
+                               System.out.println("Downloading remote file 
'"+sms.getUrl()+"#/"+path+"' -> "+localFile.getAbsolutePath());
+                               os=new 
FileOutputStream(localFile.getAbsolutePath(), mode.equals(Mode.append));
+                       }
+                       
+                       
chosenProtocol=sms.findSupportedProtocol(preferredProtocols.toArray(new 
ProtocolType.Enum[preferredProtocols.size()]));
+                       
Map<String,String>extraParameters=makeExtraParameters(chosenProtocol);
+                       ftc=sms.getExport(path,extraParameters,chosenProtocol);
+                       configure(ftc, extraParameters);
+                       System.out.println("DEB:File transfer URL : 
"+ftc.getUrl());
+//                     ProgressBar p=null;
+                       if(ftc instanceof IMonitorable  && showProgress){
+                               long size=ftc.getSourceFileSize();
+                               if(isRange()){
+                                       size=getRangeSize();
+                               }
+//                             p=new ProgressBar(localFile.getName(),size,msg);
+//                             ((IMonitorable) ftc).setProgressListener(p);
+                       }
+                       long startTime=System.currentTimeMillis();
+                       if(isRange()){
+                               if(!(ftc instanceof SupportsPartialRead)){
+                                       throw new Exception("Byte range is 
defined but protocol does not allow " +
+                                                       "partial read! Please 
choose a different protocol!");
+                               }
+                               System.out.println("Byte range: "+startByte+" - 
"+(getRangeSize()>0?endByte:""));
+                               SupportsPartialRead 
pReader=(SupportsPartialRead)ftc;
+                               pReader.readPartial(startByte, 
endByte-startByte+1, os);
+                       }
+                       else{
+                               ftc.readAllData(os);
+                       }
+//                     if(p!=null){
+//                             p.finish();
+//                     }
+                       if(timing){
+                               long 
duration=System.currentTimeMillis()-startTime;
+                               double 
rate=(double)localFile.length()/(double)duration;
+                               System.out.println("Rate: " +rate+ " kB/sec.");
+                       }
+                       if(targetStream==null)copyProperties(source, localFile);
+               }
+               finally{
+                       try{ 
+                               if(targetStream==null && os!=null){
+                                       os.close();
+                               }
+                       }catch(Exception ignored){}
+                       if(ftc!=null){
+                               try{
+                                       ftc.destroy();
+                               }catch(Exception e1){
+//                                     System.out.println("Could not destroy 
the filetransfer client",e1);
+                               }
+                       }
+               }
+       }
+
+       /**
+        * if possible, copy the remote executable flag to the local file
+        * @throws Exception
+        */
+       private void copyProperties(GridFileType source, File localFile)throws 
Exception{
+               try{
+                       
localFile.setExecutable(source.getPermissions().getExecutable());
+               }
+               catch(Exception ex){
+                       //TODO: logging
+//                     ("Can't set 'executable' flag for 
"+localFile.getName(), ex);
+               }
+       }
+       
+       private void configure(FileTransferClient ftc, 
Map<String,String>params){
+               if(ftc instanceof UFTPFileTransferClient){
+                       UFTPFileTransferClient u=(UFTPFileTransferClient)ftc;
+                       String secret=params.get(UFTPConstants.PARAM_SECRET);
+                       u.setSecret(secret);
+               }
+       }
+
+       public void setShowProgress(boolean showProgress) {
+               this.showProgress = showProgress;
+       }
+
+       public void setForceFileOnly(boolean forceFileOnly) {
+               this.forceFileOnly = forceFileOnly;
+       }
+
+       public void setTargetStream(OutputStream targetStream) {
+               this.targetStream = targetStream;
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileTransferBase.java
----------------------------------------------------------------------
diff --git 
a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileTransferBase.java
 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileTransferBase.java
new file mode 100644
index 0000000..c76ab74
--- /dev/null
+++ 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileTransferBase.java
@@ -0,0 +1,223 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.uas.util.PropertyHelper;
+import org.unigrids.services.atomic.types.GridFileType;
+import org.unigrids.services.atomic.types.ProtocolType;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.*;
+import java.util.regex.Pattern;
+
+public class FileTransferBase {
+
+       protected Properties extraParameterSource;
+
+       protected boolean timing=false;
+
+       protected boolean recurse=false;
+
+       protected String from;
+
+       protected String to;
+
+       //index of first byte to download
+       protected Long startByte;
+       
+       //index of last byte to download
+       protected Long endByte;
+       
+       /**
+        * the creation mode
+        */
+       protected Mode mode;
+
+       /**
+        * whether the job processing should fail if an error occurs
+        */
+       protected boolean failOnError;
+
+       protected List<ProtocolType.Enum> preferredProtocols=new 
ArrayList<ProtocolType.Enum>();
+
+       public FileTransferBase(){
+               preferredProtocols.add(ProtocolType.BFT);
+       }
+
+       protected Map<String,String>makeExtraParameters(ProtocolType.Enum 
protocol){
+               Map<String, String> res;
+               if(extraParameterSource==null){
+                       res=new HashMap<String, String>();
+               }
+               else{
+                       String p=String.valueOf(protocol);
+                       PropertyHelper ph=new 
PropertyHelper(extraParameterSource, new String[]{p,p.toLowerCase()});
+                       res= ph.getFilteredMap();
+               }
+               if(res.size()>0){
+                       // TODO: change it to logger 
+                       System.out.println("Have "+res.size()+" extra 
parameters for protocol "+protocol);
+               }
+               return res;
+       }
+       
+       
+       public String getTo() {
+               return to;
+       }
+
+       public String getFrom() {
+               return from;
+       }
+
+       public void setTo(String to) {
+               this.to = to;
+       }
+
+       public void setFrom(String from) {
+               this.from = from;
+       }
+
+       public Mode getMode() {
+               return mode;
+       }
+
+       public boolean isFailOnError() {
+               return failOnError;
+       }
+
+       public boolean isTiming() {
+               return timing;
+       }
+
+       public void setTiming(boolean timing) {
+               this.timing = timing;
+       }
+
+       public void setFailOnError(boolean failOnError) {
+               this.failOnError = failOnError;
+       }
+
+       public List<ProtocolType.Enum> getPreferredProtocols() {
+               return preferredProtocols;
+       }
+
+       public void setPreferredProtocols(List<ProtocolType.Enum> 
preferredProtocols) {
+               this.preferredProtocols = preferredProtocols;
+       }
+
+       public void setExtraParameterSource(Properties properties){
+               this.extraParameterSource=properties;
+       }
+
+       public void setRecurse(boolean recurse) {
+               this.recurse = recurse;
+       }
+       /**
+        * check if the given path denotes a valid remote directory
+        * @param remotePath - the path
+        * @param sms - the storage
+        * @return <code>true</code> if the remote directory exists and is a 
directory
+        */
+       protected boolean isValidDirectory(String remotePath, StorageClient 
sms){
+               boolean result=false;
+               if(! ("/".equals(remotePath) || ".".equals(remotePath)) ){
+                       try{
+                               GridFileType gft=sms.listProperties(remotePath);
+                               result=gft.getIsDirectory();
+                       }catch(Exception ex){
+                               result=false;
+                       }
+               }
+               else result=true;
+               
+               return result;
+       }
+       
+       public File[] resolveWildCards(File original){
+               final String name=original.getName();
+               if(!hasWildCards(original))return new File[]{original};
+               File parent=original.getParentFile();
+               if(parent==null)parent=new File(".");
+               FilenameFilter filter=new FilenameFilter(){
+                       Pattern p=createPattern(name);
+                       public boolean accept(File file, String name){
+                               return p.matcher(name).matches();
+                       }
+               };
+               return parent.listFiles(filter);
+       }
+
+       protected boolean hasWildCards(File file){
+               return hasWildCards(file.getName());
+       }
+
+       public boolean hasWildCards(String name){
+               return name.contains("*") || name.contains("?");
+       }
+
+       private Pattern createPattern(String nameWithWildcards){
+               String regex=nameWithWildcards.replace("?",".").replace("*", 
".*");
+               return Pattern.compile(regex);
+       }
+       
+       protected ProtocolType.Enum chosenProtocol=null;
+       
+       public ProtocolType.Enum getChosenProtocol(){
+               return chosenProtocol;
+       }
+
+       public Long getStartByte() {
+               return startByte;
+       }
+
+       public void setStartByte(Long startByte) {
+               this.startByte = startByte;
+       }
+
+       public Long getEndByte() {
+               return endByte;
+       }
+
+       public void setEndByte(Long endByte) {
+               this.endByte = endByte;
+       }
+       
+       /**
+        * checks if a byte range is defined
+        * @return <code>true</code> iff both startByte and endByte are defined
+        */
+       protected boolean isRange(){
+               return startByte!=null && endByte!=null;
+       }
+       
+       /**
+        * get the number of bytes in the byte range, or "-1" if the range is 
open-ended
+        * @return
+        */
+       protected long getRangeSize(){
+               if(Long.MAX_VALUE==endByte)return -1;
+               return endByte-startByte;
+       }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileUploader.java
----------------------------------------------------------------------
diff --git 
a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileUploader.java
 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileUploader.java
new file mode 100644
index 0000000..d899b37
--- /dev/null
+++ 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileUploader.java
@@ -0,0 +1,242 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import de.fzj.unicore.uas.client.FileTransferClient;
+import de.fzj.unicore.uas.client.StorageClient;
+import de.fzj.unicore.uas.client.UFTPConstants;
+import de.fzj.unicore.uas.client.UFTPFileTransferClient;
+import de.fzj.unicore.uas.fts.FiletransferOptions.IMonitorable;
+import org.unigrids.services.atomic.types.ProtocolType;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * upload local file(s) to a remote location
+ *
+ * @author schuller
+ */
+public class FileUploader extends FileTransferBase{
+
+       public FileUploader(String from, String to, Mode mode)throws 
FileNotFoundException{
+               this(from,to,mode,true);
+       }
+
+       public FileUploader(String from, String to, Mode mode, boolean 
failOnError)throws FileNotFoundException{
+               this.to=to;
+               this.from=from;
+               this.mode=mode;
+               this.failOnError=failOnError;
+               checkOK();
+       }
+
+       public String getFrom() {
+               return from;
+       }
+
+       public String getTo() {
+               return to;
+       }
+
+
+       public void perform(StorageClient sms)throws Exception{
+               File fileSpec=new File(from);
+               boolean hasWildCards=false;
+               boolean isDirectory=fileSpec.isDirectory();
+               File[] fileset=null;
+               
+               if(!isDirectory){
+                       hasWildCards=hasWildCards(fileSpec);
+               }
+               
+               
chosenProtocol=sms.findSupportedProtocol(preferredProtocols.toArray(new 
ProtocolType.Enum[preferredProtocols.size()]));
+               
Map<String,String>extraParameters=makeExtraParameters(chosenProtocol);
+
+               if(!hasWildCards && !isDirectory){
+                       //single regular file
+                       
uploadFile(fileSpec,to,sms,chosenProtocol,extraParameters);
+                       return;
+               }
+               
+               //handle wildcards or directory
+               if(hasWildCards){
+                       fileset=resolveWildCards(fileSpec);
+               }
+               else{
+                       fileset=fileSpec.listFiles();
+               }
+               
+               if(!isValidDirectory(to, sms)){
+                       throw new IOException("The specified remote target 
'"+to+"' is not a directory");
+               }
+               if(to==null)to="/";
+               String target=isDirectory?to+"/"+fileSpec.getName():to;
+               sms.createDirectory(target);
+               uploadFiles(fileset,target,sms,chosenProtocol,extraParameters);
+       }
+
+       /**
+        * upload a set of files to a remote directory (which must exist)
+        * 
+        * @param files
+        * @param remoteDirectory
+        * @param sms
+        * @param protocol
+        * @param extraParameters
+        * @throws Exception
+        */
+       private void uploadFiles(File[]files, String remoteDirectory, 
StorageClient sms, ProtocolType.Enum protocol, 
+                       Map<String,String>extraParameters)throws Exception{
+               for(File localFile: files){
+                       String target=remoteDirectory+"/"+localFile.getName();
+                       if(localFile.isDirectory()){
+                               if(!recurse){
+                                       System.out.println("Skipping directory 
"+localFile.getAbsolutePath());
+                               }else{
+                                       File[] fileset=localFile.listFiles();
+                                       sms.createDirectory(target);
+                                       
uploadFiles(fileset,target,sms,protocol,extraParameters);
+                               }
+                       }else{
+                               
uploadFile(localFile,target,sms,protocol,extraParameters);
+                       }
+               }
+       }
+
+       /**
+        * uploads a single regular file
+        * 
+        * @param localFile
+        * @param remotePath
+        * @param sms
+        * @param protocol
+        * @param extraParameters
+        * @throws Exception
+        */
+       private void uploadFile(File localFile, String remotePath, 
StorageClient sms, ProtocolType.Enum protocol, 
+                       Map<String,String>extraParameters) throws Exception{
+               long startTime=System.currentTimeMillis();
+               FileInputStream is=null;
+               FileTransferClient ftc=null;
+               try{
+                       if(remotePath==null){
+                               remotePath="/"+localFile.getName();
+                       }
+                       else if(remotePath.endsWith("/")){
+                               remotePath+=localFile.getName();
+                       }
+                       System.out.println("Uploading local file 
'"+localFile.getAbsolutePath()+"' -> '"+sms.getUrl()+"#"+remotePath+"'");
+                       is=new FileInputStream(localFile.getAbsolutePath());
+                       boolean append=Mode.append.equals(mode);
+                       ftc=sms.getImport(remotePath, append, extraParameters, 
protocol);
+                       configure(ftc, extraParameters);
+                       if(append)ftc.setAppend(true);
+                       String url=ftc.getUrl();
+                       System.out.println("File transfer URL : "+url);
+//                     ProgressBar p=null;
+                       if(ftc instanceof IMonitorable){
+                               long size=localFile.length();
+                               if(isRange()){
+                                       size=getRangeSize();
+                               }
+//                             p=new ProgressBar(localFile.getName(),size,msg);
+//                             ((IMonitorable) ftc).setProgressListener(p);
+                       }
+                       if(isRange()){
+                               System.out.println("Byte range: "+startByte+" - 
"+(getRangeSize()>0?endByte:""));
+                               long skipped=0;
+                               while(skipped<startByte){
+                                       skipped+=is.skip(startByte);
+                               }
+                               ftc.writeAllData(is, endByte-startByte+1);
+                               
+                       }else{
+                               ftc.writeAllData(is);
+                       }
+                       copyProperties(localFile, sms, remotePath);
+                       
+//                     if(ftc instanceof IMonitorable){
+//                             p.finish();
+//                     }
+                       
+               }finally{
+                       if(ftc!=null){
+                               try{
+                                       ftc.destroy();
+                               }catch(Exception e1){
+//                                     msg.error("Could not clean-up the 
filetransfer at <"+ftc.getUrl()+">",e1);
+                               }
+                       }
+                       try{ if(is!=null)is.close(); }catch(Exception ignored){}
+               }
+               if(timing){
+                       long duration=System.currentTimeMillis()-startTime;
+                       double rate=(double)localFile.length()/(double)duration;
+                       System.out.println("Rate: "+rate+ " kB/sec.");
+               }
+       }
+
+       /**
+        * if possible, copy the local executable flag to the remote file
+        * @param sourceFile - local file
+        * @throws Exception
+        */
+       private void copyProperties(File sourceFile, StorageClient sms, String 
target)throws Exception{
+               boolean x=sourceFile.canExecute();
+               try{
+                       if(x){
+                               sms.changePermissions(target, true, true, x);
+                       }
+               }catch(Exception ex){
+//                     System.out.println("Can't set exectuable flag on remote 
file.",ex);
+               }
+       }
+
+       private void checkOK()throws FileNotFoundException{
+               if(!failOnError){
+                       return;
+               }
+               File orig=new File(from);
+               if(!orig.isAbsolute()){
+                       orig=new File(System.getProperty("user.dir"),from);
+               }
+               File[] files=resolveWildCards(orig);
+               if(files==null){
+                       throw new FileNotFoundException("Local import 
'"+from+"' does not exist.");
+               }
+               for(File f: files){
+                       if(!f.exists())throw new FileNotFoundException("Local 
import '"+from+"' does not exist.");
+               }
+       }
+       
+       private void configure(FileTransferClient ftc, 
Map<String,String>params){
+               if(ftc instanceof UFTPFileTransferClient){
+                       UFTPFileTransferClient u=(UFTPFileTransferClient)ftc;
+                       String secret=params.get(UFTPConstants.PARAM_SECRET);
+                       u.setSecret(secret);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLGenerator.java
----------------------------------------------------------------------
diff --git 
a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLGenerator.java
 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLGenerator.java
new file mode 100644
index 0000000..de96104
--- /dev/null
+++ 
b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLGenerator.java
@@ -0,0 +1,115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.worker.task.jobsubmission.utils.bes;
+
+import org.apache.airavata.worker.core.context.ProcessContext;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionDocument;
+import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * 
+ * Utility class generates a JSDL instance from JobExecutionContext instance
+ * 
+ * @author shahbaz memon
+ * 
+ * */
+
+public class JSDLGenerator implements BESConstants {
+
+       protected final Logger log = LoggerFactory.getLogger(this.getClass());
+
+       public synchronized static JobDefinitionDocument 
buildJSDLInstance(ProcessContext context) throws Exception {
+
+               JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory
+                               .newInstance();
+               JobDefinitionType value = jobDefDoc.addNewJobDefinition();
+
+               
+               // build Identification
+               createJobIdentification(value, context);
+
+               ResourceProcessor.generateResourceElements(value, context);
+
+               ApplicationProcessor.generateJobSpecificAppElements(value, 
context);
+
+               
+               return jobDefDoc;
+       }
+
+       public synchronized static JobDefinitionDocument 
buildJSDLInstance(ProcessContext context, String smsUrl) throws Exception {
+
+               JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory
+                               .newInstance();
+               JobDefinitionType value = jobDefDoc.addNewJobDefinition();
+
+               
+               // build Identification
+               createJobIdentification(value, context);
+
+               ResourceProcessor.generateResourceElements(value, context);
+
+               ApplicationProcessor.generateJobSpecificAppElements(value, 
context);
+
+               UASDataStagingProcessor.generateDataStagingElements(value, 
context, smsUrl);
+
+               return jobDefDoc;
+       }
+
+       public synchronized static JobDefinitionDocument buildJSDLInstance(
+            ProcessContext context, String smsUrl, Object jobDirectoryMode)
+                       throws Exception {
+
+               JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory
+                               .newInstance();
+               JobDefinitionType value = jobDefDoc.addNewJobDefinition();
+
+               // build Identification
+               createJobIdentification(value, context);
+
+               ResourceProcessor.generateResourceElements(value, context);
+
+               ApplicationProcessor.generateJobSpecificAppElements(value, 
context);
+
+               UASDataStagingProcessor.generateDataStagingElements(value, 
context,
+                               smsUrl);
+
+               return jobDefDoc;
+       }
+
+       private static void createJobIdentification(JobDefinitionType value, 
ProcessContext context) {
+
+               if (context != null) {
+                       if (context.getAllocationProjectNumber() != null)
+                               JSDLUtils.addProjectName(value, 
context.getAllocationProjectNumber());
+                       
+                       if (context.getApplicationInterfaceDescription() != 
null && 
context.getApplicationInterfaceDescription().getApplicationDescription() != 
null)
+                               
JSDLUtils.getOrCreateJobIdentification(value).setDescription(context.getApplicationInterfaceDescription().getApplicationDescription());
+                       
+                       if (context.getApplicationInterfaceDescription() != 
null && context.getApplicationInterfaceDescription().getApplicationName() != 
null)
+                               
JSDLUtils.getOrCreateJobIdentification(value).setJobName(context.getApplicationInterfaceDescription().getApplicationName());
+               }
+       }
+
+
+}
\ No newline at end of file

Reply via email to