Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-276-Support-model-instrumentation-for-workflows 2b29c80ff -> 132bc55e0
test fixes Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/132bc55e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/132bc55e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/132bc55e Branch: refs/heads/ARIA-276-Support-model-instrumentation-for-workflows Commit: 132bc55e0de9427f0503bd3bfdc7b3358ca25858 Parents: 2b29c80 Author: max-orlov <[email protected]> Authored: Sun Jun 11 11:46:04 2017 +0300 Committer: max-orlov <[email protected]> Committed: Sun Jun 11 11:46:04 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflows/api/task.py | 4 +- aria/orchestrator/workflows/core/task.py | 12 ++--- aria/storage/collection_instrumentation.py | 47 +++++++++++--------- .../orchestrator/execution_plugin/test_local.py | 25 ++++++----- .../workflows/builtin/test_execute_operation.py | 9 ++-- 5 files changed, 52 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/132bc55e/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index bcba56e..342448f 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -107,8 +107,8 @@ class OperationTask(BaseTask): :raises aria.orchestrator.workflows.exceptions.OperationNotFoundException: if ``interface_name`` and ``operation_name`` to not refer to an operation on the actor """ - - assert isinstance(actor, (models.Node, models.Relationship)) + # If the actor is wrapped we retrieve the original class, o/w we compare the actor itself + assert isinstance(getattr(actor, '_wrapped', actor), (models.Node, models.Relationship)) # Creating OperationTask directly should raise an error when there is no # interface/operation. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/132bc55e/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index 72d83ea..d732f09 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -124,20 +124,22 @@ class OperationTask(BaseTask): self.operation_name = api_task.operation_name model_storage = api_task._workflow_context.model + actor = getattr(api_task.actor, '_wrapped', api_task.actor) + base_task_model = model_storage.task.model_cls - if isinstance(api_task.actor, models.Node): + if isinstance(actor, models.Node): context_cls = operation_context.NodeOperationContext create_task_model = base_task_model.for_node - elif isinstance(api_task.actor, models.Relationship): + elif isinstance(actor, models.Relationship): context_cls = operation_context.RelationshipOperationContext create_task_model = base_task_model.for_relationship else: raise RuntimeError('No operation context could be created for {actor.model_cls}' - .format(actor=api_task.actor)) + .format(actor=actor)) task_model = create_task_model( name=api_task.name, - actor=api_task.actor, + actor=actor, status=base_task_model.PENDING, max_attempts=api_task.max_attempts, retry_interval=api_task.retry_interval, @@ -156,7 +158,7 @@ class OperationTask(BaseTask): resource_storage=self._workflow_context.resource, service_id=self._workflow_context._service_id, task_id=task_model.id, - actor_id=api_task.actor.id, + actor_id=actor.id, execution_id=self._workflow_context._execution_id, workdir=self._workflow_context._workdir) self._task_id = task_model.id http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/132bc55e/aria/storage/collection_instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/collection_instrumentation.py b/aria/storage/collection_instrumentation.py index 27d8322..b8f656c 100644 --- a/aria/storage/collection_instrumentation.py +++ b/aria/storage/collection_instrumentation.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from functools import partial from . import exceptions @@ -198,23 +199,28 @@ class _InstrumentedList(_InstrumentedCollection, list): return list(self) -class _InstrumentedModel(object): +class _WrappedBase(object): - def __init__(self, original_model, mapi, instrumentation): + def __init__(self, wrapped, instrumentation): + self._wrapped = wrapped + self._instrumentation = instrumentation + + +class _InstrumentedModel(_WrappedBase): + + def __init__(self, mapi, *args, **kwargs): """ The original model - :param original_model: the model to be instrumented + :param wrapped: the model to be instrumented :param mapi: the mapi for that model """ - super(_InstrumentedModel, self).__init__() - self._original_model = original_model + super(_InstrumentedModel, self).__init__(*args, **kwargs) self._mapi = mapi - self._instrumentation = instrumentation self._apply_instrumentation() def __getattr__(self, item): - return_value = getattr(self._original_model, item) - if isinstance(return_value, self._original_model.__class__): + return_value = getattr(self._wrapped, item) + if isinstance(return_value, self._wrapped.__class__): return _create_instrumented_model(return_value, self._mapi, self._instrumentation) if isinstance(return_value, (list, dict)): return _create_wrapped_model(return_value, self._mapi, self._instrumentation) @@ -224,7 +230,7 @@ class _InstrumentedModel(object): for field in self._instrumentation: field_name = field.key field_cls = field.mapper.class_ - field = getattr(self._original_model, field_name) + field = getattr(self._wrapped, field_name) # Preserve the original value. e.g. original attributes would be located under # _attributes @@ -241,20 +247,20 @@ class _InstrumentedModel(object): "ARIA supports instrumentation for dict and list. Field {field} of the " "class {model} is of {type} type.".format( field=field, - model=self._original_model, + model=self._wrapped, type=type(field))) instrumented_class = instrumentation_cls(seq=field, - parent=self._original_model, + parent=self._wrapped, mapi=self._mapi, field_name=field_name, field_cls=field_cls) setattr(self, field_name, instrumented_class) -class _WrappedModel(object): +class _WrappedModel(_WrappedBase): - def __init__(self, wrapped, instrumentation, **kwargs): + def __init__(self, instrumentation_kwargs, *args, **kwargs): """ :param instrumented_cls: The class to be instrumented @@ -262,9 +268,8 @@ class _WrappedModel(object): :param wrapped: the currently wrapped instance :param kwargs: and kwargs to the passed to the instrumented class. """ - self._kwargs = kwargs - self._instrumentation = instrumentation - self._wrapped = wrapped + super(_WrappedModel, self).__init__(*args, **kwargs) + self._kwargs = instrumentation_kwargs def _wrap(self, value): if value.__class__ in (class_.class_ for class_ in self._instrumentation): @@ -286,16 +291,18 @@ class _WrappedModel(object): return self._wrap(self._wrapped[item]) -def _create_instrumented_model(original_model, mapi, instrumentation, **kwargs): +def _create_instrumented_model(original_model, mapi, instrumentation): return type('Instrumented{0}'.format(original_model.__class__.__name__), (_InstrumentedModel,), - {})(original_model, mapi, instrumentation, **kwargs) + {})(wrapped=original_model, instrumentation=instrumentation, mapi=mapi) -def _create_wrapped_model(original_model, mapi, instrumentation, **kwargs): +def _create_wrapped_model(original_model, mapi, instrumentation): return type('Wrapped{0}'.format(original_model.__class__.__name__), (_WrappedModel, ), - {})(original_model, instrumentation, mapi=mapi, **kwargs) + {})(wrapped=original_model, + instrumentation=instrumentation, + instrumentation_kwargs=dict(mapi=mapi)) def instrument(instrumentation, original_model, mapi): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/132bc55e/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index d792a57..853c416 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -477,20 +477,21 @@ if __name__ == '__main__': 'input_as_env_var': env_var }) + node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface = mock.models.create_interface( + node.service, + 'test', + 'op', + operation_kwargs=dict( + function='{0}.{1}'.format( + operations.__name__, + operations.run_script_locally.__name__), + arguments=arguments) + ) + node.interfaces[interface.name] = interface + @workflow def mock_workflow(ctx, graph): - node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) - interface = mock.models.create_interface( - node.service, - 'test', - 'op', - operation_kwargs=dict( - function='{0}.{1}'.format( - operations.__name__, - operations.run_script_locally.__name__), - arguments=arguments) - ) - node.interfaces[interface.name] = interface graph.add_tasks(api.task.OperationTask( node, interface_name='test', http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/132bc55e/tests/orchestrator/workflows/builtin/test_execute_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/builtin/test_execute_operation.py b/tests/orchestrator/workflows/builtin/test_execute_operation.py index 88818ca..8713e3c 100644 --- a/tests/orchestrator/workflows/builtin/test_execute_operation.py +++ b/tests/orchestrator/workflows/builtin/test_execute_operation.py @@ -56,12 +56,9 @@ def test_execute_operation(ctx): ) assert len(execute_tasks) == 1 - assert execute_tasks[0].name == task.OperationTask.NAME_FORMAT.format( - type='node', - name=node.name, - interface=interface_name, - operation=operation_name - ) + assert getattr(execute_tasks[0].actor, '_wrapped', execute_tasks[0].actor) == node + assert execute_tasks[0].operation_name == operation_name + assert execute_tasks[0].interface_name == interface_name # TODO: add more scenarios
