Fix pylint issues in aria.workflows.core package
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/177a3f6c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/177a3f6c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/177a3f6c Branch: refs/heads/pylint-aria-storage Commit: 177a3f6ce25c2b935025b28634baaafc5261f36e Parents: f300324 Author: Ran Ziv <[email protected]> Authored: Wed Oct 19 17:24:19 2016 +0300 Committer: Ran Ziv <[email protected]> Committed: Wed Oct 19 18:04:00 2016 +0300 ---------------------------------------------------------------------- .pylintrc | 3 +- aria/workflows/core/engine.py | 12 ++++++- aria/workflows/core/executor.py | 35 ++++++++++++++++---- aria/workflows/core/tasks.py | 57 +++++++++++++++++++++++++++++---- aria/workflows/core/translation.py | 33 ++++++++++++++++--- 5 files changed, 119 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/177a3f6c/.pylintrc ---------------------------------------------------------------------- diff --git a/.pylintrc b/.pylintrc index 120a828..7f4f203 100644 --- a/.pylintrc +++ b/.pylintrc @@ -62,8 +62,7 @@ confidence= # --enable=similarities". If you want to run only the classes checker, but have # no Warning level messages displayed, use"--disable=all --enable=classes # --disable=W" -disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating -disable=redefined-builtin +disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,redefined-builtin [REPORTS] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/177a3f6c/aria/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py index 7cc4781..9c6eff8 100644 --- a/aria/workflows/core/engine.py +++ b/aria/workflows/core/engine.py @@ -13,6 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +The workflow engine. Executes workflows +""" + import time import networkx @@ -23,6 +27,9 @@ from . import translation class Engine(logger.LoggerMixin): + """ + The workflow engine. Executes workflows + """ def __init__(self, executor, workflow_context, tasks_graph, **kwargs): super(Engine, self).__init__(**kwargs) @@ -35,6 +42,9 @@ class Engine(logger.LoggerMixin): execution_graph=self._execution_graph) def execute(self): + """ + execute the workflow + """ try: events.start_workflow_signal.send(self._workflow_context) while True: @@ -65,7 +75,7 @@ class Engine(logger.LoggerMixin): return len(self._execution_graph.succ.get(task.id, {})) > 0 def _all_tasks_consumed(self): - len(self._execution_graph.node) == 0 + return len(self._execution_graph.node) == 0 def _tasks_iter(self): return (data['task'] for _, data in self._execution_graph.nodes_iter(data=True)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/177a3f6c/aria/workflows/core/executor.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/executor.py b/aria/workflows/core/executor.py index 0c3aeb1..ace445a 100644 --- a/aria/workflows/core/executor.py +++ b/aria/workflows/core/executor.py @@ -13,6 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +Executors for workflow tasks +""" + import threading import multiprocessing import Queue @@ -24,14 +28,24 @@ from aria.tools import module class BaseExecutor(object): + """ + Base class for executors for running tasks + """ def __init__(self, *args, **kwargs): pass def execute(self, task): + """ + Execute a task + :param task: task to execute + """ raise NotImplementedError def close(self): + """ + Close the executor + """ pass @staticmethod @@ -48,6 +62,9 @@ class BaseExecutor(object): class CurrentThreadBlockingExecutor(BaseExecutor): + """ + Executor which runs tasks in the current thread (blocking) + """ def execute(self, task): self._task_started(task) @@ -61,6 +78,9 @@ class CurrentThreadBlockingExecutor(BaseExecutor): class ThreadExecutor(BaseExecutor): + """ + Executor which runs tasks in a separate thread + """ def __init__(self, pool_size=1, *args, **kwargs): super(ThreadExecutor, self).__init__(*args, **kwargs) @@ -96,11 +116,14 @@ class ThreadExecutor(BaseExecutor): except BaseException as e: self._task_failed(task, exception=e) # Daemon threads - except: + except BaseException: pass class MultiprocessExecutor(BaseExecutor): + """ + Executor which runs tasks in a multiprocess environment + """ def __init__(self, pool_size=1, *args, **kwargs): super(MultiprocessExecutor, self).__init__(*args, **kwargs) @@ -108,9 +131,9 @@ class MultiprocessExecutor(BaseExecutor): self._manager = multiprocessing.Manager() self._queue = self._manager.Queue() self._tasks = {} - self._listener = threading.Thread(target=self._listener) - self._listener.daemon = True - self._listener.start() + self._listener_thread = threading.Thread(target=self._listener) + self._listener_thread.daemon = True + self._listener_thread.start() self._pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=1) @@ -126,7 +149,7 @@ class MultiprocessExecutor(BaseExecutor): self._pool.close() self._stopped = True self._pool.join() - self._listener.join() + self._listener_thread.join() def _listener(self): while not self._stopped: @@ -143,7 +166,7 @@ class MultiprocessExecutor(BaseExecutor): # TODO: something raise RuntimeError() # Daemon threads - except: + except BaseException: pass def _remove_task(self, task_id): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/177a3f6c/aria/workflows/core/tasks.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/tasks.py b/aria/workflows/core/tasks.py index 83b4263..76ae609 100644 --- a/aria/workflows/core/tasks.py +++ b/aria/workflows/core/tasks.py @@ -13,43 +13,87 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +Workflow tasks +""" + class BaseTask(object): + """ + Base class for Task objects + """ def __init__(self, id, name, context): - self.id = id - self.name = name - self.context = context + self._id = id + self._name = name + self._context = context + + @property + def id(self): + """ + :return: the task's id + """ + return self._id + + @property + def name(self): + """ + :return: the task's name + """ + return self._name + + @property + def context(self): + """ + :return: the task's context + """ + return self._context class StartWorkflowTask(BaseTask): + """ + Tasks marking a workflow start + """ pass class EndWorkflowTask(BaseTask): + """ + Tasks marking a workflow end + """ pass class StartSubWorkflowTask(BaseTask): + """ + Tasks marking a subworkflow start + """ pass class EndSubWorkflowTask(BaseTask): + """ + Tasks marking a subworkflow end + """ pass class OperationTask(BaseTask): + """ + Operation tasks + """ + def __init__(self, *args, **kwargs): super(OperationTask, self).__init__(*args, **kwargs) self._create_operation_in_storage() def _create_operation_in_storage(self): - Operation = self.context.storage.operation.model_cls - operation = Operation( + operation_cls = self.context.storage.operation.model_cls + operation = operation_cls( id=self.context.id, execution_id=self.context.execution_id, max_retries=self.context.parameters.get('max_retries', 1), - status=Operation.PENDING, + status=operation_cls.PENDING, ) self.context.operation = operation @@ -58,4 +102,3 @@ class OperationTask(BaseTask): return getattr(self.context, attr) except AttributeError: return super(OperationTask, self).__getattribute__(attr) - http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/177a3f6c/aria/workflows/core/translation.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/translation.py b/aria/workflows/core/translation.py index 71d7bcd..dc483c6 100644 --- a/aria/workflows/core/translation.py +++ b/aria/workflows/core/translation.py @@ -13,6 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +Translation of user graph's API to the execution graph +""" + from aria import contexts from . import tasks @@ -25,6 +29,15 @@ def build_execution_graph( start_cls=tasks.StartWorkflowTask, end_cls=tasks.EndWorkflowTask, depends_on=()): + """ + Translates the user graph to the execution graph + :param task_graph: The user's graph + :param workflow_context: The workflow + :param execution_graph: The execution graph that is being built + :param start_cls: internal use + :param end_cls: internal use + :param depends_on: internal use + """ # Insert start marker start_task = start_cls(id=_start_graph_suffix(task_graph.id), name=_start_graph_suffix(task_graph.name), @@ -32,7 +45,10 @@ def build_execution_graph( _add_task_and_dependencies(execution_graph, start_task, depends_on) for operation_or_workflow, dependencies in task_graph.task_tree(reverse=True): - operation_dependencies = _get_tasks_from_dependencies(execution_graph, dependencies, default=[start_task]) + operation_dependencies = _get_tasks_from_dependencies( + execution_graph, + dependencies, + default=[start_task]) if _is_operation(operation_or_workflow): # Add the task an the dependencies @@ -52,8 +68,14 @@ def build_execution_graph( ) # Insert end marker - workflow_dependencies = _get_tasks_from_dependencies(execution_graph, task_graph.leaf_tasks, default=[start_task]) - end_task = end_cls(id=_end_graph_suffix(task_graph.id), name=_end_graph_suffix(task_graph.name), context=workflow_context) + workflow_dependencies = _get_tasks_from_dependencies( + execution_graph, + task_graph.leaf_tasks, + default=[start_task]) + end_task = end_cls( + id=_end_graph_suffix(task_graph.id), + name=_end_graph_suffix(task_graph.name), + context=workflow_context) _add_task_and_dependencies(execution_graph, end_task, workflow_dependencies) @@ -67,8 +89,9 @@ def _get_tasks_from_dependencies(execution_graph, dependencies, default=()): """ Returns task list from dependencies. """ - return [execution_graph.node[dependency.id if _is_operation(dependency) else _end_graph_suffix(dependency.id)] - ['task'] for dependency in dependencies] or default + return [execution_graph.node[dependency.id if _is_operation(dependency) + else _end_graph_suffix(dependency.id)]['task'] + for dependency in dependencies] or default def _is_operation(task):
