Repository: incubator-ariatosca Updated Branches: refs/heads/wf-wip f30032440 -> 602734363
Fix pylint issues in aria.workflows.core package Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/2419c043 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/2419c043 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/2419c043 Branch: refs/heads/wf-wip Commit: 2419c043c1d60c393fdf90fc1e9daf52db975553 Parents: f300324 Author: Ran Ziv <[email protected]> Authored: Wed Oct 19 17:24:19 2016 +0300 Committer: Ran Ziv <[email protected]> Committed: Wed Oct 19 18:01:19 2016 +0300 ---------------------------------------------------------------------- aria/workflows/core/engine.py | 12 ++++++- aria/workflows/core/executor.py | 35 ++++++++++++++++---- aria/workflows/core/tasks.py | 57 +++++++++++++++++++++++++++++---- aria/workflows/core/translation.py | 33 ++++++++++++++++--- 4 files changed, 118 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2419c043/aria/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py index 7cc4781..9c6eff8 100644 --- a/aria/workflows/core/engine.py +++ b/aria/workflows/core/engine.py @@ -13,6 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +The workflow engine. Executes workflows +""" + import time import networkx @@ -23,6 +27,9 @@ from . import translation class Engine(logger.LoggerMixin): + """ + The workflow engine. Executes workflows + """ def __init__(self, executor, workflow_context, tasks_graph, **kwargs): super(Engine, self).__init__(**kwargs) @@ -35,6 +42,9 @@ class Engine(logger.LoggerMixin): execution_graph=self._execution_graph) def execute(self): + """ + execute the workflow + """ try: events.start_workflow_signal.send(self._workflow_context) while True: @@ -65,7 +75,7 @@ class Engine(logger.LoggerMixin): return len(self._execution_graph.succ.get(task.id, {})) > 0 def _all_tasks_consumed(self): - len(self._execution_graph.node) == 0 + return len(self._execution_graph.node) == 0 def _tasks_iter(self): return (data['task'] for _, data in self._execution_graph.nodes_iter(data=True)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2419c043/aria/workflows/core/executor.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/executor.py b/aria/workflows/core/executor.py index 0c3aeb1..ace445a 100644 --- a/aria/workflows/core/executor.py +++ b/aria/workflows/core/executor.py @@ -13,6 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +Executors for workflow tasks +""" + import threading import multiprocessing import Queue @@ -24,14 +28,24 @@ from aria.tools import module class BaseExecutor(object): + """ + Base class for executors for running tasks + """ def __init__(self, *args, **kwargs): pass def execute(self, task): + """ + Execute a task + :param task: task to execute + """ raise NotImplementedError def close(self): + """ + Close the executor + """ pass @staticmethod @@ -48,6 +62,9 @@ class BaseExecutor(object): class CurrentThreadBlockingExecutor(BaseExecutor): + """ + Executor which runs tasks in the current thread (blocking) + """ def execute(self, task): self._task_started(task) @@ -61,6 +78,9 @@ class CurrentThreadBlockingExecutor(BaseExecutor): class ThreadExecutor(BaseExecutor): + """ + Executor which runs tasks in a separate thread + """ def __init__(self, pool_size=1, *args, **kwargs): super(ThreadExecutor, self).__init__(*args, **kwargs) @@ -96,11 +116,14 @@ class ThreadExecutor(BaseExecutor): except BaseException as e: self._task_failed(task, exception=e) # Daemon threads - except: + except BaseException: pass class MultiprocessExecutor(BaseExecutor): + """ + Executor which runs tasks in a multiprocess environment + """ def __init__(self, pool_size=1, *args, **kwargs): super(MultiprocessExecutor, self).__init__(*args, **kwargs) @@ -108,9 +131,9 @@ class MultiprocessExecutor(BaseExecutor): self._manager = multiprocessing.Manager() self._queue = self._manager.Queue() self._tasks = {} - self._listener = threading.Thread(target=self._listener) - self._listener.daemon = True - self._listener.start() + self._listener_thread = threading.Thread(target=self._listener) + self._listener_thread.daemon = True + self._listener_thread.start() self._pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=1) @@ -126,7 +149,7 @@ class MultiprocessExecutor(BaseExecutor): self._pool.close() self._stopped = True self._pool.join() - self._listener.join() + self._listener_thread.join() def _listener(self): while not self._stopped: @@ -143,7 +166,7 @@ class MultiprocessExecutor(BaseExecutor): # TODO: something raise RuntimeError() # Daemon threads - except: + except BaseException: pass def _remove_task(self, task_id): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2419c043/aria/workflows/core/tasks.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/tasks.py b/aria/workflows/core/tasks.py index 83b4263..a101968 100644 --- a/aria/workflows/core/tasks.py +++ b/aria/workflows/core/tasks.py @@ -13,43 +13,87 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +Workflow tasks. These tasks are meant to be used with the task_graph's API for executing workflows. +""" + class BaseTask(object): + """ + Base class for Task objects + """ def __init__(self, id, name, context): - self.id = id - self.name = name - self.context = context + self._id = id + self._name = name + self._context = context + + @property + def id(self): + """ + :return: the task's id + """ + return self._id + + @property + def name(self): + """ + :return: the task's name + """ + return self._name + + @property + def context(self): + """ + :return: the task's context + """ + return self._context class StartWorkflowTask(BaseTask): + """ + Tasks marking a workflow start + """ pass class EndWorkflowTask(BaseTask): + """ + Tasks marking a workflow end + """ pass class StartSubWorkflowTask(BaseTask): + """ + Tasks marking a subworkflow start + """ pass class EndSubWorkflowTask(BaseTask): + """ + Tasks marking a subworkflow end + """ pass class OperationTask(BaseTask): + """ + Operation tasks + """ + def __init__(self, *args, **kwargs): super(OperationTask, self).__init__(*args, **kwargs) self._create_operation_in_storage() def _create_operation_in_storage(self): - Operation = self.context.storage.operation.model_cls - operation = Operation( + operation_cls = self.context.storage.operation.model_cls + operation = operation_cls( id=self.context.id, execution_id=self.context.execution_id, max_retries=self.context.parameters.get('max_retries', 1), - status=Operation.PENDING, + status=operation_cls.PENDING, ) self.context.operation = operation @@ -58,4 +102,3 @@ class OperationTask(BaseTask): return getattr(self.context, attr) except AttributeError: return super(OperationTask, self).__getattribute__(attr) - http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2419c043/aria/workflows/core/translation.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/translation.py b/aria/workflows/core/translation.py index 71d7bcd..1af5274 100644 --- a/aria/workflows/core/translation.py +++ b/aria/workflows/core/translation.py @@ -13,6 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +Translation of user graph's API to the execution graph +""" + from aria import contexts from . import tasks @@ -25,6 +29,15 @@ def build_execution_graph( start_cls=tasks.StartWorkflowTask, end_cls=tasks.EndWorkflowTask, depends_on=()): + """ + Translates the user graph to the execution graph + :param task_graph: The user's graph + :param workflow_context: The workflow + :param execution_graph: The execution graph that is being built + :param start_cls: internal use + :param end_cls: internal use + :param depends_on: internal use + """ # Insert start marker start_task = start_cls(id=_start_graph_suffix(task_graph.id), name=_start_graph_suffix(task_graph.name), @@ -32,7 +45,10 @@ def build_execution_graph( _add_task_and_dependencies(execution_graph, start_task, depends_on) for operation_or_workflow, dependencies in task_graph.task_tree(reverse=True): - operation_dependencies = _get_tasks_from_dependencies(execution_graph, dependencies, default=[start_task]) + operation_dependencies = _get_tasks_from_dependencies( + execution_graph, + dependencies, + default=[start_task]) if _is_operation(operation_or_workflow): # Add the task an the dependencies @@ -52,8 +68,14 @@ def build_execution_graph( ) # Insert end marker - workflow_dependencies = _get_tasks_from_dependencies(execution_graph, task_graph.leaf_tasks, default=[start_task]) - end_task = end_cls(id=_end_graph_suffix(task_graph.id), name=_end_graph_suffix(task_graph.name), context=workflow_context) + workflow_dependencies = _get_tasks_from_dependencies( + execution_graph, + task_graph.leaf_tasks, + default=[start_task]) + end_task = end_cls( + id=_end_graph_suffix(task_graph.id), + name=_end_graph_suffix(task_graph.name), + context=workflow_context) _add_task_and_dependencies(execution_graph, end_task, workflow_dependencies) @@ -67,8 +89,9 @@ def _get_tasks_from_dependencies(execution_graph, dependencies, default=()): """ Returns task list from dependencies. """ - return [execution_graph.node[dependency.id if _is_operation(dependency) else _end_graph_suffix(dependency.id)] - ['task'] for dependency in dependencies] or default + return [execution_graph.node[dependency.id if _is_operation(dependency) + else _end_graph_suffix(dependency.id)]['task'] + for dependency in dependencies] or default def _is_operation(task):
