Repository: incubator-ariatosca Updated Branches: refs/heads/extract_execution_creation_from_workflow_runner 730750f3d -> 9611f6147
wip Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/9611f614 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/9611f614 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/9611f614 Branch: refs/heads/extract_execution_creation_from_workflow_runner Commit: 9611f61474bd5f51baa08f87a83bed24e15442ca Parents: 730750f Author: max-orlov <[email protected]> Authored: Sun Nov 19 10:09:54 2017 +0200 Committer: max-orlov <[email protected]> Committed: Sun Nov 19 10:09:54 2017 +0200 ---------------------------------------------------------------------- aria/cli/commands/executions.py | 24 +++-- aria/orchestrator/workflow_runner.py | 152 +++++++++++++++++------------- 2 files changed, 102 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9611f614/aria/cli/commands/executions.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py index cecbbc5..2ab3a33 100644 --- a/aria/cli/commands/executions.py +++ b/aria/cli/commands/executions.py @@ -26,7 +26,7 @@ from .. import logger as cli_logger from .. import execution_logging from ..core import aria from ...modeling.models import Execution -from ...orchestrator.workflow_runner import WorkflowRunner +from ...orchestrator import workflow_runner from ...orchestrator.workflows.executor.dry import DryExecutor from ...utils import formatting from ...utils import threading @@ -143,15 +143,20 @@ def start(workflow_name, service = model_storage.service.get_by_name(service_name) executor = DryExecutor() if dry else None # use WorkflowRunner's default executor - workflow_runner = \ - WorkflowRunner( + execution = workflow_runner.create_execution_model( + service, workflow_name, inputs + ) + model_storage.execution.put(execution) + + wf_runner = \ + workflow_runner.WorkflowRunner( model_storage, resource_storage, plugin_manager, - service_id=service.id, workflow_name=workflow_name, inputs=inputs, executor=executor, + execution=execution, executor=executor, task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval ) logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else '')) - _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern) + _run_execution(wf_runner, logger, model_storage, dry, mark_pattern) @executions.command(name='resume', @@ -188,14 +193,15 @@ def resume(execution_id, .format(execution=execution, valid_status=execution.CANCELLED)) return - workflow_runner = \ - WorkflowRunner( + wf_runner = \ + workflow_runner.WorkflowRunner( model_storage, resource_storage, plugin_manager, - execution_id=execution_id, retry_failed_tasks=retry_failed_tasks, executor=executor, + execution=execution, executor=executor, resume=True, + retry_failed_tasks=retry_failed_tasks, ) logger.info('Resuming {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else '')) - _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern) + _run_execution(wf_runner, logger, model_storage, dry, mark_pattern) def _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9611f614/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 4dbf29b..aebde8e 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -37,11 +37,18 @@ DEFAULT_TASK_RETRY_INTERVAL = 30 class WorkflowRunner(object): - def __init__(self, model_storage, resource_storage, plugin_manager, - execution_id=None, retry_failed_tasks=False, - service_id=None, workflow_name=None, inputs=None, executor=None, - task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS, - task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL): + def __init__( + self, + model_storage, + resource_storage, + plugin_manager, + execution, + executor=None, + resume=False, + retry_failed_tasks=False, + task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS, + task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL + ): """ Manages a single workflow execution on a given service. @@ -57,37 +64,23 @@ class WorkflowRunner(object): :param task_retry_interval: retry interval between retry attempts of a failing task """ - if not (execution_id or (workflow_name and service_id)): - exceptions.InvalidWorkflowRunnerParams( - "Either provide execution id in order to resume a workflow or workflow name " - "and service id with inputs") - - self._is_resume = execution_id is not None + self._is_resume = resume self._retry_failed_tasks = retry_failed_tasks self._model_storage = model_storage self._resource_storage = resource_storage + self._execution = execution # the IDs are stored rather than the models themselves, so this module could be used # by several threads without raising errors on model objects shared between threads - if self._is_resume: - self._execution_id = execution_id - self._service_id = self.execution.service.id - self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name - else: - self._service_id = service_id - self._workflow_name = workflow_name - self._validate_workflow_exists_for_service() - self._execution_id = self._create_execution_model(inputs).id - self._workflow_context = WorkflowContext( name=self.__class__.__name__, model_storage=self._model_storage, resource_storage=resource_storage, - service_id=service_id, - execution_id=self._execution_id, - workflow_name=self._workflow_name, + service_id=self.execution.service.id, + execution_id=self.execution.id, + workflow_name=self.execution.workflow_name, task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval) @@ -107,7 +100,7 @@ class WorkflowRunner(object): @property def execution_id(self): - return self._execution_id + return self.execution.id @property def execution(self): @@ -115,7 +108,7 @@ class WorkflowRunner(object): @property def service(self): - return self._model_storage.service.get(self._service_id) + return self._model_storage.service.get(self._execution.service.id) def execute(self): self._engine.execute(ctx=self._workflow_context, @@ -125,49 +118,12 @@ class WorkflowRunner(object): def cancel(self): self._engine.cancel_execution(ctx=self._workflow_context) - def _create_execution_model(self, inputs): - execution = models.Execution( - created_at=datetime.utcnow(), - service=self.service, - workflow_name=self._workflow_name, - inputs={}) - - if self._workflow_name in builtin.BUILTIN_WORKFLOWS: - workflow_inputs = dict() # built-in workflows don't have any inputs - else: - workflow_inputs = self.service.workflows[self._workflow_name].inputs - - modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs, - supplied_inputs=inputs or {}) - modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs, - supplied_inputs=inputs or {}) - execution.inputs = modeling_utils.merge_parameter_values( - inputs, workflow_inputs, model_cls=models.Input) - # TODO: these two following calls should execute atomically - self._validate_no_active_executions(execution) - self._model_storage.execution.put(execution) - return execution - - def _validate_workflow_exists_for_service(self): - if self._workflow_name not in self.service.workflows and \ - self._workflow_name not in builtin.BUILTIN_WORKFLOWS: - raise exceptions.UndeclaredWorkflowError( - 'No workflow policy {0} declared in service {1}' - .format(self._workflow_name, self.service.name)) - - def _validate_no_active_executions(self, execution): - active_executions = [e for e in self.service.executions if e.is_active()] - if active_executions: - raise exceptions.ActiveExecutionsError( - "Can't start execution; Service {0} has an active execution with ID {1}" - .format(self.service.name, active_executions[0].id)) - def _get_workflow_fn(self): if self._workflow_name in builtin.BUILTIN_WORKFLOWS: return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX, - self._workflow_name)) + self.execution.workflow_name)) - workflow = self.service.workflows[self._workflow_name] + workflow = self.service.workflows[self.execution.workflow_name] # TODO: Custom workflow support needs improvement, currently this code uses internal # knowledge of the resource storage; Instead, workflows should probably be loaded @@ -186,3 +142,69 @@ class WorkflowRunner(object): self._workflow_name, workflow.function)) return workflow_fn + + +def create_execution_model(service, workflow_name, inputs): + _validate_workflow_exists_for_service(service, workflow_name) + _validate_no_active_executions(service) + execution = models.Execution( + created_at=datetime.utcnow(), + service_fk=service.id, + workflow_name=workflow_name, + inputs={}) + + if workflow_name in builtin.BUILTIN_WORKFLOWS: + workflow_inputs = dict() # built-in workflows don't have any inputs + else: + workflow_inputs = service.workflows[workflow_name].inputs + + modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs, + supplied_inputs=inputs or {}) + modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs, + supplied_inputs=inputs or {}) + execution.inputs = modeling_utils.merge_parameter_values( + inputs, workflow_inputs, model_cls=models.Input) + + return execution + + + +def _create_execution_model(self, inputs): + execution = models.Execution( + created_at=datetime.utcnow(), + service=self.service, + workflow_name=self._workflow_name, + inputs={}) + + if self._workflow_name in builtin.BUILTIN_WORKFLOWS: + workflow_inputs = dict() # built-in workflows don't have any inputs + else: + workflow_inputs = self.service.workflows[self._workflow_name].inputs + + modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs, + supplied_inputs=inputs or {}) + modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs, + supplied_inputs=inputs or {}) + execution.inputs = modeling_utils.merge_parameter_values( + inputs, workflow_inputs, model_cls=models.Input) + # TODO: these two following calls should execute atomically + self._validate_no_active_executions(execution) + self._model_storage.execution.put(execution) + return execution + + + +def _validate_no_active_executions(service): + active_executions = [e for e in service.executions if e.is_active()] + if active_executions: + raise exceptions.ActiveExecutionsError( + "Can't start execution; Service {0} has an active execution with ID {1}" + .format(service.name, active_executions[0].id)) + + +def _validate_workflow_exists_for_service(service, workflow_name): + if workflow_name not in service.workflows and \ + workflow_name not in builtin.BUILTIN_WORKFLOWS: + raise exceptions.UndeclaredWorkflowError( + 'No workflow policy {0} declared in service {1}' + .format(workflow_name, service.name)) \ No newline at end of file
