This is an automated email from the ASF dual-hosted git repository. isjarana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
commit a49c3d4bb383682616d734b8a23c25c60fbf6260 Author: Dimuthu Wannipurage <[email protected]> AuthorDate: Thu May 20 15:42:06 2021 -0400 Workflow engine initial implementation --- .gitignore | 3 + data-orchestrator/pom.xml | 1 + data-orchestrator/workflow-engine/README.md | 13 ++ data-orchestrator/workflow-engine/pom.xml | 72 ++++++ .../workflow/engine/monitor/AsyncEventMonitor.java | 21 ++ .../engine/services/controller/Controller.java | 94 ++++++++ .../engine/services/participant/Participant.java | 251 +++++++++++++++++++++ .../services/wm/DataSyncWorkflowManager.java | 73 ++++++ .../engine/services/wm/WorkflowOperator.java | 182 +++++++++++++++ .../workflow/engine/task/AbstractTask.java | 158 +++++++++++++ .../workflow/engine/task/BlockingTask.java | 42 ++++ .../workflow/engine/task/NonBlockingTask.java | 36 +++ .../orchestrator/workflow/engine/task/OutPort.java | 31 +++ .../workflow/engine/task/TaskParamType.java | 23 ++ .../engine/task/annotation/BlockingTaskDef.java | 29 +++ .../engine/task/annotation/NonBlockingSection.java | 29 +++ .../engine/task/annotation/NonBlockingTaskDef.java | 29 +++ .../engine/task/annotation/TaskOutPort.java | 29 +++ .../workflow/engine/task/annotation/TaskParam.java | 31 +++ .../engine/task/impl/ExampleBlockingTask.java | 36 +++ .../engine/task/impl/ExampleNonBlockingTask.java | 28 +++ .../src/main/resources/application.properties | 27 +++ .../workflow-engine/src/main/resources/logback.xml | 50 ++++ .../src/main/resources/task-list.yaml | 5 + pom.xml | 2 + 25 files changed, 1295 insertions(+) diff --git a/.gitignore b/.gitignore index ab08b32..3583487 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,6 @@ data-orchestrator/data-orchestrator-api/target /metadata-service/db-service/client/client.iml /metadata-service/db-service/server/server.iml /metadata-service/db-service/stub/stub.iml +/metadata-service/data-builders/data-builders.iml +/metadata-service/db-service/db-service.iml +/data-orchestrator/workflow-engine/workflow-engine.iml \ No newline at end of file diff --git a/data-orchestrator/pom.xml b/data-orchestrator/pom.xml index 2d3ee54..ee7f550 100644 --- a/data-orchestrator/pom.xml +++ b/data-orchestrator/pom.xml @@ -35,6 +35,7 @@ <modules> <module>data-orchestrator-api</module> <module>data-orchestrator-core</module> + <module>workflow-engine</module> </modules> diff --git a/data-orchestrator/workflow-engine/README.md b/data-orchestrator/workflow-engine/README.md new file mode 100644 index 0000000..0efb458 --- /dev/null +++ b/data-orchestrator/workflow-engine/README.md @@ -0,0 +1,13 @@ +### Service Execution Order + +* org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller.Controller +* org.apache.airavata.datalake.orchestrator.workflow.engine.services.participant.Participant +* org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm.DataSyncWorkflowManager + +### Configure the participant with new tasks + +* Extend the task class by BlockingTask or NonBlockingTask class +* Implement methods +* Annotate the class with @BlockingTaskDef or @NonBlockingTaskDef annotations. See ExampleBlockingTask and ExampleNonBlockingTask +* Register task in src/main/resources/task-list.yaml + diff --git a/data-orchestrator/workflow-engine/pom.xml b/data-orchestrator/workflow-engine/pom.xml new file mode 100644 index 0000000..ef96c4f --- /dev/null +++ b/data-orchestrator/workflow-engine/pom.xml @@ -0,0 +1,72 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>data-orchestrator</artifactId> + <groupId>org.apache.airavata.data.lake</groupId> + <version>0.01-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>workflow-engine</artifactId> + + <properties> + <maven.compiler.source>11</maven.compiler.source> + <maven.compiler.target>11</maven.compiler.target> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.helix</groupId> + <artifactId>helix-core</artifactId> + <version>1.0.1</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter</artifactId> + <version>${spring.boot.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + <version>${log4j.over.slf4j}</version> + </dependency> + <dependency> + <groupId>org.yaml</groupId> + <artifactId>snakeyaml</artifactId> + <version>${yaml.version}</version> + </dependency> + </dependencies> +</project> \ No newline at end of file diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java new file mode 100644 index 0000000..4afa566 --- /dev/null +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/monitor/AsyncEventMonitor.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.airavata.datalake.orchestrator.workflow.engine.monitor; + +public class AsyncEventMonitor { +} diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/controller/Controller.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/controller/Controller.java new file mode 100644 index 0000000..83e6af5 --- /dev/null +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/controller/Controller.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller; + +import org.apache.helix.controller.HelixControllerMain; +import org.apache.helix.manager.zk.ZKHelixAdmin; +import org.apache.helix.manager.zk.ZNRecordSerializer; +import org.apache.helix.manager.zk.ZkClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.ComponentScan; + +import java.util.concurrent.CountDownLatch; + +@SpringBootApplication() +@ComponentScan("org.apache.airavata.datalake.orchestrator.workflow.engine.services.controller") +public class Controller implements CommandLineRunner { + + private final static Logger logger = LoggerFactory.getLogger(Controller.class); + + @org.springframework.beans.factory.annotation.Value("${cluster.name}") + private String clusterName; + + @org.springframework.beans.factory.annotation.Value("${controller.name}") + private String controllerName; + + @org.springframework.beans.factory.annotation.Value("${zookeeper.connection}") + private String zkAddress; + + private org.apache.helix.HelixManager zkHelixManager; + + private CountDownLatch startLatch = new CountDownLatch(1); + private CountDownLatch stopLatch = new CountDownLatch(1); + + @Override + public void run(String... args) throws Exception { + logger.info("Starting Cluster Controller ......"); + + try { + ZkClient zkClient = new ZkClient(zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT, + ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer()); + ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient); + + // Creates the zk cluster if not available + if (!zkHelixAdmin.getClusters().contains(clusterName)) { + zkHelixAdmin.addCluster(clusterName, true); + } + + zkHelixAdmin.close(); + zkClient.close(); + + logger.info("Connection to helix cluster : " + clusterName + " with name : " + controllerName); + logger.info("Zookeeper connection string " + zkAddress); + + zkHelixManager = HelixControllerMain.startHelixController(zkAddress, clusterName, + controllerName, HelixControllerMain.STANDALONE); + startLatch.countDown(); + stopLatch.await(); + } catch (Exception ex) { + logger.error("Error in running the Controller: {}", controllerName, ex); + } finally { + disconnect(); + } + } + + private void disconnect() { + if (zkHelixManager != null) { + logger.info("Controller: {}, has disconnected from cluster: {}", controllerName, clusterName); + zkHelixManager.disconnect(); + } + } + + public static void main(String args[]) throws Exception { + SpringApplication.run(Controller.class); + } +} diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java new file mode 100644 index 0000000..a4b9bdd --- /dev/null +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/participant/Participant.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.airavata.datalake.orchestrator.workflow.engine.services.participant; + +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.BlockingTask; +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef; +import org.apache.helix.InstanceType; +import org.apache.helix.examples.OnlineOfflineStateModelFactory; +import org.apache.helix.manager.zk.ZKHelixAdmin; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.manager.zk.ZNRecordSerializer; +import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.task.TaskFactory; +import org.apache.helix.task.TaskStateModelFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.ComponentScan; +import org.yaml.snakeyaml.Yaml; + +import java.io.*; +import java.util.*; + +@SpringBootApplication() +@ComponentScan("org.apache.airavata.datalake.orchestrator.workflow.engine.services.participant") +public class Participant implements CommandLineRunner { + + private final static Logger logger = LoggerFactory.getLogger(Participant.class); + + private int shutdownGracePeriod = 30000; + private int shutdownGraceRetries = 2; + + @org.springframework.beans.factory.annotation.Value("${zookeeper.connection}") + private String zkAddress; + + @org.springframework.beans.factory.annotation.Value("${cluster.name}") + private String clusterName; + + @org.springframework.beans.factory.annotation.Value("${participant.name}") + private String participantName; + + @org.springframework.beans.factory.annotation.Value("${task.list.file}") + private String taskListFile; + + private ZKHelixManager zkHelixManager; + + private List<String> blockingTaskClasses = new ArrayList<>(); + private List<String> nonBlockingTaskClasses = new ArrayList<>(); + + private final List<String> runningTasks = Collections.synchronizedList(new ArrayList<String>()); + + @Override + public void run(String... args) throws Exception { + logger.info("Staring Participant ....."); + + loadTasks(); + + ZkClient zkClient = null; + try { + zkClient = new ZkClient(zkAddress, ZkClient.DEFAULT_SESSION_TIMEOUT, + ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer()); + ZKHelixAdmin zkHelixAdmin = new ZKHelixAdmin(zkClient); + + List<String> nodesInCluster = zkHelixAdmin.getInstancesInCluster(clusterName); + + if (!nodesInCluster.contains(participantName)) { + InstanceConfig instanceConfig = new InstanceConfig(participantName); + instanceConfig.setHostName("localhost"); + instanceConfig.setInstanceEnabled(true); + zkHelixAdmin.addInstance(clusterName, instanceConfig); + logger.info("Participant: " + participantName + " has been added to cluster: " + clusterName); + + } else { + zkHelixAdmin.enableInstance(clusterName, participantName, true); + logger.debug("Participant: " + participantName + " has been re-enabled at the cluster: " + clusterName); + } + + Runtime.getRuntime().addShutdownHook( + new Thread(() -> { + logger.debug("Participant: " + participantName + " shutdown hook called"); + try { + zkHelixAdmin.enableInstance(clusterName, participantName, false); + } catch (Exception e) { + logger.warn("Participant: " + participantName + " was not disabled normally", e); + } + disconnect(); + }) + ); + + connect(); + + } catch (Exception ex) { + logger.error("Error running Participant: " + participantName + ", reason: " + ex, ex); + } finally { + if (zkClient != null) { + zkClient.close(); + } + } + } + + private void loadTasks() throws Exception { + + try { + Yaml yaml = new Yaml(); + File listFile = new File(taskListFile); + + InputStream stream; + if (listFile.exists()) { + logger.info("Loading task list file {} from absolute path", taskListFile); + stream = new FileInputStream(taskListFile); + } else { + logger.info("Loading task list file {} from class path", taskListFile); + stream = Participant.class.getClassLoader().getResourceAsStream(taskListFile); + } + + Object load = yaml.load(stream); + + if (load == null) { + throw new Exception("Did not load the configuration from file " + taskListFile); + } + + if (load instanceof Map) { + Map rootMap = (Map) load; + if (rootMap.containsKey("tasks")) { + Object tasksObj = rootMap.get("tasks"); + if (tasksObj instanceof Map) { + Map tasksMap = (Map) tasksObj; + if (tasksMap.containsKey("blocking")) { + Object blockingTaskObj = tasksMap.get("blocking"); + if (blockingTaskObj instanceof List) { + blockingTaskClasses = (List<String>) blockingTaskObj; + blockingTaskClasses.forEach(taskClz -> { + logger.info("Loading blocking task " + taskClz); + }); + } + } + + if (tasksMap.containsKey("nonBlocking")) { + Object nonBlockingTaskObj = tasksMap.get("nonBlocking"); + if (nonBlockingTaskObj instanceof List) { + nonBlockingTaskClasses = (List<String>) nonBlockingTaskObj; + nonBlockingTaskClasses.forEach(taskClz -> { + logger.info("Loading non blocking task " + taskClz); + }); + } + } + } + } + } + } catch (FileNotFoundException e) { + logger.error("Failed to load task list from file {}", taskListFile, e); + throw e; + } + } + + private Map<String, TaskFactory> getTaskFactory() throws Exception { + + Map<String, TaskFactory> taskMap = new HashMap<>(); + + for (String className : blockingTaskClasses) { + try { + logger.info("Loading blocking task {}", className); + Class<?> taskClz = Class.forName(className); + Object taskObj = taskClz.getConstructor().newInstance(); + BlockingTask blockingTask = (BlockingTask) taskObj; + TaskFactory taskFactory = context -> { + blockingTask.setCallbackContext(context); + return blockingTask; + }; + BlockingTaskDef btDef = blockingTask.getClass().getAnnotation(BlockingTaskDef.class); + taskMap.put(btDef.name(), taskFactory); + + } catch (ClassNotFoundException e) { + logger.error("Couldn't find a class with name {}", className); + throw e; + } + } + return taskMap; + } + + private void connect() { + try { + zkHelixManager = new ZKHelixManager(clusterName, participantName, InstanceType.PARTICIPANT, zkAddress); + // register online-offline model + StateMachineEngine machineEngine = zkHelixManager.getStateMachineEngine(); + OnlineOfflineStateModelFactory factory = new OnlineOfflineStateModelFactory(participantName); + machineEngine.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(), factory); + + // register task model + machineEngine.registerStateModelFactory("Task", new TaskStateModelFactory(zkHelixManager, getTaskFactory())); + + logger.debug("Participant: " + participantName + ", registered state model factories."); + + zkHelixManager.connect(); + logger.info("Participant: " + participantName + ", has connected to cluster: " + clusterName); + + Thread.currentThread().join(); + } catch (InterruptedException ex) { + logger.error("Participant: " + participantName + ", is interrupted! reason: " + ex, ex); + } catch (Exception ex) { + logger.error("Error in connect() for Participant: " + participantName + ", reason: " + ex, ex); + } finally { + disconnect(); + } + } + + private void disconnect() { + logger.info("Shutting down participant. Currently available tasks " + runningTasks.size()); + if (zkHelixManager != null) { + if (runningTasks.size() > 0) { + for (int i = 0; i <= shutdownGraceRetries; i++) { + logger.info("Shutting down gracefully [RETRY " + i + "]"); + try { + Thread.sleep(shutdownGracePeriod); + } catch (InterruptedException e) { + logger.warn("Waiting for running tasks failed [RETRY " + i + "]", e); + } + if (runningTasks.size() == 0) { + break; + } + } + } + logger.info("Participant: " + participantName + ", has disconnected from cluster: " + clusterName); + zkHelixManager.disconnect(); + } + } + + public static void main(String args[]) throws Exception { + SpringApplication.run(Participant.class); + } +} diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java new file mode 100644 index 0000000..1e987a6 --- /dev/null +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/DataSyncWorkflowManager.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm; + +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask; +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort; +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleBlockingTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.ComponentScan; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +@SpringBootApplication() +@ComponentScan("org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm") +public class DataSyncWorkflowManager implements CommandLineRunner { + + private final static Logger logger = LoggerFactory.getLogger(DataSyncWorkflowManager.class); + + @org.springframework.beans.factory.annotation.Value("${cluster.name}") + private String clusterName; + + @org.springframework.beans.factory.annotation.Value("${datasync.wm.name}") + private String workflowManagerName; + + @org.springframework.beans.factory.annotation.Value("${zookeeper.connection}") + private String zkAddress; + + @Override + public void run(String... args) throws Exception { + WorkflowOperator workflowOperator = new WorkflowOperator(); + workflowOperator.init(clusterName, workflowManagerName, zkAddress); + + ExampleBlockingTask bt1 = new ExampleBlockingTask(); + bt1.setTaskId("bt1-" + UUID.randomUUID()); + + ExampleBlockingTask bt2 = new ExampleBlockingTask(); + bt2.setTaskId("bt2-" + UUID.randomUUID()); + + // Setting dependency + bt1.setOutPort(new OutPort().setNextTaskId(bt2.getTaskId())); + + Map<String, AbstractTask> taskMap = new HashMap<>(); + taskMap.put(bt1.getTaskId(), bt1); + taskMap.put(bt2.getTaskId(), bt2); + String workflowId = workflowOperator.buildAndRunWorkflow(taskMap, bt1.getTaskId()); + logger.info("Launched workflow {}", workflowId); + } + + public static void main(String args[]) throws Exception { + SpringApplication.run(DataSyncWorkflowManager.class); + } +} diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java new file mode 100644 index 0000000..8590eb5 --- /dev/null +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/services/wm/WorkflowOperator.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.airavata.datalake.orchestrator.workflow.engine.services.wm; + +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.AbstractTask; +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.OutPort; +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.TaskParamType; +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef; +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskOutPort; +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.task.*; + +import java.lang.reflect.Field; +import java.util.*; + +public class WorkflowOperator { + + private static final long WORKFLOW_EXPIRY_TIME = 1 * 1000; + private static final long TASK_EXPIRY_TIME = 24 * 60 * 60 * 1000; + + private TaskDriver taskDriver; + private HelixManager helixManager; + + public void init(String clusterName, String workflowManagerName, String zkAddress) throws Exception { + helixManager = HelixManagerFactory.getZKHelixManager(clusterName, workflowManagerName, + InstanceType.SPECTATOR, zkAddress); + helixManager.connect(); + + Runtime.getRuntime().addShutdownHook( + new Thread() { + @Override + public void run() { + if (helixManager != null && helixManager.isConnected()) { + helixManager.disconnect(); + } + } + } + ); + + taskDriver = new TaskDriver(helixManager); + } + + public void destroy() { + if (helixManager != null) { + helixManager.disconnect(); + } + } + + public String buildAndRunWorkflow(Map<String, AbstractTask> taskMap, String startTaskId) throws Exception { + + if (taskDriver == null) { + throw new Exception("Workflow operator needs to be initialized"); + } + + String workflowName = UUID.randomUUID().toString(); + Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName).setExpiry(0); + buildWorkflowRecursively(workflowBuilder, startTaskId, taskMap); + + WorkflowConfig.Builder config = new WorkflowConfig.Builder().setFailureThreshold(0); + workflowBuilder.setWorkflowConfig(config.build()); + workflowBuilder.setExpiry(WORKFLOW_EXPIRY_TIME); + Workflow workflow = workflowBuilder.build(); + + taskDriver.start(workflow); + return workflowName; + } + + private void buildWorkflowRecursively(Workflow.Builder workflowBuilder, String nextTaskId, Map<String, AbstractTask> taskMap) + throws Exception{ + AbstractTask currentTask = taskMap.get(nextTaskId); + String taskType = currentTask.getClass().getAnnotation(BlockingTaskDef.class).name(); + TaskConfig.Builder taskBuilder = new TaskConfig.Builder() + .setTaskId(currentTask.getTaskId()) + .setCommand(taskType); + + Map<String, String> paramMap = serializeTaskData(currentTask); + paramMap.forEach(taskBuilder::addConfig); + + List<TaskConfig> taskBuilds = new ArrayList<>(); + taskBuilds.add(taskBuilder.build()); + + JobConfig.Builder job = new JobConfig.Builder() + .addTaskConfigs(taskBuilds) + .setFailureThreshold(0) + .setExpiry(WORKFLOW_EXPIRY_TIME) + .setTimeoutPerTask(TASK_EXPIRY_TIME) + .setNumConcurrentTasksPerInstance(20) + .setMaxAttemptsPerTask(currentTask.getRetryCount()); + + workflowBuilder.addJob(currentTask.getTaskId(), job); + + List<OutPort> outPorts = getOutPortsOfTask(currentTask); + + for (OutPort outPort : outPorts) { + if (outPort != null) { + workflowBuilder.addParentChildDependency(currentTask.getTaskId(), outPort.getNextTaskId()); + buildWorkflowRecursively(workflowBuilder, outPort.getNextTaskId(), taskMap); + } + } + } + + public String getWorkflowStatus(String workflowName) { + WorkflowContext workflowContext = taskDriver.getWorkflowContext(workflowName); + TaskState workflowState = workflowContext.getWorkflowState(); + return workflowState.name(); + } + + public void stopWorkflow(String workflowName) { + taskDriver.stop(workflowName); + } + + public void resumeWorkflow(String workflowName) { + taskDriver.resume(workflowName); + } + + public void deleteWorkflow(String workflowName) { + taskDriver.delete(workflowName); + } + + private <T extends AbstractTask> Map<String, String> serializeTaskData(T data) throws IllegalAccessException { + + Map<String, String> result = new HashMap<>(); + for (Class<?> c = data.getClass(); c != null; c = c.getSuperclass()) { + Field[] fields = c.getDeclaredFields(); + for (Field classField : fields) { + TaskParam parm = classField.getAnnotation(TaskParam.class); + if (parm != null) { + classField.setAccessible(true); + if (classField.get(data) instanceof TaskParamType) { + result.put(parm.name(), TaskParamType.class.cast(classField.get(data)).serialize()); + } else { + result.put(parm.name(), classField.get(data).toString()); + } + } + + TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class); + if (outPort != null) { + classField.setAccessible(true); + if (classField.get(data) != null) { + result.put(outPort.name(), ((OutPort) classField.get(data)).getNextTaskId().toString()); + } + } + } + } + return result; + } + + private <T extends AbstractTask> List<OutPort> getOutPortsOfTask(T taskObj) throws IllegalAccessException { + + List<OutPort> outPorts = new ArrayList<>(); + for (Class<?> c = taskObj.getClass(); c != null; c = c.getSuperclass()) { + Field[] fields = c.getDeclaredFields(); + for (Field field : fields) { + TaskOutPort outPortAnnotation = field.getAnnotation(TaskOutPort.class); + if (outPortAnnotation != null) { + field.setAccessible(true); + OutPort outPort = (OutPort) field.get(taskObj); + outPorts.add(outPort); + } + } + } + return outPorts; + } +} diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java new file mode 100644 index 0000000..c9ceee9 --- /dev/null +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/AbstractTask.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.airavata.datalake.orchestrator.workflow.engine.task; + +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskOutPort; +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.TaskParam; +import org.apache.helix.task.Task; +import org.apache.helix.task.TaskCallbackContext; +import org.apache.helix.task.TaskResult; +import org.apache.helix.task.UserContentStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public abstract class AbstractTask extends UserContentStore implements Task { + + private final static Logger logger = LoggerFactory.getLogger(AbstractTask.class); + + private TaskCallbackContext callbackContext; + + @TaskOutPort(name = "nextTask") + private OutPort outPort; + + @TaskParam(name = "taskId") + private String taskId; + + @TaskParam(name = "retryCount") + private int retryCount = 3; + + public AbstractTask() { + + } + + @Override + public TaskResult run() { + try { + String helixTaskId = this.callbackContext.getTaskConfig().getId(); + logger.info("Running task {}", helixTaskId); + deserializeTaskData(this, this.callbackContext.getTaskConfig().getConfigMap()); + } catch (Exception e) { + logger.error("Failed at deserializing task data", e); + return new TaskResult(TaskResult.Status.FAILED, "Failed in deserializing task data"); + } + return onRun(); + } + + @Override + public void cancel() { + onCancel(); + } + + public abstract TaskResult onRun(); + + public abstract void onCancel(); + + public OutPort getOutPort() { + return outPort; + } + + public void setOutPort(OutPort outPort) { + this.outPort = outPort; + } + + public int getRetryCount() { + return retryCount; + } + + public void setRetryCount(int retryCount) { + this.retryCount = retryCount; + } + + public TaskCallbackContext getCallbackContext() { + return callbackContext; + } + + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + public void setCallbackContext(TaskCallbackContext callbackContext) { + this.callbackContext = callbackContext; + } + + private <T extends AbstractTask> void deserializeTaskData(T instance, Map<String, String> params) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException, InstantiationException { + + List<Field> allFields = new ArrayList<>(); + Class genericClass = instance.getClass(); + + while (AbstractTask.class.isAssignableFrom(genericClass)) { + Field[] declaredFields = genericClass.getDeclaredFields(); + for (Field declaredField : declaredFields) { + allFields.add(declaredField); + } + genericClass = genericClass.getSuperclass(); + } + + for (Field classField : allFields) { + TaskParam param = classField.getAnnotation(TaskParam.class); + if (param != null) { + if (params.containsKey(param.name())) { + classField.setAccessible(true); + if (classField.getType().isAssignableFrom(String.class)) { + classField.set(instance, params.get(param.name())); + } else if (classField.getType().isAssignableFrom(Integer.class) || + classField.getType().isAssignableFrom(Integer.TYPE)) { + classField.set(instance, Integer.parseInt(params.get(param.name()))); + } else if (classField.getType().isAssignableFrom(Long.class) || + classField.getType().isAssignableFrom(Long.TYPE)) { + classField.set(instance, Long.parseLong(params.get(param.name()))); + } else if (classField.getType().isAssignableFrom(Boolean.class) || + classField.getType().isAssignableFrom(Boolean.TYPE)) { + classField.set(instance, Boolean.parseBoolean(params.get(param.name()))); + } else if (TaskParamType.class.isAssignableFrom(classField.getType())) { + Class<?> clazz = classField.getType(); + Constructor<?> ctor = clazz.getConstructor(); + Object obj = ctor.newInstance(); + ((TaskParamType)obj).deserialize(params.get(param.name())); + classField.set(instance, obj); + } + } + } + } + + for (Field classField : allFields) { + TaskOutPort outPort = classField.getAnnotation(TaskOutPort.class); + if (outPort != null) { + classField.setAccessible(true); + OutPort op = new OutPort(); + op.setNextTaskId(params.get(outPort.name())); + } + } + } +} diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java new file mode 100644 index 0000000..9033f1a --- /dev/null +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/BlockingTask.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.airavata.datalake.orchestrator.workflow.engine.task; + +import org.apache.helix.task.TaskResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class BlockingTask extends AbstractTask { + + private final static Logger logger = LoggerFactory.getLogger(BlockingTask.class); + + public BlockingTask() { + } + + @Override + public TaskResult onRun() { + return runBlockingCode(); + } + + public abstract TaskResult runBlockingCode(); + + @Override + public void onCancel() { + + } +} diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java new file mode 100644 index 0000000..9d2532c --- /dev/null +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/NonBlockingTask.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.airavata.datalake.orchestrator.workflow.engine.task; + +import org.apache.helix.task.TaskResult; + +public class NonBlockingTask extends AbstractTask { + + public NonBlockingTask() { + } + + @Override + public TaskResult onRun() { + return null; + } + + @Override + public void onCancel() { + + } +} diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/OutPort.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/OutPort.java new file mode 100644 index 0000000..2b4bb09 --- /dev/null +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/OutPort.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.airavata.datalake.orchestrator.workflow.engine.task; + +public class OutPort { + private String nextTaskId; + + public String getNextTaskId() { + return nextTaskId; + } + + public OutPort setNextTaskId(String nextTaskId) { + this.nextTaskId = nextTaskId; + return this; + } +} diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskParamType.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskParamType.java new file mode 100644 index 0000000..08f0fb0 --- /dev/null +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/TaskParamType.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.airavata.datalake.orchestrator.workflow.engine.task; + +public interface TaskParamType { + public String serialize(); + public void deserialize(String content); +} diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/BlockingTaskDef.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/BlockingTaskDef.java new file mode 100644 index 0000000..06735bc --- /dev/null +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/BlockingTaskDef.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface BlockingTaskDef { + public String name(); +} diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java new file mode 100644 index 0000000..8047a9b --- /dev/null +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingSection.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface NonBlockingSection { + public int order(); +} diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingTaskDef.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingTaskDef.java new file mode 100644 index 0000000..d38dc30 --- /dev/null +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/NonBlockingTaskDef.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface NonBlockingTaskDef { + public String name(); +} diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskOutPort.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskOutPort.java new file mode 100644 index 0000000..ce8e9ae --- /dev/null +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskOutPort.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.FIELD) +public @interface TaskOutPort { + public String name(); +} diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskParam.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskParam.java new file mode 100644 index 0000000..fe94273 --- /dev/null +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/annotation/TaskParam.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.FIELD) +public @interface TaskParam { + public String name(); + public String defaultValue() default ""; + public boolean mandatory() default false; +} diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java new file mode 100644 index 0000000..93ec010 --- /dev/null +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleBlockingTask.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl; + +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.BlockingTask; +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.BlockingTaskDef; +import org.apache.helix.task.TaskResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@BlockingTaskDef(name = "ExampleBlockingTask") +public class ExampleBlockingTask extends BlockingTask { + + private final static Logger logger = LoggerFactory.getLogger(ExampleBlockingTask.class); + + @Override + public TaskResult runBlockingCode() { + logger.info("Running example blocking task {}", getTaskId()); + return new TaskResult(TaskResult.Status.COMPLETED, "Success"); + } +} diff --git a/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java new file mode 100644 index 0000000..527d0a2 --- /dev/null +++ b/data-orchestrator/workflow-engine/src/main/java/org/apache/airavata/datalake/orchestrator/workflow/engine/task/impl/ExampleNonBlockingTask.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl; + +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.NonBlockingTask; +import org.apache.airavata.datalake.orchestrator.workflow.engine.task.annotation.NonBlockingTaskDef; + +@NonBlockingTaskDef(name = "ExampleNonBlockingTask") +public class ExampleNonBlockingTask extends NonBlockingTask { + + public ExampleNonBlockingTask() { + } +} diff --git a/data-orchestrator/workflow-engine/src/main/resources/application.properties b/data-orchestrator/workflow-engine/src/main/resources/application.properties new file mode 100644 index 0000000..ea188ec --- /dev/null +++ b/data-orchestrator/workflow-engine/src/main/resources/application.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +cluster.name=datalake +controller.name=datalake_controller +zookeeper.connection=localhost:2181 + +participant.name=datalake_participant +task.list.file=task-list.yaml + +datasync.wm.name=datasync_wf \ No newline at end of file diff --git a/data-orchestrator/workflow-engine/src/main/resources/logback.xml b/data-orchestrator/workflow-engine/src/main/resources/logback.xml new file mode 100644 index 0000000..3afe661 --- /dev/null +++ b/data-orchestrator/workflow-engine/src/main/resources/logback.xml @@ -0,0 +1,50 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<configuration> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern> + </encoder> + </appender> + + <appender name="LOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <File>../logs/airavata.log</File> + <Append>true</Append> + <encoder> + <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern> + </encoder> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>../logs/airavata.log.%d{yyyy-MM-dd}</fileNamePattern> + <maxHistory>30</maxHistory> + <totalSizeCap>1GB</totalSizeCap> + </rollingPolicy> + </appender> + + <logger name="ch.qos.logback" level="WARN"/> + <logger name="org.apache.helix" level="WARN"/> + <logger name="org.apache.zookeeper" level="ERROR"/> + <logger name="org.apache.helix" level="ERROR"/> + <logger name="org.apache.airavata" level="INFO"/> + <logger name="org.hibernate" level="ERROR"/> + <logger name="net.schmizz.sshj" level="WARN"/> + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="LOGFILE"/> + </root> +</configuration> \ No newline at end of file diff --git a/data-orchestrator/workflow-engine/src/main/resources/task-list.yaml b/data-orchestrator/workflow-engine/src/main/resources/task-list.yaml new file mode 100644 index 0000000..e9b17a4 --- /dev/null +++ b/data-orchestrator/workflow-engine/src/main/resources/task-list.yaml @@ -0,0 +1,5 @@ +tasks: + blocking: + - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleBlockingTask + nonBlocking: + - org.apache.airavata.datalake.orchestrator.workflow.engine.task.impl.ExampleNonBlockingTask \ No newline at end of file diff --git a/pom.xml b/pom.xml index 69205f7..9489f35 100644 --- a/pom.xml +++ b/pom.xml @@ -147,6 +147,8 @@ <neo4j.version>3.4.6</neo4j.version> <io.grpc.version>1.25.0</io.grpc.version> <spring-security.version>5.3.4.RELEASE</spring-security.version> + <yaml.version>1.15</yaml.version> + <spring.boot.version>2.2.1.RELEASE</spring.boot.version> </properties>
