Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-12-runner [created] f17426025
ARIA-12 New "Runner" class for simple workflow executions Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/f1742602 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/f1742602 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/f1742602 Branch: refs/heads/ARIA-12-runner Commit: f17426025dea534325b6f2f23d0bdf4af54174a3 Parents: db9ae9c Author: Tal Liron <tal.li...@gmail.com> Authored: Tue Jan 24 17:57:50 2017 +0200 Committer: Tal Liron <tal.li...@gmail.com> Committed: Tue Jan 24 19:01:33 2017 +0200 ---------------------------------------------------------------------- aria/orchestrator/runner.py | 112 +++++++++++++++++++++++++++++++++ tests/mock/__init__.py | 2 +- tests/mock/context.py | 38 ++--------- tests/mock/topology.py | 97 ++++++++++++++++++++++++++++ tests/orchestrator/test_runner.py | 74 ++++++++++++++++++++++ 5 files changed, 288 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f1742602/aria/orchestrator/runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/runner.py b/aria/orchestrator/runner.py new file mode 100644 index 0000000..80f4c07 --- /dev/null +++ b/aria/orchestrator/runner.py @@ -0,0 +1,112 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow runner +""" + +import platform +import os + +from sqlalchemy import (create_engine, orm) # @UnresolvedImport +from sqlalchemy.pool import StaticPool # @UnresolvedImport + +from .context.workflow import WorkflowContext +from .workflows.core.engine import Engine +from .workflows.executor.thread import ThreadExecutor +from ..storage import model +from ..storage.sql_mapi import SQLAlchemyModelAPI +from ..storage.filesystem_rapi import FileSystemResourceAPI +from .. import (application_model_storage, application_resource_storage) + + +SQLITE_IN_MEMORY = 'sqlite:///:memory:' + + +class Runner(object): + """ + Runs workflows on a deployment. + + Handles the initialization of the storage engine and provides convenience methods for + sub-classes to create tasks. + """ + + def __init__(self, workflow_name, workflow_fn, inputs, initialize_model_storage_fn, + deployment_id, in_memory=False): + self._path = None if in_memory else '/tmp/aria.db' + + workflow_context = self.create_workflow_context(workflow_name, deployment_id, + initialize_model_storage_fn) + + tasks_graph = workflow_fn(ctx=workflow_context, **inputs) + + self._engine = Engine( + executor=ThreadExecutor(), + workflow_context=workflow_context, + tasks_graph=tasks_graph) + + def run(self): + self._engine.execute() + + def create_workflow_context(self, workflow_name, deployment_id, initialize_model_storage_fn): + model_storage = self.create_sqlite_model_storage(self._path, True) + initialize_model_storage_fn(model_storage) + resource_storage = self.create_fs_resource_storage() + return WorkflowContext( + name=workflow_name, + model_storage=model_storage, + resource_storage=resource_storage, + deployment_id=deployment_id, + workflow_name=self.__class__.__name__, + task_max_attempts=1, + task_retry_interval=1) + + def create_sqlite_model_storage(self, path=None, fresh=False): # pylint: disable=no-self-use + # Engine + if path is not None: + if fresh and os.path.isfile(path): + os.remove(path) + path_prefix = '' if 'Windows' in platform.system() else '/' + sqlite_engine = create_engine('sqlite:///%s%s' % (path_prefix, path)) + else: + # Causes serious threading problems: + # https://gehrcke.de/2015/05/in-memory-sqlite-database-and-flask-a-threading-trap/ + sqlite_engine = create_engine( + SQLITE_IN_MEMORY, + connect_args={'check_same_thread': False}, + poolclass=StaticPool) + + # Models + model.DeclarativeBase.metadata.create_all(bind=sqlite_engine) # @UndefinedVariable + + # Session + sqlite_session_factory = orm.sessionmaker(bind=sqlite_engine) + if path is not None: + sqlite_session = orm.scoped_session(session_factory=sqlite_session_factory) + else: + sqlite_session = sqlite_session_factory() + + # Storage + sqlite_kwargs = dict(engine=sqlite_engine, session=sqlite_session) + return application_model_storage( + SQLAlchemyModelAPI, + api_kwargs=sqlite_kwargs) + + def create_fs_resource_storage(self, directory='.'): # pylint: disable=no-self-use + fs_kwargs = dict(directory=directory) + return application_resource_storage( + FileSystemResourceAPI, + api_kwargs=fs_kwargs) + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f1742602/tests/mock/__init__.py ---------------------------------------------------------------------- diff --git a/tests/mock/__init__.py b/tests/mock/__init__.py index 14541d0..9004b4c 100644 --- a/tests/mock/__init__.py +++ b/tests/mock/__init__.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -from . import models, context, operations +from . import models, context, topology, operations http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f1742602/tests/mock/context.py ---------------------------------------------------------------------- diff --git a/tests/mock/context.py b/tests/mock/context.py index 5559675..d45a855 100644 --- a/tests/mock/context.py +++ b/tests/mock/context.py @@ -19,43 +19,13 @@ from aria.storage.filesystem_rapi import FileSystemResourceAPI from aria.storage.sql_mapi import SQLAlchemyModelAPI from . import models +from .topology import create_simple_topology_two_nodes def simple(mapi_kwargs, resources_dir=None, **kwargs): model_storage = aria.application_model_storage(SQLAlchemyModelAPI, api_kwargs=mapi_kwargs) - blueprint = models.get_blueprint() - model_storage.blueprint.put(blueprint) - deployment = models.get_deployment(blueprint) - model_storage.deployment.put(deployment) - - ################################################################################# - # Creating a simple deployment with node -> node as a graph - - dependency_node = models.get_dependency_node(deployment) - model_storage.node.put(dependency_node) - storage_dependency_node = model_storage.node.get(dependency_node.id) - - dependency_node_instance = models.get_dependency_node_instance(storage_dependency_node) - model_storage.node_instance.put(dependency_node_instance) - storage_dependency_node_instance = model_storage.node_instance.get(dependency_node_instance.id) - - dependent_node = models.get_dependent_node(deployment) - model_storage.node.put(dependent_node) - storage_dependent_node = model_storage.node.get(dependent_node.id) - - dependent_node_instance = models.get_dependent_node_instance(storage_dependent_node) - model_storage.node_instance.put(dependent_node_instance) - storage_dependent_node_instance = model_storage.node_instance.get(dependent_node_instance.id) - - relationship = models.get_relationship(storage_dependent_node, storage_dependency_node) - model_storage.relationship.put(relationship) - storage_relationship = model_storage.relationship.get(relationship.id) - relationship_instance = models.get_relationship_instance( - relationship=storage_relationship, - target_instance=storage_dependency_node_instance, - source_instance=storage_dependent_node_instance - ) - model_storage.relationship_instance.put(relationship_instance) + + deployment_id = create_simple_topology_two_nodes(model_storage) # pytest tmpdir if resources_dir: @@ -70,7 +40,7 @@ def simple(mapi_kwargs, resources_dir=None, **kwargs): name='simple_context', model_storage=model_storage, resource_storage=resource_storage, - deployment_id=deployment.id, + deployment_id=deployment_id, workflow_name=models.WORKFLOW_NAME, task_max_attempts=models.TASK_MAX_ATTEMPTS, task_retry_interval=models.TASK_RETRY_INTERVAL http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f1742602/tests/mock/topology.py ---------------------------------------------------------------------- diff --git a/tests/mock/topology.py b/tests/mock/topology.py new file mode 100644 index 0000000..9d197f1 --- /dev/null +++ b/tests/mock/topology.py @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime + +from aria.storage import model + +from . import models + + +def create_simple_topology_single_node(model_storage, deployment_id, create_operation): + now = datetime.utcnow() + + blueprint = model.Blueprint(name='mock-blueprint', + created_at=now, + updated_at=now, + plan={}, + main_file_name='mock-file') + model_storage.blueprint.put(blueprint) + + deployment = model.Deployment(name='mock-deployment-%d' % deployment_id, + blueprint_fk=blueprint.id, + created_at=now, + updated_at=now) + model_storage.deployment.put(deployment) + + node = model.Node(name='mock-node', + type='tosca.nodes.Compute', + operations={ + 'tosca.interfaces.node.lifecycle.Standard.create': { + 'operation': create_operation, + 'inputs': { + 'key': 'create', + 'value': True}}}, + number_of_instances=1, + planned_number_of_instances=1, + deploy_number_of_instances=1, + min_number_of_instances=1, + max_number_of_instances=1, + deployment_fk=deployment.id) + model_storage.node.put(node) + + node_instance = model.NodeInstance(name='mock-node-instance', + state='', + node_fk=node.id) + model_storage.node_instance.put(node_instance) + + +def create_simple_topology_two_nodes(model_storage): + blueprint = models.get_blueprint() + model_storage.blueprint.put(blueprint) + deployment = models.get_deployment(blueprint) + model_storage.deployment.put(deployment) + + ################################################################################# + # Creating a simple deployment with node -> node as a graph + + dependency_node = models.get_dependency_node(deployment) + model_storage.node.put(dependency_node) + storage_dependency_node = model_storage.node.get(dependency_node.id) + + dependency_node_instance = models.get_dependency_node_instance(storage_dependency_node) + model_storage.node_instance.put(dependency_node_instance) + storage_dependency_node_instance = model_storage.node_instance.get(dependency_node_instance.id) + + dependent_node = models.get_dependent_node(deployment) + model_storage.node.put(dependent_node) + storage_dependent_node = model_storage.node.get(dependent_node.id) + + dependent_node_instance = models.get_dependent_node_instance(storage_dependent_node) + model_storage.node_instance.put(dependent_node_instance) + storage_dependent_node_instance = model_storage.node_instance.get(dependent_node_instance.id) + + relationship = models.get_relationship(storage_dependent_node, storage_dependency_node) + model_storage.relationship.put(relationship) + storage_relationship = model_storage.relationship.get(relationship.id) + relationship_instance = models.get_relationship_instance( + relationship=storage_relationship, + target_instance=storage_dependency_node_instance, + source_instance=storage_dependent_node_instance + ) + model_storage.relationship_instance.put(relationship_instance) + + return deployment.id + http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f1742602/tests/orchestrator/test_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_runner.py b/tests/orchestrator/test_runner.py new file mode 100644 index 0000000..d5d689e --- /dev/null +++ b/tests/orchestrator/test_runner.py @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from aria import workflow +from aria.orchestrator import operation +from aria.orchestrator.workflows.api.task import OperationTask +from aria.orchestrator.runner import Runner + +from tests import mock + +import pytest + + +OPERATION_RESULTS = {} + + +@operation +def mock_create_operation(ctx, key, value, **kwargs): + OPERATION_RESULTS[key] = value + + +@pytest.fixture(autouse=True) +def cleanup(): + OPERATION_RESULTS.clear() + + +def test_runner_no_tasks(): + @workflow + def workflow_fn(ctx, graph): + pass + + _test_runner(workflow_fn) + + +def test_runner_tasks(): + @workflow + def workflow_fn(ctx, graph): + for node_instance in ctx.model.node_instance.iter(): + graph.add_tasks( + OperationTask.node_instance(instance=node_instance, + name='tosca.interfaces.node.lifecycle.Standard.create')) + + _test_runner(workflow_fn) + + assert OPERATION_RESULTS.get('create') is True + + +def _initialize_model_storage_fn(model_storage): + mock.topology.create_simple_topology_single_node( + model_storage, + 1, + '%s.%s' % (__name__, mock_create_operation.__name__) + ) + + +def _test_runner(workflow_fn): + runner = Runner(workflow_name='runner workflow', + workflow_fn=workflow_fn, + inputs={}, + initialize_model_storage_fn=_initialize_model_storage_fn, + deployment_id=1) + runner.run()