http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/core/engine.py b/apache-ariatosca-0.1.1/aria/orchestrator/workflows/core/engine.py deleted file mode 100644 index d9c77e9..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/core/engine.py +++ /dev/null @@ -1,182 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Workflow execution. -""" - -import time -from datetime import datetime - -from aria import logger -from aria.modeling import models -from aria.orchestrator import events -from aria.orchestrator.context import operation - -from .. import exceptions -from ..executor.base import StubTaskExecutor -# Import required so all signals are registered -from . import events_handler # pylint: disable=unused-import - - -class Engine(logger.LoggerMixin): - """ - Executes workflows. - """ - - def __init__(self, executors, **kwargs): - super(Engine, self).__init__(**kwargs) - self._executors = executors.copy() - self._executors.setdefault(StubTaskExecutor, StubTaskExecutor()) - - def execute(self, ctx, resuming=False): - """ - Executes the workflow. - """ - if resuming: - events.on_resume_workflow_signal.send(ctx) - - tasks_tracker = _TasksTracker(ctx) - try: - events.start_workflow_signal.send(ctx) - while True: - cancel = self._is_cancel(ctx) - if cancel: - break - for task in tasks_tracker.ended_tasks: - self._handle_ended_tasks(task) - tasks_tracker.finished(task) - for task in tasks_tracker.executable_tasks: - tasks_tracker.executing(task) - self._handle_executable_task(ctx, task) - if tasks_tracker.all_tasks_consumed: - break - else: - time.sleep(0.1) - if cancel: - self._terminate_tasks(tasks_tracker.executing_tasks) - events.on_cancelled_workflow_signal.send(ctx) - else: - events.on_success_workflow_signal.send(ctx) - except BaseException as e: - # Cleanup any remaining tasks - self._terminate_tasks(tasks_tracker.executing_tasks) - events.on_failure_workflow_signal.send(ctx, exception=e) - raise - - def _terminate_tasks(self, tasks): - for task in tasks: - try: - self._executors[task._executor].terminate(task.id) - except BaseException: - pass - - @staticmethod - def cancel_execution(ctx): - """ - Send a cancel request to the engine. If execution already started, execution status - will be modified to ``cancelling`` status. If execution is in pending mode, execution status - will be modified to ``cancelled`` directly. - """ - events.on_cancelling_workflow_signal.send(ctx) - - @staticmethod - def _is_cancel(ctx): - execution = ctx.model.execution.refresh(ctx.execution) - return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED) - - def _handle_executable_task(self, ctx, task): - task_executor = self._executors[task._executor] - - # If the task is a stub, a default context is provided, else it should hold the context cls - context_cls = operation.BaseOperationContext if task._stub_type else task._context_cls - op_ctx = context_cls( - model_storage=ctx.model, - resource_storage=ctx.resource, - workdir=ctx._workdir, - task_id=task.id, - actor_id=task.actor.id if task.actor else None, - service_id=task.execution.service.id, - execution_id=task.execution.id, - name=task.name - ) - - if not task._stub_type: - events.sent_task_signal.send(op_ctx) - task_executor.execute(op_ctx) - - @staticmethod - def _handle_ended_tasks(task): - if task.status == models.Task.FAILED and not task.ignore_failure: - raise exceptions.ExecutorException('Workflow failed') - - -class _TasksTracker(object): - def __init__(self, ctx): - self._ctx = ctx - self._tasks = ctx.execution.tasks - self._executed_tasks = [task for task in self._tasks if task.has_ended()] - self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks)) - self._executing_tasks = [] - - @property - def all_tasks_consumed(self): - return len(self._executed_tasks) == len(self._tasks) and len(self._executing_tasks) == 0 - - def executing(self, task): - # Task executing could be retrying (thus removed and added earlier) - if task not in self._executing_tasks: - self._executable_tasks.remove(task) - self._executing_tasks.append(task) - - def finished(self, task): - self._executing_tasks.remove(task) - self._executed_tasks.append(task) - - @property - def ended_tasks(self): - for task in self.executing_tasks: - if task.has_ended(): - yield task - - @property - def executable_tasks(self): - now = datetime.utcnow() - # we need both lists since retrying task are in the executing task list. - for task in self._update_tasks(self._executing_tasks + self._executable_tasks): - if all([task.is_waiting(), - task.due_at <= now, - all(dependency in self._executed_tasks for dependency in task.dependencies) - ]): - yield task - - @property - def executing_tasks(self): - for task in self._update_tasks(self._executing_tasks): - yield task - - @property - def executed_tasks(self): - for task in self._update_tasks(self._executed_tasks): - yield task - - @property - def tasks(self): - for task in self._update_tasks(self._tasks): - yield task - - def _update_tasks(self, tasks): - for task in tasks: - yield self._ctx.model.task.refresh(task)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/core/events_handler.py b/apache-ariatosca-0.1.1/aria/orchestrator/workflows/core/events_handler.py deleted file mode 100644 index 37801de..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/core/events_handler.py +++ /dev/null @@ -1,161 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Workflow event handling. -""" - -from datetime import ( - datetime, - timedelta, -) - -from ... import events -from ... import exceptions - - [email protected]_task_signal.connect -def _task_sent(ctx, *args, **kwargs): - with ctx.persist_changes: - ctx.task.status = ctx.task.SENT - - [email protected]_task_signal.connect -def _task_started(ctx, *args, **kwargs): - with ctx.persist_changes: - ctx.task.started_at = datetime.utcnow() - ctx.task.status = ctx.task.STARTED - _update_node_state_if_necessary(ctx, is_transitional=True) - - [email protected]_failure_task_signal.connect -def _task_failed(ctx, exception, *args, **kwargs): - with ctx.persist_changes: - should_retry = all([ - not isinstance(exception, exceptions.TaskAbortException), - ctx.task.attempts_count < ctx.task.max_attempts or - ctx.task.max_attempts == ctx.task.INFINITE_RETRIES, - # ignore_failure check here means the task will not be retried and it will be marked - # as failed. The engine will also look at ignore_failure so it won't fail the - # workflow. - not ctx.task.ignore_failure - ]) - if should_retry: - retry_interval = None - if isinstance(exception, exceptions.TaskRetryException): - retry_interval = exception.retry_interval - if retry_interval is None: - retry_interval = ctx.task.retry_interval - ctx.task.status = ctx.task.RETRYING - ctx.task.attempts_count += 1 - ctx.task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval) - else: - ctx.task.ended_at = datetime.utcnow() - ctx.task.status = ctx.task.FAILED - - [email protected]_success_task_signal.connect -def _task_succeeded(ctx, *args, **kwargs): - with ctx.persist_changes: - ctx.task.ended_at = datetime.utcnow() - ctx.task.status = ctx.task.SUCCESS - - _update_node_state_if_necessary(ctx) - - [email protected]_workflow_signal.connect -def _workflow_started(workflow_context, *args, **kwargs): - with workflow_context.persist_changes: - execution = workflow_context.execution - # the execution may already be in the process of cancelling - if execution.status in (execution.CANCELLING, execution.CANCELLED): - return - execution.status = execution.STARTED - execution.started_at = datetime.utcnow() - - [email protected]_failure_workflow_signal.connect -def _workflow_failed(workflow_context, exception, *args, **kwargs): - with workflow_context.persist_changes: - execution = workflow_context.execution - execution.error = str(exception) - execution.status = execution.FAILED - execution.ended_at = datetime.utcnow() - - [email protected]_success_workflow_signal.connect -def _workflow_succeeded(workflow_context, *args, **kwargs): - with workflow_context.persist_changes: - execution = workflow_context.execution - execution.status = execution.SUCCEEDED - execution.ended_at = datetime.utcnow() - - [email protected]_cancelled_workflow_signal.connect -def _workflow_cancelled(workflow_context, *args, **kwargs): - with workflow_context.persist_changes: - execution = workflow_context.execution - # _workflow_cancelling function may have called this function already - if execution.status == execution.CANCELLED: - return - # the execution may have already been finished - elif execution.status in (execution.SUCCEEDED, execution.FAILED): - _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status) - else: - execution.status = execution.CANCELLED - execution.ended_at = datetime.utcnow() - - [email protected]_resume_workflow_signal.connect -def _workflow_resume(workflow_context, *args, **kwargs): - with workflow_context.persist_changes: - execution = workflow_context.execution - execution.status = execution.PENDING - # Any non ended task would be put back to pending state - for task in execution.tasks: - if not task.has_ended(): - task.status = task.PENDING - - [email protected]_cancelling_workflow_signal.connect -def _workflow_cancelling(workflow_context, *args, **kwargs): - with workflow_context.persist_changes: - execution = workflow_context.execution - if execution.status == execution.PENDING: - return _workflow_cancelled(workflow_context=workflow_context) - # the execution may have already been finished - elif execution.status in (execution.SUCCEEDED, execution.FAILED): - _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status) - else: - execution.status = execution.CANCELLING - - -def _update_node_state_if_necessary(ctx, is_transitional=False): - # TODO: this is not the right way to check! the interface name is arbitrary - # and also will *never* be the type name - node = ctx.task.node if ctx.task is not None else None - if (node is not None) and \ - (ctx.task.interface_name in ('Standard', 'tosca.interfaces.node.lifecycle.Standard')): - state = node.determine_state(op_name=ctx.task.operation_name, - is_transitional=is_transitional) - if state: - node.state = state - ctx.model.node.update(node) - - -def _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, status): - workflow_context.logger.info( - "'{workflow_name}' workflow execution {status} before the cancel request" - "was fully processed".format(workflow_name=workflow_context.workflow_name, status=status)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/workflows/core/graph_compiler.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/core/graph_compiler.py b/apache-ariatosca-0.1.1/aria/orchestrator/workflows/core/graph_compiler.py deleted file mode 100644 index 81543d5..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/core/graph_compiler.py +++ /dev/null @@ -1,118 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ....modeling import models -from .. import executor, api - - -class GraphCompiler(object): - def __init__(self, ctx, default_executor): - self._ctx = ctx - self._default_executor = default_executor - self._stub_executor = executor.base.StubTaskExecutor - self._model_to_api_id = {} - - def compile(self, - task_graph, - start_stub_type=models.Task.START_WORKFLOW, - end_stub_type=models.Task.END_WORKFLOW, - depends_on=()): - """ - Translates the user graph to the execution graph - :param task_graph: The user's graph - :param start_stub_type: internal use - :param end_stub_type: internal use - :param depends_on: internal use - """ - depends_on = list(depends_on) - - # Insert start marker - start_task = self._create_stub_task( - start_stub_type, depends_on, self._start_graph_suffix(task_graph.id), task_graph.name, - ) - - for task in task_graph.topological_order(reverse=True): - dependencies = \ - (self._get_tasks_from_dependencies(task_graph.get_dependencies(task)) - or [start_task]) - - if isinstance(task, api.task.OperationTask): - self._create_operation_task(task, dependencies) - - elif isinstance(task, api.task.WorkflowTask): - # Build the graph recursively while adding start and end markers - self.compile( - task, models.Task.START_SUBWROFKLOW, models.Task.END_SUBWORKFLOW, dependencies - ) - elif isinstance(task, api.task.StubTask): - self._create_stub_task(models.Task.STUB, dependencies, task.id) - else: - raise RuntimeError('Undefined state') - - # Insert end marker - self._create_stub_task( - end_stub_type, - self._get_non_dependent_tasks(self._ctx.execution) or [start_task], - self._end_graph_suffix(task_graph.id), - task_graph.name - ) - - def _create_stub_task(self, stub_type, dependencies, api_id, name=None): - model_task = models.Task( - name=name, - dependencies=dependencies, - execution=self._ctx.execution, - _executor=self._stub_executor, - _stub_type=stub_type) - self._ctx.model.task.put(model_task) - self._model_to_api_id[model_task.id] = api_id - return model_task - - def _create_operation_task(self, api_task, dependencies): - model_task = models.Task.from_api_task( - api_task, self._default_executor, dependencies=dependencies) - self._ctx.model.task.put(model_task) - self._model_to_api_id[model_task.id] = api_task.id - return model_task - - @staticmethod - def _start_graph_suffix(api_id): - return '{0}-Start'.format(api_id) - - @staticmethod - def _end_graph_suffix(api_id): - return '{0}-End'.format(api_id) - - @staticmethod - def _get_non_dependent_tasks(execution): - tasks_with_dependencies = set() - for task in execution.tasks: - tasks_with_dependencies.update(task.dependencies) - return list(set(execution.tasks) - set(tasks_with_dependencies)) - - def _get_tasks_from_dependencies(self, dependencies): - """ - Returns task list from dependencies. - """ - tasks = [] - for dependency in dependencies: - if isinstance(dependency, (api.task.StubTask, api.task.OperationTask)): - dependency_name = dependency.id - else: - dependency_name = self._end_graph_suffix(dependency.id) - tasks.extend(task for task in self._ctx.execution.tasks - if self._model_to_api_id.get(task.id, None) == dependency_name) - return tasks http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/workflows/events_logging.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/events_logging.py b/apache-ariatosca-0.1.1/aria/orchestrator/workflows/events_logging.py deleted file mode 100644 index 9eee1e1..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/events_logging.py +++ /dev/null @@ -1,85 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -""" -Workflow event logging. -""" - -from .. import events -from ... import modeling - - -def _get_task_name(task): - if isinstance(task.actor, modeling.model_bases.service_instance.RelationshipBase): - return '{source_node.name}->{target_node.name}'.format( - source_node=task.actor.source_node, target_node=task.actor.target_node) - else: - return task.actor.name - - [email protected]_task_signal.connect -def _start_task_handler(ctx, **kwargs): - # If the task has no function this is an empty task. - if ctx.task.function: - suffix = 'started...' - logger = ctx.logger.info - else: - suffix = 'has no implementation' - logger = ctx.logger.debug - - logger('{name} {task.interface_name}.{task.operation_name} {suffix}'.format( - name=_get_task_name(ctx.task), task=ctx.task, suffix=suffix)) - - [email protected]_success_task_signal.connect -def _success_task_handler(ctx, **kwargs): - if not ctx.task.function: - return - ctx.logger.info('{name} {task.interface_name}.{task.operation_name} successful' - .format(name=_get_task_name(ctx.task), task=ctx.task)) - - [email protected]_failure_task_signal.connect -def _failure_operation_handler(ctx, traceback, **kwargs): - ctx.logger.error( - '{name} {task.interface_name}.{task.operation_name} failed' - .format(name=_get_task_name(ctx.task), task=ctx.task), extra=dict(traceback=traceback) - ) - - [email protected]_workflow_signal.connect -def _start_workflow_handler(context, **kwargs): - context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context)) - - [email protected]_failure_workflow_signal.connect -def _failure_workflow_handler(context, **kwargs): - context.logger.info("'{ctx.workflow_name}' workflow execution failed".format(ctx=context)) - - [email protected]_success_workflow_signal.connect -def _success_workflow_handler(context, **kwargs): - context.logger.info("'{ctx.workflow_name}' workflow execution succeeded".format(ctx=context)) - - [email protected]_cancelled_workflow_signal.connect -def _cancel_workflow_handler(context, **kwargs): - context.logger.info("'{ctx.workflow_name}' workflow execution canceled".format(ctx=context)) - - [email protected]_cancelling_workflow_signal.connect -def _cancelling_workflow_handler(context, **kwargs): - context.logger.info("Cancelling '{ctx.workflow_name}' workflow execution".format(ctx=context)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/workflows/exceptions.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/exceptions.py b/apache-ariatosca-0.1.1/aria/orchestrator/workflows/exceptions.py deleted file mode 100644 index 2a1d6b1..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/exceptions.py +++ /dev/null @@ -1,91 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Workflow exceptions. -""" - -import os - -from .. import exceptions - - -class ExecutorException(exceptions.AriaError): - """ - General executor exception. - """ - pass - - -class ProcessException(ExecutorException): - """ - Raised when subprocess execution fails. - """ - - def __init__(self, command, stderr=None, stdout=None, return_code=None): - """ - Process class Exception - :param list command: child process command - :param str message: custom message - :param str stderr: child process stderr - :param str stdout: child process stdout - :param int return_code: child process exit code - """ - super(ProcessException, self).__init__("child process failed") - self.command = command - self.stderr = stderr - self.stdout = stdout - self.return_code = return_code - - @property - def explanation(self): - """ - Describes the error in detail - """ - return ( - 'Command "{error.command}" executed with an error.{0}' - 'code: {error.return_code}{0}' - 'error: {error.stderr}{0}' - 'output: {error.stdout}'.format(os.linesep, error=self)) - - -class AriaEngineError(exceptions.AriaError): - """ - Raised by the workflow engine. - """ - - -class TaskException(exceptions.AriaError): - """ - Raised by the task. - """ - - -class TaskCreationException(TaskException): - """ - Could not create the task. - """ - - -class OperationNotFoundException(TaskCreationException): - """ - Could not find an operation on the node or relationship. - """ - - -class PluginNotFoundException(TaskCreationException): - """ - Could not find a plugin matching the plugin specification. - """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/executor/base.py b/apache-ariatosca-0.1.1/aria/orchestrator/workflows/executor/base.py deleted file mode 100644 index e7d03ea..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/executor/base.py +++ /dev/null @@ -1,75 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Base class for task executors. -""" - -from aria import logger -from aria.orchestrator import events - - -class BaseExecutor(logger.LoggerMixin): - """ - Base class for task executors. - """ - def _execute(self, ctx): - raise NotImplementedError - - def execute(self, ctx): - """ - Executes a task. - - :param task: task to execute - """ - if ctx.task.function: - self._execute(ctx) - else: - # In this case the task is missing a function. This task still gets to an - # executor, but since there is nothing to run, we by default simply skip the - # execution itself. - self._task_started(ctx) - self._task_succeeded(ctx) - - def close(self): - """ - Closes the executor. - """ - pass - - def terminate(self, task_id): - """ - Terminate the executing task - :return: - """ - pass - - @staticmethod - def _task_started(ctx): - events.start_task_signal.send(ctx) - - @staticmethod - def _task_failed(ctx, exception, traceback=None): - events.on_failure_task_signal.send(ctx, exception=exception, traceback=traceback) - - @staticmethod - def _task_succeeded(ctx): - events.on_success_task_signal.send(ctx) - - -class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method - def execute(self, ctx, *args, **kwargs): - with ctx.persist_changes: - ctx.task.status = ctx.task.SUCCESS http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/workflows/executor/celery.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/executor/celery.py b/apache-ariatosca-0.1.1/aria/orchestrator/workflows/executor/celery.py deleted file mode 100644 index 0716e5b..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/executor/celery.py +++ /dev/null @@ -1,97 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Celery task executor. -""" - -import threading -import Queue - -from aria.orchestrator.workflows.executor import BaseExecutor - - -class CeleryExecutor(BaseExecutor): - """ - Celery task executor. - """ - - def __init__(self, app, *args, **kwargs): - super(CeleryExecutor, self).__init__(*args, **kwargs) - self._app = app - self._started_signaled = False - self._started_queue = Queue.Queue(maxsize=1) - self._tasks = {} - self._results = {} - self._receiver = None - self._stopped = False - self._receiver_thread = threading.Thread(target=self._events_receiver) - self._receiver_thread.daemon = True - self._receiver_thread.start() - self._started_queue.get(timeout=30) - - def _execute(self, ctx): - self._tasks[ctx.id] = ctx - arguments = dict(arg.unwrapped for arg in ctx.arguments.values()) - arguments['ctx'] = ctx.context - self._results[ctx.id] = self._app.send_task( - ctx.operation_mapping, - kwargs=arguments, - task_id=ctx.id, - queue=self._get_queue(ctx)) - - def close(self): - self._stopped = True - if self._receiver: - self._receiver.should_stop = True - self._receiver_thread.join() - - @staticmethod - def _get_queue(task): - return None if task else None # TODO - - def _events_receiver(self): - with self._app.connection() as connection: - self._receiver = self._app.events.Receiver(connection, handlers={ - 'task-started': self._celery_task_started, - 'task-succeeded': self._celery_task_succeeded, - 'task-failed': self._celery_task_failed, - }) - for _ in self._receiver.itercapture(limit=None, timeout=None, wakeup=True): - if not self._started_signaled: - self._started_queue.put(True) - self._started_signaled = True - if self._stopped: - return - - def _celery_task_started(self, event): - self._task_started(self._tasks[event['uuid']]) - - def _celery_task_succeeded(self, event): - task, _ = self._remove_task(event['uuid']) - self._task_succeeded(task) - - def _celery_task_failed(self, event): - task, async_result = self._remove_task(event['uuid']) - try: - exception = async_result.result - except BaseException as e: - exception = RuntimeError( - 'Could not de-serialize exception of task {0} --> {1}: {2}' - .format(task.name, type(e).__name__, str(e))) - self._task_failed(task, exception=exception) - - def _remove_task(self, task_id): - return self._tasks.pop(task_id), self._results.pop(task_id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/workflows/executor/dry.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/executor/dry.py b/apache-ariatosca-0.1.1/aria/orchestrator/workflows/executor/dry.py deleted file mode 100644 index 9314e5d..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/executor/dry.py +++ /dev/null @@ -1,54 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Dry task executor. -""" - -from datetime import datetime - -from . import base - - -class DryExecutor(base.BaseExecutor): # pylint: disable=abstract-method - """ - Dry task executor: prints task information without causing any side effects. - """ - def execute(self, ctx): - with ctx.persist_changes: - # updating the task manually instead of calling self._task_started(task), - # to avoid any side effects raising that event might cause - ctx.task.started_at = datetime.utcnow() - ctx.task.status = ctx.task.STARTED - - dry_msg = '<dry> {name} {task.interface_name}.{task.operation_name} {suffix}' - logger = ctx.logger.info if ctx.task.function else ctx.logger.debug - - if hasattr(ctx.task.actor, 'source_node'): - name = '{source_node.name}->{target_node.name}'.format( - source_node=ctx.task.actor.source_node, target_node=ctx.task.actor.target_node) - else: - name = ctx.task.actor.name - - if ctx.task.function: - logger(dry_msg.format(name=name, task=ctx.task, suffix='started...')) - logger(dry_msg.format(name=name, task=ctx.task, suffix='successful')) - else: - logger(dry_msg.format(name=name, task=ctx.task, suffix='has no implementation')) - - # updating the task manually instead of calling self._task_succeeded(task), - # to avoid any side effects raising that event might cause - ctx.task.ended_at = datetime.utcnow() - ctx.task.status = ctx.task.SUCCESS http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/executor/process.py b/apache-ariatosca-0.1.1/aria/orchestrator/workflows/executor/process.py deleted file mode 100644 index 81da26f..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/executor/process.py +++ /dev/null @@ -1,350 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Sub-process task executor. -""" - -# pylint: disable=wrong-import-position - -import os -import sys - -# As part of the process executor implementation, subprocess are started with this module as their -# entry point. We thus remove this module's directory from the python path if it happens to be -# there - -from collections import namedtuple - -script_dir = os.path.dirname(__file__) -if script_dir in sys.path: - sys.path.remove(script_dir) - -import contextlib -import io -import threading -import socket -import struct -import subprocess -import tempfile -import Queue -import pickle - -import psutil -import jsonpickle - -import aria -from aria.orchestrator.workflows.executor import base -from aria.extension import process_executor -from aria.utils import ( - imports, - exceptions, - process as process_utils -) - - -_INT_FMT = 'I' -_INT_SIZE = struct.calcsize(_INT_FMT) -UPDATE_TRACKED_CHANGES_FAILED_STR = \ - 'Some changes failed writing to storage. For more info refer to the log.' - - -_Task = namedtuple('_Task', 'proc, ctx') - - -class ProcessExecutor(base.BaseExecutor): - """ - Sub-process task executor. - """ - - def __init__(self, plugin_manager=None, python_path=None, *args, **kwargs): - super(ProcessExecutor, self).__init__(*args, **kwargs) - self._plugin_manager = plugin_manager - - # Optional list of additional directories that should be added to - # subprocesses python path - self._python_path = python_path or [] - - # Flag that denotes whether this executor has been stopped - self._stopped = False - - # Contains reference to all currently running tasks - self._tasks = {} - - self._request_handlers = { - 'started': self._handle_task_started_request, - 'succeeded': self._handle_task_succeeded_request, - 'failed': self._handle_task_failed_request, - } - - # Server socket used to accept task status messages from subprocesses - self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._server_socket.bind(('localhost', 0)) - self._server_socket.listen(10) - self._server_port = self._server_socket.getsockname()[1] - - # Used to send a "closed" message to the listener when this executor is closed - self._messenger = _Messenger(task_id=None, port=self._server_port) - - # Queue object used by the listener thread to notify this constructed it has started - # (see last line of this __init__ method) - self._listener_started = Queue.Queue() - - # Listener thread to handle subprocesses task status messages - self._listener_thread = threading.Thread(target=self._listener) - self._listener_thread.daemon = True - self._listener_thread.start() - - # Wait for listener thread to actually start before returning - self._listener_started.get(timeout=60) - - def close(self): - if self._stopped: - return - self._stopped = True - # Listener thread may be blocked on "accept" call. This will wake it up with an explicit - # "closed" message - self._messenger.closed() - self._server_socket.close() - self._listener_thread.join(timeout=60) - - # we use set(self._tasks) since tasks may change in the process of closing - for task_id in set(self._tasks): - self.terminate(task_id) - - def terminate(self, task_id): - task = self._remove_task(task_id) - # The process might have managed to finish, thus it would not be in the tasks list - if task: - try: - parent_process = psutil.Process(task.proc.pid) - for child_process in reversed(parent_process.children(recursive=True)): - try: - child_process.kill() - except BaseException: - pass - parent_process.kill() - except BaseException: - pass - - def _execute(self, ctx): - self._check_closed() - - # Temporary file used to pass arguments to the started subprocess - file_descriptor, arguments_json_path = tempfile.mkstemp(prefix='executor-', suffix='.json') - os.close(file_descriptor) - with open(arguments_json_path, 'wb') as f: - f.write(pickle.dumps(self._create_arguments_dict(ctx))) - - env = self._construct_subprocess_env(task=ctx.task) - # Asynchronously start the operation in a subprocess - proc = subprocess.Popen( - [ - sys.executable, - os.path.expanduser(os.path.expandvars(__file__)), - os.path.expanduser(os.path.expandvars(arguments_json_path)) - ], - env=env) - - self._tasks[ctx.task.id] = _Task(ctx=ctx, proc=proc) - - def _remove_task(self, task_id): - return self._tasks.pop(task_id, None) - - def _check_closed(self): - if self._stopped: - raise RuntimeError('Executor closed') - - def _create_arguments_dict(self, ctx): - return { - 'task_id': ctx.task.id, - 'function': ctx.task.function, - 'operation_arguments': dict(arg.unwrapped for arg in ctx.task.arguments.values()), - 'port': self._server_port, - 'context': ctx.serialization_dict, - } - - def _construct_subprocess_env(self, task): - env = os.environ.copy() - - if task.plugin_fk and self._plugin_manager: - # If this is a plugin operation, - # load the plugin on the subprocess env we're constructing - self._plugin_manager.load_plugin(task.plugin, env=env) - - # Add user supplied directories to injected PYTHONPATH - if self._python_path: - process_utils.append_to_pythonpath(*self._python_path, env=env) - - return env - - def _listener(self): - # Notify __init__ method this thread has actually started - self._listener_started.put(True) - while not self._stopped: - try: - with self._accept_request() as (request, response): - request_type = request['type'] - if request_type == 'closed': - break - request_handler = self._request_handlers.get(request_type) - if not request_handler: - raise RuntimeError('Invalid request type: {0}'.format(request_type)) - task_id = request['task_id'] - request_handler(task_id=task_id, request=request, response=response) - except BaseException as e: - self.logger.debug('Error in process executor listener: {0}'.format(e)) - - @contextlib.contextmanager - def _accept_request(self): - with contextlib.closing(self._server_socket.accept()[0]) as connection: - message = _recv_message(connection) - response = {} - try: - yield message, response - except BaseException as e: - response['exception'] = exceptions.wrap_if_needed(e) - raise - finally: - _send_message(connection, response) - - def _handle_task_started_request(self, task_id, **kwargs): - self._task_started(self._tasks[task_id].ctx) - - def _handle_task_succeeded_request(self, task_id, **kwargs): - task = self._remove_task(task_id) - if task: - self._task_succeeded(task.ctx) - - def _handle_task_failed_request(self, task_id, request, **kwargs): - task = self._remove_task(task_id) - if task: - self._task_failed( - task.ctx, exception=request['exception'], traceback=request['traceback']) - - -def _send_message(connection, message): - - # Packing the length of the entire msg using struct.pack. - # This enables later reading of the content. - def _pack(data): - return struct.pack(_INT_FMT, len(data)) - - data = jsonpickle.dumps(message) - msg_metadata = _pack(data) - connection.send(msg_metadata) - connection.sendall(data) - - -def _recv_message(connection): - # Retrieving the length of the msg to come. - def _unpack(conn): - return struct.unpack(_INT_FMT, _recv_bytes(conn, _INT_SIZE))[0] - - msg_metadata_len = _unpack(connection) - msg = _recv_bytes(connection, msg_metadata_len) - return jsonpickle.loads(msg) - - -def _recv_bytes(connection, count): - result = io.BytesIO() - while True: - if not count: - return result.getvalue() - read = connection.recv(count) - if not read: - return result.getvalue() - result.write(read) - count -= len(read) - - -class _Messenger(object): - - def __init__(self, task_id, port): - self.task_id = task_id - self.port = port - - def started(self): - """Task started message""" - self._send_message(type='started') - - def succeeded(self): - """Task succeeded message""" - self._send_message(type='succeeded') - - def failed(self, exception): - """Task failed message""" - self._send_message(type='failed', exception=exception) - - def closed(self): - """Executor closed message""" - self._send_message(type='closed') - - def _send_message(self, type, exception=None): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(('localhost', self.port)) - try: - _send_message(sock, { - 'type': type, - 'task_id': self.task_id, - 'exception': exceptions.wrap_if_needed(exception), - 'traceback': exceptions.get_exception_as_string(*sys.exc_info()), - }) - response = _recv_message(sock) - response_exception = response.get('exception') - if response_exception: - raise response_exception - finally: - sock.close() - - -def _main(): - arguments_json_path = sys.argv[1] - with open(arguments_json_path) as f: - arguments = pickle.loads(f.read()) - - # arguments_json_path is a temporary file created by the parent process. - # so we remove it here - os.remove(arguments_json_path) - - task_id = arguments['task_id'] - port = arguments['port'] - messenger = _Messenger(task_id=task_id, port=port) - - function = arguments['function'] - operation_arguments = arguments['operation_arguments'] - context_dict = arguments['context'] - - try: - ctx = context_dict['context_cls'].instantiate_from_dict(**context_dict['context']) - except BaseException as e: - messenger.failed(e) - return - - try: - messenger.started() - task_func = imports.load_attribute(function) - aria.install_aria_extensions() - for decorate in process_executor.decorate(): - task_func = decorate(task_func) - task_func(ctx=ctx, **operation_arguments) - ctx.close() - messenger.succeeded() - except BaseException as e: - ctx.close() - messenger.failed(e) - -if __name__ == '__main__': - _main() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/executor/thread.py b/apache-ariatosca-0.1.1/aria/orchestrator/workflows/executor/thread.py deleted file mode 100644 index 6cef2c0..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/executor/thread.py +++ /dev/null @@ -1,79 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Thread task executor. -""" - -import Queue -import threading - -import sys - -from aria.utils import imports, exceptions - -from .base import BaseExecutor - - -class ThreadExecutor(BaseExecutor): - """ - Thread task executor. - - It's easier writing tests using this executor rather than the full-blown sub-process executor. - - Note: This executor is incapable of running plugin operations. - """ - - def __init__(self, pool_size=1, close_timeout=5, *args, **kwargs): - super(ThreadExecutor, self).__init__(*args, **kwargs) - self._stopped = False - self._close_timeout = close_timeout - self._queue = Queue.Queue() - self._pool = [] - for i in range(pool_size): - name = 'ThreadExecutor-{index}'.format(index=i+1) - thread = threading.Thread(target=self._processor, name=name) - thread.daemon = True - thread.start() - self._pool.append(thread) - - def _execute(self, ctx): - self._queue.put(ctx) - - def close(self): - self._stopped = True - for thread in self._pool: - if self._close_timeout is None: - thread.join() - else: - thread.join(self._close_timeout) - - def _processor(self): - while not self._stopped: - try: - ctx = self._queue.get(timeout=1) - self._task_started(ctx) - try: - task_func = imports.load_attribute(ctx.task.function) - arguments = dict(arg.unwrapped for arg in ctx.task.arguments.values()) - task_func(ctx=ctx, **arguments) - self._task_succeeded(ctx) - except BaseException as e: - self._task_failed(ctx, - exception=e, - traceback=exceptions.get_exception_as_string(*sys.exc_info())) - # Daemon threads - except BaseException as e: - pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/parser/consumption/consumer.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/parser/consumption/consumer.py b/apache-ariatosca-0.1.1/aria/parser/consumption/consumer.py deleted file mode 100644 index 4f4c614..0000000 --- a/apache-ariatosca-0.1.1/aria/parser/consumption/consumer.py +++ /dev/null @@ -1,86 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ...exceptions import AriaException -from ...utils.exceptions import print_exception -from ..validation import Issue - - -class Consumer(object): - """ - Base class for ARIA consumers. - - Consumers provide useful functionality by consuming presentations. - """ - - def __init__(self, context): - self.context = context - - def consume(self): - pass - - def dump(self): - pass - - def _handle_exception(self, e): - if hasattr(e, 'issue') and isinstance(e.issue, Issue): - self.context.validation.report(issue=e.issue) - else: - self.context.validation.report(exception=e) - if not isinstance(e, AriaException): - print_exception(e) - - -class ConsumerChain(Consumer): - """ - ARIA consumer chain. - - Calls consumers in order, handling exception by calling ``_handle_exception`` on them, and stops - the chain if there are any validation issues. - """ - - def __init__(self, context, consumer_classes=None, handle_exceptions=True): - super(ConsumerChain, self).__init__(context) - self.handle_exceptions = handle_exceptions - self.consumers = [] - if consumer_classes: - for consumer_class in consumer_classes: - self.append(consumer_class) - - def append(self, *consumer_classes): - for consumer_class in consumer_classes: - self.consumers.append(consumer_class(self.context)) - - def consume(self): - for consumer in self.consumers: - try: - consumer.consume() - except BaseException as e: - if self.handle_exceptions: - handle_exception(consumer, e) - else: - raise e - if self.context.validation.has_issues: - break - - -def handle_exception(consumer, e): - if isinstance(e, AriaException) and e.issue: - consumer.context.validation.report(issue=e.issue) - else: - consumer.context.validation.report(exception=e) - if not isinstance(e, AriaException): - print_exception(e) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/parser/consumption/context.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/parser/consumption/context.py b/apache-ariatosca-0.1.1/aria/parser/consumption/context.py deleted file mode 100644 index 6fa61f4..0000000 --- a/apache-ariatosca-0.1.1/aria/parser/consumption/context.py +++ /dev/null @@ -1,106 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sys -import threading - -from ..validation import ValidationContext -from ..loading import LoadingContext -from ..reading import ReadingContext -from ..presentation import PresentationContext -from ..modeling import ModelingContext -from .style import Style - - -_thread_locals = threading.local() - - -class ConsumptionContext(object): - """ - Consumption context. - - :ivar args: runtime arguments (usually provided on the command line) - :ivar out: message output stream (defaults to stdout) - :ivar style: message output style - :vartype style: Style - :ivar validation: validation context - :vartype validation: :class:`ValidationContext` - :ivar loading: loading context - :vartype loading: :class:`LoadingContext` - :ivar reading: reading context - :vartype reading: :class:`ReadingContext` - :ivar presentation: presentation context - :vartype presentation: :class:`PresentationContext` - :ivar modeling: modeling context - :vartype modeling: :class:`ModelingContext` - """ - - @staticmethod - def get_thread_local(): - """ - Gets the context attached to the current thread if there is one. - """ - - return getattr(_thread_locals, 'aria_consumption_context', None) - - def __init__(self, set_thread_local=True): - self.args = [] - self.out = sys.stdout - self.style = Style() - self.validation = ValidationContext() - self.loading = LoadingContext() - self.reading = ReadingContext() - self.presentation = PresentationContext() - self.modeling = ModelingContext() - - if set_thread_local: - self.set_thread_local() - - def set_thread_local(self): - """ - Attaches this context to the current thread. - """ - - _thread_locals.aria_consumption_context = self - - def write(self, string): - """ - Writes to our ``out``, making sure to encode UTF-8 if required. - """ - - try: - self.out.write(string) - except UnicodeEncodeError: - self.out.write(string.encode('utf8')) - - def has_arg_switch(self, name): - name = '--%s' % name - return name in self.args - - def get_arg_value(self, name, default=None): - name = '--%s=' % name - for arg in self.args: - if arg.startswith(name): - return arg[len(name):] - return default - - def get_arg_value_int(self, name, default=None): - value = self.get_arg_value(name) - if value is not None: - try: - return int(value) - except (TypeError, ValueError): - pass - return default http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/parser/consumption/exceptions.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/parser/consumption/exceptions.py b/apache-ariatosca-0.1.1/aria/parser/consumption/exceptions.py deleted file mode 100644 index 78509cb..0000000 --- a/apache-ariatosca-0.1.1/aria/parser/consumption/exceptions.py +++ /dev/null @@ -1,23 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ...exceptions import AriaException - - -class ConsumerException(AriaException): - """ - ARIA consumer exception. - """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/parser/consumption/inputs.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/parser/consumption/inputs.py b/apache-ariatosca-0.1.1/aria/parser/consumption/inputs.py deleted file mode 100644 index fe7e192..0000000 --- a/apache-ariatosca-0.1.1/aria/parser/consumption/inputs.py +++ /dev/null @@ -1,53 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ...utils.formatting import safe_repr -from ..loading import UriLocation, LiteralLocation -from ..reading import JsonReader -from .consumer import Consumer - - -class Inputs(Consumer): - """ - Fills in the inputs if provided as arguments. - """ - - def consume(self): - inputs = self.context.get_arg_value('inputs') - if inputs is None: - return - - if inputs.endswith('.json') or inputs.endswith('.yaml'): - location = UriLocation(inputs) - else: - location = LiteralLocation(inputs) - - loader = self.context.loading.loader_source.get_loader(self.context.loading, location, None) - - if isinstance(location, LiteralLocation): - reader = JsonReader(self.context.reading, location, loader) - else: - reader = self.context.reading.reader_source.get_reader(self.context.reading, - location, loader) - - inputs = reader.read() - - if not isinstance(inputs, dict): - self.context.validation.report( - 'Inputs consumer: inputs are not a dict: %s' % safe_repr(inputs)) - return - - for name, value in inputs.iteritems(): - self.context.modeling.set_input(name, value) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/parser/consumption/modeling.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/parser/consumption/modeling.py b/apache-ariatosca-0.1.1/aria/parser/consumption/modeling.py deleted file mode 100644 index 44027b9..0000000 --- a/apache-ariatosca-0.1.1/aria/parser/consumption/modeling.py +++ /dev/null @@ -1,196 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ...utils.formatting import json_dumps, yaml_dumps -from .consumer import Consumer, ConsumerChain - - -class DeriveServiceTemplate(Consumer): - """ - Derives the service template from the presenter. - """ - - def consume(self): - if self.context.presentation.presenter is None: - self.context.validation.report('DeriveServiceTemplate consumer: missing presenter') - return - - if not hasattr(self.context.presentation.presenter, '_get_model'): - self.context.validation.report('DeriveServiceTemplate consumer: presenter does not' - ' support "_get_model"') - return - - self.context.modeling.template = \ - self.context.presentation.presenter._get_model(self.context) - - -class CoerceServiceTemplateValues(Consumer): - """ - Coerces values in the service template. - """ - - def consume(self): - self.context.modeling.template.coerce_values(True) - - -class ValidateServiceTemplate(Consumer): - """ - Validates the service template. - """ - - def consume(self): - self.context.modeling.template.validate() - - -class ServiceTemplate(ConsumerChain): - """ - Generates the service template from the presenter. - """ - - def __init__(self, context): - super(ServiceTemplate, self).__init__(context, (DeriveServiceTemplate, - CoerceServiceTemplateValues, - ValidateServiceTemplate)) - - def dump(self): - if self.context.has_arg_switch('yaml'): - indent = self.context.get_arg_value_int('indent', 2) - raw = self.context.modeling.template_as_raw - self.context.write(yaml_dumps(raw, indent=indent)) - elif self.context.has_arg_switch('json'): - indent = self.context.get_arg_value_int('indent', 2) - raw = self.context.modeling.template_as_raw - self.context.write(json_dumps(raw, indent=indent)) - else: - self.context.modeling.template.dump() - - -class Types(Consumer): - """ - Used to just dump the types. - """ - - def dump(self): - if self.context.has_arg_switch('yaml'): - indent = self.context.get_arg_value_int('indent', 2) - raw = self.context.modeling.types_as_raw - self.context.write(yaml_dumps(raw, indent=indent)) - elif self.context.has_arg_switch('json'): - indent = self.context.get_arg_value_int('indent', 2) - raw = self.context.modeling.types_as_raw - self.context.write(json_dumps(raw, indent=indent)) - else: - self.context.modeling.template.dump_types() - - -class InstantiateServiceInstance(Consumer): - """ - Instantiates the service template into a service instance. - """ - - def consume(self): - if self.context.modeling.template is None: - self.context.validation.report('InstantiateServiceInstance consumer: missing service ' - 'template') - return - - self.context.modeling.template.instantiate(None, None, - inputs=dict(self.context.modeling.inputs)) - - -class CoerceServiceInstanceValues(Consumer): - """ - Coerces values in the service instance. - """ - - def consume(self): - self.context.modeling.instance.coerce_values(True) - - -class ValidateServiceInstance(Consumer): - """ - Validates the service instance. - """ - - def consume(self): - self.context.modeling.instance.validate() - - -class SatisfyRequirements(Consumer): - """ - Satisfies node requirements in the service instance. - """ - - def consume(self): - self.context.modeling.instance.satisfy_requirements() - - -class ValidateCapabilities(Consumer): - """ - Validates capabilities in the service instance. - """ - - def consume(self): - self.context.modeling.instance.validate_capabilities() - - -class FindHosts(Consumer): - """ - Find hosts for all nodes in the service instance. - """ - - def consume(self): - self.context.modeling.instance.find_hosts() - - -class ConfigureOperations(Consumer): - """ - Configures all operations in the service instance. - """ - - def consume(self): - self.context.modeling.instance.configure_operations() - - -class ServiceInstance(ConsumerChain): - """ - Generates the service instance by instantiating the service template. - """ - - def __init__(self, context): - super(ServiceInstance, self).__init__(context, (InstantiateServiceInstance, - CoerceServiceInstanceValues, - ValidateServiceInstance, - CoerceServiceInstanceValues, - SatisfyRequirements, - CoerceServiceInstanceValues, - ValidateCapabilities, - FindHosts, - ConfigureOperations, - CoerceServiceInstanceValues)) - - def dump(self): - if self.context.has_arg_switch('graph'): - self.context.modeling.instance.dump_graph() - elif self.context.has_arg_switch('yaml'): - indent = self.context.get_arg_value_int('indent', 2) - raw = self.context.modeling.instance_as_raw - self.context.write(yaml_dumps(raw, indent=indent)) - elif self.context.has_arg_switch('json'): - indent = self.context.get_arg_value_int('indent', 2) - raw = self.context.modeling.instance_as_raw - self.context.write(json_dumps(raw, indent=indent)) - else: - self.context.modeling.instance.dump() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/parser/consumption/presentation.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/parser/consumption/presentation.py b/apache-ariatosca-0.1.1/aria/parser/consumption/presentation.py deleted file mode 100644 index 542b3f0..0000000 --- a/apache-ariatosca-0.1.1/aria/parser/consumption/presentation.py +++ /dev/null @@ -1,137 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ...utils.threading import FixedThreadPoolExecutor -from ...utils.formatting import json_dumps, yaml_dumps -from ..loading import UriLocation -from ..reading import AlreadyReadException -from ..presentation import PresenterNotFoundError -from .consumer import Consumer - - -class Read(Consumer): - """ - Reads the presentation, handling imports recursively. - - It works by consuming a data source via appropriate :class:`~aria.parser.loading.Loader`, - :class:`~aria.parser.reading.Reader`, and :class:`~aria.parser.presentation.Presenter` - instances. - - It supports agnostic raw data composition for presenters that have - ``_get_import_locations`` and ``_merge_import``. - - To improve performance, loaders are called asynchronously on separate threads. - - Note that parsing may internally trigger more than one loading/reading/presentation - cycle, for example if the agnostic raw data has dependencies that must also be parsed. - """ - - def consume(self): - if self.context.presentation.location is None: - self.context.validation.report('Presentation consumer: missing location') - return - - presenter = None - imported_presentations = None - - executor = FixedThreadPoolExecutor(size=self.context.presentation.threads, - timeout=self.context.presentation.timeout) - executor.print_exceptions = self.context.presentation.print_exceptions - try: - presenter = self._present(self.context.presentation.location, None, None, executor) - executor.drain() - - # Handle exceptions - for e in executor.exceptions: - self._handle_exception(e) - - imported_presentations = executor.returns - finally: - executor.close() - - # Merge imports - if (imported_presentations is not None) and hasattr(presenter, '_merge_import'): - for imported_presentation in imported_presentations: - okay = True - if hasattr(presenter, '_validate_import'): - okay = presenter._validate_import(self.context, imported_presentation) - if okay: - presenter._merge_import(imported_presentation) - - self.context.presentation.presenter = presenter - - def dump(self): - if self.context.has_arg_switch('yaml'): - indent = self.context.get_arg_value_int('indent', 2) - raw = self.context.presentation.presenter._raw - self.context.write(yaml_dumps(raw, indent=indent)) - elif self.context.has_arg_switch('json'): - indent = self.context.get_arg_value_int('indent', 2) - raw = self.context.presentation.presenter._raw - self.context.write(json_dumps(raw, indent=indent)) - else: - self.context.presentation.presenter._dump(self.context) - - def _handle_exception(self, e): - if isinstance(e, AlreadyReadException): - return - super(Read, self)._handle_exception(e) - - def _present(self, location, origin_location, presenter_class, executor): - # Link the context to this thread - self.context.set_thread_local() - - raw = self._read(location, origin_location) - - if self.context.presentation.presenter_class is not None: - # The presenter class we specified in the context overrides everything - presenter_class = self.context.presentation.presenter_class - else: - try: - presenter_class = self.context.presentation.presenter_source.get_presenter(raw) - except PresenterNotFoundError: - if presenter_class is None: - raise - # We'll use the presenter class we were given (from the presenter that imported us) - if presenter_class is None: - raise PresenterNotFoundError('presenter not found') - - presentation = presenter_class(raw=raw) - - if presentation is not None and hasattr(presentation, '_link_locators'): - presentation._link_locators() - - # Submit imports to executor - if hasattr(presentation, '_get_import_locations'): - import_locations = presentation._get_import_locations(self.context) - if import_locations: - for import_location in import_locations: - # The imports inherit the parent presenter class and use the current location as - # their origin location - import_location = UriLocation(import_location) - executor.submit(self._present, import_location, location, presenter_class, - executor) - - return presentation - - def _read(self, location, origin_location): - if self.context.reading.reader is not None: - return self.context.reading.reader.read() - loader = self.context.loading.loader_source.get_loader(self.context.loading, location, - origin_location) - reader = self.context.reading.reader_source.get_reader(self.context.reading, location, - loader) - return reader.read() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/parser/consumption/style.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/parser/consumption/style.py b/apache-ariatosca-0.1.1/aria/parser/consumption/style.py deleted file mode 100644 index 72892b9..0000000 --- a/apache-ariatosca-0.1.1/aria/parser/consumption/style.py +++ /dev/null @@ -1,50 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ...utils.console import Colored, indent -from ...utils.formatting import safe_repr - - -class Style(object): - def __init__(self, indentation=2): - self.indentation = indentation - - @property - def indent(self): - return indent(self.indentation) - - @staticmethod - def section(value): - return Colored.cyan(value, bold=True) - - @staticmethod - def type(value): - return Colored.blue(value, bold=True) - - @staticmethod - def node(value): - return Colored.red(value, bold=True) - - @staticmethod - def property(value): - return Colored.magenta(value, bold=True) - - @staticmethod - def literal(value): - return Colored.magenta(safe_repr(value)) - - @staticmethod - def meta(value): - return Colored.green(value) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/parser/consumption/validation.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/parser/consumption/validation.py b/apache-ariatosca-0.1.1/aria/parser/consumption/validation.py deleted file mode 100644 index a7bc3b8..0000000 --- a/apache-ariatosca-0.1.1/aria/parser/consumption/validation.py +++ /dev/null @@ -1,30 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from .consumer import Consumer - - -class Validate(Consumer): - """ - Validates the presentation. - """ - - def consume(self): - if self.context.presentation.presenter is None: - self.context.validation.report('Validation consumer: missing presenter') - return - - self.context.presentation.presenter._validate(self.context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/parser/exceptions.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/parser/exceptions.py b/apache-ariatosca-0.1.1/aria/parser/exceptions.py deleted file mode 100644 index a1f7012..0000000 --- a/apache-ariatosca-0.1.1/aria/parser/exceptions.py +++ /dev/null @@ -1,33 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Parser exceptions. -""" - -from ..exceptions import AriaException -from .validation import Issue - - -class InvalidValueError(AriaException): - """ - ARIA error: value is invalid. - """ - - def __init__(self, message, cause=None, cause_tb=None, location=None, line=None, column=None, - locator=None, snippet=None, level=Issue.FIELD): - super(InvalidValueError, self).__init__(message, cause, cause_tb) - self.issue = Issue(message, location=location, line=line, column=column, locator=locator, - snippet=snippet, level=level, exception=cause) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/parser/loading/context.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/parser/loading/context.py b/apache-ariatosca-0.1.1/aria/parser/loading/context.py deleted file mode 100644 index 59727c9..0000000 --- a/apache-ariatosca-0.1.1/aria/parser/loading/context.py +++ /dev/null @@ -1,33 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ...utils.collections import StrictList -from .source import DefaultLoaderSource - - -class LoadingContext(object): - """ - Loading context. - - :ivar loader_source: for finding loader instances - :vartype loader_source: ~aria.parser.loading.LoaderSource - :ivar prefixes: additional prefixes for :class:`UriTextLoader` - :vartype prefixes: [:obj:`basestring`] - """ - - def __init__(self): - self.loader_source = DefaultLoaderSource() - self.prefixes = StrictList(value_class=basestring) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/parser/loading/exceptions.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/parser/loading/exceptions.py b/apache-ariatosca-0.1.1/aria/parser/loading/exceptions.py deleted file mode 100644 index 6e8267a..0000000 --- a/apache-ariatosca-0.1.1/aria/parser/loading/exceptions.py +++ /dev/null @@ -1,35 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ...exceptions import AriaException - - -class LoaderException(AriaException): - """ - ARIA loader exception. - """ - - -class LoaderNotFoundError(LoaderException): - """ - ARIA loader error: loader not found for source. - """ - - -class DocumentNotFoundException(LoaderException): - """ - ARIA loader exception: document not found. - """
