Github user mxmrlv commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/156#discussion_r122955823
--- Diff: aria/orchestrator/workflows/core/events_handler.py ---
@@ -30,120 +30,122 @@
@events.sent_task_signal.connect
-def _task_sent(task, *args, **kwargs):
- with task._update():
- task.status = task.SENT
+def _task_sent(ctx, *args, **kwargs):
+ with ctx.track_changes:
+ ctx.task.status = ctx.task.SENT
@events.start_task_signal.connect
-def _task_started(task, *args, **kwargs):
- with task._update():
- task.started_at = datetime.utcnow()
- task.status = task.STARTED
- _update_node_state_if_necessary(task, is_transitional=True)
+def _task_started(ctx, *args, **kwargs):
+ with ctx.track_changes:
+ ctx.task.started_at = datetime.utcnow()
+ ctx.task.status = ctx.task.STARTED
+ _update_node_state_if_necessary(ctx, is_transitional=True)
@events.on_failure_task_signal.connect
-def _task_failed(task, exception, *args, **kwargs):
- with task._update():
+def _task_failed(ctx, exception, *args, **kwargs):
+ with ctx.track_changes:
should_retry = all([
not isinstance(exception, exceptions.TaskAbortException),
- task.attempts_count < task.max_attempts or task.max_attempts
== task.INFINITE_RETRIES,
- # ignore_failure check here means the task will not be retries
and it will be marked
+ ctx.task.attempts_count < ctx.task.max_attempts or
+ ctx.task.max_attempts == ctx.task.INFINITE_RETRIES,
+ # ignore_failure check here means the task will not be retried
and it will be marked
# as failed. The engine will also look at ignore_failure so it
won't fail the
# workflow.
- not task.ignore_failure
+ not ctx.task.ignore_failure
])
if should_retry:
retry_interval = None
if isinstance(exception, exceptions.TaskRetryException):
retry_interval = exception.retry_interval
if retry_interval is None:
- retry_interval = task.retry_interval
- task.status = task.RETRYING
- task.attempts_count += 1
- task.due_at = datetime.utcnow() +
timedelta(seconds=retry_interval)
+ retry_interval = ctx.task.retry_interval
+ ctx.task.status = ctx.task.RETRYING
+ ctx.task.attempts_count += 1
+ ctx.task.due_at = datetime.utcnow() +
timedelta(seconds=retry_interval)
else:
- task.ended_at = datetime.utcnow()
- task.status = task.FAILED
+ ctx.task.ended_at = datetime.utcnow()
+ ctx.task.status = ctx.task.FAILED
@events.on_success_task_signal.connect
-def _task_succeeded(task, *args, **kwargs):
- with task._update():
- task.ended_at = datetime.utcnow()
- task.status = task.SUCCESS
+def _task_succeeded(ctx, *args, **kwargs):
+ with ctx.track_changes:
+ ctx.task.ended_at = datetime.utcnow()
+ ctx.task.status = ctx.task.SUCCESS
- _update_node_state_if_necessary(task)
+ _update_node_state_if_necessary(ctx)
@events.start_workflow_signal.connect
def _workflow_started(workflow_context, *args, **kwargs):
- execution = workflow_context.execution
- # the execution may already be in the process of cancelling
- if execution.status in (execution.CANCELLING, execution.CANCELLED):
- return
- execution.status = execution.STARTED
- execution.started_at = datetime.utcnow()
- workflow_context.execution = execution
+ with workflow_context.track_changes:
+ execution = workflow_context.execution
+ # the execution may already be in the process of cancelling
+ if execution.status in (execution.CANCELLING, execution.CANCELLED):
+ return
+ execution.status = execution.STARTED
+ execution.started_at = datetime.utcnow()
@events.on_failure_workflow_signal.connect
def _workflow_failed(workflow_context, exception, *args, **kwargs):
- execution = workflow_context.execution
- execution.error = str(exception)
- execution.status = execution.FAILED
- execution.ended_at = datetime.utcnow()
- workflow_context.execution = execution
+ with workflow_context.track_changes:
+ execution = workflow_context.execution
+ execution.error = str(exception)
+ execution.status = execution.FAILED
+ execution.ended_at = datetime.utcnow()
@events.on_success_workflow_signal.connect
def _workflow_succeeded(workflow_context, *args, **kwargs):
- execution = workflow_context.execution
- execution.status = execution.SUCCEEDED
- execution.ended_at = datetime.utcnow()
- workflow_context.execution = execution
+ with workflow_context.track_changes:
+ execution = workflow_context.execution
+ execution.status = execution.SUCCEEDED
+ execution.ended_at = datetime.utcnow()
@events.on_cancelled_workflow_signal.connect
def _workflow_cancelled(workflow_context, *args, **kwargs):
- execution = workflow_context.execution
- # _workflow_cancelling function may have called this function already
- if execution.status == execution.CANCELLED:
- return
- # the execution may have already been finished
- elif execution.status in (execution.SUCCEEDED, execution.FAILED):
-
_log_tried_to_cancel_execution_but_it_already_ended(workflow_context,
execution.status)
- else:
- execution.status = execution.CANCELLED
- execution.ended_at = datetime.utcnow()
- workflow_context.execution = execution
+ with workflow_context.track_changes:
+ execution = workflow_context.execution
+ # _workflow_cancelling function may have called this function
already
+ if execution.status == execution.CANCELLED:
+ return
+ # the execution may have already been finished
+ elif execution.status in (execution.SUCCEEDED, execution.FAILED):
+
_log_tried_to_cancel_execution_but_it_already_ended(workflow_context,
execution.status)
+ else:
+ execution.status = execution.CANCELLED
+ execution.ended_at = datetime.utcnow()
@events.on_cancelling_workflow_signal.connect
def _workflow_cancelling(workflow_context, *args, **kwargs):
- execution = workflow_context.execution
- if execution.status == execution.PENDING:
- return _workflow_cancelled(workflow_context=workflow_context)
- # the execution may have already been finished
- elif execution.status in (execution.SUCCEEDED, execution.FAILED):
-
_log_tried_to_cancel_execution_but_it_already_ended(workflow_context,
execution.status)
- else:
- execution.status = execution.CANCELLING
- workflow_context.execution = execution
-
-
-def _update_node_state_if_necessary(task, is_transitional=False):
+ with workflow_context.track_changes:
+ execution = workflow_context.execution
+ if execution.status == execution.PENDING:
+ return _workflow_cancelled(workflow_context=workflow_context)
+ # the execution may have already been finished
+ elif execution.status in (execution.SUCCEEDED, execution.FAILED):
+
_log_tried_to_cancel_execution_but_it_already_ended(workflow_context,
execution.status)
+ else:
+ execution.status = execution.CANCELLING
+ workflow_context.execution = execution
--- End diff --
remove
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---