Add implementation for BESJobSubmissionTask
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d231956e Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d231956e Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d231956e Branch: refs/heads/feature-workload-mgmt Commit: d231956e853aca385f6187f5ded5978a21d6548f Parents: 9f0e45b Author: Gourav Shenoy <[email protected]> Authored: Tue May 2 13:50:17 2017 -0400 Committer: Gourav Shenoy <[email protected]> Committed: Tue May 2 13:50:17 2017 -0400 ---------------------------------------------------------------------- modules/worker/task-jobsubmission/pom.xml | 33 ++ .../impl/BESJobSubmissionTask.java | 83 +-- .../jobsubmission/utils/bes/ActivityInfo.java | 50 ++ .../utils/bes/ApplicationProcessor.java | 221 ++++++++ .../jobsubmission/utils/bes/BESConstants.java | 45 ++ .../utils/bes/DataTransferrer.java | 328 ++++++++++++ .../jobsubmission/utils/bes/FileDownloader.java | 255 +++++++++ .../utils/bes/FileTransferBase.java | 223 ++++++++ .../jobsubmission/utils/bes/FileUploader.java | 242 +++++++++ .../jobsubmission/utils/bes/JSDLGenerator.java | 115 ++++ .../task/jobsubmission/utils/bes/JSDLUtils.java | 517 ++++++++++++++++++ .../task/jobsubmission/utils/bes/Mode.java | 45 ++ .../jobsubmission/utils/bes/MyProxyLogon.java | 465 ++++++++++++++++ .../task/jobsubmission/utils/bes/OSType.java | 124 +++++ .../utils/bes/ProcessorRequirement.java | 61 +++ .../jobsubmission/utils/bes/RangeValueType.java | 271 ++++++++++ .../utils/bes/ResourceProcessor.java | 97 ++++ .../utils/bes/ResourceRequirement.java | 34 ++ .../jobsubmission/utils/bes/SPMDVariations.java | 52 ++ .../jobsubmission/utils/bes/SecurityUtils.java | 160 ++++++ .../jobsubmission/utils/bes/StorageCreator.java | 207 ++++++++ .../utils/bes/UASDataStagingProcessor.java | 182 +++++++ .../utils/bes/UNICORESecurityContext.java | 195 +++++++ .../task/jobsubmission/utils/bes/URIUtils.java | 121 +++++ .../utils/bes/X509SecurityContext.java | 340 ++++++++++++ modules/worker/worker-core/pom.xml | 6 + .../airavata/worker/core/RequestData.java | 149 ++++++ .../airavata/worker/core/SecurityContext.java | 24 + .../core/context/AbstractSecurityContext.java | 57 ++ .../airavata/worker/core/utils/SSHUtils.java | 524 +++++++++++++++++++ .../worker/core/utils/WorkerFactory.java | 12 + .../airavata/worker/core/utils/WorkerUtils.java | 64 +++ 32 files changed, 5262 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/pom.xml ---------------------------------------------------------------------- diff --git a/modules/worker/task-jobsubmission/pom.xml b/modules/worker/task-jobsubmission/pom.xml index 7d9506e..45d720e 100644 --- a/modules/worker/task-jobsubmission/pom.xml +++ b/modules/worker/task-jobsubmission/pom.xml @@ -41,6 +41,39 @@ <artifactId>aurora-client</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>eu.unicore</groupId> + <artifactId>unicore-client-wrapper</artifactId> + <version>1.7.2_1</version> + <exclusions> + <!-- <exclusion> + <groupId>org.apache.santuario</groupId> + <artifactId>xmlsec</artifactId> + </exclusion> --> + <exclusion> + <groupId>net.sf.saxon</groupId> + <artifactId>saxon</artifactId> + </exclusion> + <exclusion> + <groupId>net.sf.saxon</groupId> + <artifactId>saxon-dom</artifactId> + </exclusion> + <exclusion> + <groupId>net.sf.saxon</groupId> + <artifactId>saxon-xpath</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>net.sf.saxon</groupId> + <artifactId>Saxon-HE</artifactId> + <version>9.6.0-1</version> + </dependency> + <dependency> + <groupId>commons-httpclient</groupId> + <artifactId>commons-httpclient</artifactId> + <version>3.1</version> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java ---------------------------------------------------------------------- diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java index 2c6b984..65668c0 100644 --- a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java +++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/impl/BESJobSubmissionTask.java @@ -31,17 +31,6 @@ import de.fzj.unicore.wsrflite.xmlbeans.WSUtilities; import eu.unicore.util.httpclient.DefaultClientConfiguration; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.credential.store.store.CredentialStoreException; -import org.apache.airavata.gfac.core.GFacException; -import org.apache.airavata.gfac.core.GFacUtils; -import org.apache.airavata.gfac.core.SSHApiException; -import org.apache.airavata.gfac.core.authentication.AuthenticationInfo; -import org.apache.airavata.gfac.core.cluster.ServerInfo; -import org.apache.airavata.gfac.core.context.ProcessContext; -import org.apache.airavata.gfac.core.context.TaskContext; -import org.apache.airavata.gfac.core.task.JobSubmissionTask; -import org.apache.airavata.gfac.core.task.TaskException; -import org.apache.airavata.gfac.impl.Factory; -import org.apache.airavata.gfac.impl.SSHUtils; import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission; @@ -59,7 +48,21 @@ import org.apache.airavata.model.task.TaskTypes; import org.apache.airavata.registry.cpi.AppCatalogException; import org.apache.airavata.registry.cpi.ExperimentCatalogModelType; import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.airavata.worker.core.authentication.AuthenticationInfo; +import org.apache.airavata.worker.core.cluster.ServerInfo; +import org.apache.airavata.worker.core.context.ProcessContext; +import org.apache.airavata.worker.core.context.TaskContext; +import org.apache.airavata.worker.core.exceptions.SSHApiException; +import org.apache.airavata.worker.core.exceptions.WorkerException; +import org.apache.airavata.worker.core.task.TaskException; +import org.apache.airavata.worker.core.utils.SSHUtils; +import org.apache.airavata.worker.core.utils.WorkerFactory; +import org.apache.airavata.worker.core.utils.WorkerUtils; +import org.apache.airavata.worker.task.jobsubmission.JobSubmissionTask; +import org.apache.airavata.worker.task.jobsubmission.utils.JobSubmissionUtils; +import org.apache.airavata.worker.task.jobsubmission.utils.bes.*; import org.apache.xmlbeans.XmlCursor; +import org.ggf.schemas.bes.x2006.x08.besFactory.*; import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,7 +108,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask { // con't reuse if UserDN has been changed. secProperties = getSecurityConfig(processContext); // try secProperties = secProperties.clone() if we can't use already initialized ClientConfigurations. - } catch (GFacException e) { + } catch (WorkerException e) { String msg = "Unicorn security context initialization error"; log.error(msg, e); taskStatus.setState(TaskState.FAILED); @@ -115,10 +118,10 @@ public class BESJobSubmissionTask implements JobSubmissionTask { try { JobSubmissionProtocol protocol = processContext.getJobSubmissionProtocol(); - JobSubmissionInterface jobSubmissionInterface = GFacUtils.getPreferredJobSubmissionInterface(processContext); + JobSubmissionInterface jobSubmissionInterface = JobSubmissionUtils.getPreferredJobSubmissionInterface(processContext); String factoryUrl = null; if (protocol.equals(JobSubmissionProtocol.UNICORE)) { - UnicoreJobSubmission unicoreJobSubmission = GFacUtils.getUnicoreJobSubmission( + UnicoreJobSubmission unicoreJobSubmission = JobSubmissionUtils.getUnicoreJobSubmission( jobSubmissionInterface.getJobSubmissionInterfaceId()); factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL(); } @@ -167,8 +170,8 @@ public class BESJobSubmissionTask implements JobSubmissionTask { jobDetails.setJobDescription(activityEpr.toString()); jobDetails.setJobStatuses(Arrays.asList(new JobStatus(JobState.SUBMITTED))); processContext.setJobModel(jobDetails); - GFacUtils.saveJobModel(processContext, jobDetails); - GFacUtils.saveJobStatus(processContext, jobDetails); + JobSubmissionUtils.saveJobModel(processContext, jobDetails); + WorkerUtils.saveJobStatus(processContext, jobDetails); log.info(formatStatusMessage(activityEpr.getAddress() .getStringValue(), factory.getActivityStatus(activityEpr) .toString())); @@ -205,8 +208,8 @@ public class BESJobSubmissionTask implements JobSubmissionTask { } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) { JobState applicationJobStatus = JobState.CANCELED; jobDetails.setJobStatuses(Arrays.asList(new JobStatus(applicationJobStatus))); - GFacUtils.saveJobStatus(processContext, jobDetails); - throw new GFacException( + WorkerUtils.saveJobStatus(processContext, jobDetails); + throw new WorkerException( processContext.getExperimentId() + "Job Canceled"); } else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) { try { @@ -215,7 +218,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask { } JobState applicationJobStatus = JobState.COMPLETE; jobDetails.setJobStatuses(Arrays.asList(new JobStatus(applicationJobStatus))); - GFacUtils.saveJobStatus(processContext, jobDetails); + WorkerUtils.saveJobStatus(processContext, jobDetails); log.info("Job Id: {}, exit code: {}, exit status: {}", jobDetails.getJobId(), activityStatus.getExitCode(), ActivityStateEnumeration.FINISHED.toString()); @@ -228,7 +231,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask { if (copyOutput != null) { copyOutputFilesToStorage(taskContext, copyOutput); for (OutputDataObjectType outputDataObjectType : copyOutput) { - GFacUtils.saveExperimentOutput(processContext, outputDataObjectType.getName(), outputDataObjectType.getValue()); + WorkerUtils.saveExperimentOutput(processContext, outputDataObjectType.getName(), outputDataObjectType.getValue()); } } // dt.publishFinalOutputs(); @@ -244,13 +247,13 @@ public class BESJobSubmissionTask implements JobSubmissionTask { return taskStatus; } - private void copyOutputFilesToStorage(TaskContext taskContext, List<OutputDataObjectType> copyOutput) throws GFacException { + private void copyOutputFilesToStorage(TaskContext taskContext, List<OutputDataObjectType> copyOutput) throws WorkerException { ProcessContext pc = taskContext.getParentProcessContext(); String remoteFilePath = null, fileName = null, localFilePath = null; try { - authenticationInfo = Factory.getStorageSSHKeyAuthentication(pc); + authenticationInfo = WorkerFactory.getStorageSSHKeyAuthentication(pc); ServerInfo serverInfo = pc.getComputeResourceServerInfo(); - Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo); + Session sshSession = WorkerFactory.getSSHSession(authenticationInfo, serverInfo); for (OutputDataObjectType output : copyOutput) { switch (output.getType()) { case STDERR: case STDOUT: case STRING: case URI: @@ -259,7 +262,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask { localFilePath = localFilePath.substring(localFilePath.indexOf("://") + 2, localFilePath.length()); } fileName = localFilePath.substring(localFilePath.lastIndexOf("/") + 1); - URI destinationURI = TaskUtils.getDestinationURI(taskContext, hostName, inputPath, fileName); + URI destinationURI = WorkerUtils.getDestinationURI(taskContext, hostName, inputPath, fileName); remoteFilePath = destinationURI.getPath(); log.info("SCP local file :{} -> from remote :{}", localFilePath, remoteFilePath); SSHUtils.scpTo(localFilePath, remoteFilePath, sshSession); @@ -271,18 +274,18 @@ public class BESJobSubmissionTask implements JobSubmissionTask { } } catch (IOException | JSchException | SSHApiException | URISyntaxException | CredentialStoreException e) { log.error("Error while coping local file " + localFilePath + " to remote " + remoteFilePath, e); - throw new GFacException("Error while scp output files to remote storage file location", e); + throw new WorkerException("Error while scp output files to remote storage file location", e); } } - private void copyInputFilesToLocal(TaskContext taskContext) throws GFacException { + private void copyInputFilesToLocal(TaskContext taskContext) throws WorkerException { ProcessContext pc = taskContext.getParentProcessContext(); StorageResourceDescription storageResource = pc.getStorageResource(); if (storageResource != null) { hostName = storageResource.getHostName(); } else { - throw new GFacException("Storage Resource is null"); + throw new WorkerException("Storage Resource is null"); } inputPath = pc.getStorageFileSystemRootLocation(); inputPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator); @@ -290,9 +293,9 @@ public class BESJobSubmissionTask implements JobSubmissionTask { String remoteFilePath = null, fileName = null, localFilePath = null; URI remoteFileURI = null; try { - authenticationInfo = Factory.getStorageSSHKeyAuthentication(pc); + authenticationInfo = WorkerFactory.getStorageSSHKeyAuthentication(pc); ServerInfo serverInfo = pc.getStorageResourceServerInfo(); - Session sshSession = Factory.getSSHSession(authenticationInfo, serverInfo); + Session sshSession = WorkerFactory.getSSHSession(authenticationInfo, serverInfo); List<InputDataObjectType> processInputs = pc.getProcessModel().getProcessInputs(); for (InputDataObjectType input : processInputs) { @@ -308,11 +311,11 @@ public class BESJobSubmissionTask implements JobSubmissionTask { } } catch (IOException | JSchException | SSHApiException | URISyntaxException e) { log.error("Error while coping remote file " + remoteFilePath + " to local " + localFilePath, e); - throw new GFacException("Error while scp input files to local file location", e); + throw new WorkerException("Error while scp input files to local file location", e); } catch (CredentialStoreException e) { String msg = "Authentication issue, make sure you are passing valid credential token"; log.error(msg, e); - throw new GFacException(msg, e); + throw new WorkerException(msg, e); } } @@ -324,7 +327,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask { processContext.setOutputDir(localPath); } - private DefaultClientConfiguration getSecurityConfig(ProcessContext pc) throws GFacException { + private DefaultClientConfiguration getSecurityConfig(ProcessContext pc) throws WorkerException { DefaultClientConfiguration clientConfig = null; try { UNICORESecurityContext unicoreSecurityContext = SecurityUtils.getSecurityContext(pc); @@ -339,9 +342,9 @@ public class BESJobSubmissionTask implements JobSubmissionTask { clientConfig = unicoreSecurityContext.getDefaultConfiguration(false); } } catch (RegistryException e) { - throw new GFacException("Error! reading user configuration data from registry", e); + throw new WorkerException("Error! reading user configuration data from registry", e); } catch (ApplicationSettingsException e) { - throw new GFacException("Error! retrieving default client configurations", e); + throw new WorkerException("Error! retrieving default client configurations", e); } return clientConfig; @@ -380,8 +383,8 @@ public class BESJobSubmissionTask implements JobSubmissionTask { } } - private void sendNotification(ProcessContext processContext, JobModel jobModel) throws GFacException { - GFacUtils.saveJobStatus(processContext, jobModel); + private void sendNotification(ProcessContext processContext, JobModel jobModel) throws WorkerException { + WorkerUtils.saveJobStatus(processContext, jobModel); } @Override @@ -467,9 +470,9 @@ public class BESJobSubmissionTask implements JobSubmissionTask { * EndpointReference need to be saved to make cancel work. * * @param processContext - * @throws GFacException + * @throws WorkerException */ - public boolean cancelJob(ProcessContext processContext) throws GFacException { + public boolean cancelJob(ProcessContext processContext) throws WorkerException { try { String activityEpr = processContext.getJobModel().getJobDescription(); // initSecurityProperties(processContext); @@ -479,7 +482,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask { String interfaceId = processContext.getApplicationInterfaceDescription().getApplicationInterfaceId(); String factoryUrl = null; if (protocol.equals(JobSubmissionProtocol.UNICORE)) { - UnicoreJobSubmission unicoreJobSubmission = GFacUtils.getUnicoreJobSubmission(interfaceId); + UnicoreJobSubmission unicoreJobSubmission = JobSubmissionUtils.getUnicoreJobSubmission(interfaceId); factoryUrl = unicoreJobSubmission.getUnicoreEndPointURL(); } EndpointReferenceType epr = EndpointReferenceType.Factory @@ -490,7 +493,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask { factory.terminateActivity(eprt); return true; } catch (Exception e) { - throw new GFacException(e.getLocalizedMessage(), e); + throw new WorkerException(e.getLocalizedMessage(), e); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ActivityInfo.java ---------------------------------------------------------------------- diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ActivityInfo.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ActivityInfo.java new file mode 100644 index 0000000..22cf4db --- /dev/null +++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ActivityInfo.java @@ -0,0 +1,50 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.airavata.worker.task.jobsubmission.utils.bes; + +import org.ggf.schemas.bes.x2006.x08.besFactory.ActivityStatusType; +import org.w3.x2005.x08.addressing.EndpointReferenceType; + +import java.io.Serializable; + +public class ActivityInfo implements Serializable{ + + private static final long serialVersionUID = 1L; + + private EndpointReferenceType activityEPR; + + private ActivityStatusType activityStatusDoc; + + + public EndpointReferenceType getActivityEPR() { + return activityEPR; + } + public void setActivityEPR(EndpointReferenceType activityEPR) { + this.activityEPR = activityEPR; + } + public ActivityStatusType getActivityStatus() { + return activityStatusDoc; + } + public void setActivityStatusDoc(ActivityStatusType activityStatusDoc) { + this.activityStatusDoc = activityStatusDoc; + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ApplicationProcessor.java ---------------------------------------------------------------------- diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ApplicationProcessor.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ApplicationProcessor.java new file mode 100644 index 0000000..7fb442f --- /dev/null +++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/ApplicationProcessor.java @@ -0,0 +1,221 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.worker.task.jobsubmission.utils.bes; + +import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; +import org.apache.airavata.model.application.io.InputDataObjectType; +import org.apache.airavata.model.parallelism.ApplicationParallelismType; +import org.apache.airavata.worker.core.context.ProcessContext; +import org.ggf.schemas.jsdl.x2005.x11.jsdl.ApplicationType; +import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType; +import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.FileNameType; +import org.ggf.schemas.jsdl.x2005.x11.jsdlPosix.UserNameType; +import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.NumberOfProcessesType; +import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.ProcessesPerHostType; +import org.ogf.schemas.jsdl.x2007.x02.jsdlSpmd.ThreadsPerProcessType; + +import java.util.Iterator; +import java.util.List; + + +public class ApplicationProcessor { + + public static void generateJobSpecificAppElements(JobDefinitionType value, ProcessContext context){ + + String userName = getUserNameFromContext(context); +// if (userName.equalsIgnoreCase("admin")){ +// userName = "CN=zdv575, O=Ultrascan Gateway, C=DE"; +// } + + ApplicationDeploymentDescription appDep= context.getApplicationDeploymentDescription(); + String appname = context.getApplicationInterfaceDescription().getApplicationName(); + ApplicationParallelismType parallelism = appDep.getParallelism(); + ApplicationType appType = JSDLUtils.getOrCreateApplication(value); + appType.setApplicationName(appname); + + +// if (appDep.getSetEnvironment().size() > 0) { +// createApplicationEnvironment(value, appDep.getSetEnvironment(), parallelism); +// } +// + + String stdout = context.getStdoutLocation(); + String stderr = context.getStderrLocation(); + if(stdout != null) { + stdout = stdout.substring(stdout.lastIndexOf('/')+1); + } + + if(stderr != null) { + stderr = stderr.substring(stderr.lastIndexOf('/')+1); + } + + stdout = (stdout == null || stdout.equals("")) ? "stdout":stdout; + stderr = (stdout == null || stderr.equals("")) ? "stderr":stderr; + + if (appDep.getExecutablePath() != null) { + FileNameType fNameType = FileNameType.Factory.newInstance(); + fNameType.setStringValue(appDep.getExecutablePath()); + if(isParallelJob(context)) { + JSDLUtils.getOrCreateSPMDApplication(value).setExecutable(fNameType); + if (parallelism.equals(ApplicationParallelismType.OPENMP_MPI)){ + JSDLUtils.getSPMDApplication(value).setSPMDVariation(SPMDVariations.OpenMPI.value()); + }else if (parallelism.equals(ApplicationParallelismType.MPI)){ + JSDLUtils.getSPMDApplication(value).setSPMDVariation(SPMDVariations.MPI.value()); + } + + // setting number of processes + try { + String np = getInputAsString(context, BESConstants.NUMBER_OF_PROCESSES); + if((np != null) && (Integer.parseInt(np) > 0)){ + NumberOfProcessesType num = NumberOfProcessesType.Factory.newInstance(); + num.setStringValue(np); + JSDLUtils.getSPMDApplication(value).setNumberOfProcesses(num); + } + + }catch(RuntimeException np) { + // do nothing + } + + + try { + // setting processes per host + String pphost = getInputAsString(context, BESConstants.PROCESSES_PER_HOST); + if((pphost != null) && (Integer.parseInt(pphost) > 0)){ + ProcessesPerHostType pph = ProcessesPerHostType.Factory.newInstance(); + pph.setStringValue(String.valueOf(pphost)); + JSDLUtils.getSPMDApplication(value).setProcessesPerHost(pph); + } + }catch(RuntimeException np) { + // do nothing + } + + int totalThreadCount = context.getProcessModel().getProcessResourceSchedule().getNumberOfThreads(); + // we take it as threads per processes + if(totalThreadCount > 0){ + ThreadsPerProcessType tpp = ThreadsPerProcessType.Factory.newInstance(); + tpp.setStringValue(String.valueOf(totalThreadCount)); + JSDLUtils.getSPMDApplication(value).setThreadsPerProcess(tpp); + } + + if(userName != null) { + UserNameType userNameType = UserNameType.Factory.newInstance(); + userNameType.setStringValue(userName); + JSDLUtils.getSPMDApplication(value).setUserName(userNameType); + } + if (stdout != null){ + FileNameType fName = FileNameType.Factory.newInstance(); + fName.setStringValue(stdout); + JSDLUtils.getOrCreateSPMDApplication(value).setOutput(fName); + } + if (stderr != null){ + FileNameType fName = FileNameType.Factory.newInstance(); + fName.setStringValue(stderr); + JSDLUtils.getOrCreateSPMDApplication(value).setError(fName); + } + + + } + else { + JSDLUtils.getOrCreatePOSIXApplication(value).setExecutable(fNameType); + if(userName != null) { + UserNameType userNameType = UserNameType.Factory.newInstance(); + userNameType.setStringValue(userName); + JSDLUtils.getOrCreatePOSIXApplication(value).setUserName(userNameType); + } + if (stdout != null){ + FileNameType fName = FileNameType.Factory.newInstance(); + fName.setStringValue(stdout); + JSDLUtils.getOrCreatePOSIXApplication(value).setOutput(fName); + } + if (stderr != null){ + FileNameType fName = FileNameType.Factory.newInstance(); + fName.setStringValue(stderr); + JSDLUtils.getOrCreatePOSIXApplication(value).setError(fName); + } + } + } + } + + public static String getUserNameFromContext(ProcessContext jobContext) { + if(jobContext.getProcessModel() == null) + return null; + //TODO: Extend unicore model to specify optional unix user id (allocation account) + return "admin"; + } + + public static void addApplicationArgument(JobDefinitionType value, ProcessContext context, String stringPrm) { + if(isParallelJob(context)){ + JSDLUtils.getOrCreateSPMDApplication(value).addNewArgument().setStringValue(stringPrm); + } + else { + JSDLUtils.getOrCreatePOSIXApplication(value).addNewArgument().setStringValue(stringPrm); + } + } + + public static String getApplicationStdOut(JobDefinitionType value, ProcessContext context) throws RuntimeException { + if (isParallelJob(context)) return JSDLUtils.getOrCreateSPMDApplication(value).getOutput().getStringValue(); + else return JSDLUtils.getOrCreatePOSIXApplication(value).getOutput().getStringValue(); + } + + public static String getApplicationStdErr(JobDefinitionType value, ProcessContext context) throws RuntimeException { + if (isParallelJob(context)) return JSDLUtils.getOrCreateSPMDApplication(value).getError().getStringValue(); + else return JSDLUtils.getOrCreatePOSIXApplication(value).getError().getStringValue(); + } + + public static void createGenericApplication(JobDefinitionType value, String appName) { + ApplicationType appType = JSDLUtils.getOrCreateApplication(value); + appType.setApplicationName(appName); + } + + public static boolean isParallelJob(ProcessContext context) { + + ApplicationDeploymentDescription appDep = context.getApplicationDeploymentDescription(); + ApplicationParallelismType parallelism = appDep.getParallelism(); + + boolean isParallel = false; + + if(parallelism.equals(ApplicationParallelismType.MPI) || + parallelism.equals(ApplicationParallelismType.OPENMP_MPI) || + parallelism.equals(ApplicationParallelismType.OPENMP )) { + isParallel = true; + } + + return isParallel; + } + + private static String getInputAsString(ProcessContext context, String name) { + List<InputDataObjectType> inputList = context.getProcessModel().getProcessInputs(); + String value = null; + for (Iterator<InputDataObjectType> iterator = inputList.iterator(); iterator.hasNext();) { + InputDataObjectType inputDataObjectType = iterator + .next(); + if (inputDataObjectType.getName().equals(name)) { + value = inputDataObjectType.getValue(); + break; + } + } + return value; + } + + + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/BESConstants.java ---------------------------------------------------------------------- diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/BESConstants.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/BESConstants.java new file mode 100644 index 0000000..5f3991e --- /dev/null +++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/BESConstants.java @@ -0,0 +1,45 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +package org.apache.airavata.worker.task.jobsubmission.utils.bes; + +public interface BESConstants { + + public static final String PROP_SMS_EPR = "unicore.sms.epr"; + + public static final String PROP_BES_URL = "bes.factory.url"; + + public static final String PROP_ACTIVITY_INFO = "bes.activity.info"; + + public static final String PROP_CLIENT_CONF = "bes.client.config"; + + public static final String PROP_CA_CERT_PATH = "bes.ca.cert.path"; + + public static final String PROP_CA_KEY_PATH = "bes.ca.key.path"; + + public static final String PROP_CA_KEY_PASS = "bes.ca.key.pass"; + + public static final String NUMBER_OF_PROCESSES = "NumberOfProcesses"; + + public static final String PROCESSES_PER_HOST = "ProcessesPerHost"; + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/DataTransferrer.java ---------------------------------------------------------------------- diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/DataTransferrer.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/DataTransferrer.java new file mode 100644 index 0000000..736d982 --- /dev/null +++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/DataTransferrer.java @@ -0,0 +1,328 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.worker.task.jobsubmission.utils.bes; + +import de.fzj.unicore.uas.client.StorageClient; +import org.apache.airavata.common.utils.Constants; +import org.apache.airavata.model.application.io.DataType; +import org.apache.airavata.model.application.io.InputDataObjectType; +import org.apache.airavata.model.application.io.OutputDataObjectType; +import org.apache.airavata.model.process.ProcessModel; +import org.apache.airavata.registry.cpi.ExpCatChildDataType; +import org.apache.airavata.registry.cpi.ExperimentCatalog; +import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.airavata.worker.core.context.ProcessContext; +import org.apache.airavata.worker.core.exceptions.WorkerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; + +/** + * Data movement utility class for transferring files before and after the job execution phase. + * + * */ +public class DataTransferrer { + + protected final Logger log = LoggerFactory.getLogger(this.getClass()); + + protected ProcessContext processContext; + + protected StorageClient storageClient; + + protected List<OutputDataObjectType> resultantOutputsLst; + + protected String gatewayDownloadLocation, stdoutLocation, stderrLocation; + + public DataTransferrer(ProcessContext processContext, StorageClient storageClient) { + this.processContext = processContext; + this.storageClient = storageClient; + resultantOutputsLst = new ArrayList<OutputDataObjectType>(); + initStdoutsLocation(); + } + + private void initStdoutsLocation() { + + gatewayDownloadLocation = getDownloadLocation(); + + String stdout = processContext.getStdoutLocation(); + String stderr = processContext.getStderrLocation(); + + if(stdout != null) { + stdout = stdout.substring(stdout.lastIndexOf('/')+1); + } + + if(stderr != null) { + stderr = stderr.substring(stderr.lastIndexOf('/')+1); + } + + String stdoutFileName = (stdout == null || stdout.equals("")) ? "stdout" + : stdout; + String stderrFileName = (stdout == null || stderr.equals("")) ? "stderr" + : stderr; + + stdoutLocation = gatewayDownloadLocation+File.separator+stdoutFileName; + + stderrLocation = gatewayDownloadLocation+File.separator+stderrFileName; + + List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs(); + if (processOutputs != null && !processOutputs.isEmpty()){ + for (OutputDataObjectType processOutput : processOutputs){ + if (processOutput.getType().equals(DataType.STDOUT)){ + processOutput.setValue(stdoutLocation); + } + if (processOutput.getType().equals(DataType.STDERR)){ + processOutput.setValue(stderrLocation); + } + + } + } + } + + public void uploadLocalFiles() throws WorkerException { + List<String> inFilePrms = new ArrayList<>(); + // FIXME - remove hard coded file path. + inFilePrms.addAll(extractInFileParams()); +// inFilePrms.add("file://home/airavata/test/hpcinput-localhost-uslims3_cauma3d-00950.tar"); + for (String uri : inFilePrms) { + String fileName = new File(uri).getName(); + if (uri.startsWith("file")) { + try { + String uriWithoutProtocol = uri.substring(uri.lastIndexOf("://") + 2, uri.length()); + FileUploader fileUploader = new FileUploader(uriWithoutProtocol, fileName, Mode.overwrite, false); + log.info("Uploading file {}", fileName); + fileUploader.perform(storageClient); + } catch (FileNotFoundException e3) { + throw new WorkerException( + "Error while staging-in, local file "+fileName+" not found", e3); + } catch (Exception e) { + throw new WorkerException("Cannot upload files", e); + + } + + } + } + } + + public List<String> extractInFileParams() { + List<String> filePrmsList = new ArrayList<String>(); + List<InputDataObjectType> applicationInputs = processContext.getProcessModel().getProcessInputs(); + if (applicationInputs != null && !applicationInputs.isEmpty()){ + for (InputDataObjectType output : applicationInputs){ + if(output.getType().equals(DataType.URI)) { + filePrmsList.add(output.getValue()); + } + } + } + return filePrmsList; + } + + public void setStorageClient(StorageClient sc){ + storageClient = sc; + } + + public void downloadStdOuts() throws WorkerException{ + + String stdoutFileName = new File(stdoutLocation).getName(); + + String stderrFileName = new File(stderrLocation).getName(); + + FileDownloader f1 = null; + log.info("Downloading stdout and stderr.."); + log.info(stdoutFileName + " -> " + stdoutLocation); + + f1 = new FileDownloader(stdoutFileName, stdoutLocation, Mode.overwrite); + try { + f1.perform(storageClient); +// String stdoutput = readFile(stdoutLocation); + } catch (Exception e) { + log.error("Error while downloading " + stdoutFileName + " to location " + stdoutLocation, e); + } + + log.info(stderrFileName + " -> " + stderrLocation); + f1.setFrom(stderrFileName); + f1.setTo(stderrLocation); + try { + f1.perform(storageClient); +// String stderror = readFile(stderrLocation); + } catch (Exception e) { + log.error("Error while downloading " + stderrFileName + " to location " + stderrLocation); + } + String scriptExitCodeFName = "UNICORE_SCRIPT_EXIT_CODE"; + String scriptCodeLocation = gatewayDownloadLocation + File.separator + scriptExitCodeFName; + if (UASDataStagingProcessor.isUnicoreEndpoint(processContext)) { + f1.setFrom(scriptExitCodeFName); + f1.setTo(scriptCodeLocation); + try { + f1.perform(storageClient); + OutputDataObjectType output = new OutputDataObjectType(); + output.setName(scriptExitCodeFName); + output.setValue(scriptCodeLocation); + output.setType(DataType.URI); + output.setIsRequired(true); + processContext.getProcessModel().getProcessOutputs().add(output); + log.info("UNICORE_SCRIPT_EXIT_CODE -> " + scriptCodeLocation); + log.info("EXIT CODE: " + readFile(scriptCodeLocation)); + } catch (Exception e) { + log.error("Error downloading file " + scriptExitCodeFName + " to location " + scriptCodeLocation, e); + } + } + } + + private String readFile(String localFile) throws IOException { + BufferedReader instream = new BufferedReader(new FileReader(localFile)); + StringBuffer buff = new StringBuffer(); + String temp = null; + while ((temp = instream.readLine()) != null) { + buff.append(temp); + buff.append(Constants.NEWLINE); + } + + log.info("finish read file:" + localFile); + + return buff.toString(); + } + + private String getDownloadLocation() { + ProcessModel processModel = processContext.getProcessModel(); + String outputDataDir = ""; + + if (processContext.getOutputDir() != null ) { + + outputDataDir = processContext.getOutputDir(); + + + if ("".equals(outputDataDir)) { + outputDataDir = getTempPath(); + } + + else { + + // in case of remote locations use the tmp location + if (outputDataDir.startsWith("scp:") || + outputDataDir.startsWith("ftp:") || + outputDataDir.startsWith("gsiftp:")) { + outputDataDir = getTempPath(); + } else if ( outputDataDir.startsWith("file:") && + outputDataDir.contains("@")){ + outputDataDir = getTempPath(); + + } else { + try { + URI u = new URI(outputDataDir); + outputDataDir = u.getPath(); + } catch (URISyntaxException e) { + outputDataDir = getTempPath(); + } + } + } + } + + File file = new File(outputDataDir); + if(!file.exists()){ + file.mkdirs(); + } + + + return outputDataDir; + } + + private String getTempPath() { + String tmpOutputDir = File.separator + "tmp" + File.separator + + processContext.getProcessId(); + (new File(tmpOutputDir)).mkdirs(); + return tmpOutputDir; + } + + public List<OutputDataObjectType> downloadRemoteFiles() throws WorkerException { + + if(log.isDebugEnabled()) { + log.debug("Download location is:" + gatewayDownloadLocation); + } + + List<OutputDataObjectType> applicationOutputs = processContext.getProcessModel().getProcessOutputs(); + if (applicationOutputs != null && !applicationOutputs.isEmpty()){ + for (OutputDataObjectType output : applicationOutputs){ + if("".equals(output.getValue()) || output.getValue() == null) { + continue; + } + if(output.getType().equals(DataType.STDOUT)) { + output.setValue(stdoutLocation); + resultantOutputsLst.add(output); + } else if(output.getType().equals(DataType.STDERR)) { + output.setValue(stderrLocation); + resultantOutputsLst.add(output); + } else if (output.getType().equals(DataType.URI)) { + String value = null; + if (!output.getLocation().isEmpty()) { + value = output.getLocation() + File.separator + output.getValue(); + } else { + value = output.getValue(); + } + String outputPath = gatewayDownloadLocation + File.separator + output.getValue(); + File f = new File(gatewayDownloadLocation); + if (!f.exists()) + f.mkdirs(); + + FileDownloader fileDownloader = new FileDownloader(value, outputPath, Mode.overwrite); + try { + log.info("Downloading file {}", value); + fileDownloader.perform(storageClient); + output.setType(DataType.URI); + output.setValue(outputPath); + resultantOutputsLst.add(output); + } catch (Exception e) { + log.error("Error downloading " + value + " from job working directory. "); +// throw new WorkerException(e.getLocalizedMessage(),e); + } + } else { + log.info("Ignore output file {}, type {}", output.getValue(), output.getType().toString()); + } + + } + + } + + downloadStdOuts(); + return resultantOutputsLst; + + } + + public void publishFinalOutputs() throws WorkerException { + try { + if(!resultantOutputsLst.isEmpty()) { + log.debug("Publishing the list of outputs to the registry instance.."); + ExperimentCatalog experimentCatalog = processContext.getExperimentCatalog(); + experimentCatalog.add(ExpCatChildDataType.EXPERIMENT_OUTPUT, resultantOutputsLst, processContext.getExperimentId()); + } + } catch (RegistryException e) { + throw new WorkerException("Cannot publish outputs to the registry."); + } + + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileDownloader.java ---------------------------------------------------------------------- diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileDownloader.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileDownloader.java new file mode 100644 index 0000000..937b1c1 --- /dev/null +++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileDownloader.java @@ -0,0 +1,255 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.worker.task.jobsubmission.utils.bes; + +import de.fzj.unicore.uas.client.FileTransferClient; +import de.fzj.unicore.uas.client.StorageClient; +import de.fzj.unicore.uas.client.UFTPConstants; +import de.fzj.unicore.uas.client.UFTPFileTransferClient; +import de.fzj.unicore.uas.fts.FiletransferOptions.IMonitorable; +import de.fzj.unicore.uas.fts.FiletransferOptions.SupportsPartialRead; +import org.unigrids.services.atomic.types.GridFileType; +import org.unigrids.services.atomic.types.ProtocolType; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; + +/** + * helper that exports remote files from a UNICORE Storage + * to the local client machine.<br/> + * Simple wildcards ("*" and "?") and download of + * directories are supported. + * + * TODO this should be refactored so the single-file download logic + * is separated from the wildcard/directory/provided outputStream logic + * + * @author schuller + */ +public class FileDownloader extends FileTransferBase{ + + private boolean showProgress=true; + + private boolean forceFileOnly=false; + + private OutputStream targetStream=null; + + public FileDownloader(String from, String to, Mode mode){ + this(from,to,mode,true); + } + + public FileDownloader(String from, String to, Mode mode, boolean failOnError){ + this.to=to; + this.from=from; + this.mode=mode; + this.failOnError=failOnError; + } + + public void perform(StorageClient sms)throws Exception{ + boolean isWildcard=hasWildCards(from); + boolean isDirectory=false; + GridFileType gridSource=null; + if(isWildcard){ + performWildCardExport(sms); + } + else { + //check if source is a directory + gridSource=sms.listProperties(from); + isDirectory=gridSource.getIsDirectory(); + if(isDirectory){ + if(forceFileOnly){ + throw new IOException("Source is a directory"); + } + performDirectoryExport(gridSource, new File(to), sms); + } + else{ + download(gridSource,new File(to),sms); + } + } + } + + protected void performDirectoryExport(GridFileType directory, File targetDirectory, StorageClient sms)throws Exception{ + if(!targetDirectory.exists()|| !targetDirectory.canWrite()){ + throw new IOException("Target directory <"+to+"> does not exist or is not writable!"); + } + if(!targetDirectory.isDirectory()){ + throw new IOException("Target <"+to+"> is not a directory!"); + } + GridFileType[]gridFiles=sms.listDirectory(directory.getPath()); + for(GridFileType file: gridFiles){ + if(file.getIsDirectory()){ + if(!recurse) { + System.out.println("Skipping directory "+file.getPath()); + continue; + } + else{ + File newTargetDirectory=new File(targetDirectory,getName(file.getPath())); + boolean success=newTargetDirectory.mkdirs(); + if(!success)throw new IOException("Can create directory: "+newTargetDirectory.getAbsolutePath()); + performDirectoryExport(file, newTargetDirectory, sms); + continue; + } + } + download(file, new File(targetDirectory,getName(file.getPath())), sms); + } + } + + protected void performWildCardExport(StorageClient sms)throws Exception{ + String dir=getDir(from); + if(dir==null)dir="/"; + GridFileType[] files=sms.find(dir, false, from, false, null, null); + File targetDir=targetStream==null?new File(to):null; + if(targetStream==null){ + if(!targetDir.isDirectory())throw new IOException("Target is not a directory."); + } + for(GridFileType f: files){ + download(f, targetDir, sms); + } + } + + private String getDir(String path){ + return new File(path).getParent(); + } + + private String getName(String path){ + return new File(path).getName(); + } + + /** + * download a single regular file + * + * @param source - grid file descriptor + * @param localFile - local file or directory to write to + * @param sms + * @throws Exception + */ + private void download(GridFileType source, File localFile, StorageClient sms)throws Exception{ + if(source==null || source.getIsDirectory()){ + throw new IllegalStateException("Source="+source); + } + + OutputStream os=targetStream!=null?targetStream:null; + FileTransferClient ftc=null; + try{ + String path=source.getPath(); + if(targetStream==null){ + if(localFile.isDirectory()){ + localFile=new File(localFile,getName(source.getPath())); + } + if(mode.equals(Mode.nooverwrite) && localFile.exists()){ + System.out.println("File exists and creation mode was set to 'nooverwrite'."); + return; + } + System.out.println("Downloading remote file '"+sms.getUrl()+"#/"+path+"' -> "+localFile.getAbsolutePath()); + os=new FileOutputStream(localFile.getAbsolutePath(), mode.equals(Mode.append)); + } + + chosenProtocol=sms.findSupportedProtocol(preferredProtocols.toArray(new ProtocolType.Enum[preferredProtocols.size()])); + Map<String,String>extraParameters=makeExtraParameters(chosenProtocol); + ftc=sms.getExport(path,extraParameters,chosenProtocol); + configure(ftc, extraParameters); + System.out.println("DEB:File transfer URL : "+ftc.getUrl()); +// ProgressBar p=null; + if(ftc instanceof IMonitorable && showProgress){ + long size=ftc.getSourceFileSize(); + if(isRange()){ + size=getRangeSize(); + } +// p=new ProgressBar(localFile.getName(),size,msg); +// ((IMonitorable) ftc).setProgressListener(p); + } + long startTime=System.currentTimeMillis(); + if(isRange()){ + if(!(ftc instanceof SupportsPartialRead)){ + throw new Exception("Byte range is defined but protocol does not allow " + + "partial read! Please choose a different protocol!"); + } + System.out.println("Byte range: "+startByte+" - "+(getRangeSize()>0?endByte:"")); + SupportsPartialRead pReader=(SupportsPartialRead)ftc; + pReader.readPartial(startByte, endByte-startByte+1, os); + } + else{ + ftc.readAllData(os); + } +// if(p!=null){ +// p.finish(); +// } + if(timing){ + long duration=System.currentTimeMillis()-startTime; + double rate=(double)localFile.length()/(double)duration; + System.out.println("Rate: " +rate+ " kB/sec."); + } + if(targetStream==null)copyProperties(source, localFile); + } + finally{ + try{ + if(targetStream==null && os!=null){ + os.close(); + } + }catch(Exception ignored){} + if(ftc!=null){ + try{ + ftc.destroy(); + }catch(Exception e1){ +// System.out.println("Could not destroy the filetransfer client",e1); + } + } + } + } + + /** + * if possible, copy the remote executable flag to the local file + * @throws Exception + */ + private void copyProperties(GridFileType source, File localFile)throws Exception{ + try{ + localFile.setExecutable(source.getPermissions().getExecutable()); + } + catch(Exception ex){ + //TODO: logging +// ("Can't set 'executable' flag for "+localFile.getName(), ex); + } + } + + private void configure(FileTransferClient ftc, Map<String,String>params){ + if(ftc instanceof UFTPFileTransferClient){ + UFTPFileTransferClient u=(UFTPFileTransferClient)ftc; + String secret=params.get(UFTPConstants.PARAM_SECRET); + u.setSecret(secret); + } + } + + public void setShowProgress(boolean showProgress) { + this.showProgress = showProgress; + } + + public void setForceFileOnly(boolean forceFileOnly) { + this.forceFileOnly = forceFileOnly; + } + + public void setTargetStream(OutputStream targetStream) { + this.targetStream = targetStream; + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileTransferBase.java ---------------------------------------------------------------------- diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileTransferBase.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileTransferBase.java new file mode 100644 index 0000000..c76ab74 --- /dev/null +++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileTransferBase.java @@ -0,0 +1,223 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.worker.task.jobsubmission.utils.bes; + +import de.fzj.unicore.uas.client.StorageClient; +import de.fzj.unicore.uas.util.PropertyHelper; +import org.unigrids.services.atomic.types.GridFileType; +import org.unigrids.services.atomic.types.ProtocolType; + +import java.io.File; +import java.io.FilenameFilter; +import java.util.*; +import java.util.regex.Pattern; + +public class FileTransferBase { + + protected Properties extraParameterSource; + + protected boolean timing=false; + + protected boolean recurse=false; + + protected String from; + + protected String to; + + //index of first byte to download + protected Long startByte; + + //index of last byte to download + protected Long endByte; + + /** + * the creation mode + */ + protected Mode mode; + + /** + * whether the job processing should fail if an error occurs + */ + protected boolean failOnError; + + protected List<ProtocolType.Enum> preferredProtocols=new ArrayList<ProtocolType.Enum>(); + + public FileTransferBase(){ + preferredProtocols.add(ProtocolType.BFT); + } + + protected Map<String,String>makeExtraParameters(ProtocolType.Enum protocol){ + Map<String, String> res; + if(extraParameterSource==null){ + res=new HashMap<String, String>(); + } + else{ + String p=String.valueOf(protocol); + PropertyHelper ph=new PropertyHelper(extraParameterSource, new String[]{p,p.toLowerCase()}); + res= ph.getFilteredMap(); + } + if(res.size()>0){ + // TODO: change it to logger + System.out.println("Have "+res.size()+" extra parameters for protocol "+protocol); + } + return res; + } + + + public String getTo() { + return to; + } + + public String getFrom() { + return from; + } + + public void setTo(String to) { + this.to = to; + } + + public void setFrom(String from) { + this.from = from; + } + + public Mode getMode() { + return mode; + } + + public boolean isFailOnError() { + return failOnError; + } + + public boolean isTiming() { + return timing; + } + + public void setTiming(boolean timing) { + this.timing = timing; + } + + public void setFailOnError(boolean failOnError) { + this.failOnError = failOnError; + } + + public List<ProtocolType.Enum> getPreferredProtocols() { + return preferredProtocols; + } + + public void setPreferredProtocols(List<ProtocolType.Enum> preferredProtocols) { + this.preferredProtocols = preferredProtocols; + } + + public void setExtraParameterSource(Properties properties){ + this.extraParameterSource=properties; + } + + public void setRecurse(boolean recurse) { + this.recurse = recurse; + } + /** + * check if the given path denotes a valid remote directory + * @param remotePath - the path + * @param sms - the storage + * @return <code>true</code> if the remote directory exists and is a directory + */ + protected boolean isValidDirectory(String remotePath, StorageClient sms){ + boolean result=false; + if(! ("/".equals(remotePath) || ".".equals(remotePath)) ){ + try{ + GridFileType gft=sms.listProperties(remotePath); + result=gft.getIsDirectory(); + }catch(Exception ex){ + result=false; + } + } + else result=true; + + return result; + } + + public File[] resolveWildCards(File original){ + final String name=original.getName(); + if(!hasWildCards(original))return new File[]{original}; + File parent=original.getParentFile(); + if(parent==null)parent=new File("."); + FilenameFilter filter=new FilenameFilter(){ + Pattern p=createPattern(name); + public boolean accept(File file, String name){ + return p.matcher(name).matches(); + } + }; + return parent.listFiles(filter); + } + + protected boolean hasWildCards(File file){ + return hasWildCards(file.getName()); + } + + public boolean hasWildCards(String name){ + return name.contains("*") || name.contains("?"); + } + + private Pattern createPattern(String nameWithWildcards){ + String regex=nameWithWildcards.replace("?",".").replace("*", ".*"); + return Pattern.compile(regex); + } + + protected ProtocolType.Enum chosenProtocol=null; + + public ProtocolType.Enum getChosenProtocol(){ + return chosenProtocol; + } + + public Long getStartByte() { + return startByte; + } + + public void setStartByte(Long startByte) { + this.startByte = startByte; + } + + public Long getEndByte() { + return endByte; + } + + public void setEndByte(Long endByte) { + this.endByte = endByte; + } + + /** + * checks if a byte range is defined + * @return <code>true</code> iff both startByte and endByte are defined + */ + protected boolean isRange(){ + return startByte!=null && endByte!=null; + } + + /** + * get the number of bytes in the byte range, or "-1" if the range is open-ended + * @return + */ + protected long getRangeSize(){ + if(Long.MAX_VALUE==endByte)return -1; + return endByte-startByte; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileUploader.java ---------------------------------------------------------------------- diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileUploader.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileUploader.java new file mode 100644 index 0000000..d899b37 --- /dev/null +++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/FileUploader.java @@ -0,0 +1,242 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.worker.task.jobsubmission.utils.bes; + +import de.fzj.unicore.uas.client.FileTransferClient; +import de.fzj.unicore.uas.client.StorageClient; +import de.fzj.unicore.uas.client.UFTPConstants; +import de.fzj.unicore.uas.client.UFTPFileTransferClient; +import de.fzj.unicore.uas.fts.FiletransferOptions.IMonitorable; +import org.unigrids.services.atomic.types.ProtocolType; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Map; + +/** + * upload local file(s) to a remote location + * + * @author schuller + */ +public class FileUploader extends FileTransferBase{ + + public FileUploader(String from, String to, Mode mode)throws FileNotFoundException{ + this(from,to,mode,true); + } + + public FileUploader(String from, String to, Mode mode, boolean failOnError)throws FileNotFoundException{ + this.to=to; + this.from=from; + this.mode=mode; + this.failOnError=failOnError; + checkOK(); + } + + public String getFrom() { + return from; + } + + public String getTo() { + return to; + } + + + public void perform(StorageClient sms)throws Exception{ + File fileSpec=new File(from); + boolean hasWildCards=false; + boolean isDirectory=fileSpec.isDirectory(); + File[] fileset=null; + + if(!isDirectory){ + hasWildCards=hasWildCards(fileSpec); + } + + chosenProtocol=sms.findSupportedProtocol(preferredProtocols.toArray(new ProtocolType.Enum[preferredProtocols.size()])); + Map<String,String>extraParameters=makeExtraParameters(chosenProtocol); + + if(!hasWildCards && !isDirectory){ + //single regular file + uploadFile(fileSpec,to,sms,chosenProtocol,extraParameters); + return; + } + + //handle wildcards or directory + if(hasWildCards){ + fileset=resolveWildCards(fileSpec); + } + else{ + fileset=fileSpec.listFiles(); + } + + if(!isValidDirectory(to, sms)){ + throw new IOException("The specified remote target '"+to+"' is not a directory"); + } + if(to==null)to="/"; + String target=isDirectory?to+"/"+fileSpec.getName():to; + sms.createDirectory(target); + uploadFiles(fileset,target,sms,chosenProtocol,extraParameters); + } + + /** + * upload a set of files to a remote directory (which must exist) + * + * @param files + * @param remoteDirectory + * @param sms + * @param protocol + * @param extraParameters + * @throws Exception + */ + private void uploadFiles(File[]files, String remoteDirectory, StorageClient sms, ProtocolType.Enum protocol, + Map<String,String>extraParameters)throws Exception{ + for(File localFile: files){ + String target=remoteDirectory+"/"+localFile.getName(); + if(localFile.isDirectory()){ + if(!recurse){ + System.out.println("Skipping directory "+localFile.getAbsolutePath()); + }else{ + File[] fileset=localFile.listFiles(); + sms.createDirectory(target); + uploadFiles(fileset,target,sms,protocol,extraParameters); + } + }else{ + uploadFile(localFile,target,sms,protocol,extraParameters); + } + } + } + + /** + * uploads a single regular file + * + * @param localFile + * @param remotePath + * @param sms + * @param protocol + * @param extraParameters + * @throws Exception + */ + private void uploadFile(File localFile, String remotePath, StorageClient sms, ProtocolType.Enum protocol, + Map<String,String>extraParameters) throws Exception{ + long startTime=System.currentTimeMillis(); + FileInputStream is=null; + FileTransferClient ftc=null; + try{ + if(remotePath==null){ + remotePath="/"+localFile.getName(); + } + else if(remotePath.endsWith("/")){ + remotePath+=localFile.getName(); + } + System.out.println("Uploading local file '"+localFile.getAbsolutePath()+"' -> '"+sms.getUrl()+"#"+remotePath+"'"); + is=new FileInputStream(localFile.getAbsolutePath()); + boolean append=Mode.append.equals(mode); + ftc=sms.getImport(remotePath, append, extraParameters, protocol); + configure(ftc, extraParameters); + if(append)ftc.setAppend(true); + String url=ftc.getUrl(); + System.out.println("File transfer URL : "+url); +// ProgressBar p=null; + if(ftc instanceof IMonitorable){ + long size=localFile.length(); + if(isRange()){ + size=getRangeSize(); + } +// p=new ProgressBar(localFile.getName(),size,msg); +// ((IMonitorable) ftc).setProgressListener(p); + } + if(isRange()){ + System.out.println("Byte range: "+startByte+" - "+(getRangeSize()>0?endByte:"")); + long skipped=0; + while(skipped<startByte){ + skipped+=is.skip(startByte); + } + ftc.writeAllData(is, endByte-startByte+1); + + }else{ + ftc.writeAllData(is); + } + copyProperties(localFile, sms, remotePath); + +// if(ftc instanceof IMonitorable){ +// p.finish(); +// } + + }finally{ + if(ftc!=null){ + try{ + ftc.destroy(); + }catch(Exception e1){ +// msg.error("Could not clean-up the filetransfer at <"+ftc.getUrl()+">",e1); + } + } + try{ if(is!=null)is.close(); }catch(Exception ignored){} + } + if(timing){ + long duration=System.currentTimeMillis()-startTime; + double rate=(double)localFile.length()/(double)duration; + System.out.println("Rate: "+rate+ " kB/sec."); + } + } + + /** + * if possible, copy the local executable flag to the remote file + * @param sourceFile - local file + * @throws Exception + */ + private void copyProperties(File sourceFile, StorageClient sms, String target)throws Exception{ + boolean x=sourceFile.canExecute(); + try{ + if(x){ + sms.changePermissions(target, true, true, x); + } + }catch(Exception ex){ +// System.out.println("Can't set exectuable flag on remote file.",ex); + } + } + + private void checkOK()throws FileNotFoundException{ + if(!failOnError){ + return; + } + File orig=new File(from); + if(!orig.isAbsolute()){ + orig=new File(System.getProperty("user.dir"),from); + } + File[] files=resolveWildCards(orig); + if(files==null){ + throw new FileNotFoundException("Local import '"+from+"' does not exist."); + } + for(File f: files){ + if(!f.exists())throw new FileNotFoundException("Local import '"+from+"' does not exist."); + } + } + + private void configure(FileTransferClient ftc, Map<String,String>params){ + if(ftc instanceof UFTPFileTransferClient){ + UFTPFileTransferClient u=(UFTPFileTransferClient)ftc; + String secret=params.get(UFTPConstants.PARAM_SECRET); + u.setSecret(secret); + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/d231956e/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLGenerator.java ---------------------------------------------------------------------- diff --git a/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLGenerator.java b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLGenerator.java new file mode 100644 index 0000000..de96104 --- /dev/null +++ b/modules/worker/task-jobsubmission/src/main/java/org/apache/airavata/worker/task/jobsubmission/utils/bes/JSDLGenerator.java @@ -0,0 +1,115 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.worker.task.jobsubmission.utils.bes; + +import org.apache.airavata.worker.core.context.ProcessContext; +import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionDocument; +import org.ggf.schemas.jsdl.x2005.x11.jsdl.JobDefinitionType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * + * Utility class generates a JSDL instance from JobExecutionContext instance + * + * @author shahbaz memon + * + * */ + +public class JSDLGenerator implements BESConstants { + + protected final Logger log = LoggerFactory.getLogger(this.getClass()); + + public synchronized static JobDefinitionDocument buildJSDLInstance(ProcessContext context) throws Exception { + + JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory + .newInstance(); + JobDefinitionType value = jobDefDoc.addNewJobDefinition(); + + + // build Identification + createJobIdentification(value, context); + + ResourceProcessor.generateResourceElements(value, context); + + ApplicationProcessor.generateJobSpecificAppElements(value, context); + + + return jobDefDoc; + } + + public synchronized static JobDefinitionDocument buildJSDLInstance(ProcessContext context, String smsUrl) throws Exception { + + JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory + .newInstance(); + JobDefinitionType value = jobDefDoc.addNewJobDefinition(); + + + // build Identification + createJobIdentification(value, context); + + ResourceProcessor.generateResourceElements(value, context); + + ApplicationProcessor.generateJobSpecificAppElements(value, context); + + UASDataStagingProcessor.generateDataStagingElements(value, context, smsUrl); + + return jobDefDoc; + } + + public synchronized static JobDefinitionDocument buildJSDLInstance( + ProcessContext context, String smsUrl, Object jobDirectoryMode) + throws Exception { + + JobDefinitionDocument jobDefDoc = JobDefinitionDocument.Factory + .newInstance(); + JobDefinitionType value = jobDefDoc.addNewJobDefinition(); + + // build Identification + createJobIdentification(value, context); + + ResourceProcessor.generateResourceElements(value, context); + + ApplicationProcessor.generateJobSpecificAppElements(value, context); + + UASDataStagingProcessor.generateDataStagingElements(value, context, + smsUrl); + + return jobDefDoc; + } + + private static void createJobIdentification(JobDefinitionType value, ProcessContext context) { + + if (context != null) { + if (context.getAllocationProjectNumber() != null) + JSDLUtils.addProjectName(value, context.getAllocationProjectNumber()); + + if (context.getApplicationInterfaceDescription() != null && context.getApplicationInterfaceDescription().getApplicationDescription() != null) + JSDLUtils.getOrCreateJobIdentification(value).setDescription(context.getApplicationInterfaceDescription().getApplicationDescription()); + + if (context.getApplicationInterfaceDescription() != null && context.getApplicationInterfaceDescription().getApplicationName() != null) + JSDLUtils.getOrCreateJobIdentification(value).setJobName(context.getApplicationInterfaceDescription().getApplicationName()); + } + } + + +} \ No newline at end of file
