Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-48-aria-cli 4c168db8e -> 277c4ae82
created dry-run executor Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/277c4ae8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/277c4ae8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/277c4ae8 Branch: refs/heads/ARIA-48-aria-cli Commit: 277c4ae82a00833ecb533db18ac694d8727c5fa3 Parents: 4c168db Author: Ran Ziv <r...@gigaspaces.com> Authored: Tue Apr 4 12:02:11 2017 +0300 Committer: Ran Ziv <r...@gigaspaces.com> Committed: Tue Apr 4 12:02:11 2017 +0300 ---------------------------------------------------------------------- aria/cli/cli/aria.py | 5 +++ aria/cli/cli/helptexts.py | 2 + aria/cli/commands/executions.py | 15 +++++-- aria/orchestrator/workflow_runner.py | 5 ++- aria/orchestrator/workflows/executor/dry.py | 54 ++++++++++++++++++++++++ 5 files changed, 76 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/277c4ae8/aria/cli/cli/aria.py ---------------------------------------------------------------------- diff --git a/aria/cli/cli/aria.py b/aria/cli/cli/aria.py index baa72eb..1664ce5 100644 --- a/aria/cli/cli/aria.py +++ b/aria/cli/cli/aria.py @@ -304,6 +304,11 @@ class Options(object): is_flag=True, help=helptexts.JSON_OUTPUT) + self.dry_execution = click.option( + '--dry', + is_flag=True, + help=helptexts.DRY_EXECUTION) + self.init_hard_reset = click.option( '--hard', is_flag=True, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/277c4ae8/aria/cli/cli/helptexts.py ---------------------------------------------------------------------- diff --git a/aria/cli/cli/helptexts.py b/aria/cli/cli/helptexts.py index 02519cb..0d66d6b 100644 --- a/aria/cli/cli/helptexts.py +++ b/aria/cli/cli/helptexts.py @@ -32,6 +32,8 @@ HARD_RESET = "Hard reset the configuration, including coloring and loggers" ENABLE_COLORS = "Enable colors in logger (use --hard when working with" \ " an initialized environment) [default: False]" +DRY_EXECUTION = "Execute a workflow dry run (prints operations information without causing side " \ + "effects)" SERVICE_TEMPLATE_FILENAME = ( "The name of the archive's main service template file. " "This is only relevant if uploading an archive") http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/277c4ae8/aria/cli/commands/executions.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py index 6d8b949..82ee51a 100644 --- a/aria/cli/commands/executions.py +++ b/aria/cli/commands/executions.py @@ -18,6 +18,7 @@ from ..table import print_data from ..cli import aria from ...modeling.models import Execution from ...orchestrator.workflow_runner import WorkflowRunner +from ...orchestrator.workflows.executor.dry import DryExecutor from ...utils import formatting from ...utils import threading @@ -101,6 +102,7 @@ def list(service_name, @aria.argument('workflow-name') @aria.options.service_name(required=True) @aria.options.inputs +@aria.options.dry_execution @aria.options.task_max_attempts() @aria.options.task_retry_interval() @aria.options.verbose() @@ -111,6 +113,7 @@ def list(service_name, def start(workflow_name, service_name, inputs, + dry, task_max_attempts, task_retry_interval, model_storage, @@ -119,19 +122,21 @@ def start(workflow_name, logger): """Execute a workflow - `WORKFLOW_ID` is the id of the workflow to execute (e.g. `uninstall`) + `WORKFLOW_NAME` is the name of the workflow to execute (e.g. `uninstall`) """ + executor = DryExecutor() if dry else None # use WorkflowRunner's default executor + workflow_runner = \ WorkflowRunner(workflow_name, service_name, inputs, model_storage, resource_storage, plugin_manager, - task_max_attempts, task_retry_interval) + executor, task_max_attempts, task_retry_interval) execution_thread_name = '{0}_{1}'.format(service_name, workflow_name) execution_thread = threading.ExceptionThread(target=workflow_runner.execute, name=execution_thread_name) execution_thread.daemon = True # allows force-cancel to exit immediately - logger.info('Starting execution. Press Ctrl+C cancel') + logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else '')) execution_thread.start() try: while execution_thread.is_alive(): @@ -148,6 +153,10 @@ def start(workflow_name, if execution.status == Execution.FAILED: logger.info('Execution error:\n{0}'.format(execution.error)) + if dry: + # remove traces of the dry execution (including tasks, logs, inputs..) + model_storage.execution.delete(execution) + def _cancel_execution(workflow_runner, execution_thread, logger): logger.info('Cancelling execution. Press Ctrl+C again to force-cancel') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/277c4ae8/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 78b17b8..982dff1 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -40,7 +40,7 @@ class WorkflowRunner(object): def __init__(self, workflow_name, service_name, inputs, model_storage, resource_storage, plugin_manager, - task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS, + executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS, task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL): self._model_storage = model_storage @@ -71,8 +71,9 @@ class WorkflowRunner(object): execution_inputs_dict = models.Parameter.unwrap_dict(self.execution.inputs) self._tasks_graph = workflow_fn(ctx=workflow_context, **execution_inputs_dict) + executor = executor or ProcessExecutor(plugin_manager=plugin_manager) self._engine = Engine( - executor=ProcessExecutor(plugin_manager=plugin_manager), + executor=executor, workflow_context=workflow_context, tasks_graph=self._tasks_graph) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/277c4ae8/aria/orchestrator/workflows/executor/dry.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py new file mode 100644 index 0000000..69ce53c --- /dev/null +++ b/aria/orchestrator/workflows/executor/dry.py @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Dry executor +""" + +from datetime import datetime + +from .base import BaseExecutor +from ....modeling.models import Parameter + + +class DryExecutor(BaseExecutor): + """ + Executor which dry runs tasks - prints task information without causing any side effects + """ + + def execute(self, task): + # updating the task manually instead of calling self._task_started(task), + # to avoid any side effects raising that event might cause + with task._update(): + task.started_at = datetime.utcnow() + task.status = task.STARTED + + actor_type = type(task.actor).__name__.lower() + implementation = '{0} > '.format(task.plugin) if task.plugin else '' + implementation += task.implementation + inputs = Parameter.unwrap_dict(task.inputs) + + self.logger.info( + 'Executing {actor_type} {actor_name} operation {interface_name} {operation_name}: ' + '{implementation} (Inputs: {inputs})' + .format(actor_type=actor_type, actor_name=task.actor.name, + interface_name=task.interface_name, operation_name=task.operation_name, + implementation=implementation, inputs=inputs)) + + # updating the task manually instead of calling self._task_succeeded(task), + # to avoid any side effects raising that event might cause + with task._update(): + task.ended_at = datetime.utcnow() + task.status = task.SUCCESS