This is an automated email from the ASF dual-hosted git repository. isjarana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
commit 42c9da6378d34e54873a8cdf2f34c59eed2d3a56 Author: Dimuthu Wannipurage <[email protected]> AuthorDate: Mon May 24 04:14:13 2021 -0400 Non blocking task initial framework --- .../engine/services/participant/Participant.java | 21 ++++ .../services/wm/DataSyncWorkflowManager.java | 21 ++-- .../engine/services/wm/WorkflowOperator.java | 107 +++++++++++++++------ .../workflow/engine/task/AbstractTask.java | 8 +- .../workflow/engine/task/NonBlockingTask.java | 41 +++++++- .../engine/task/annotation/NonBlockingSection.java | 2 +- .../engine/task/impl/ExampleBlockingTask.java | 2 +- .../engine/task/impl/ExampleNonBlockingTask.java | 17 +++- 8 files changed, 179 insertions(+), 40 deletions(-) diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java index 7b6fd9c..65314ee 100644 --- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java @@ -18,7 +18,9 @@ package org.apache.airavata.datalake.orchestrator.workflow.engine.services.participant; import org.apache.airavata.datalake.orchestrator.workflow.engine.task.BlockingTask; +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.NonBlockingTask; import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef; +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingTaskDef; import org.apache.helix.InstanceType; import org.apache.helix.examples.OnlineOfflineStateModelFactory; import org.apache.helix.manager.zk.ZKHelixAdmin; @@ -196,6 +198,25 @@ public class Participant implements CommandLineRunner { throw e; } } + + for (String className : nonBlockingTaskClasses) { + try { + logger.info("Loading non blocking task {}", className); + Class<?> taskClz = Class.forName(className); + Object taskObj = taskClz.getConstructor().newInstance(); + NonBlockingTask nonBlockingTask = (NonBlockingTask) taskObj; + TaskFactory taskFactory = context -> { + nonBlockingTask.setCallbackContext(context); + return nonBlockingTask; + }; + NonBlockingTaskDef nbtDef = nonBlockingTask.getClass().getAnnotation(NonBlockingTaskDef.class); + taskMap.put(nbtDef.name(), taskFactory); + + } catch (ClassNotFoundException e) { + logger.error("Couldn't find a class with name {}", className); + throw e; + } + } return taskMap; } diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java index 869b7dd..9f08751 100644 --- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java @@ -20,6 +20,7 @@ package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm; import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask; import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort; import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleBlockingTask; +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleNonBlockingTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; @@ -63,17 +64,23 @@ public class DataSyncWorkflowManager implements CommandLineRunner { ExampleBlockingTask bt4 = new ExampleBlockingTask(); bt4.setTaskId("bt4-" + UUID.randomUUID()); + ExampleNonBlockingTask nbt1 = new ExampleNonBlockingTask(); + nbt1.setTaskId("nbt1-" + UUID.randomUUID()); + nbt1.setCurrentSection(2); + // Setting dependency - bt1.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId())); - bt2.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId())); - bt4.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId())); + bt1.setOutPort(new OutPort().setNextTaskId(nbt1.getTaskId())); + //bt2.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId())); + //bt4.setOutPort(new OutPort().setNextTaskId(bt3.getTaskId())); Map<String, AbstractTask> taskMap = new HashMap<>(); taskMap.put(bt1.getTaskId(), bt1); - taskMap.put(bt2.getTaskId(), bt2); - taskMap.put(bt3.getTaskId(), bt3); - taskMap.put(bt4.getTaskId(), bt4); - String[] startTaskIds = {bt1.getTaskId(), bt2.getTaskId(), bt4.getTaskId()}; + taskMap.put(nbt1.getTaskId(), nbt1); + //taskMap.put(bt2.getTaskId(), bt2); + //taskMap.put(bt3.getTaskId(), bt3); + //taskMap.put(bt4.getTaskId(), bt4); + //String[] startTaskIds = {bt1.getTaskId(), bt2.getTaskId(), bt4.getTaskId()}; + String[] startTaskIds = {bt1.getTaskId()}; String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, startTaskIds); logger.info("Launched workflow {}", workflowId); } diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java index 2e52f05..ba3ef58 100644 --- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java @@ -18,9 +18,11 @@ package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm; import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask; +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.NonBlockingTask; import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort; import org.apache.airavata.datalake.orchestrator.workflow.engine.task.TaskParamType; import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef; +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingTaskDef; import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskOutPort; import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam; import org.apache.commons.beanutils.PropertyUtils; @@ -96,38 +98,82 @@ public class WorkflowOperator { return workflowName; } + private void continueNonBlockingRest(Map<String, AbstractTask> taskMap, String nonBlockingTaskId, int currentSection) { + + } + private void buildWorkflowRecursively(Workflow.Builder workflowBuilder, String nextTaskId, Map<String, AbstractTask> taskMap) throws Exception{ AbstractTask currentTask = taskMap.get(nextTaskId); - String taskType = currentTask.getClass().getAnnotation(BlockingTaskDef.class).name(); - TaskConfig.Builder taskBuilder = new TaskConfig.Builder() - .setTaskId(currentTask.getTaskId()) - .setCommand(taskType); - Map<String, String> paramMap = serializeTaskData(currentTask); - paramMap.forEach(taskBuilder::addConfig); + if (currentTask == null) { + logger.error("Couldn't find a task with id {} in the task map", nextTaskId); + throw new Exception("Couldn't find a task with id " + nextTaskId +" in the task map"); + } - List<TaskConfig> taskBuilds = new ArrayList<>(); - taskBuilds.add(taskBuilder.build()); + BlockingTaskDef blockingTaskDef = currentTask.getClass().getAnnotation(BlockingTaskDef.class); + NonBlockingTaskDef nonBlockingTaskDef = currentTask.getClass().getAnnotation(NonBlockingTaskDef.class); - JobConfig.Builder job = new JobConfig.Builder() - .addTaskConfigs(taskBuilds) - .setFailureThreshold(0) - .setExpiry(WORKFLOW_EXPIRY_TIME) - .setTimeoutPerTask(TASK_EXPIRY_TIME) - .setNumConcurrentTasksPerInstance(20) - .setMaxAttemptsPerTask(currentTask.getRetryCount()); + if (blockingTaskDef != null) { + String taskName = blockingTaskDef.name(); + TaskConfig.Builder taskBuilder = new TaskConfig.Builder() + .setTaskId(currentTask.getTaskId()) + .setCommand(taskName); - workflowBuilder.addJob(currentTask.getTaskId(), job); + Map<String, String> paramMap = serializeTaskData(currentTask); + paramMap.forEach(taskBuilder::addConfig); - List<OutPort> outPorts = getOutPortsOfTask(currentTask); + List<TaskConfig> taskBuilds = new ArrayList<>(); + taskBuilds.add(taskBuilder.build()); - for (OutPort outPort : outPorts) { - if (outPort != null) { - workflowBuilder.addParentChildDependency(currentTask.getTaskId(), outPort.getNextTaskId()); - logger.info("Parent to child dependency {} -> {}", currentTask.getTaskId(), outPort.getNextTaskId()); - buildWorkflowRecursively(workflowBuilder, outPort.getNextTaskId(), taskMap); + JobConfig.Builder job = new JobConfig.Builder() + .addTaskConfigs(taskBuilds) + .setFailureThreshold(0) + .setExpiry(WORKFLOW_EXPIRY_TIME) + .setTimeoutPerTask(TASK_EXPIRY_TIME) + .setNumConcurrentTasksPerInstance(20) + .setMaxAttemptsPerTask(currentTask.getRetryCount()); + + workflowBuilder.addJob(currentTask.getTaskId(), job); + + List<OutPort> outPorts = getOutPortsOfTask(currentTask); + + for (OutPort outPort : outPorts) { + if (outPort != null) { + workflowBuilder.addParentChildDependency(currentTask.getTaskId(), outPort.getNextTaskId()); + logger.info("Parent to child dependency {} -> {}", currentTask.getTaskId(), outPort.getNextTaskId()); + buildWorkflowRecursively(workflowBuilder, outPort.getNextTaskId(), taskMap); + } } + } else if (nonBlockingTaskDef != null) { + + NonBlockingTask nbTask = (NonBlockingTask) currentTask; + + String taskName = nonBlockingTaskDef.name(); + TaskConfig.Builder taskBuilder = new TaskConfig.Builder() + .setTaskId(currentTask.getTaskId()) + .setCommand(taskName); + + Map<String, String> paramMap = serializeTaskData(currentTask); + paramMap.forEach(taskBuilder::addConfig); + + List<TaskConfig> taskBuilds = new ArrayList<>(); + taskBuilds.add(taskBuilder.build()); + + JobConfig.Builder job = new JobConfig.Builder() + .addTaskConfigs(taskBuilds) + .setFailureThreshold(0) + .setExpiry(WORKFLOW_EXPIRY_TIME) + .setTimeoutPerTask(TASK_EXPIRY_TIME) + .setNumConcurrentTasksPerInstance(20) + .setMaxAttemptsPerTask(currentTask.getRetryCount()); + + workflowBuilder.addJob(currentTask.getTaskId(), job); + + continueNonBlockingRest(taskMap, nextTaskId, nbTask.getCurrentSection()); + } else { + logger.error("Couldn't find the task def annotation in class {}", currentTask.getClass().getName()); + throw new Exception("Couldn't find the task def annotation in class " + currentTask.getClass().getName()); } } @@ -156,13 +202,18 @@ public class WorkflowOperator { Field[] fields = c.getDeclaredFields(); for (Field classField : fields) { TaskParam parm = classField.getAnnotation(TaskParam.class); - if (parm != null) { - Object propertyValue = PropertyUtils.getProperty(data, parm.name()); - if (propertyValue instanceof TaskParamType) { - result.put(parm.name(), TaskParamType.class.cast(propertyValue).serialize()); - } else { - result.put(parm.name(), propertyValue.toString()); + try { + if (parm != null) { + Object propertyValue = PropertyUtils.getProperty(data, parm.name()); + if (propertyValue instanceof TaskParamType) { + result.put(parm.name(), TaskParamType.class.cast(propertyValue).serialize()); + } else { + result.put(parm.name(), propertyValue.toString()); + } } + } catch (Exception e) { + logger.error("Failed to serialize task parameter {} in class {}", parm.name(), data.getClass().getName()); + throw e; } TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class); diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java index 42d8406..89863fc 100644 --- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java @@ -76,7 +76,13 @@ public abstract class AbstractTask extends UserContentStore implements Task { logger.error("Failed at deserializing task data", e); return new TaskResult(TaskResult.Status.FAILED, "Failed in deserializing task data"); } - return onRun(); + + try { + return onRun(); + } catch (Exception e) { + logger.error("Unknown error while running task {}", getTaskId(), e); + return new TaskResult(TaskResult.Status.FAILED, "Failed due to unknown error"); + } } @Override diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java index 9d2532c..912d6a3 100644 --- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java @@ -17,20 +17,59 @@ package org.apache.airavata.datalake.orchestrator.workflow.engine.task; +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingSection; +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam; +import org.apache.commons.beanutils.PropertyUtils; import org.apache.helix.task.TaskResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Method; public class NonBlockingTask extends AbstractTask { + private final static Logger logger = LoggerFactory.getLogger(NonBlockingTask.class); + + @TaskParam(name = "currentSection") + private ThreadLocal<Integer> currentSection = new ThreadLocal<>(); + public NonBlockingTask() { } @Override public TaskResult onRun() { - return null; + Class<?> c = this.getClass(); + Method[] allMethods = c.getMethods(); + for (Method method : allMethods) { + NonBlockingSection nbs = method.getAnnotation(NonBlockingSection.class); + if (nbs != null) { + if (nbs.sectionIndex() == getCurrentSection()) { + try { + Object result = method.invoke(this); + return (TaskResult) result; + } catch (Exception e) { + logger.error("Failed to invoke designated section {}", getCurrentSection(), e); + return new TaskResult(TaskResult.Status.FAILED, + "Failed to invoke designated section " + getCurrentSection()); + } + } + } + } + + logger.error("Couldn't find a section matching section id {}", getCurrentSection()); + return new TaskResult(TaskResult.Status.FAILED, "Couldn't find a section matching section id " + getCurrentSection()); } @Override public void onCancel() { } + + public Integer getCurrentSection() { + return currentSection.get(); + } + + public void setCurrentSection(Integer currentSection) { + this.currentSection.set(currentSection); + } } diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java index 8047a9b..6fb97b5 100644 --- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java @@ -25,5 +25,5 @@ import java.lang.annotation.Target; @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface NonBlockingSection { - public int order(); + public int sectionIndex(); } diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java index 0c94839..631d06f 100644 --- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java @@ -39,7 +39,7 @@ public class ExampleBlockingTask extends BlockingTask { if (getTaskId().startsWith("bt1")) { try { logger.info("Task {} is sleeping", getTaskId()); - Thread.sleep(10000); + Thread.sleep(1000); //return new TaskResult(TaskResult.Status.FAILED, "Fail"); } catch (InterruptedException e) { e.printStackTrace(); diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java index 527d0a2..0bdd9c9 100644 --- a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java @@ -18,11 +18,26 @@ package org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl; import org.apache.airavata.datalake.orchestrator.workflow.engine.task.NonBlockingTask; +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingSection; import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingTaskDef; +import org.apache.helix.task.TaskResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @NonBlockingTaskDef(name = "ExampleNonBlockingTask") public class ExampleNonBlockingTask extends NonBlockingTask { - public ExampleNonBlockingTask() { + private final static Logger logger = LoggerFactory.getLogger(ExampleNonBlockingTask.class); + + @NonBlockingSection(sectionIndex = 1) + public TaskResult section1() { + logger.info("Running section 1"); + return new TaskResult(TaskResult.Status.COMPLETED, "Completed"); + } + + @NonBlockingSection(sectionIndex = 2) + public TaskResult section2() { + logger.info("Running section 2"); + return new TaskResult(TaskResult.Status.COMPLETED, "Completed"); } }
