Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-74-process-executor-hook [created] fdbed5359
ARIA-74 Add process executor extension registration hooks Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/fdbed535 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/fdbed535 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/fdbed535 Branch: refs/heads/ARIA-74-process-executor-hook Commit: fdbed53598e6692ac6d3da3b3515749ad08b1124 Parents: eaf9974 Author: Dan Kilman <[email protected]> Authored: Mon Jan 23 16:55:41 2017 +0200 Committer: Dan Kilman <[email protected]> Committed: Mon Jan 23 17:26:07 2017 +0200 ---------------------------------------------------------------------- aria/extension.py | 15 ++++ aria/orchestrator/workflows/executor/process.py | 5 ++ .../executor/test_process_executor_extension.py | 80 ++++++++++++++++++++ 3 files changed, 100 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdbed535/aria/extension.py ---------------------------------------------------------------------- diff --git a/aria/extension.py b/aria/extension.py index ddb7c25..6e40038 100644 --- a/aria/extension.py +++ b/aria/extension.py @@ -118,8 +118,23 @@ class _ParserExtensionRegistration(_ExtensionRegistration): parser = _ParserExtensionRegistration() +class _ProcessExecutorExtensionRegistration(_ExtensionRegistration): + """Process executor extension class decorator""" + + @_registrar + def decorate(self): + """ + The operation function executed by the process executor will be decorated with the function + returned from decorate(). + """ + return [] + +executor = _ProcessExecutorExtensionRegistration() + + def init(): """ Initialize all registrars by calling all registered functions """ parser.init() + executor.init() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdbed535/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 2cc9178..770a060 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -39,6 +39,8 @@ import Queue import jsonpickle +import aria +from aria.extension import executor from aria.utils import imports from aria.utils import exceptions from aria.orchestrator.workflows.executor import base @@ -291,6 +293,9 @@ def _main(): try: ctx = serialization.operation_context_from_dict(context_dict) task_func = imports.load_attribute(operation_mapping) + aria.install_aria_extensions() + for decorate in executor.decorate(): + task_func = decorate(task_func) task_func(ctx=ctx, **operation_inputs) messenger.succeeded(tracked_changes=instrument.tracked_changes) except BaseException as e: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdbed535/tests/orchestrator/workflows/executor/test_process_executor_extension.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py new file mode 100644 index 0000000..fd46d14 --- /dev/null +++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py @@ -0,0 +1,80 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria import extension +from aria.orchestrator.workflows import api +from aria.orchestrator.workflows.core import engine +from aria.orchestrator.workflows.executor import process +from aria.orchestrator import workflow, operation + +import tests +from tests import mock +from tests import storage + + +def test_decorate_extension(context, executor): + inputs = {'input1': 1, 'input2': 2} + + def get_node_instance(ctx): + return ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME) + + @workflow + def mock_workflow(ctx, graph): + node_instance = get_node_instance(ctx) + op = 'test.op' + op_dict = {'operation': '{0}.{1}'.format(__name__, _mock_operation.__name__)} + node_instance.node.operations['test.op'] = op_dict + task = api.task.OperationTask.node_instance(instance=node_instance, name=op, inputs=inputs) + graph.add_tasks(task) + return graph + graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter + eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph) + eng.execute() + out = get_node_instance(context).runtime_properties['out'] + assert out['wrapper_inputs'] == inputs + assert out['function_inputs'] == inputs + + [email protected] +class MockProcessExecutorExtension(object): + + def decorate(self): + def decorator(function): + def wrapper(ctx, **operation_inputs): + ctx.node_instance.runtime_properties['out'] = {'wrapper_inputs': operation_inputs} + function(ctx=ctx, **operation_inputs) + return wrapper + return decorator + + +@operation +def _mock_operation(ctx, **operation_inputs): + ctx.node_instance.runtime_properties['out']['function_inputs'] = operation_inputs + + [email protected] +def executor(): + result = process.ProcessExecutor(python_path=[tests.ROOT_DIR]) + yield result + result.close() + + [email protected] +def context(tmpdir): + result = mock.context.simple(storage.get_sqlite_api_kwargs(str(tmpdir))) + yield result + storage.release_sqlite_storage(result.model)
