Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-12-runner [created] f17426025


ARIA-12 New "Runner" class for simple workflow executions


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/f1742602
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/f1742602
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/f1742602

Branch: refs/heads/ARIA-12-runner
Commit: f17426025dea534325b6f2f23d0bdf4af54174a3
Parents: db9ae9c
Author: Tal Liron <tal.li...@gmail.com>
Authored: Tue Jan 24 17:57:50 2017 +0200
Committer: Tal Liron <tal.li...@gmail.com>
Committed: Tue Jan 24 19:01:33 2017 +0200

----------------------------------------------------------------------
 aria/orchestrator/runner.py       | 112 +++++++++++++++++++++++++++++++++
 tests/mock/__init__.py            |   2 +-
 tests/mock/context.py             |  38 ++---------
 tests/mock/topology.py            |  97 ++++++++++++++++++++++++++++
 tests/orchestrator/test_runner.py |  74 ++++++++++++++++++++++
 5 files changed, 288 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f1742602/aria/orchestrator/runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/runner.py b/aria/orchestrator/runner.py
new file mode 100644
index 0000000..80f4c07
--- /dev/null
+++ b/aria/orchestrator/runner.py
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow runner
+"""
+
+import platform
+import os
+
+from sqlalchemy import (create_engine, orm) # @UnresolvedImport
+from sqlalchemy.pool import StaticPool # @UnresolvedImport
+
+from .context.workflow import WorkflowContext
+from .workflows.core.engine import Engine
+from .workflows.executor.thread import ThreadExecutor
+from ..storage import model
+from ..storage.sql_mapi import SQLAlchemyModelAPI
+from ..storage.filesystem_rapi import FileSystemResourceAPI
+from .. import (application_model_storage, application_resource_storage)
+
+
+SQLITE_IN_MEMORY = 'sqlite:///:memory:'
+
+
+class Runner(object):
+    """
+    Runs workflows on a deployment.
+
+    Handles the initialization of the storage engine and provides convenience 
methods for
+    sub-classes to create tasks.
+    """
+
+    def __init__(self, workflow_name, workflow_fn, inputs, 
initialize_model_storage_fn,
+                 deployment_id, in_memory=False):
+        self._path = None if in_memory else '/tmp/aria.db'
+
+        workflow_context = self.create_workflow_context(workflow_name, 
deployment_id,
+                                                        
initialize_model_storage_fn)
+
+        tasks_graph = workflow_fn(ctx=workflow_context, **inputs)
+
+        self._engine = Engine(
+            executor=ThreadExecutor(),
+            workflow_context=workflow_context,
+            tasks_graph=tasks_graph)
+
+    def run(self):
+        self._engine.execute()
+
+    def create_workflow_context(self, workflow_name, deployment_id, 
initialize_model_storage_fn):
+        model_storage = self.create_sqlite_model_storage(self._path, True)
+        initialize_model_storage_fn(model_storage)
+        resource_storage = self.create_fs_resource_storage()
+        return WorkflowContext(
+            name=workflow_name,
+            model_storage=model_storage,
+            resource_storage=resource_storage,
+            deployment_id=deployment_id,
+            workflow_name=self.__class__.__name__,
+            task_max_attempts=1,
+            task_retry_interval=1)
+
+    def create_sqlite_model_storage(self, path=None, fresh=False): # pylint: 
disable=no-self-use
+        # Engine
+        if path is not None:
+            if fresh and os.path.isfile(path):
+                os.remove(path)
+            path_prefix = '' if 'Windows' in platform.system() else '/'
+            sqlite_engine = create_engine('sqlite:///%s%s' % (path_prefix, 
path))
+        else:
+            # Causes serious threading problems:
+            # 
https://gehrcke.de/2015/05/in-memory-sqlite-database-and-flask-a-threading-trap/
+            sqlite_engine = create_engine(
+                SQLITE_IN_MEMORY,
+                connect_args={'check_same_thread': False},
+                poolclass=StaticPool)
+
+        # Models
+        model.DeclarativeBase.metadata.create_all(bind=sqlite_engine) # 
@UndefinedVariable
+
+        # Session
+        sqlite_session_factory = orm.sessionmaker(bind=sqlite_engine)
+        if path is not None:
+            sqlite_session = 
orm.scoped_session(session_factory=sqlite_session_factory)
+        else:
+            sqlite_session = sqlite_session_factory()
+
+        # Storage
+        sqlite_kwargs = dict(engine=sqlite_engine, session=sqlite_session)
+        return application_model_storage(
+            SQLAlchemyModelAPI,
+            api_kwargs=sqlite_kwargs)
+
+    def create_fs_resource_storage(self, directory='.'): # pylint: 
disable=no-self-use
+        fs_kwargs = dict(directory=directory)
+        return application_resource_storage(
+            FileSystemResourceAPI,
+            api_kwargs=fs_kwargs)
+    
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f1742602/tests/mock/__init__.py
----------------------------------------------------------------------
diff --git a/tests/mock/__init__.py b/tests/mock/__init__.py
index 14541d0..9004b4c 100644
--- a/tests/mock/__init__.py
+++ b/tests/mock/__init__.py
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from . import models, context, operations
+from . import models, context, topology, operations

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f1742602/tests/mock/context.py
----------------------------------------------------------------------
diff --git a/tests/mock/context.py b/tests/mock/context.py
index 5559675..d45a855 100644
--- a/tests/mock/context.py
+++ b/tests/mock/context.py
@@ -19,43 +19,13 @@ from aria.storage.filesystem_rapi import 
FileSystemResourceAPI
 from aria.storage.sql_mapi import SQLAlchemyModelAPI
 
 from . import models
+from .topology import create_simple_topology_two_nodes
 
 
 def simple(mapi_kwargs, resources_dir=None, **kwargs):
     model_storage = aria.application_model_storage(SQLAlchemyModelAPI, 
api_kwargs=mapi_kwargs)
-    blueprint = models.get_blueprint()
-    model_storage.blueprint.put(blueprint)
-    deployment = models.get_deployment(blueprint)
-    model_storage.deployment.put(deployment)
-
-    
#################################################################################
-    # Creating a simple deployment with node -> node as a graph
-
-    dependency_node = models.get_dependency_node(deployment)
-    model_storage.node.put(dependency_node)
-    storage_dependency_node = model_storage.node.get(dependency_node.id)
-
-    dependency_node_instance = 
models.get_dependency_node_instance(storage_dependency_node)
-    model_storage.node_instance.put(dependency_node_instance)
-    storage_dependency_node_instance = 
model_storage.node_instance.get(dependency_node_instance.id)
-
-    dependent_node = models.get_dependent_node(deployment)
-    model_storage.node.put(dependent_node)
-    storage_dependent_node = model_storage.node.get(dependent_node.id)
-
-    dependent_node_instance = 
models.get_dependent_node_instance(storage_dependent_node)
-    model_storage.node_instance.put(dependent_node_instance)
-    storage_dependent_node_instance = 
model_storage.node_instance.get(dependent_node_instance.id)
-
-    relationship = models.get_relationship(storage_dependent_node, 
storage_dependency_node)
-    model_storage.relationship.put(relationship)
-    storage_relationship = model_storage.relationship.get(relationship.id)
-    relationship_instance = models.get_relationship_instance(
-        relationship=storage_relationship,
-        target_instance=storage_dependency_node_instance,
-        source_instance=storage_dependent_node_instance
-    )
-    model_storage.relationship_instance.put(relationship_instance)
+    
+    deployment_id = create_simple_topology_two_nodes(model_storage)
 
     # pytest tmpdir
     if resources_dir:
@@ -70,7 +40,7 @@ def simple(mapi_kwargs, resources_dir=None, **kwargs):
         name='simple_context',
         model_storage=model_storage,
         resource_storage=resource_storage,
-        deployment_id=deployment.id,
+        deployment_id=deployment_id,
         workflow_name=models.WORKFLOW_NAME,
         task_max_attempts=models.TASK_MAX_ATTEMPTS,
         task_retry_interval=models.TASK_RETRY_INTERVAL

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f1742602/tests/mock/topology.py
----------------------------------------------------------------------
diff --git a/tests/mock/topology.py b/tests/mock/topology.py
new file mode 100644
index 0000000..9d197f1
--- /dev/null
+++ b/tests/mock/topology.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime
+
+from aria.storage import model
+
+from . import models
+
+
+def create_simple_topology_single_node(model_storage, deployment_id, 
create_operation):
+    now = datetime.utcnow()
+    
+    blueprint = model.Blueprint(name='mock-blueprint',
+                                created_at=now,
+                                updated_at=now,
+                                plan={},
+                                main_file_name='mock-file')
+    model_storage.blueprint.put(blueprint)
+    
+    deployment = model.Deployment(name='mock-deployment-%d' % deployment_id,
+                                  blueprint_fk=blueprint.id,
+                                  created_at=now,
+                                  updated_at=now)
+    model_storage.deployment.put(deployment)
+    
+    node = model.Node(name='mock-node',
+                      type='tosca.nodes.Compute',
+                      operations={
+                          'tosca.interfaces.node.lifecycle.Standard.create': {
+                              'operation': create_operation,
+                              'inputs': {
+                                  'key': 'create',
+                                  'value': True}}},
+                      number_of_instances=1,
+                      planned_number_of_instances=1,
+                      deploy_number_of_instances=1,
+                      min_number_of_instances=1,
+                      max_number_of_instances=1,
+                      deployment_fk=deployment.id)
+    model_storage.node.put(node)
+    
+    node_instance = model.NodeInstance(name='mock-node-instance',
+                                       state='',
+                                       node_fk=node.id)
+    model_storage.node_instance.put(node_instance)
+
+
+def create_simple_topology_two_nodes(model_storage):
+    blueprint = models.get_blueprint()
+    model_storage.blueprint.put(blueprint)
+    deployment = models.get_deployment(blueprint)
+    model_storage.deployment.put(deployment)
+
+    
#################################################################################
+    # Creating a simple deployment with node -> node as a graph
+
+    dependency_node = models.get_dependency_node(deployment)
+    model_storage.node.put(dependency_node)
+    storage_dependency_node = model_storage.node.get(dependency_node.id)
+
+    dependency_node_instance = 
models.get_dependency_node_instance(storage_dependency_node)
+    model_storage.node_instance.put(dependency_node_instance)
+    storage_dependency_node_instance = 
model_storage.node_instance.get(dependency_node_instance.id)
+
+    dependent_node = models.get_dependent_node(deployment)
+    model_storage.node.put(dependent_node)
+    storage_dependent_node = model_storage.node.get(dependent_node.id)
+
+    dependent_node_instance = 
models.get_dependent_node_instance(storage_dependent_node)
+    model_storage.node_instance.put(dependent_node_instance)
+    storage_dependent_node_instance = 
model_storage.node_instance.get(dependent_node_instance.id)
+
+    relationship = models.get_relationship(storage_dependent_node, 
storage_dependency_node)
+    model_storage.relationship.put(relationship)
+    storage_relationship = model_storage.relationship.get(relationship.id)
+    relationship_instance = models.get_relationship_instance(
+        relationship=storage_relationship,
+        target_instance=storage_dependency_node_instance,
+        source_instance=storage_dependent_node_instance
+    )
+    model_storage.relationship_instance.put(relationship_instance)
+    
+    return deployment.id
+

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f1742602/tests/orchestrator/test_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_runner.py 
b/tests/orchestrator/test_runner.py
new file mode 100644
index 0000000..d5d689e
--- /dev/null
+++ b/tests/orchestrator/test_runner.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from aria import workflow
+from aria.orchestrator import operation
+from aria.orchestrator.workflows.api.task import OperationTask
+from aria.orchestrator.runner import Runner
+
+from tests import mock
+
+import pytest
+
+
+OPERATION_RESULTS = {}
+
+
+@operation
+def mock_create_operation(ctx, key, value, **kwargs):
+    OPERATION_RESULTS[key] = value
+
+
+@pytest.fixture(autouse=True)
+def cleanup():
+    OPERATION_RESULTS.clear()
+
+
+def test_runner_no_tasks():
+    @workflow
+    def workflow_fn(ctx, graph):
+        pass
+
+    _test_runner(workflow_fn)
+
+
+def test_runner_tasks():
+    @workflow
+    def workflow_fn(ctx, graph):
+        for node_instance in ctx.model.node_instance.iter():
+            graph.add_tasks(
+                OperationTask.node_instance(instance=node_instance,
+                                            
name='tosca.interfaces.node.lifecycle.Standard.create'))
+
+    _test_runner(workflow_fn)
+    
+    assert OPERATION_RESULTS.get('create') is True
+
+
+def _initialize_model_storage_fn(model_storage):
+    mock.topology.create_simple_topology_single_node(
+        model_storage,
+        1,
+       '%s.%s' % (__name__, mock_create_operation.__name__)
+    )
+
+
+def _test_runner(workflow_fn):
+    runner = Runner(workflow_name='runner workflow',
+                    workflow_fn=workflow_fn,
+                    inputs={},
+                    initialize_model_storage_fn=_initialize_model_storage_fn,
+                    deployment_id=1)
+    runner.run()

Reply via email to