Updated JobExecutionContext with thrift data model types
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/73e21be4 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/73e21be4 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/73e21be4 Branch: refs/heads/gfac_appcatalog_int Commit: 73e21be4cf423dc35fb5f1a3363f989a40e067e7 Parents: 96a673f Author: shamrath <[email protected]> Authored: Thu Oct 30 15:57:33 2014 -0400 Committer: Chathuri Wimalasena <[email protected]> Committed: Wed Nov 5 11:16:15 2014 -0500 ---------------------------------------------------------------------- .../gfac/core/context/JobExecutionContext.java | 91 +++++++++++++++----- .../airavata/gfac/core/cpi/BetterGfacImpl.java | 37 ++++---- 2 files changed, 85 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/73e21be4/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java index 2b2255f..3616b42 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java @@ -33,6 +33,9 @@ import org.apache.airavata.gfac.SecurityContext; import org.apache.airavata.gfac.core.cpi.GFac; import org.apache.airavata.gfac.core.notification.GFacNotifier; import org.apache.airavata.gfac.core.provider.GFacProvider; +import org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; import org.apache.airavata.model.workspace.experiment.Experiment; import org.apache.airavata.model.workspace.experiment.JobDetails; import org.apache.airavata.model.workspace.experiment.TaskDetails; @@ -67,16 +70,42 @@ public class JobExecutionContext extends AbstractContext implements Serializable private ZooKeeper zk; private String credentialStoreToken; - + /** + * User defined working directory. + */ private String workingDir; - + /** + * Input data directory + */ private String inputDir; + /** + * Output data directory + */ private String outputDir; - private String standaredOutput; - private String standaredError; - private String prefferedJobSubmissionProtocal; - private String prefferedDataMovementProtocal; - + /** + * standard output file path + */ + private String standardOutput; + /** + * standard error file path + */ + private String standardError; + /** + * User preferred job submission protocol. + */ + private JobSubmissionProtocol preferredJobSubmissionProtocol; + /** + * User preferred data movement protocol. + */ + private DataMovementProtocol preferredDataMovementProtocol; + /** + * List of job submission protocols sorted by priority order. + */ + private List<JobSubmissionInterface> hostPrioritizedJobSubmissionInterfaces; + /** + * use preferred job submission protocol. + */ + private JobSubmissionInterface preferredJobSubmissionInterface; // private ContextHeaderDocument.ContextHeader contextHeader; @@ -354,35 +383,51 @@ public class JobExecutionContext extends AbstractContext implements Serializable this.outputDir = outputDir; } - public String getStandaredOutput() { - return standaredOutput; + public String getStandardOutput() { + return standardOutput; + } + + public void setStandardOutput(String standardOutput) { + this.standardOutput = standardOutput; + } + + public String getStandardError() { + return standardError; + } + + public void setStandardError(String standardError) { + this.standardError = standardError; + } + + public JobSubmissionProtocol getPreferredJobSubmissionProtocol() { + return preferredJobSubmissionProtocol; } - public void setStandaredOutput(String standaredOutput) { - this.standaredOutput = standaredOutput; + public void setPreferredJobSubmissionProtocol(JobSubmissionProtocol preferredJobSubmissionProtocol) { + this.preferredJobSubmissionProtocol = preferredJobSubmissionProtocol; } - public String getStandaredError() { - return standaredError; + public DataMovementProtocol getPreferredDataMovementProtocol() { + return preferredDataMovementProtocol; } - public void setStandaredError(String standaredError) { - this.standaredError = standaredError; + public void setPreferredDataMovementProtocol(DataMovementProtocol preferredDataMovementProtocol) { + this.preferredDataMovementProtocol = preferredDataMovementProtocol; } - public String getPrefferedJobSubmissionProtocal() { - return prefferedJobSubmissionProtocal; + public List<JobSubmissionInterface> getHostPrioritizedJobSubmissionInterfaces() { + return hostPrioritizedJobSubmissionInterfaces; } - public void setPrefferedJobSubmissionProtocal(String prefferedJobSubmissionProtocal) { - this.prefferedJobSubmissionProtocal = prefferedJobSubmissionProtocal; + public void setHostPrioritizedJobSubmissionInterfaces(List<JobSubmissionInterface> hostPrioritizedJobSubmissionInterfaces) { + this.hostPrioritizedJobSubmissionInterfaces = hostPrioritizedJobSubmissionInterfaces; } - public String getPrefferedDataMovementProtocal() { - return prefferedDataMovementProtocal; + public JobSubmissionInterface getPreferredJobSubmissionInterface() { + return preferredJobSubmissionInterface; } - public void setPrefferedDataMovementProtocal(String prefferedDataMovementProtocal) { - this.prefferedDataMovementProtocal = prefferedDataMovementProtocal; + public void setPreferredJobSubmissionInterface(JobSubmissionInterface preferredJobSubmissionInterface) { + this.preferredJobSubmissionInterface = preferredJobSubmissionInterface; } } http://git-wip-us.apache.org/repos/asf/airavata/blob/73e21be4/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index fd43c65..696b61b 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -53,6 +53,7 @@ import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentD import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; import org.apache.airavata.model.messaging.event.*; import org.apache.airavata.model.workspace.experiment.*; @@ -70,6 +71,8 @@ import java.io.File; import java.io.IOException; import java.net.URL; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Properties; @@ -319,31 +322,25 @@ public class BetterGfacImpl implements GFac,Watcher { /* * Stdout and Stderr for Shell */ - jobExecutionContext.setStandaredOutput(workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stdout"); - jobExecutionContext.setStandaredError(workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stderr"); + jobExecutionContext.setStandardOutput(workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stdout"); + jobExecutionContext.setStandardError(workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stderr"); + + jobExecutionContext.setPreferredJobSubmissionProtocol(gatewayResourcePreferences.getPreferredJobSubmissionProtocol()); } List<JobSubmissionInterface> jobSubmissionInterfaces = computeResource.getJobSubmissionInterfaces(); - String preferredJobSubmissionProtocol = gatewayResourcePreferences.getPreferredJobSubmissionProtocol(); - String hostClass; - if (preferredJobSubmissionProtocol != null){ - hostClass = preferredJobSubmissionProtocol; - }else { - if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()){ - int lowestPriority = jobSubmissionInterfaces.get(0).getPriorityOrder(); - String selectedHost = null; - for (int i = 0; i < jobSubmissionInterfaces.size() - 1; i++){ - if (jobSubmissionInterfaces.get(i+1).getPriorityOrder() < lowestPriority ){ - lowestPriority = jobSubmissionInterfaces.get(i+1).getPriorityOrder(); - selectedHost = jobSubmissionInterfaces.get(i+1).getJobSubmissionProtocol().toString(); - } + if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()){ + Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() { + @Override + public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) { + return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder(); } - hostClass = selectedHost; - }else { - throw new GFacException("Compute resource should have atleast one job submission interface defined..."); - } + }); + + jobExecutionContext.setHostPrioritizedJobSubmissionInterfaces(jobSubmissionInterfaces); + }else { + throw new GFacException("Compute resource should have at least one job submission interface defined..."); } - jobExecutionContext.setPrefferedJobSubmissionProtocal(hostClass); return jobExecutionContext; }
