http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java index 88a3ada..66998c3 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java @@ -480,7 +480,7 @@ public class GFacUtils { public static String getZKGfacServersParentPath() { return ZKPaths.makePath(ZkConstants.ZOOKEEPER_SERVERS_NODE, ZkConstants.ZOOKEEPER_GFAC_SERVER_NODE); } - public static GroovyMap createGroovyMap(ProcessContext processContext) + public static GroovyMap crateGroovyMap(ProcessContext processContext) throws ApplicationSettingsException, AppCatalogException, GFacException { return createGroovyMap(processContext, null); } @@ -488,146 +488,140 @@ public class GFacUtils { throws GFacException, AppCatalogException, ApplicationSettingsException { GroovyMap groovyMap = new GroovyMap(); - try { - ProcessModel processModel = processContext.getProcessModel(); - ResourceJobManager resourceJobManager = processContext.getResourceJobManager(); - setMailAddresses(processContext, groovyMap); // set email options and addresses - - groovyMap.add(Script.INPUT_DIR, processContext.getInputDir()); - groovyMap.add(Script.OUTPUT_DIR, processContext.getOutputDir()); - groovyMap.add(Script.EXECUTABLE_PATH, processContext.getApplicationDeploymentDescription().getExecutablePath()); - groovyMap.add(Script.STANDARD_OUT_FILE, processContext.getStdoutLocation()); - groovyMap.add(Script.STANDARD_ERROR_FILE, processContext.getStderrLocation()); - groovyMap.add(Script.SCRATCH_LOCATION, processContext.getScratchLocation()); - groovyMap.add(Script.GATEWAY_ID, processContext.getGatewayId()); - groovyMap.add(Script.GATEWAY_USER_NAME, processContext.getProcessModel().getUserName()); - groovyMap.add(Script.APPLICATION_NAME, processContext.getApplicationInterfaceDescription().getApplicationName()); - - ComputeResourcePreference crp = getComputeResourcePreference(processContext); - if (isValid(crp.getAllocationProjectNumber())) { - groovyMap.add(Script.ACCOUNT_STRING, crp.getAllocationProjectNumber()); + ProcessModel processModel = processContext.getProcessModel(); + ResourceJobManager resourceJobManager = getResourceJobManager(processContext); + setMailAddresses(processContext, groovyMap); // set email options and addresses + + groovyMap.add(Script.INPUT_DIR, processContext.getInputDir()); + groovyMap.add(Script.OUTPUT_DIR, processContext.getOutputDir()); + groovyMap.add(Script.EXECUTABLE_PATH, processContext.getApplicationDeploymentDescription().getExecutablePath()); + groovyMap.add(Script.STANDARD_OUT_FILE, processContext.getStdoutLocation()); + groovyMap.add(Script.STANDARD_ERROR_FILE, processContext.getStderrLocation()); + groovyMap.add(Script.SCRATCH_LOCATION, processContext.getScratchLocation()); + groovyMap.add(Script.GATEWAY_ID, processContext.getGatewayId()); + groovyMap.add(Script.GATEWAY_USER_NAME, processContext.getProcessModel().getUserName()); + groovyMap.add(Script.APPLICATION_NAME, processContext.getApplicationInterfaceDescription().getApplicationName()); + + ComputeResourcePreference crp = getComputeResourcePreference(processContext); + if (isValid(crp.getAllocationProjectNumber())) { + groovyMap.add(Script.ACCOUNT_STRING, crp.getAllocationProjectNumber()); + } + groovyMap.add(Script.RESERVATION, getReservation(crp)); + + // To make job name alpha numeric + groovyMap.add(Script.JOB_NAME, "A" + String.valueOf(generateJobName())); + groovyMap.add(Script.WORKING_DIR, processContext.getWorkingDir()); + + List<String> inputValues = getProcessInputValues(processModel.getProcessInputs()); + inputValues.addAll(getProcessOutputValues(processModel.getProcessOutputs())); + groovyMap.add(Script.INPUTS, inputValues); + + groovyMap.add(Script.USER_NAME, processContext.getJobSubmissionRemoteCluster().getServerInfo().getUserName()); + groovyMap.add(Script.SHELL_NAME, "/bin/bash"); + // get walltime + if (taskContext != null) { + try { + JobSubmissionTaskModel jobSubmissionTaskModel = ((JobSubmissionTaskModel) taskContext.getSubTaskModel()); + if (jobSubmissionTaskModel.getWallTime() > 0) { + groovyMap.add(Script.MAX_WALL_TIME, + GFacUtils.maxWallTimeCalculator(jobSubmissionTaskModel.getWallTime())); + } + } catch (TException e) { + log.error("Error while getting job submission sub task model", e); } - groovyMap.add(Script.RESERVATION, getReservation(crp)); - - // To make job name alpha numeric - groovyMap.add(Script.JOB_NAME, "A" + String.valueOf(generateJobName())); - groovyMap.add(Script.WORKING_DIR, processContext.getWorkingDir()); + } - List<String> inputValues = getProcessInputValues(processModel.getProcessInputs()); - inputValues.addAll(getProcessOutputValues(processModel.getProcessOutputs())); - groovyMap.add(Script.INPUTS, inputValues); + // NOTE: Give precedence to data comes with experiment + ComputationalResourceSchedulingModel scheduling = processModel.getProcessResourceSchedule(); + if (scheduling != null) { + int totalNodeCount = scheduling.getNodeCount(); + int totalCPUCount = scheduling.getTotalCPUCount(); - groovyMap.add(Script.USER_NAME, processContext.getJobSubmissionRemoteCluster().getServerInfo().getUserName()); - groovyMap.add(Script.SHELL_NAME, "/bin/bash"); - // get walltime - if (taskContext != null) { - try { - JobSubmissionTaskModel jobSubmissionTaskModel = ((JobSubmissionTaskModel) taskContext.getSubTaskModel()); - if (jobSubmissionTaskModel.getWallTime() > 0) { + if (isValid(scheduling.getQueueName())) { + groovyMap.add(Script.QUEUE_NAME, scheduling.getQueueName()); + } + if (totalNodeCount > 0) { + groovyMap.add(Script.NODES, totalNodeCount); + } + // qos per queue + String qoS = getQoS(crp.getQualityOfService(), scheduling.getQueueName()); + if (qoS != null) { + groovyMap.add(Script.QUALITY_OF_SERVICE, qoS); + } + if (totalCPUCount > 0) { + int ppn = totalCPUCount / totalNodeCount; + groovyMap.add(Script.PROCESS_PER_NODE, ppn); + groovyMap.add(Script.CPU_COUNT, totalCPUCount); + } + // max wall time may be set before this level if jobsubmission task has wall time configured to this job, + // if so we ignore scheduling configuration. + if (scheduling.getWallTimeLimit() > 0 && groovyMap.get(Script.MAX_WALL_TIME) == null) { + groovyMap.add(Script.MAX_WALL_TIME, + GFacUtils.maxWallTimeCalculator(scheduling.getWallTimeLimit())); + if (resourceJobManager != null) { + if (resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)) { groovyMap.add(Script.MAX_WALL_TIME, - GFacUtils.maxWallTimeCalculator(jobSubmissionTaskModel.getWallTime())); + GFacUtils.maxWallTimeCalculator(scheduling.getWallTimeLimit())); } - } catch (TException e) { - log.error("Error while getting job submission sub task model", e); } } - - // NOTE: Give precedence to data comes with experiment - ComputationalResourceSchedulingModel scheduling = processModel.getProcessResourceSchedule(); - if (scheduling != null) { - int totalNodeCount = scheduling.getNodeCount(); - int totalCPUCount = scheduling.getTotalCPUCount(); - - if (isValid(scheduling.getQueueName())) { - groovyMap.add(Script.QUEUE_NAME, scheduling.getQueueName()); - } - if (totalNodeCount > 0) { - groovyMap.add(Script.NODES, totalNodeCount); - } - // qos per queue - String qoS = getQoS(crp.getQualityOfService(), scheduling.getQueueName()); - if (qoS != null) { - groovyMap.add(Script.QUALITY_OF_SERVICE, qoS); - } - if (totalCPUCount > 0) { - int ppn = totalCPUCount / totalNodeCount; - groovyMap.add(Script.PROCESS_PER_NODE, ppn); - groovyMap.add(Script.CPU_COUNT, totalCPUCount); - } - // max wall time may be set before this level if jobsubmission task has wall time configured to this job, - // if so we ignore scheduling configuration. - if (scheduling.getWallTimeLimit() > 0 && groovyMap.get(Script.MAX_WALL_TIME) == null) { - groovyMap.add(Script.MAX_WALL_TIME, - GFacUtils.maxWallTimeCalculator(scheduling.getWallTimeLimit())); - if (resourceJobManager != null) { - if (resourceJobManager.getResourceJobManagerType().equals(ResourceJobManagerType.LSF)) { - groovyMap.add(Script.MAX_WALL_TIME, - GFacUtils.maxWallTimeCalculator(scheduling.getWallTimeLimit())); - } - } - } - if (scheduling.getTotalPhysicalMemory() > 0) { - groovyMap.add(Script.USED_MEM, scheduling.getTotalPhysicalMemory()); - } - if (isValid(scheduling.getOverrideLoginUserName())) { - groovyMap.add(Script.USER_NAME, scheduling.getOverrideLoginUserName()); - } - if (isValid(scheduling.getOverrideAllocationProjectNumber())) { - groovyMap.add(Script.ACCOUNT_STRING, scheduling.getOverrideAllocationProjectNumber()); - } - if (isValid(scheduling.getStaticWorkingDir())) { - groovyMap.add(Script.WORKING_DIR, scheduling.getStaticWorkingDir()); - } - } else { - log.error("Task scheduling cannot be null at this point.."); + if (scheduling.getTotalPhysicalMemory() > 0) { + groovyMap.add(Script.USED_MEM, scheduling.getTotalPhysicalMemory()); } - - ApplicationDeploymentDescription appDepDescription = processContext.getApplicationDeploymentDescription(); - List<CommandObject> moduleCmds = appDepDescription.getModuleLoadCmds(); - if (moduleCmds != null) { - List<String> modulesCmdCollect = moduleCmds.stream() - .sorted((e1, e2) -> e1.getCommandOrder() - e2.getCommandOrder()) - .map(map -> map.getCommand()) - .collect(Collectors.toList()); - groovyMap.add(Script.MODULE_COMMANDS, modulesCmdCollect); + if (isValid(scheduling.getOverrideLoginUserName())) { + groovyMap.add(Script.USER_NAME, scheduling.getOverrideLoginUserName()); } - - List<CommandObject> preJobCommands = appDepDescription.getPreJobCommands(); - if (preJobCommands != null) { - List<String> preJobCmdCollect = preJobCommands.stream() - .sorted((e1, e2) -> e1.getCommandOrder() - e2.getCommandOrder()) - .map(map -> parseCommands(map.getCommand(), groovyMap)) - .collect(Collectors.toList()); - groovyMap.add(Script.PRE_JOB_COMMANDS, preJobCmdCollect); + if (isValid(scheduling.getOverrideAllocationProjectNumber())) { + groovyMap.add(Script.ACCOUNT_STRING, scheduling.getOverrideAllocationProjectNumber()); } - - List<CommandObject> postJobCommands = appDepDescription.getPostJobCommands(); - if (postJobCommands != null) { - List<String> postJobCmdCollect = postJobCommands.stream() - .sorted((e1, e2) -> e1.getCommandOrder() - e2.getCommandOrder()) - .map(map -> parseCommands(map.getCommand(), groovyMap)) - .collect(Collectors.toList()); - groovyMap.add(Script.POST_JOB_COMMANDS, postJobCmdCollect); + if (isValid(scheduling.getStaticWorkingDir())) { + groovyMap.add(Script.WORKING_DIR, scheduling.getStaticWorkingDir()); } + } else { + log.error("Task scheduling cannot be null at this point.."); + } - ApplicationParallelismType parallelism = appDepDescription.getParallelism(); - - if (parallelism != null && resourceJobManager != null) { - Map<ApplicationParallelismType, String> parallelismPrefix = resourceJobManager.getParallelismPrefix(); - if (parallelism != ApplicationParallelismType.SERIAL) { - if (parallelismPrefix != null){ - String parallelismCommand = parallelismPrefix.get(parallelism); - if (parallelismCommand != null){ - groovyMap.add(Script.JOB_SUBMITTER_COMMAND, parallelismCommand); - }else { - throw new GFacException("Parallelism prefix is not defined for given parallelism type " + parallelism + ".. Please define the parallelism prefix at App Catalog"); - } + ApplicationDeploymentDescription appDepDescription = processContext.getApplicationDeploymentDescription(); + List<CommandObject> moduleCmds = appDepDescription.getModuleLoadCmds(); + if (moduleCmds != null) { + List<String> modulesCmdCollect = moduleCmds.stream() + .sorted((e1, e2) -> e1.getCommandOrder() - e2.getCommandOrder()) + .map(map -> map.getCommand()) + .collect(Collectors.toList()); + groovyMap.add(Script.MODULE_COMMANDS, modulesCmdCollect); + } + + List<CommandObject> preJobCommands = appDepDescription.getPreJobCommands(); + if (preJobCommands != null) { + List<String> preJobCmdCollect = preJobCommands.stream() + .sorted((e1, e2) -> e1.getCommandOrder() - e2.getCommandOrder()) + .map(map -> parseCommands(map.getCommand(), groovyMap)) + .collect(Collectors.toList()); + groovyMap.add(Script.PRE_JOB_COMMANDS, preJobCmdCollect); + } + + List<CommandObject> postJobCommands = appDepDescription.getPostJobCommands(); + if (postJobCommands != null) { + List<String> postJobCmdCollect = postJobCommands.stream() + .sorted((e1, e2) -> e1.getCommandOrder() - e2.getCommandOrder()) + .map(map -> parseCommands(map.getCommand(), groovyMap)) + .collect(Collectors.toList()); + groovyMap.add(Script.POST_JOB_COMMANDS, postJobCmdCollect); + } + + ApplicationParallelismType parallelism = appDepDescription.getParallelism(); + Map<ApplicationParallelismType, String> parallelismPrefix = processContext.getResourceJobManager().getParallelismPrefix(); + if (parallelism != null) { + if (parallelism != ApplicationParallelismType.SERIAL) { + if (parallelismPrefix != null){ + String parallelismCommand = parallelismPrefix.get(parallelism); + if (parallelismCommand != null){ + groovyMap.add(Script.JOB_SUBMITTER_COMMAND, parallelismCommand); + }else { + throw new GFacException("Parallelism prefix is not defined for given parallelism type " + parallelism + ".. Please define the parallelism prefix at App Catalog"); } - } // FIXME what if type if SERIAL type + } } - } catch (Exception e) { - log.error("Error while creating groovy map", e); - throw e; } return groovyMap; } @@ -903,51 +897,26 @@ public class GFacUtils { } public static File createJobFile(GroovyMap groovyMap, TaskContext tc, JobManagerConfiguration jMC) - throws GFacException { - try { - int number = new SecureRandom().nextInt(); - number = (number < 0 ? -number : number); - File tempJobFile = new File(GFacUtils.getLocalDataDir(tc), "job_" + Integer.toString(number) + jMC.getScriptExtension()); - FileUtils.writeStringToFile(tempJobFile, generateScript(groovyMap, jMC.getJobDescriptionTemplateName())); - return tempJobFile; - } catch (IOException e) { - throw new GFacException("Error while writing script content to temp file"); - } - } + throws GFacException{ - public static String generateScript(GroovyMap groovyMap, String templateName) throws GFacException { - URL templateUrl = ApplicationSettings.loadFile(templateName); + URL templateUrl = ApplicationSettings.loadFile(jMC.getJobDescriptionTemplateName()); if (templateUrl == null) { - String error = "Template file '" + templateName + "' not found"; + String error = "System configuration file '" + jMC.getJobDescriptionTemplateName() + + "' not found in the classpath"; throw new GFacException(error); } - File template = new File(templateUrl.getPath()); - TemplateEngine engine = new GStringTemplateEngine(); - Writable make; try { - make = engine.createTemplate(template).make(groovyMap); - } catch (Exception e) { - throw new GFacException("Error while generating script using groovy map"); - } - return make.toString(); - } + File template = new File(templateUrl.getPath()); + TemplateEngine engine = new GStringTemplateEngine(); + Writable make = engine.createTemplate(template).make(groovyMap); - public static String getTemplateFileName(ResourceJobManagerType resourceJobManagerType) { - switch (resourceJobManagerType) { - case FORK: - return "UGE_Groovy.template"; - case PBS: - return "PBS_Groovy.template"; - case SLURM: - return "SLURM_Groovy.template"; - case UGE: - return "UGE_Groovy.template"; - case LSF: - return "LSF_Groovy.template"; - case CLOUD: - return "CLOUD_Groovy.template"; - default: - return null; + int number = new SecureRandom().nextInt(); + number = (number < 0 ? -number : number); + File tempJobFile = new File(GFacUtils.getLocalDataDir(tc), "job_" + Integer.toString(number) + jMC.getScriptExtension()); + FileUtils.writeStringToFile(tempJobFile, make.toString()); + return tempJobFile; + } catch (ClassNotFoundException | IOException e) { + throw new GFacException("Error while parsing template and generating script file"); } }
http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GroovyMap.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GroovyMap.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GroovyMap.java index a1f8132..1abc878 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GroovyMap.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GroovyMap.java @@ -63,15 +63,6 @@ public class GroovyMap extends HashMap<String, Object> { return get(script.name); } - public String getStringValue(Script script) { - Object obj = get(script); - if (obj instanceof String) { - return ((String) obj); - }else { - throw new ClassCastException("Value is not type for String"); - } - } - private void addDefaultValues() { this.add(Script.SHELL_NAME, null) .add(Script.QUEUE_NAME, null) http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java index 2a5afc1..5e8de6d 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java @@ -199,25 +199,22 @@ public abstract class Factory { .getResourceJobManagerType().name()); } - String templateFileName = GFacUtils.getTemplateFileName(resourceJobManager.getResourceJobManagerType()); - switch (resourceJobManager.getResourceJobManagerType()) { case PBS: - return new PBSJobConfiguration(templateFileName, ".pbs", resourceJobManager.getJobManagerBinPath(), + return new PBSJobConfiguration("PBS_Groovy.template", ".pbs", resourceJobManager.getJobManagerBinPath(), resourceJobManager.getJobManagerCommands(), outputParser); case SLURM: - return new SlurmJobConfiguration(templateFileName, ".slurm", resourceJobManager + return new SlurmJobConfiguration("SLURM_Groovy.template", ".slurm", resourceJobManager .getJobManagerBinPath(), resourceJobManager.getJobManagerCommands(), outputParser); case LSF: - return new LSFJobConfiguration(templateFileName, ".lsf", resourceJobManager.getJobManagerBinPath(), + return new LSFJobConfiguration("LSF_Groovy.template", ".lsf", resourceJobManager.getJobManagerBinPath(), resourceJobManager.getJobManagerCommands(), outputParser); case UGE: - return new UGEJobConfiguration(templateFileName, ".pbs", resourceJobManager.getJobManagerBinPath(), + return new UGEJobConfiguration("UGE_Groovy.template", ".pbs", resourceJobManager.getJobManagerBinPath(), resourceJobManager.getJobManagerCommands(), outputParser); case FORK: - return new ForkJobConfiguration(templateFileName, ".sh", resourceJobManager.getJobManagerBinPath(), + return new ForkJobConfiguration("FORK_Groovy.template", ".sh", resourceJobManager.getJobManagerBinPath(), resourceJobManager.getJobManagerCommands(), outputParser); - // We don't have a job configuration manager for CLOUD type default: return null; } http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmission.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmission.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmission.java new file mode 100644 index 0000000..0941c85 --- /dev/null +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmission.java @@ -0,0 +1,148 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.gfac.impl.task; + +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.airavata.cloud.aurora.client.AuroraThriftClient; +import org.apache.airavata.cloud.aurora.client.bean.IdentityBean; +import org.apache.airavata.cloud.aurora.client.bean.JobConfigBean; +import org.apache.airavata.cloud.aurora.client.bean.JobKeyBean; +import org.apache.airavata.cloud.aurora.client.bean.ProcessBean; +import org.apache.airavata.cloud.aurora.client.bean.ResourceBean; +import org.apache.airavata.cloud.aurora.client.bean.ResponseBean; +import org.apache.airavata.cloud.aurora.client.bean.TaskConfigBean; +import org.apache.airavata.cloud.aurora.util.AuroraThriftClientUtil; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.gfac.core.GFacException; +import org.apache.airavata.gfac.core.GFacUtils; +import org.apache.airavata.gfac.core.context.ProcessContext; +import org.apache.airavata.gfac.core.context.TaskContext; +import org.apache.airavata.gfac.core.task.JobSubmissionTask; +import org.apache.airavata.gfac.core.task.TaskException; +import org.apache.airavata.gfac.impl.AuroraUtils; +import org.apache.airavata.model.commons.ErrorModel; +import org.apache.airavata.model.job.JobModel; +import org.apache.airavata.model.status.JobState; +import org.apache.airavata.model.status.JobStatus; +import org.apache.airavata.model.status.TaskState; +import org.apache.airavata.model.status.TaskStatus; +import org.apache.airavata.model.task.TaskTypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AuroraJobSubmission implements JobSubmissionTask{ + + private static final Logger log = LoggerFactory.getLogger(AuroraJobSubmission.class); + + @Override + public JobStatus cancel(TaskContext taskcontext) throws TaskException { + JobStatus jobStatus = new JobStatus(); + jobStatus.setJobState(JobState.ACTIVE); + return jobStatus; + } + + @Override + public void init(Map<String, String> propertyMap) throws TaskException { + + } + + @Override + public TaskStatus execute(TaskContext taskContext) { + TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED); // set to completed. + ProcessContext processContext = taskContext.getParentProcessContext(); + JobModel jobModel = processContext.getJobModel(); + jobModel.setTaskId(taskContext.getTaskId()); + String jobIdAndName = "A" + GFacUtils.generateJobName(); + jobModel.setJobName(jobIdAndName); + JobStatus jobStatus = new JobStatus(); + jobStatus.setJobState(JobState.SUBMITTED); + + try { + JobKeyBean jobKey = new JobKeyBean(AuroraUtils.ENVIRONMENT, AuroraUtils.ROLE, jobIdAndName); + IdentityBean owner = new IdentityBean(AuroraUtils.ROLE); + // only autodoc vina + String workingDir = taskContext.getWorkingDir(); +// ProcessBean proc1 = new ProcessBean("process_1", "mkdir -p " + workingDir, false); +// ProcessBean proc2 = new ProcessBean("process_2", "cp -rf /home/centos/efs-mount-point/autodock-vina/* " + workingDir , false); + String executablePath = processContext.getApplicationDeploymentDescription().getExecutablePath(); + ProcessBean proc3 = new ProcessBean("process_3", "cd " + workingDir + " && sh " + executablePath, false); + Set<ProcessBean> processes = new LinkedHashSet<>(); +// processes.add(proc1); +// processes.add(proc2); + processes.add(proc3); + + ResourceBean resources = new ResourceBean(1.5, 512, 512); + + TaskConfigBean taskConfig = new TaskConfigBean("Airavata-Aurora-" + jobIdAndName, processes, resources); + JobConfigBean jobConfig = new JobConfigBean(jobKey, owner, taskConfig, AuroraUtils.CLUSTER); + + String executorConfigJson = AuroraThriftClientUtil.getExecutorConfigJson(jobConfig); + log.info("Executor Config for Job {} , {}", jobIdAndName, executorConfigJson); + + AuroraThriftClient client = AuroraThriftClient.getAuroraThriftClient(); + ResponseBean response = client.createJob(jobConfig); + log.info("Response for job {}, {}", jobIdAndName, response); + jobModel.setJobDescription(resources.toString()); + + jobModel.setJobId(jobIdAndName); + jobStatus.setReason("Successfully Submitted"); + jobModel.setJobStatuses(Arrays.asList(jobStatus )); + jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + taskContext.getParentProcessContext().setJobModel(jobModel); + + GFacUtils.saveJobModel(processContext, jobModel); + GFacUtils.saveJobStatus(processContext, jobModel); + taskStatus.setReason("Successfully submitted job to Aurora"); + } catch (Exception e) { + String msg = "Error occurred while submitting the job"; + log.error(msg, e); + taskStatus.setState(TaskState.FAILED); + taskStatus.setReason(msg); + taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + ErrorModel errorModel = new ErrorModel(); + errorModel.setActualErrorMessage(e.getMessage()); + errorModel.setUserFriendlyMessage(msg); + taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel)); + } + + taskContext.setTaskStatus(taskStatus); + try { + GFacUtils.saveAndPublishTaskStatus(taskContext); + } catch (GFacException e) { + log.error("Error while saving task status", e); + } + return taskStatus; + } + + @Override + public TaskStatus recover(TaskContext taskContext) { + return execute(taskContext); + } + + @Override + public TaskTypes getType() { + return TaskTypes.JOB_SUBMISSION; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmissionTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmissionTask.java deleted file mode 100644 index a987559..0000000 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/AuroraJobSubmissionTask.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.airavata.gfac.impl.task; - -import java.util.Arrays; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; - -import org.apache.airavata.cloud.aurora.client.AuroraThriftClient; -import org.apache.airavata.cloud.aurora.client.bean.IdentityBean; -import org.apache.airavata.cloud.aurora.client.bean.JobConfigBean; -import org.apache.airavata.cloud.aurora.client.bean.JobKeyBean; -import org.apache.airavata.cloud.aurora.client.bean.ProcessBean; -import org.apache.airavata.cloud.aurora.client.bean.ResourceBean; -import org.apache.airavata.cloud.aurora.client.bean.ResponseBean; -import org.apache.airavata.cloud.aurora.client.bean.TaskConfigBean; -import org.apache.airavata.cloud.aurora.util.AuroraThriftClientUtil; -import org.apache.airavata.common.utils.AiravataUtils; -import org.apache.airavata.gfac.core.GFacException; -import org.apache.airavata.gfac.core.GFacUtils; -import org.apache.airavata.gfac.core.GroovyMap; -import org.apache.airavata.gfac.core.Script; -import org.apache.airavata.gfac.core.context.ProcessContext; -import org.apache.airavata.gfac.core.context.TaskContext; -import org.apache.airavata.gfac.core.task.JobSubmissionTask; -import org.apache.airavata.gfac.core.task.TaskException; -import org.apache.airavata.gfac.impl.AuroraUtils; -import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType; -import org.apache.airavata.model.commons.ErrorModel; -import org.apache.airavata.model.job.JobModel; -import org.apache.airavata.model.status.JobState; -import org.apache.airavata.model.status.JobStatus; -import org.apache.airavata.model.status.TaskState; -import org.apache.airavata.model.status.TaskStatus; -import org.apache.airavata.model.task.TaskTypes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AuroraJobSubmissionTask implements JobSubmissionTask{ - - private static final Logger log = LoggerFactory.getLogger(AuroraJobSubmissionTask.class); - - @Override - public JobStatus cancel(TaskContext taskcontext) throws TaskException { - JobStatus jobStatus = new JobStatus(); - jobStatus.setJobState(JobState.ACTIVE); - return jobStatus; - } - - @Override - public void init(Map<String, String> propertyMap) throws TaskException { - - } - - @Override - public TaskStatus execute(TaskContext taskContext) { - TaskStatus taskStatus = new TaskStatus(TaskState.COMPLETED); // set to completed. - ProcessContext processContext = taskContext.getParentProcessContext(); - JobModel jobModel = processContext.getJobModel(); - jobModel.setTaskId(taskContext.getTaskId()); - String jobIdAndName = "A" + GFacUtils.generateJobName(); - jobModel.setJobName(jobIdAndName); - JobStatus jobStatus = new JobStatus(); - jobStatus.setJobState(JobState.SUBMITTED); - - try { - JobKeyBean jobKey = new JobKeyBean(AuroraUtils.ENVIRONMENT, AuroraUtils.ROLE, jobIdAndName); - IdentityBean owner = new IdentityBean(AuroraUtils.ROLE); - GroovyMap groovyMap = GFacUtils.createGroovyMap(processContext, taskContext); - groovyMap.add(Script.JOB_SUBMITTER_COMMAND, "sh"); - String templateFileName = GFacUtils.getTemplateFileName(ResourceJobManagerType.CLOUD); - String script = GFacUtils.generateScript(groovyMap, templateFileName); - ProcessBean process_1 = new ProcessBean("process_1", script, false); - - Set<ProcessBean> processes = new LinkedHashSet<>(); - processes.add(process_1); - ResourceBean resources = new ResourceBean(1.5, 512, 512); - TaskConfigBean taskConfig = new TaskConfigBean("Airavata-Aurora-" + jobIdAndName, processes, resources); - JobConfigBean jobConfig = new JobConfigBean(jobKey, owner, taskConfig, AuroraUtils.CLUSTER); - - String executorConfigJson = AuroraThriftClientUtil.getExecutorConfigJson(jobConfig); - log.info("Executor Config for Job {} , {}", jobIdAndName, executorConfigJson); - - AuroraThriftClient client = AuroraThriftClient.getAuroraThriftClient(); - ResponseBean response = client.createJob(jobConfig); - log.info("Response for job {}, {}", jobIdAndName, response); - jobModel.setJobDescription(resources.toString()); - - jobModel.setJobId(jobIdAndName); - jobStatus.setReason("Successfully Submitted"); - jobModel.setJobStatuses(Arrays.asList(jobStatus )); - jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - taskContext.getParentProcessContext().setJobModel(jobModel); - - GFacUtils.saveJobModel(processContext, jobModel); - GFacUtils.saveJobStatus(processContext, jobModel); - taskStatus.setReason("Successfully submitted job to Aurora"); - } catch (Throwable e) { - String msg = "Error occurred while submitting Aurora job"; - log.error(msg, e); - taskStatus.setState(TaskState.FAILED); - taskStatus.setReason(msg); - taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - ErrorModel errorModel = new ErrorModel(); - errorModel.setActualErrorMessage(e.getMessage()); - errorModel.setUserFriendlyMessage(msg); - taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel)); - } - - taskContext.setTaskStatus(taskStatus); - try { - GFacUtils.saveAndPublishTaskStatus(taskContext); - } catch (GFacException e) { - log.error("Error while saving task status", e); - } - return taskStatus; - } - - @Override - public TaskStatus recover(TaskContext taskContext) { - return execute(taskContext); - } - - @Override - public TaskTypes getType() { - return TaskTypes.JOB_SUBMISSION; - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java index deabb95..657de00 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/DefaultJobSubmissionTask.java @@ -73,7 +73,7 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask { jobModel.setTaskId(taskContext.getTaskId()); RemoteCluster remoteCluster = processContext.getJobSubmissionRemoteCluster(); GroovyMap groovyMap = GFacUtils.createGroovyMap(processContext, taskContext); - jobModel.setJobName(groovyMap.getStringValue(Script.JOB_NAME)); + jobModel.setJobName(groovyMap.get(Script.JOB_NAME).toString()); ResourceJobManager resourceJobManager = GFacUtils.getResourceJobManager(processContext); JobManagerConfiguration jConfig = null; if (resourceJobManager != null) { @@ -278,16 +278,8 @@ public class DefaultJobSubmissionTask implements JobSubmissionTask { errorModel.setActualErrorMessage(e.getMessage()); errorModel.setUserFriendlyMessage(msg); taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel)); - } catch (Throwable e) { - String msg = "JobSubmission failed"; - log.error(msg, e); - taskStatus.setState(TaskState.FAILED); - taskStatus.setReason(msg); - taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - ErrorModel errorModel = new ErrorModel(); - errorModel.setActualErrorMessage(e.getMessage()); - errorModel.setUserFriendlyMessage(msg); - taskContext.getTaskModel().setTaskErrors(Arrays.asList(errorModel)); + } catch (RegistryException e) { + e.printStackTrace(); } taskContext.setTaskStatus(taskStatus); http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/orchestrator/orchestrator-client/src/main/java/org/apache/airavata/orchestrator/cpi/OrchestratorService.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-client/src/main/java/org/apache/airavata/orchestrator/cpi/OrchestratorService.java b/modules/orchestrator/orchestrator-client/src/main/java/org/apache/airavata/orchestrator/cpi/OrchestratorService.java index fc8362e..ccc309f 100644 --- a/modules/orchestrator/orchestrator-client/src/main/java/org/apache/airavata/orchestrator/cpi/OrchestratorService.java +++ b/modules/orchestrator/orchestrator-client/src/main/java/org/apache/airavata/orchestrator/cpi/OrchestratorService.java @@ -22,35 +22,23 @@ */ package org.apache.airavata.orchestrator.cpi; +import org.apache.thrift.EncodingUtils; +import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.protocol.TTupleProtocol; import org.apache.thrift.scheme.IScheme; import org.apache.thrift.scheme.SchemeFactory; import org.apache.thrift.scheme.StandardScheme; - import org.apache.thrift.scheme.TupleScheme; -import org.apache.thrift.protocol.TTupleProtocol; -import org.apache.thrift.protocol.TProtocolException; -import org.apache.thrift.EncodingUtils; -import org.apache.thrift.TException; -import org.apache.thrift.async.AsyncMethodCallback; -import org.apache.thrift.server.AbstractNonblockingServer.*; -import java.util.List; -import java.util.ArrayList; -import java.util.Map; -import java.util.HashMap; -import java.util.EnumMap; -import java.util.Set; -import java.util.HashSet; -import java.util.EnumSet; -import java.util.Collections; -import java.util.BitSet; -import java.nio.ByteBuffer; -import java.util.Arrays; -import javax.annotation.Generated; +import org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Generated; +import java.util.*; + @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-08-09") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-11-08") public class OrchestratorService { public interface Iface { http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/orchestrator/orchestrator-service/pom.xml ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-service/pom.xml b/modules/orchestrator/orchestrator-service/pom.xml index 44b11a4..ecb8325 100644 --- a/modules/orchestrator/orchestrator-service/pom.xml +++ b/modules/orchestrator/orchestrator-service/pom.xml @@ -75,6 +75,11 @@ <artifactId>airavata-commons</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>cluster-monitoring</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <properties> http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java index 7b33e82..f025e67 100644 --- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java +++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java @@ -21,8 +21,8 @@ package org.apache.airavata.orchestrator.server; -import java.net.InetSocketAddress; - +import org.apache.airavata.cluster.monitoring.ClusterStatusMonitorJobScheduler; +import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.IServer; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.orchestrator.cpi.OrchestratorService; @@ -32,9 +32,12 @@ import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TTransportException; +import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; + public class OrchestratorServer implements IServer { private final static Logger logger = LoggerFactory.getLogger(OrchestratorServer.class); @@ -45,6 +48,8 @@ public class OrchestratorServer implements IServer { private TServer server; + private ClusterStatusMonitorJobScheduler clusterStatusMonitorJobScheduler; + public OrchestratorServer() { setStatus(ServerStatus.STOPPED); } @@ -95,6 +100,11 @@ public class OrchestratorServer implements IServer { } } + public void startClusterStatusMonitoring() throws SchedulerException, ApplicationSettingsException { + clusterStatusMonitorJobScheduler = new ClusterStatusMonitorJobScheduler(); + clusterStatusMonitorJobScheduler.scheduleClusterStatusMonitoring(); + } + public static void main(String[] args) { try { new OrchestratorServer().start(); @@ -105,6 +115,9 @@ public class OrchestratorServer implements IServer { @Override public void start() throws Exception { + //starting cluster status monitoring + startClusterStatusMonitoring(); + setStatus(ServerStatus.STARTING); OrchestratorService.Processor<OrchestratorServerHandler> orchestratorService = new OrchestratorService.Processor<OrchestratorServerHandler>(new OrchestratorServerHandler()); http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/ResourceType.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/ResourceType.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/ResourceType.java index 1d7aeba..b1370c9 100644 --- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/ResourceType.java +++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/ResourceType.java @@ -43,5 +43,6 @@ public enum ResourceType { TASK_ERROR, TASK_STATUS, JOB, - JOB_STATUS + JOB_STATUS, + QUEUE_STATUS } http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/impl/ExperimentCatalogImpl.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/impl/ExperimentCatalogImpl.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/impl/ExperimentCatalogImpl.java index 5264158..a8e2338 100644 --- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/impl/ExperimentCatalogImpl.java +++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/impl/ExperimentCatalogImpl.java @@ -32,10 +32,7 @@ import org.apache.airavata.model.experiment.UserConfigurationDataModel; import org.apache.airavata.model.job.JobModel; import org.apache.airavata.model.process.ProcessModel; import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel; -import org.apache.airavata.model.status.ExperimentStatus; -import org.apache.airavata.model.status.JobStatus; -import org.apache.airavata.model.status.ProcessStatus; -import org.apache.airavata.model.status.TaskStatus; +import org.apache.airavata.model.status.*; import org.apache.airavata.model.task.TaskModel; import org.apache.airavata.model.workspace.Gateway; import org.apache.airavata.model.workspace.Notification; @@ -50,6 +47,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class ExperimentCatalogImpl implements ExperimentCatalog { private GatewayResource gatewayResource; @@ -129,6 +127,8 @@ public class ExperimentCatalogImpl implements ExperimentCatalog { return gatewayRegistry.addGateway((Gateway)newObjectToAdd); case NOTIFICATION: return notificationRegistry.createNotification((Notification)newObjectToAdd); + case QUEUE_STATUS: + return experimentRegistry.createQueueStatuses((List<QueueStatusModel>) newObjectToAdd); default: logger.error("Unsupported top level type..", new UnsupportedOperationException()); throw new UnsupportedOperationException(); @@ -466,16 +466,16 @@ public class ExperimentCatalogImpl implements ExperimentCatalog { case PROJECT: List<Project> projectList = projectRegistry .getProjectList(fieldName, value, limit, offset, orderByIdentifier, resultOrderType); - for (Project project : projectList ){ - result.add(project); - } + result.addAll(projectList.stream().collect(Collectors.toList())); return result; case EXPERIMENT: List<ExperimentModel> experimentList = experimentRegistry.getExperimentList(fieldName, value, limit, offset, orderByIdentifier, resultOrderType); - for (ExperimentModel experiment : experimentList) { - result.add(experiment); - } + result.addAll(experimentList.stream().collect(Collectors.toList())); + return result; + case QUEUE_STATUS: + List<QueueStatusModel> queueStatusModelsList = experimentRegistry.getLatestQueueStatuses(); + result.addAll(queueStatusModelsList.stream().collect(Collectors.toList())); return result; default: logger.error("Unsupported data type...", new UnsupportedOperationException()); http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/impl/ExperimentRegistry.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/impl/ExperimentRegistry.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/impl/ExperimentRegistry.java index 8fdf299..5599a4b 100644 --- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/impl/ExperimentRegistry.java +++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/impl/ExperimentRegistry.java @@ -35,6 +35,7 @@ import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel import org.apache.airavata.model.status.*; import org.apache.airavata.model.task.TaskModel; import org.apache.airavata.registry.core.experiment.catalog.ExpCatResourceUtils; +import org.apache.airavata.registry.core.experiment.catalog.ExperimentCatResource; import org.apache.airavata.registry.core.experiment.catalog.ResourceType; import org.apache.airavata.registry.core.experiment.catalog.resources.*; import org.apache.airavata.registry.core.experiment.catalog.utils.ThriftDataModelConversion; @@ -1687,6 +1688,38 @@ public class ExperimentRegistry { } } + public boolean createQueueStatuses(List<QueueStatusModel> queueStatusModels) throws RegistryException { + for(QueueStatusModel qModel : queueStatusModels){ + QueueStatusResource queueStatusResource = new QueueStatusResource(); + queueStatusResource.setHostName(qModel.getHostName()); + queueStatusResource.setQueueName(qModel.getQueueName()); + queueStatusResource.setTime(qModel.getTime()); + queueStatusResource.setQueueUp(qModel.isQueueUp()); + queueStatusResource.setRunningJobs(qModel.getRunningJobs()); + queueStatusResource.setQueuedJobs(qModel.getQueuedJobs()); + + queueStatusResource.save(); + } + return true; + } + + public List<QueueStatusModel> getLatestQueueStatuses() throws RegistryException { + List<QueueStatusModel> queueStatusModels = new ArrayList<>(); + List<ExperimentCatResource> queueStatusResources = (new QueueStatusResource()).get(ResourceType.QUEUE_STATUS); + for(ExperimentCatResource r : queueStatusResources){ + QueueStatusResource qResource = (QueueStatusResource) r; + QueueStatusModel queueStatusModel = new QueueStatusModel(); + queueStatusModel.setHostName(qResource.getHostName()); + queueStatusModel.setQueueName(qResource.getQueueName()); + queueStatusModel.setTime(qResource.getTime()); + queueStatusModel.setQueueUp(qResource.getQueueUp()); + queueStatusModel.setRunningJobs(qResource.getRunningJobs()); + queueStatusModel.setQueuedJobs(qResource.getQueuedJobs()); + queueStatusModels.add(queueStatusModel); + } + return queueStatusModels; + } + public String getStatusID(String parentId) { String status = parentId.replaceAll("\\s", ""); return status + "_" + UUID.randomUUID(); http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/QueueStatus.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/QueueStatus.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/QueueStatus.java new file mode 100644 index 0000000..2fdaedb --- /dev/null +++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/QueueStatus.java @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.registry.core.experiment.catalog.model; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.persistence.*; + +@Entity +@Table(name = "QUEUE_STATUS") +@IdClass(QueueStatusPK.class) +public class QueueStatus { + private final static Logger logger = LoggerFactory.getLogger(QueueStatus.class); + private String hostName; + private String queueName; + private Boolean queueUp; + private int runningJobs; + private int queuedJobs; + private Long time; + + @Id + @Column(name = "HOST_NAME") + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + @Id + @Column(name = "QUEUE_NAME") + public String getQueueName() { + return queueName; + } + + public void setQueueName(String queueName) { + this.queueName = queueName; + } + + @Id + @Column(name = "CREATED_TIME") + public Long getTime() { + return time; + } + + public void setTime(Long time) { + this.time = time; + } + + @Basic + @Column(name = "QUEUE_UP") + public Boolean getQueueUp() { + return queueUp; + } + + public void setQueueUp(Boolean queueUp) { + this.queueUp = queueUp; + } + + @Basic + @Column(name = "RUNNING_JOBS") + public int getRunningJobs() { + return runningJobs; + } + + public void setRunningJobs(int runningJobs) { + this.runningJobs = runningJobs; + } + + @Basic + @Column(name = "QUEUED_JOBS") + public int getQueuedJobs() { + return queuedJobs; + } + + public void setQueuedJobs(int queuedJobs) { + this.queuedJobs = queuedJobs; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/QueueStatusPK.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/QueueStatusPK.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/QueueStatusPK.java new file mode 100644 index 0000000..417971f --- /dev/null +++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/model/QueueStatusPK.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.registry.core.experiment.catalog.model; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.persistence.Column; +import javax.persistence.Id; +import java.io.Serializable; + +public class QueueStatusPK implements Serializable { + private final static Logger logger = LoggerFactory.getLogger(QueueStatusPK.class); + private String hostName; + private String queueName; + private Long time; + + + @Column(name = "HOST_NAME") + @Id + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + @Column(name = "QUEUE_NAME") + @Id + public String getQueueName() { + return queueName; + } + + public void setQueueName(String queueName) { + this.queueName = queueName; + } + + @Column(name = "CREATED_TIME") + @Id + public Long getTime() { + return time; + } + + public void setTime(Long time) { + this.time = time; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + QueueStatusPK that = (QueueStatusPK) o; + + if (getHostName() != null ? !getHostName().equals(that.getHostName()) : that.getHostName() != null) return false; + if (getQueueName() != null ? !getQueueName().equals(that.getQueueName()) : that.getQueueName() != null) return false; + if (getTime() != null ? !getTime().equals(that.getTime()) : that.getTime() != null) return false; + + return true; + } + + @Override + public int hashCode() { + int result = getHostName() != null ? getHostName().hashCode() : 0; + result = 31 * result + (getQueueName() != null ? getQueueName().hashCode() : 0); + result = 31 * result + (getTime() != null ? getTime().hashCode() : 0); + return result; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/QueueStatusResource.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/QueueStatusResource.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/QueueStatusResource.java new file mode 100644 index 0000000..620d3c0 --- /dev/null +++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/experiment/catalog/resources/QueueStatusResource.java @@ -0,0 +1,208 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.registry.core.experiment.catalog.resources; + +import org.apache.airavata.registry.core.experiment.catalog.ExpCatResourceUtils; +import org.apache.airavata.registry.core.experiment.catalog.ExperimentCatResource; +import org.apache.airavata.registry.core.experiment.catalog.ResourceType; +import org.apache.airavata.registry.core.experiment.catalog.model.QueueStatus; +import org.apache.airavata.registry.cpi.RegistryException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.persistence.EntityManager; +import javax.persistence.Query; +import java.util.ArrayList; +import java.util.List; + +public class QueueStatusResource extends AbstractExpCatResource { + private final static Logger logger = LoggerFactory.getLogger(QueueStatusResource.class); + private String hostName; + private String queueName; + private Long time; + private Boolean queueUp; + private Integer runningJobs; + private Integer queuedJobs; + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public String getQueueName() { + return queueName; + } + + public void setQueueName(String queueName) { + this.queueName = queueName; + } + + public Long getTime() { + return time; + } + + public void setTime(Long time) { + this.time = time; + } + + public Boolean getQueueUp() { + return queueUp; + } + + public void setQueueUp(Boolean queueUp) { + this.queueUp = queueUp; + } + + public Integer getRunningJobs() { + return runningJobs; + } + + public void setRunningJobs(Integer runningJobs) { + this.runningJobs = runningJobs; + } + + public Integer getQueuedJobs() { + return queuedJobs; + } + + public void setQueuedJobs(Integer queuedJobs) { + this.queuedJobs = queuedJobs; + } + + + /** + * This method will create associate resource objects for the given resource type. + * + * @param type child resource type + * @return associate child resource + */ + @Override + public ExperimentCatResource create(ResourceType type) throws RegistryException { + throw new RegistryException("Method not supported...!!!"); + } + + /** + * This method will remove the given child resource from the database + * + * @param type child resource type + * @param name child resource name + */ + @Override + public void remove(ResourceType type, Object name) throws RegistryException { + throw new RegistryException("Method not supported...!!!"); + } + + /** + * This method will return the given child resource from the database + * + * @param type child resource type + * @param name child resource name + * @return associate child resource + */ + @Override + public ExperimentCatResource get(ResourceType type, Object name) throws RegistryException { + throw new RegistryException("Method not supported...!!!"); + } + + /** + * This method will list all the child resources for the given resource type + * + * @param type child resource type + * @return list of child resources of the given child resource type + */ + @Override + public List<ExperimentCatResource> get(ResourceType type) throws RegistryException { + List<ExperimentCatResource> result = new ArrayList<>(); + EntityManager em = null; + try { + String query = "SELECT q from QueueStatus q WHERE q.time IN (SELECT MAX(q2.time) FROM QueueStatus q2 GROUP BY q2.hostName, q2.queueName)"; + em = ExpCatResourceUtils.getEntityManager(); + em.getTransaction().begin(); + Query q = em.createQuery(query); + List resultList = q.getResultList(); + for (Object o : resultList) { + QueueStatus queueStatus = (QueueStatus) o; + QueueStatusResource queueStatusResource = new QueueStatusResource(); + queueStatusResource.setHostName(queueStatus.getHostName()); + queueStatusResource.setQueueName(queueStatus.getQueueName()); + queueStatusResource.setTime(queueStatus.getTime()); + queueStatusResource.setQueueUp(queueStatus.getQueueUp()); + queueStatusResource.setQueuedJobs(queueStatus.getQueuedJobs()); + queueStatusResource.setRunningJobs(queueStatus.getRunningJobs()); + result.add(queueStatusResource); + } + em.getTransaction().commit(); + em.close(); + + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RegistryException(e); + } finally { + if (em != null && em.isOpen()) { + if (em.getTransaction().isActive()) { + em.getTransaction().rollback(); + } + em.close(); + } + } + return result; + } + + /** + * This method will save the resource to the database. + */ + @Override + public void save() throws RegistryException { + EntityManager em = null; + try { + em = ExpCatResourceUtils.getEntityManager(); + QueueStatus queueStatus = new QueueStatus(); + queueStatus.setHostName(hostName); + queueStatus.setQueueName(queueName); + queueStatus.setTime(time); + queueStatus.setQueueUp(queueUp); + queueStatus.setRunningJobs(runningJobs); + queueStatus.setQueuedJobs(queuedJobs); + em.getTransaction().begin(); + em.persist(queueStatus); + em.getTransaction().commit(); + if (em.isOpen()) { + if (em.getTransaction().isActive()){ + em.getTransaction().rollback(); + } + em.close(); + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RegistryException(e); + } finally { + if (em != null && em.isOpen()) { + if (em.getTransaction().isActive()){ + em.getTransaction().rollback(); + } + em.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/registry/registry-core/src/main/resources/META-INF/persistence.xml ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/resources/META-INF/persistence.xml b/modules/registry/registry-core/src/main/resources/META-INF/persistence.xml index d3558a5..c3f1a0f 100644 --- a/modules/registry/registry-core/src/main/resources/META-INF/persistence.xml +++ b/modules/registry/registry-core/src/main/resources/META-INF/persistence.xml @@ -95,6 +95,7 @@ <class>org.apache.airavata.registry.core.experiment.catalog.model.Job</class> <class>org.apache.airavata.registry.core.experiment.catalog.model.JobStatus</class> <class>org.apache.airavata.registry.core.experiment.catalog.model.Notification</class> + <class>org.apache.airavata.registry.core.experiment.catalog.model.QueueStatus</class> <exclude-unlisted-classes>true</exclude-unlisted-classes> </persistence-unit> <persistence-unit name="replicacatalog_data"> http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/registry/registry-core/src/main/resources/expcatalog-derby.sql ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/resources/expcatalog-derby.sql b/modules/registry/registry-core/src/main/resources/expcatalog-derby.sql index 5fd86e7..c1dcb05 100644 --- a/modules/registry/registry-core/src/main/resources/expcatalog-derby.sql +++ b/modules/registry/registry-core/src/main/resources/expcatalog-derby.sql @@ -371,6 +371,16 @@ CREATE TABLE JOB_STATUS ( FOREIGN KEY (JOB_ID, TASK_ID) REFERENCES JOB(JOB_ID, TASK_ID) ON DELETE CASCADE ); +CREATE TABLE QUEUE_STATUS( + HOST_NAME VARCHAR(255), + QUEUE_NAME VARCHAR(255), + CREATED_TIME BIGINT, + QUEUE_UP BOOLEAN, + RUNNING_JOBS INT, + QUEUED_JOBS INT, + PRIMARY KEY (HOST_NAME, QUEUE_NAME, CREATED_TIME) +); + CREATE TABLE CONFIGURATION ( CONFIG_KEY VARCHAR(255), http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/registry/registry-core/src/main/resources/expcatalog-mysql.sql ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/main/resources/expcatalog-mysql.sql b/modules/registry/registry-core/src/main/resources/expcatalog-mysql.sql index 78bdb99..3b7fc38 100644 --- a/modules/registry/registry-core/src/main/resources/expcatalog-mysql.sql +++ b/modules/registry/registry-core/src/main/resources/expcatalog-mysql.sql @@ -374,6 +374,15 @@ CREATE TABLE JOB_STATUS ( FOREIGN KEY (JOB_ID, TASK_ID) REFERENCES JOB(JOB_ID, TASK_ID) ON DELETE CASCADE ); +CREATE TABLE QUEUE_STATUS( + HOST_NAME VARCHAR(255), + QUEUE_NAME VARCHAR(255), + CREATED_TIME INT(11), + QUEUE_UP TINYINT(1), + RUNNING_JOBS INT(11), + QUEUED_JOBS INT(11), + PRIMARY KEY (HOST_NAME, QUEUE_NAME, CREATED_TIME) +); CREATE TABLE CONFIGURATION ( http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/registry/registry-core/src/test/java/org/apache/airavata/experiment/catalog/QueueStatusResourceTest.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-core/src/test/java/org/apache/airavata/experiment/catalog/QueueStatusResourceTest.java b/modules/registry/registry-core/src/test/java/org/apache/airavata/experiment/catalog/QueueStatusResourceTest.java new file mode 100644 index 0000000..2c76dac --- /dev/null +++ b/modules/registry/registry-core/src/test/java/org/apache/airavata/experiment/catalog/QueueStatusResourceTest.java @@ -0,0 +1,76 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +package org.apache.airavata.experiment.catalog; + +import junit.framework.Assert; +import org.apache.airavata.registry.core.experiment.catalog.ExperimentCatResource; +import org.apache.airavata.registry.core.experiment.catalog.ResourceType; +import org.apache.airavata.registry.core.experiment.catalog.resources.QueueStatusResource; +import org.apache.airavata.registry.cpi.RegistryException; +import org.junit.Test; + +import java.util.List; + +public class QueueStatusResourceTest extends AbstractResourceTest { + + @Test + public void test(){ + QueueStatusResource queueStatusResource1 = new QueueStatusResource(); + queueStatusResource1.setHostName("bigred2.uits.iu.edu"); + queueStatusResource1.setQueueName("cpu"); + queueStatusResource1.setTime((long) 1 + System.currentTimeMillis()); + queueStatusResource1.setQueueUp(true); + queueStatusResource1.setRunningJobs(3); + queueStatusResource1.setQueuedJobs(4); + try { + queueStatusResource1.save(); + } catch (RegistryException e) { + e.printStackTrace(); + Assert.fail(); + } + + QueueStatusResource queueStatusResource2 = new QueueStatusResource(); + queueStatusResource2.setHostName("bigred2.uits.iu.edu"); + queueStatusResource2.setQueueName("cpu"); + queueStatusResource2.setTime((long) 2 + System.currentTimeMillis()); + queueStatusResource2.setQueueUp(true); + queueStatusResource2.setRunningJobs(33); + queueStatusResource2.setQueuedJobs(44); + try { + queueStatusResource2.save(); + } catch (RegistryException e) { + e.printStackTrace(); + Assert.fail(); + } + + try { + List<ExperimentCatResource> experimentCatResources = queueStatusResource1.get(ResourceType.QUEUE_STATUS); + Assert.assertTrue(experimentCatResources.size()==1); + QueueStatusResource queueStatusResource = (QueueStatusResource) experimentCatResources.get(0); + Assert.assertEquals(queueStatusResource2.getTime(), queueStatusResource.getTime()); + } catch (RegistryException e) { + e.printStackTrace(); + Assert.fail(); + } + + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/ExpCatParentDataType.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/ExpCatParentDataType.java b/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/ExpCatParentDataType.java index b9eca68..5ac404a 100644 --- a/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/ExpCatParentDataType.java +++ b/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/ExpCatParentDataType.java @@ -26,7 +26,8 @@ public enum ExpCatParentDataType { PROJECT, EXPERIMENT, GATEWAY, - NOTIFICATION + NOTIFICATION, + QUEUE_STATUS } http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/ExperimentCatalogModelType.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/ExperimentCatalogModelType.java b/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/ExperimentCatalogModelType.java index 433ce60..f9b7dcc 100644 --- a/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/ExperimentCatalogModelType.java +++ b/modules/registry/registry-cpi/src/main/java/org/apache/airavata/registry/cpi/ExperimentCatalogModelType.java @@ -43,5 +43,6 @@ public enum ExperimentCatalogModelType { TASK_STATUS, TASK_ERROR, JOB, - JOB_STATUS + JOB_STATUS, + QUEUE_STATUS } http://git-wip-us.apache.org/repos/asf/airavata/blob/914799c1/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/handler/RegistryServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/handler/RegistryServerHandler.java b/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/handler/RegistryServerHandler.java index 822da35..535b265 100644 --- a/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/handler/RegistryServerHandler.java +++ b/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/handler/RegistryServerHandler.java @@ -20,6 +20,7 @@ */ package org.apache.airavata.registry.api.service.handler; +import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.model.WorkflowModel; @@ -30,10 +31,10 @@ import org.apache.airavata.model.appcatalog.computeresource.*; import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile; import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference; +import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription; +import org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference; import org.apache.airavata.model.appcatalog.userresourceprofile.UserResourceProfile; import org.apache.airavata.model.appcatalog.userresourceprofile.UserStoragePreference; -import org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference; -import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription; import org.apache.airavata.model.application.io.InputDataObjectType; import org.apache.airavata.model.application.io.OutputDataObjectType; import org.apache.airavata.model.data.movement.DMType; @@ -48,6 +49,7 @@ import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel import org.apache.airavata.model.status.ExperimentState; import org.apache.airavata.model.status.ExperimentStatus; import org.apache.airavata.model.status.JobStatus; +import org.apache.airavata.model.status.QueueStatusModel; import org.apache.airavata.model.task.TaskModel; import org.apache.airavata.model.workspace.Gateway; import org.apache.airavata.model.workspace.Notification; @@ -66,7 +68,6 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.util.*; public class RegistryServerHandler implements RegistryService.Iface { @@ -4147,6 +4148,41 @@ public class RegistryServerHandler implements RegistryService.Iface { } /** + * * Get queue statuses of all compute resources + * * + */ + @Override + public List<QueueStatusModel> getLatestQueueStatuses() throws RegistryServiceException, TException { + try { + experimentCatalog = RegistryFactory.getExperimentCatalog(ServerSettings.getDefaultUserGateway()); + List<Object> temp = experimentCatalog.get(ExperimentCatalogModelType.QUEUE_STATUS, null, null, -1, 0, null, null); + List<QueueStatusModel> queueStatusModels = new ArrayList<>(); + temp.stream().forEach(t->{ + queueStatusModels.add((QueueStatusModel)t); + }); + return queueStatusModels; + } catch (RegistryException | ApplicationSettingsException e) { + logger.error("Error while reading queue status models....", e); + RegistryServiceException exception = new RegistryServiceException(); + exception.setMessage("Error while reading queue status models.... : " + e.getMessage()); + throw exception; + } + } + + @Override + public void registerQueueStatuses(List<QueueStatusModel> queueStatuses) throws RegistryServiceException, TException { + try { + experimentCatalog = RegistryFactory.getExperimentCatalog(ServerSettings.getDefaultUserGateway()); + experimentCatalog.add(ExpCatParentDataType.QUEUE_STATUS, queueStatuses, null); + } catch (RegistryException | ApplicationSettingsException e) { + logger.error("Error while storing queue status models....", e); + RegistryServiceException exception = new RegistryServiceException(); + exception.setMessage("Error while storing queue status models.... : " + e.getMessage()); + throw exception; + } + } + + /** * Fetch all User Compute Resource Preferences of a registered User Resource Profile. * * @param userId
