Repository: airavata Updated Branches: refs/heads/master 4568832b3 -> e290cfe17
Fixed master branch to job submission with local file. Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e290cfe1 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e290cfe1 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e290cfe1 Branch: refs/heads/master Commit: e290cfe17a5c4f6f0422f1830067637cb81b0494 Parents: 4568832 Author: Shameera Rathanyaka <[email protected]> Authored: Wed Jul 15 18:01:23 2015 -0400 Committer: Shameera Rathanyaka <[email protected]> Committed: Wed Jul 15 18:01:23 2015 -0400 ---------------------------------------------------------------------- .../airavata/common/utils/ServerSettings.java | 10 +- .../server/src/main/resources/gfac-config.yaml | 15 +- .../gfac/core/config/GFacYamlConfigruation.java | 2 +- .../gfac/core/context/ProcessContext.java | 29 ++++ .../airavata/gfac/core/context/TaskContext.java | 4 + .../org/apache/airavata/gfac/impl/Factory.java | 145 +++++++++--------- .../airavata/gfac/impl/GFacEngineImpl.java | 63 +++++++- .../apache/airavata/gfac/impl/GFacWorker.java | 150 +++++++++++-------- .../gfac/impl/task/SCPDataStageTask.java | 87 +++++++++++ .../gfac/impl/task/SCPInputDataStageTask.java | 24 +-- .../gfac/monitor/email/EmailBasedMonitor.java | 84 +++++------ .../airavata/gfac/server/GfacServerHandler.java | 1 + 12 files changed, 414 insertions(+), 200 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java index 37521f6..b898d96 100644 --- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java +++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java @@ -34,7 +34,8 @@ public class ServerSettings extends ApplicationSettings { private static final String DEFAULT_USER = "default.registry.user"; private static final String DEFAULT_USER_PASSWORD = "default.registry.password"; - private static final String DEFAULT_USER_GATEWAY = "default.registry.gateway"; + private static final String DEFAULT_USER_GATEWAY = "default.registry.gateway"; + private static final String OUTPUT_LOCATION = "out.location"; private static final String SERVER_CONTEXT_ROOT = "server.context-root"; public static final String IP = "ip"; @@ -98,8 +99,9 @@ public class ServerSettings extends ApplicationSettings { private static boolean stopAllThreads = false; private static boolean emailBaseNotificationEnable; + private static String outputLocation; - public static String getDefaultUser() throws ApplicationSettingsException { + public static String getDefaultUser() throws ApplicationSettingsException { return getSetting(DEFAULT_USER); } @@ -346,4 +348,8 @@ public class ServerSettings extends ApplicationSettings { public static String getSecurityManagerClassName() throws ApplicationSettingsException { return getSetting(Constants.SECURITY_MANAGER_CLASS); } + + public static String getOutputLocation() { + return getSetting(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir")); + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/configuration/server/src/main/resources/gfac-config.yaml ---------------------------------------------------------------------- diff --git a/modules/configuration/server/src/main/resources/gfac-config.yaml b/modules/configuration/server/src/main/resources/gfac-config.yaml index 46ece9c..31fd63a 100644 --- a/modules/configuration/server/src/main/resources/gfac-config.yaml +++ b/modules/configuration/server/src/main/resources/gfac-config.yaml @@ -41,12 +41,15 @@ commonTasks: fileTransferTasks: - transferProtocol: SCP - taskClass: org.apache.airavata.gfac.impl.task.SCPInputDataStageTask - properties: - - password: pwd123 - passPhrase: test - privateKey: key - publicKey: pubkey + taskClass: org.apache.airavata.gfac.impl.task.SCPDataStageTask + +# - transferProtocol: SCP +# taskClass: org.apache.airavata.gfac.impl.task.SCPInputDataStageTask +# properties: +# - password: pwd123 +# passPhrase: test +# privateKey: key +# publicKey: pubkey #- transferProtocol: SFTP # taskClass: org.apache.airavata.task.adapters.SFTPFileTransferTask http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/config/GFacYamlConfigruation.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/config/GFacYamlConfigruation.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/config/GFacYamlConfigruation.java index fa8ce32..5101b41 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/config/GFacYamlConfigruation.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/config/GFacYamlConfigruation.java @@ -45,7 +45,7 @@ public class GFacYamlConfigruation { private static final String JOB_MANAGER_TYPE = "jobManagerType"; private static final String COMMAND_OUTPUT_PARSER = "commandOutputParser"; private static final String EMAIL_PARSER = "emailParser"; - private static final String RESOURCE_EMAIL_ADDRESS = "resourceEmailAddress"; + private static final String RESOURCE_EMAIL_ADDRESS = "resourceEmailAddresses"; private static final String PROPERTIES = "properties"; private List<JobSubmitterTaskConfig> jobSubmitters = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java index b7b02c1..dc8dace 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java @@ -29,6 +29,8 @@ import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDes import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; import org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol; import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; +import org.apache.airavata.model.appcatalog.computeresource.MonitorMode; +import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager; import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile; import org.apache.airavata.model.job.JobModel; @@ -55,6 +57,7 @@ public class ProcessContext { private String workingDir; private String inputDir; private String outputDir; + private String localWorkingDir; private List<TaskContext> taskChain; private GatewayResourceProfile gatewayResourceProfile; private ComputeResourceDescription computeResourceDescription; @@ -68,6 +71,8 @@ public class ProcessContext { private DataMovementProtocol dataMovementProtocol; private JobModel jobModel; private ComputeResourcePreference computeResourcePreference; + private MonitorMode monitorMode; + private ResourceJobManager resourceJobManager; /** * Note: process context property use lazy loading approach. In runtime you will see some properties as null @@ -292,4 +297,28 @@ public class ProcessContext { public String getComputeResourceId() { return getComputeResourceDescription().getComputeResourceId(); } + + public void setMonitorMode(MonitorMode monitorMode) { + this.monitorMode = monitorMode; + } + + public MonitorMode getMonitorMode() { + return monitorMode; + } + + public void setResourceJobManager(ResourceJobManager resourceJobManager) { + this.resourceJobManager = resourceJobManager; + } + + public ResourceJobManager getResourceJobManager() { + return resourceJobManager; + } + + public String getLocalWorkingDir() { + return localWorkingDir; + } + + public void setLocalWorkingDir(String localWorkingDir) { + this.localWorkingDir = localWorkingDir; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java index 1be5142..95d2fb9 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/TaskContext.java @@ -72,4 +72,8 @@ public class TaskContext { public String getTaskId() { return taskModel.getTaskId(); } + + public String getLocalWorkingDir() { + return getParentProcessContext().getLocalWorkingDir(); + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java index b8a3d4e..51db6f3 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java @@ -30,6 +30,7 @@ import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.JobManagerConfiguration; import org.apache.airavata.gfac.core.authentication.AuthenticationInfo; import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication; +import org.apache.airavata.gfac.core.cluster.OutputParser; import org.apache.airavata.gfac.core.cluster.RemoteCluster; import org.apache.airavata.gfac.core.cluster.ServerInfo; import org.apache.airavata.gfac.core.config.DataTransferTaskConfig; @@ -56,6 +57,7 @@ import org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission; +import org.apache.airavata.model.appcatalog.computeresource.MonitorMode; import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager; import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType; import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; @@ -68,8 +70,11 @@ import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.Calendar; import java.util.Date; import java.util.HashMap; @@ -78,16 +83,24 @@ import java.util.Map; public abstract class Factory { + private static final Logger log = LoggerFactory.getLogger(Factory.class); +/* static{ + try { + loadConfiguration(); + } catch (GFacException e) { + log.error("Error while loading configurations"); + } + }*/ + private static GFacEngine engine; private static Publisher statusPublisher; private static CuratorFramework curatorClient; private static EmailBasedMonitor emailBasedMonitor; - private static Date startMonitorDate = Calendar.getInstance().getTime(); private static Map<String, RemoteCluster> remoteClusterMap = new HashMap<>(); private static Map<JobSubmissionProtocol, JobSubmissionTask> jobSubmissionTask = new HashMap<>(); private static Map<DataMovementProtocol, Task> dataMovementTask = new HashMap<>(); private static Map<ResourceJobManagerType, ResourceConfig> resources = new HashMap<>(); - private static boolean readConfig = false; + private static Map<MonitorMode, JobMonitor> jobMonitorServices = new HashMap<>(); public static GFacEngine getGFacEngine() throws GFacException { if (engine == null) { @@ -132,41 +145,36 @@ public abstract class Factory { return curatorClient; } - public static JobMonitor getJobMonitor(ResourceJobManagerType resourceJobManagerType) throws AiravataException { - if (resourceJobManagerType == ResourceJobManagerType.FORK) { - return null; // TODO write a job monitor for this. - } else { - if (emailBasedMonitor == null) { - synchronized (EmailBasedMonitor.class) { - if (emailBasedMonitor == null) { - emailBasedMonitor = new EmailBasedMonitor(resourceJobManagerType); - emailBasedMonitor.setDate(startMonitorDate); - new Thread(emailBasedMonitor).start(); - } - } - } - return emailBasedMonitor; + public static JobManagerConfiguration getJobManagerConfiguration(ResourceJobManager resourceJobManager) throws GFacException { + ResourceConfig resourceConfig = Factory.getResourceConfig(resourceJobManager.getResourceJobManagerType()); + OutputParser outputParser; + try { + Class<? extends OutputParser> aClass = Class.forName(resourceConfig.getCommandOutputParser()).asSubclass + (OutputParser.class); + outputParser = aClass.getConstructor().newInstance(); + } catch (Exception e) { + throw new GFacException("Error while instantiating output parser for " + resourceJobManager + .getResourceJobManagerType().name()); } - } - public static JobManagerConfiguration getJobManagerConfiguration(ResourceJobManager resourceJobManager) { switch (resourceJobManager.getResourceJobManagerType()) { + case PBS: return new PBSJobConfiguration("PBSTemplate.xslt", ".pbs", resourceJobManager.getJobManagerBinPath(), - resourceJobManager.getJobManagerCommands(), new PBSOutputParser()); + resourceJobManager.getJobManagerCommands(), outputParser); case SLURM: return new SlurmJobConfiguration("SLURMTemplate.xslt", ".slurm", resourceJobManager - .getJobManagerBinPath(), resourceJobManager.getJobManagerCommands(), new SlurmOutputParser()); + .getJobManagerBinPath(), resourceJobManager.getJobManagerCommands(), outputParser); case LSF: return new LSFJobConfiguration("LSFTemplate.xslt", ".lsf", resourceJobManager.getJobManagerBinPath(), - resourceJobManager.getJobManagerCommands(), new LSFOutputParser()); + resourceJobManager.getJobManagerCommands(), outputParser); case UGE: return new UGEJobConfiguration("UGETemplate.xslt", ".pbs", resourceJobManager.getJobManagerBinPath(), - resourceJobManager.getJobManagerCommands(), new UGEOutputParser()); - + resourceJobManager.getJobManagerCommands(), outputParser); default: return null; } + } public static HostScheduler getHostScheduler() { @@ -174,45 +182,27 @@ public abstract class Factory { } - public static RemoteCluster getRemoteCluster(ProcessContext processCtx) throws GFacException, + /** + * Factory class manage reomete cluster map, this will solve too many connections/ sessions issues with cluster + * communications. + * @param jobSubmissionProtocol + * @param computeResourceId + * @param resourceJobManager + * @return + * @throws GFacException + * @throws AppCatalogException + * @throws AiravataException + */ + public static RemoteCluster getRemoteCluster(JobSubmissionProtocol jobSubmissionProtocol, String computeResourceId, + ResourceJobManager resourceJobManager) throws GFacException, AppCatalogException, AiravataException { - String key = processCtx.getJobSubmissionProtocol().toString() + ":" + processCtx.getComputeResourceId(); + String key = jobSubmissionProtocol.toString() + ":" + computeResourceId; RemoteCluster remoteCluster = remoteClusterMap.get(key); if (remoteCluster == null) { - String hostName = Factory.getDefaultAppCatalog().getComputeResource().getComputeResource(processCtx - .getComputeResourceId()).getHostName(); + String hostName = Factory.getDefaultAppCatalog().getComputeResource().getComputeResource(computeResourceId).getHostName(); // fixme - read login user name from computeResourcePreference ServerInfo serverInfo = new ServerInfo(ServerSettings.getSetting("ssh.username"), hostName); - List<JobSubmissionInterface> jobSubmissionInterfaces = Factory.getDefaultAppCatalog().getComputeResource() - .getComputeResource(processCtx.getComputeResourceId()) .getJobSubmissionInterfaces(); - - ResourceJobManager resourceJobManager = null; - JobSubmissionInterface jsInterface = null; - for (JobSubmissionInterface jobSubmissionInterface : jobSubmissionInterfaces) { - if (jobSubmissionInterface.getJobSubmissionProtocol() == processCtx.getJobSubmissionProtocol()) { - jsInterface = jobSubmissionInterface; - break; - } - } - if (jsInterface == null) { - // TODO: throw an exception. - } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) { - SSHJobSubmission sshJobSubmission = getDefaultAppCatalog().getComputeResource().getSSHJobSubmission - (jsInterface.getJobSubmissionInterfaceId()); - resourceJobManager = sshJobSubmission.getResourceJobManager(); - } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.LOCAL) { - LOCALSubmission localSubmission = getDefaultAppCatalog().getComputeResource().getLocalJobSubmission - (jsInterface.getJobSubmissionInterfaceId()); - resourceJobManager = localSubmission.getResourceJobManager(); - } else { - // TODO : throw an not supported jobsubmission protocol exception. we only support SSH and LOCAL - } - - if (resourceJobManager == null) { - // TODO throw an exception - } - JobManagerConfiguration jobManagerConfiguration = getJobManagerConfiguration(resourceJobManager); AuthenticationInfo authenticationInfo = getSSHKeyAuthentication(); remoteCluster = new HPCRemoteCluster(serverInfo, jobManagerConfiguration, authenticationInfo); @@ -236,30 +226,23 @@ public abstract class Factory { return sshKA; } - public static JobSubmissionTask getJobSubmissionTask(JobSubmissionProtocol jobSubmissionProtocol) throws - GFacException { - if (!readConfig) { - loadConfiguration(); - } + public static JobSubmissionTask getJobSubmissionTask(JobSubmissionProtocol jobSubmissionProtocol) { return jobSubmissionTask.get(jobSubmissionProtocol); } - public static Task getDataMovementTask(DataMovementProtocol dataMovementProtocol) throws GFacException { - if (!readConfig) { - loadConfiguration(); - } + public static Task getDataMovementTask(DataMovementProtocol dataMovementProtocol){ return dataMovementTask.get(dataMovementProtocol); } - public static ResourceConfig getResourceConfig(ResourceJobManagerType resourceJobManagerType) throws - GFacException { - if (!readConfig) { - loadConfiguration(); - } + public static ResourceConfig getResourceConfig(ResourceJobManagerType resourceJobManagerType) { return resources.get(resourceJobManagerType); } - private static void loadConfiguration() throws GFacException { + public static Map<ResourceJobManagerType, ResourceConfig> getResourceConfig() { + return resources; + } + + public static void loadConfiguration() throws GFacException { GFacYamlConfigruation config = new GFacYamlConfigruation(); try { for (JobSubmitterTaskConfig jobSubmitterTaskConfig : config.getJobSbumitters()) { @@ -283,10 +266,28 @@ public abstract class Factory { for (ResourceConfig resourceConfig : config.getResourceConfiguration()) { resources.put(resourceConfig.getJobManagerType(), resourceConfig); } - readConfig = true; } catch (Exception e) { throw new GFacException("Gfac config issue", e); } } + public static JobMonitor getMonitorService(MonitorMode monitorMode) throws AiravataException { + JobMonitor jobMonitor = jobMonitorServices.get(monitorMode); + if (jobMonitor == null) { + synchronized (JobMonitor.class) { + jobMonitor = jobMonitorServices.get(monitorMode); + if (jobMonitor == null) { + switch (monitorMode) { + case JOB_EMAIL_NOTIFICATION_MONITOR: + EmailBasedMonitor emailBasedMonitor = new EmailBasedMonitor(Factory.getResourceConfig()); + jobMonitorServices.put(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR, emailBasedMonitor); + jobMonitor = ((JobMonitor) emailBasedMonitor); + new Thread(emailBasedMonitor).start(); + } + } + } + } + return jobMonitor; + } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java index 3bea455..c7eba99 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java @@ -23,6 +23,7 @@ package org.apache.airavata.gfac.impl; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.gfac.core.GFacEngine; import org.apache.airavata.gfac.core.GFacException; @@ -33,6 +34,11 @@ import org.apache.airavata.gfac.core.task.JobSubmissionTask; import org.apache.airavata.gfac.core.task.Task; import org.apache.airavata.gfac.core.task.TaskException; import org.apache.airavata.gfac.impl.task.SSHEnvironmentSetupTask; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; +import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission; +import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager; +import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile; import org.apache.airavata.model.application.io.DataType; import org.apache.airavata.model.application.io.InputDataObjectType; @@ -55,6 +61,7 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -80,6 +87,7 @@ public class GFacEngineImpl implements GFacEngine { processContext.setExperimentCatalog(expCatalog); processContext.setCuratorClient(Factory.getCuratorClient()); processContext.setStatusPublisher(Factory.getStatusPublisher()); + ProcessModel processModel = (ProcessModel) expCatalog.get(ExperimentCatalogModelType.PROCESS, processId); processContext.setProcessModel(processModel); GatewayResourceProfile gatewayProfile = appCatalog.getGatewayProfile().getGatewayProfile(gatewayId); @@ -92,9 +100,15 @@ public class GFacEngineImpl implements GFacEngine { .getApplicationDeployement(processModel.getApplicationDeploymentId())); processContext.setApplicationInterfaceDescription(appCatalog.getApplicationInterface() .getApplicationInterface(processModel.getApplicationInterfaceId())); - processContext.setRemoteCluster(Factory.getRemoteCluster(processContext)); + processContext.setResourceJobManager(getResourceJobManager(processContext)); + processContext.setRemoteCluster(Factory.getRemoteCluster(processContext.getJobSubmissionProtocol(), + processContext.getComputeResourceId(), processContext.getResourceJobManager())); - // + String inputPath = ServerSettings.getOutputLocation(); + if (inputPath != null) { + processContext.setLocalWorkingDir((inputPath.endsWith("/") ? inputPath : inputPath + "/") + + processContext.getProcessId()); + } return processContext; } catch (AppCatalogException e) { throw new GFacException("App catalog access exception ", e); @@ -134,6 +148,7 @@ public class GFacEngineImpl implements GFacEngine { } catch (TException e) { throw new GFacException("Error while serializing data staging sub task model"); } + saveTaskModel(taskCtx); GFacUtils.saveAndPublishTaskStatus(taskCtx); Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol()); executeTask(taskCtx, dMoveTask); @@ -195,7 +210,7 @@ public class GFacEngineImpl implements GFacEngine { // create data staging sub task model DataStagingTaskModel submodel = new DataStagingTaskModel(); submodel.setSource(processInput.getValue()); - submodel.setDestination(processContext.getWorkingDir()); + submodel.setDestination(processContext.getDataMovementProtocol().name() + ":" + processContext.getWorkingDir()); taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel)); taskCtx.setTaskModel(taskModel); return taskCtx; @@ -212,9 +227,13 @@ public class GFacEngineImpl implements GFacEngine { taskModel.setTaskStatus(new TaskStatus(TaskState.CREATED)); taskModel.setTaskType(TaskTypes.DATA_STAGING); // create data staging sub task model + String remoteOutputDir = processContext.getOutputDir(); + remoteOutputDir = remoteOutputDir.endsWith("/") ? remoteOutputDir : remoteOutputDir + "/"; DataStagingTaskModel submodel = new DataStagingTaskModel(); - submodel.setSource(processOutput.getValue()); - submodel.setDestination(processContext.getWorkingDir()); + submodel.setSource(processContext.getDataMovementProtocol().name() + ":" + remoteOutputDir + processOutput + .getValue()); + String localWorkingDir = processContext.getLocalWorkingDir(); + submodel.setDestination("file://" + localWorkingDir); taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel)); taskCtx.setTaskModel(taskModel); return taskCtx; @@ -271,6 +290,9 @@ public class GFacEngineImpl implements GFacEngine { } catch (TException e) { throw new GFacException("Thrift model to byte[] convertion issue", e); } + File localWorkingdir = new File(taskCtx.getLocalWorkingDir()); + localWorkingdir.mkdirs(); // make local dir if not exist + saveTaskModel(taskCtx); GFacUtils.saveAndPublishTaskStatus(taskCtx); Task dMoveTask = Factory.getDataMovementTask(processContext.getDataMovementProtocol()); executeTask(taskCtx, dMoveTask); @@ -307,5 +329,36 @@ public class GFacEngineImpl implements GFacEngine { }); } + public static ResourceJobManager getResourceJobManager(ProcessContext processCtx) throws AppCatalogException { + List<JobSubmissionInterface> jobSubmissionInterfaces = Factory.getDefaultAppCatalog().getComputeResource() + .getComputeResource(processCtx.getComputeResourceId()) .getJobSubmissionInterfaces(); + + ResourceJobManager resourceJobManager = null; + JobSubmissionInterface jsInterface = null; + for (JobSubmissionInterface jobSubmissionInterface : jobSubmissionInterfaces) { + if (jobSubmissionInterface.getJobSubmissionProtocol() == processCtx.getJobSubmissionProtocol()) { + jsInterface = jobSubmissionInterface; + break; + } + } + if (jsInterface == null) { + // TODO: throw an exception. + } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) { + SSHJobSubmission sshJobSubmission = Factory.getDefaultAppCatalog().getComputeResource().getSSHJobSubmission + (jsInterface.getJobSubmissionInterfaceId()); + processCtx.setMonitorMode(sshJobSubmission.getMonitorMode()); // fixme - Move this to populate process context method. + resourceJobManager = sshJobSubmission.getResourceJobManager(); + } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.LOCAL) { + LOCALSubmission localSubmission = Factory.getDefaultAppCatalog().getComputeResource().getLocalJobSubmission + (jsInterface.getJobSubmissionInterfaceId()); + resourceJobManager = localSubmission.getResourceJobManager(); + } else { + // TODO : throw an not supported jobsubmission protocol exception. we only support SSH and LOCAL + } + if (resourceJobManager == null) { + // TODO throw an exception + } + return resourceJobManager; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java index 51aa8f8..899f684 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java @@ -21,9 +21,14 @@ package org.apache.airavata.gfac.impl; +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.gfac.core.GFac; import org.apache.airavata.gfac.core.GFacEngine; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.context.ProcessContext; +import org.apache.airavata.gfac.core.monitor.JobMonitor; +import org.apache.airavata.model.status.ProcessState; +import org.apache.airavata.model.status.ProcessStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,22 +44,19 @@ public class GFacWorker implements Runnable { /** * This will be called by monitoring service. - * @param processContext - * @throws GFacException */ - public GFacWorker(ProcessContext processContext) throws GFacException { - if (processContext == null) { - throw new GFacException("Worker must initialize with valide processContext, Process context is null"); - } - this.processContext = processContext; - } + public GFacWorker(ProcessContext processContext) throws GFacException { + if (processContext == null) { + throw new GFacException("Worker must initialize with valide processContext, Process context is null"); + } + this.processId = processContext.getProcessId(); + this.gatewayId = processContext.getGatewayId(); + this.tokenId = processContext.getTokenId(); + this.processContext = processContext; + } /** * This constructor will be called when new or recovery request comes. - * @param processId - * @param gatewayId - * @param tokenId - * @throws GFacException */ public GFacWorker(String processId, String gatewayId, String tokenId) throws GFacException { this.processId = processId; @@ -62,53 +64,83 @@ public class GFacWorker implements Runnable { this.tokenId = tokenId; } - @Override - public void run() { - try { - GFacEngine engine = Factory.getGFacEngine(); - if (processContext == null) { - processContext = engine.populateProcessContext(processId, gatewayId, tokenId); - isProcessContextPopulated = true; - } - ProcessType type = getProcessType(processContext); - try { - switch (type) { - case NEW: - engine.executeProcess(processContext); - break; - case RECOVER: - // recover the process -// engine.recoverProcess(processContext); - engine.executeProcess(processContext); - break; - case OUTFLOW: - // run the outflow task - engine.runProcessOutflow(processContext); - break; - case RECOVER_OUTFLOW: - // recover outflow task; - engine.recoverProcessOutflow(processContext); - } - } catch (GFacException e) { - switch (type) { - case NEW: - log.error("Process execution error", e); - break; - case RECOVER: - log.error("Process recover error ", e); - break; - case OUTFLOW: - log.error("Process outflow execution error", e); - break; - case RECOVER_OUTFLOW: - log.error("Process outflow recover error", e); - break; - } - } - } catch (GFacException e) { - log.error("GFac Worker throws an exception", e); - } - } + @Override + public void run() { + try { + GFacEngine engine = Factory.getGFacEngine(); + if (processContext == null) { + processContext = engine.populateProcessContext(processId, gatewayId, tokenId); + isProcessContextPopulated = true; + } + ProcessType type = getProcessType(processContext); + try { + switch (type) { + case NEW: + exectuteProcess(engine); + break; + case RECOVER: + recoverProcess(engine); + break; + case OUTFLOW: + // run the outflow task + engine.runProcessOutflow(processContext); + processContext.setProcessStatus(new ProcessStatus(ProcessState.COMPLETED)); + break; + case RECOVER_OUTFLOW: + // recover outflow task; + engine.recoverProcessOutflow(processContext); + processContext.setProcessStatus(new ProcessStatus(ProcessState.COMPLETED)); + break; + default: + throw new GFacException("process Id : " + processId + " Couldn't identify process type"); + } + } catch (GFacException e) { + switch (type) { + case NEW: + log.error("Process execution error", e); + break; + case RECOVER: + log.error("Process recover error ", e); + break; + case OUTFLOW: + log.error("Process outflow execution error", e); + break; + case RECOVER_OUTFLOW: + log.error("Process outflow recover error", e); + break; + } + throw e; + } + } catch (GFacException e) { + log.error("GFac Worker throws an exception", e); + } + } + + private void recoverProcess(GFacEngine engine) throws GFacException { + // recover the process + // engine.recoverProcess(processContext); + exectuteProcess(engine); // TODO - implement recover process. + } + + private void exectuteProcess(GFacEngine engine) throws GFacException { + engine.executeProcess(processContext); + if (processContext.getMonitorMode() == null) { + engine.runProcessOutflow(processContext); + } else { + try { + JobMonitor monitorService = Factory.getMonitorService(processContext.getMonitorMode()); + if (monitorService != null) { + monitorService.monitor(processContext.getJobModel().getJobId(), processContext); + processContext.setProcessStatus(new ProcessStatus(ProcessState.MONITORING)); + } else { + // we directly invoke outflow + engine.runProcessOutflow(processContext); + } + } catch (AiravataException e) { + throw new GFacException("Error while retrieving moniot service", e); + } + } + } private ProcessType getProcessType(ProcessContext processContext) { // check the status and return correct type of process. http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java new file mode 100644 index 0000000..089535e --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPDataStageTask.java @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.gfac.impl.task; + +import com.jcraft.jsch.JSch; +import com.jcraft.jsch.JSchException; +import com.jcraft.jsch.Session; +import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.gfac.core.SSHApiException; +import org.apache.airavata.gfac.core.context.TaskContext; +import org.apache.airavata.gfac.core.task.Task; +import org.apache.airavata.gfac.core.task.TaskException; +import org.apache.airavata.gfac.impl.SSHUtils; +import org.apache.airavata.model.status.TaskState; +import org.apache.airavata.model.task.DataStagingTaskModel; +import org.apache.airavata.model.task.TaskTypes; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; + +public class SCPDataStageTask implements Task { + @Override + public void init(Map<String, String> propertyMap) throws TaskException { + + } + + @Override + public TaskState execute(TaskContext taskContext) throws TaskException { + + if (taskContext.getTaskModel().getTaskType() != TaskTypes.DATA_STAGING) { + throw new TaskException("Invalid task call, expected " + TaskTypes.DATA_STAGING.toString() + " but found " + + taskContext.getTaskModel().getTaskType().toString()); + } + try { + DataStagingTaskModel subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext + .getTaskModel()); + URI sourceURI = new URI(subTaskModel.getSource()); + URI destinationURI = new URI(subTaskModel.getDestination()); + + if (sourceURI.getScheme().equalsIgnoreCase("file")) { // Airavata --> RemoteCluster + taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURI.getPath(), destinationURI + .getPath()); + } else { // RemoteCluster --> Airavata + taskContext.getParentProcessContext().getRemoteCluster().scpFrom(sourceURI.getPath(), destinationURI + .getPath()); + } + } catch (SSHApiException e) { + throw new TaskException("Scp attempt failed", e); + } catch (TException e) { + throw new TaskException("Invalid task invocation"); + } catch (URISyntaxException e) { + throw new TaskException("source or destination is not a valid URI"); + } + return null; + } + + @Override + public TaskState recover(TaskContext taskContext) throws TaskException { + return null; + } + + @Override + public TaskTypes getType() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java index 76be7a0..332a0aa 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/SCPInputDataStageTask.java @@ -23,10 +23,12 @@ package org.apache.airavata.gfac.impl.task; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; +import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.gfac.core.SSHApiException; import org.apache.airavata.gfac.core.context.TaskContext; import org.apache.airavata.gfac.core.task.TaskException; +import org.apache.airavata.gfac.impl.Factory; import org.apache.airavata.gfac.impl.SSHUtils; import org.apache.airavata.model.status.TaskState; import org.apache.airavata.model.task.DataStagingTaskModel; @@ -35,6 +37,8 @@ import org.apache.thrift.TException; import java.io.IOException; import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; import java.net.URL; public class SCPInputDataStageTask extends AbstractSCPTask { @@ -50,31 +54,33 @@ public class SCPInputDataStageTask extends AbstractSCPTask { + taskContext.getTaskModel().getTaskType().toString()); } try { - DataStagingTaskModel subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext.getTaskModel()); - URL sourceURL = new URL(subTaskModel.getSource()); - URL destinationURL = new URL(subTaskModel.getDestination()); + DataStagingTaskModel subTaskModel = (DataStagingTaskModel) ThriftUtils.getSubTaskModel(taskContext + .getTaskModel()); + URI sourceURI = new URI(subTaskModel.getSource()); + URI destinationURI = new URI(subTaskModel.getDestination()); - if (sourceURL.getProtocol().equalsIgnoreCase("file")) { // local --> Airavata --> RemoteCluster - taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURL.getPath(), + if (sourceURI.getScheme().equalsIgnoreCase("file")) { // local --> Airavata --> RemoteCluster + taskContext.getParentProcessContext().getRemoteCluster().scpTo(sourceURI.getPath(), subTaskModel.getDestination()); } else { // PGA(client) --> Airavata --> RemoteCluster // PGA(client) --> Airavata JSch jsch = new JSch(); jsch.addIdentity(privateKeyPath, passPhrase); Session session = jsch.getSession(userName, hostName, DEFAULT_SSH_PORT); - SSHUtils.scpFrom(sourceURL.getPath(), inputPath, session); + SSHUtils.scpFrom(sourceURI.getPath(), taskContext.getLocalWorkingDir() , session); // Airavata --> RemoteCluster - taskContext.getParentProcessContext().getRemoteCluster().scpTo(destinationURL.getPath(), inputPath); + taskContext.getParentProcessContext().getRemoteCluster().scpTo(destinationURI.getPath(), + taskContext.getLocalWorkingDir()); } - } catch (MalformedURLException e) { - throw new TaskException("Wrong source or destination file path.", e); } catch (SSHApiException e) { throw new TaskException("Scp attempt failed", e); } catch (JSchException | IOException e) { throw new TaskException("Scp failed", e); } catch (TException e) { throw new TaskException("Invalid task invocation"); + } catch (URISyntaxException e) { + e.printStackTrace(); } return null; } http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java index 08f8423..9ecd700 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/monitor/email/EmailBasedMonitor.java @@ -24,6 +24,7 @@ import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.gfac.core.GFacException; import org.apache.airavata.gfac.core.GFacThreadPoolExecutor; +import org.apache.airavata.gfac.core.config.ResourceConfig; import org.apache.airavata.gfac.core.context.ProcessContext; import org.apache.airavata.gfac.core.monitor.EmailParser; import org.apache.airavata.gfac.core.monitor.JobMonitor; @@ -47,6 +48,7 @@ import javax.mail.Session; import javax.mail.Store; import javax.mail.search.FlagTerm; import javax.mail.search.SearchTerm; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; @@ -71,12 +73,15 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ private String host, emailAddress, password, storeProtocol, folderName ; private Date monitorStartDate; private Map<ResourceJobManagerType, EmailParser> emailParserMap = new HashMap<ResourceJobManagerType, EmailParser>(); + private Map<String, ResourceJobManagerType> addressMap = new HashMap<>(); - public EmailBasedMonitor(ResourceJobManagerType type) throws AiravataException { - init(); - } - private void init() throws AiravataException { + public EmailBasedMonitor(Map<ResourceJobManagerType, ResourceConfig> resourceConfigs) throws AiravataException { + init(); + populateAddressAndParserMap(resourceConfigs); + } + + private void init() throws AiravataException { host = ServerSettings.getEmailBasedMonitorHost(); emailAddress = ServerSettings.getEmailBasedMonitorAddress(); password = ServerSettings.getEmailBasedMonitorPassword(); @@ -90,6 +95,24 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ properties.put("mail.store.protocol", storeProtocol); } + private void populateAddressAndParserMap(Map<ResourceJobManagerType, ResourceConfig> resourceConfigs) throws AiravataException { + for (Map.Entry<ResourceJobManagerType, ResourceConfig> resourceConfigEntry : resourceConfigs.entrySet()) { + ResourceJobManagerType type = resourceConfigEntry.getKey(); + ResourceConfig config = resourceConfigEntry.getValue(); + List<String> resourceEmailAddresses = config.getResourceEmailAddresses(); + for (String resourceEmailAddress : resourceEmailAddresses) { + addressMap.put(resourceEmailAddress, type); + } + try { + Class<? extends EmailParser> emailParserClass = Class.forName(config.getEmailParser()).asSubclass(EmailParser.class); + EmailParser emailParser = emailParserClass.getConstructor().newInstance(); + emailParserMap.put(type, emailParser); + } catch (Exception e) { + throw new AiravataException("Error while instantiation email parsers", e); + } + } + + } @Override public void monitor(String jobId, ProcessContext processContext) { log.info("[EJM]: Added monitor Id : " + jobId + " to email based monitor map"); @@ -106,52 +129,21 @@ public class EmailBasedMonitor implements JobMonitor, Runnable{ String addressStr = fromAddress.toString(); ResourceJobManagerType jobMonitorType = getJobMonitorType(addressStr); EmailParser emailParser = emailParserMap.get(jobMonitorType); - if (emailParser == null) { - switch (jobMonitorType) { - case PBS: - emailParser = new PBSEmailParser(); - break; - case SLURM: - emailParser = new SLURMEmailParser(); - break; - case LSF: - emailParser = new LSFEmailParser(); - break; - case UGE: - emailParser = new UGEEmailParser(); - break; - default: - throw new AiravataException("[EJM]: Un-handle resource job manager type: " + jobMonitorType - .toString() + " for email monitoring --> " + addressStr); - } - - emailParserMap.put(jobMonitorType, emailParser); - } + if (emailParser == null) { + throw new AiravataException("[EJM]: Un-handle resource job manager type: " + jobMonitorType + .toString() + " for email monitoring --> " + addressStr); + } return emailParser.parseEmail(message); } private ResourceJobManagerType getJobMonitorType(String addressStr) throws AiravataException { - System.out.println("*********** address ******** : " + addressStr); - switch (addressStr) { - case "[email protected]": // trestles , gordan - case "[email protected]": // bigred2 - case "root <[email protected]>": // bigred2 - case "root <[email protected]>": // alamo - return ResourceJobManagerType.PBS; - case "SDSC Admin <[email protected]>": // comet - case "[email protected]": // stampede - case "slurm user <[email protected]>": - return ResourceJobManagerType.SLURM; -// case "lsf": -// return ResourceJobManagerType.LSF; - default: - if (addressStr.contains("ls4.tacc.utexas.edu>")) { // lonestar - return ResourceJobManagerType.UGE; - } else { - throw new AiravataException("[EJM]: Couldn't identify Resource job manager type from address " + addressStr); - } - } - +// System.out.println("*********** address ******** : " + addressStr); + for (Map.Entry<String, ResourceJobManagerType> addressEntry : addressMap.entrySet()) { + if (addressEntry.getKey().matches(addressStr)) { + return addressEntry.getValue(); + } + } + throw new AiravataException("[EJM]: Couldn't identify Resource job manager type from address " + addressStr); } @Override http://git-wip-us.apache.org/repos/asf/airavata/blob/e290cfe1/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index cda910e..6fa1288 100644 --- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -80,6 +80,7 @@ public class GfacServerHandler implements GfacService.Iface { public GfacServerHandler() throws AiravataStartupException { try { + Factory.loadConfiguration(); startCuratorClient(); initZkDataStructure(); initAMQPClient();
