This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch helix-integration in repository https://gitbox.apache.org/repos/asf/airavata.git
commit cb54e4df2eb5ae453290a5d29cd3c0a8033c993d Author: dimuthu <dimuthu.upeks...@gmail.com> AuthorDate: Tue Feb 27 11:04:32 2018 -0500 Fixing env setup task --- .../apache/airavata/helix/core/util/TaskUtil.java | 19 ++++---- .../airavata/helix/impl/task/AiravataTask.java | 10 ++-- .../airavata/helix/impl/task/EnvSetupTask.java | 37 ++++----------- .../helix/impl/workflow/SimpleWorkflow.java | 54 ++++++++++++++++++++-- 4 files changed, 75 insertions(+), 45 deletions(-) diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java index d0f1ab6..218bd94 100644 --- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java +++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/util/TaskUtil.java @@ -19,15 +19,18 @@ import java.util.Map; */ public class TaskUtil { - public static <T extends AbstractTask> List<OutPort> getOutPortsOfTask(T task) throws IllegalAccessException { - Field[] fields = task.getClass().getDeclaredFields(); + public static <T extends AbstractTask> List<OutPort> getOutPortsOfTask(T taskObj) throws IllegalAccessException { + List<OutPort> outPorts = new ArrayList<>(); - for (Field field : fields) { - TaskOutPort outPortAnnotation = field.getAnnotation(TaskOutPort.class); - if (outPortAnnotation != null) { - field.setAccessible(true); - OutPort outPort = (OutPort) field.get(task); - outPorts.add(outPort); + for (Class<?> c = taskObj.getClass(); c != null; c = c.getSuperclass()) { + Field[] fields = c.getDeclaredFields(); + for (Field field : fields) { + TaskOutPort outPortAnnotation = field.getAnnotation(TaskOutPort.class); + if (outPortAnnotation != null) { + field.setAccessible(true); + OutPort outPort = (OutPort) field.get(taskObj); + outPorts.add(outPort); + } } } return outPorts; diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java index 315c07c..26361d2 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java @@ -51,14 +51,13 @@ public abstract class AiravataTask extends AbstractTask { @TaskParam(name = "gatewayId") private String gatewayId; - @TaskOutPort(name = "Success Port") - private OutPort onSuccess; - + @TaskOutPort(name = "Next Task") + private OutPort nextTask; protected TaskResult onSuccess(String message) { String successMessage = "Task " + getTaskId() + " completed." + message != null ? " Message : " + message : ""; logger.info(successMessage); - return onSuccess.invoke(new TaskResult(TaskResult.Status.COMPLETED, message)); + return nextTask.invoke(new TaskResult(TaskResult.Status.COMPLETED, message)); } protected TaskResult onFail(String reason, boolean fatal, Throwable error) { @@ -178,4 +177,7 @@ public abstract class AiravataTask extends AbstractTask { return processModel; } + public void setNextTask(OutPort nextTask) { + this.nextTask = nextTask; + } } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java index cabc014..eafa53d 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java @@ -1,23 +1,18 @@ package org.apache.airavata.helix.impl.task; import org.apache.airavata.agents.api.AgentAdaptor; -import org.apache.airavata.helix.core.OutPort; import org.apache.airavata.helix.task.api.TaskHelper; import org.apache.airavata.helix.task.api.annotation.TaskDef; -import org.apache.airavata.helix.task.api.annotation.TaskOutPort; -import org.apache.airavata.helix.task.api.annotation.TaskParam; import org.apache.airavata.model.status.TaskState; import org.apache.airavata.registry.cpi.RegistryException; import org.apache.helix.task.TaskResult; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; @TaskDef(name = "Environment Setup Task") public class EnvSetupTask extends AiravataTask { - @TaskParam(name = "Working Directory") - private String workingDirectory; - - @TaskOutPort(name = "Success Out Port") - private OutPort successPort; + private static final Logger logger = LogManager.getLogger(EnvSetupTask.class); @Override public TaskResult onRun(TaskHelper taskHelper) { @@ -30,18 +25,19 @@ public class EnvSetupTask extends AiravataTask { getTaskContext().getComputeResourceCredentialToken(), getTaskContext().getComputeResourceLoginUserName()); - adaptor.createDirectory(workingDirectory); + logger.info("Creating directory " + getTaskContext().getWorkingDir() + " on compute resource " + getTaskContext().getComputeResourceId()); + adaptor.createDirectory(getTaskContext().getWorkingDir()); publishTaskState(TaskState.COMPLETED); - return successPort.invoke(new TaskResult(TaskResult.Status.COMPLETED, "Successfully completed")); + return onSuccess("Successfully completed"); } catch (Exception e) { try { publishTaskState(TaskState.FAILED); } catch (RegistryException e1) { - publishErrors(e1); + logger.error("Task failed to publish task status", e1); + // ignore silently } - publishErrors(e); - return new TaskResult(TaskResult.Status.FAILED, "Failed the task"); + return onFail("Failed to setup environment of task " + getTaskId(), true, e); } } @@ -50,19 +46,4 @@ public class EnvSetupTask extends AiravataTask { } - public String getWorkingDirectory() { - return workingDirectory; - } - - public void setWorkingDirectory(String workingDirectory) { - this.workingDirectory = workingDirectory; - } - - public OutPort getSuccessPort() { - return successPort; - } - - public void setSuccessPort(OutPort successPort) { - this.successPort = successPort; - } } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java index 397ff45..99db2c4 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java @@ -1,22 +1,66 @@ package org.apache.airavata.helix.impl.workflow; import org.apache.airavata.helix.core.AbstractTask; +import org.apache.airavata.helix.core.OutPort; +import org.apache.airavata.helix.impl.task.AiravataTask; import org.apache.airavata.helix.impl.task.EnvSetupTask; import org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask; import org.apache.airavata.helix.workflow.WorkflowManager; +import org.apache.airavata.model.experiment.ExperimentModel; +import org.apache.airavata.model.process.ProcessModel; +import org.apache.airavata.model.task.TaskModel; +import org.apache.airavata.model.task.TaskTypes; +import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; +import org.apache.airavata.registry.cpi.AppCatalog; +import org.apache.airavata.registry.cpi.ExperimentCatalog; +import org.apache.airavata.registry.cpi.ExperimentCatalogModelType; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.UUID; +import java.util.stream.Collectors; public class SimpleWorkflow { public static void main(String[] args) throws Exception { - EnvSetupTask envSetupTask = new EnvSetupTask(); - envSetupTask.setWorkingDirectory("/tmp/a"); + String processId = "PROCESS_438a87cc-2dec-4edc-bfeb-31128df91bb6"; + AppCatalog appCatalog = RegistryFactory.getAppCatalog(); + ExperimentCatalog experimentCatalog = RegistryFactory.getDefaultExpCatalog(); - DefaultJobSubmissionTask defaultJobSubmissionTask = new DefaultJobSubmissionTask(); + ProcessModel processModel = (ProcessModel) experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId); + ExperimentModel experimentModel = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, processModel.getExperimentId()); + String taskDag = processModel.getTaskDag(); + List<TaskModel> taskList = processModel.getTasks(); + + String[] taskIds = taskDag.split(","); + final List<AiravataTask> allTasks = new ArrayList<>(); + + for (String taskId : taskIds) { + Optional<TaskModel> model = taskList.stream().filter(taskModel -> taskModel.getTaskId().equals(taskId)).findFirst(); + model.ifPresent(taskModel -> { + AiravataTask airavataTask = null; + if (taskModel.getTaskType() == TaskTypes.ENV_SETUP) { + airavataTask = new EnvSetupTask(); + } else if (taskModel.getTaskType() == TaskTypes.JOB_SUBMISSION) { + airavataTask = new DefaultJobSubmissionTask(); + } + + if (airavataTask != null) { + airavataTask.setGatewayId(experimentModel.getGatewayId()); + airavataTask.setExperimentId(experimentModel.getExperimentId()); + airavataTask.setProcessId(processModel.getProcessId()); + airavataTask.setTaskId(taskModel.getTaskId()); + if (allTasks.size() > 0) { + allTasks.get(allTasks.size() -1).setNextTask(new OutPort(airavataTask.getTaskId(), airavataTask)); + } + allTasks.add(airavataTask); + } + }); + } + +/* DefaultJobSubmissionTask defaultJobSubmissionTask = new DefaultJobSubmissionTask(); defaultJobSubmissionTask.setGatewayId("default"); defaultJobSubmissionTask.setExperimentId("Clone_of_Mothur-Test1_0c9f627e-2c32-403e-a28a-2a8b10c21c1a"); defaultJobSubmissionTask.setProcessId("PROCESS_438a87cc-2dec-4edc-bfeb-31128df91bb6"); @@ -24,8 +68,8 @@ public class SimpleWorkflow { List<AbstractTask> tasks = new ArrayList<>(); tasks.add(defaultJobSubmissionTask); - +*/ WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster", "wm-22", "localhost:2199"); - workflowManager.launchWorkflow(UUID.randomUUID().toString(), tasks, true); + workflowManager.launchWorkflow(UUID.randomUUID().toString(), allTasks.stream().map(t -> (AiravataTask)t).collect(Collectors.toList()), true); } } -- To stop receiving notification emails like this one, please contact dimuthu...@apache.org.