[
https://issues.apache.org/jira/browse/AIRFLOW-6986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Naresh Edla reassigned AIRFLOW-6986:
------------------------------------
Assignee: Naresh Edla
> Update NONE_FAILED and add a new trigger rule
> ---------------------------------------------
>
> Key: AIRFLOW-6986
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6986
> Project: Apache Airflow
> Issue Type: Bug
> Components: utils
> Affects Versions: 1.10.9
> Reporter: Naresh Edla
> Assignee: Naresh Edla
> Priority: Major
> Original Estimate: 1h
> Remaining Estimate: 1h
>
> Hi Team,
> I think any task with trigger rule as NONE_FAILED should not skip if all of
> its upstream task have skipped, for which there should be another trigger
> rule added as given below .
> {code:python}
> @provide_session
> def _evaluate_trigger_rule( # pylint: disable=too-many-branches
> self,
> ti,
> successes,
> skipped,
> failed,
> upstream_failed,
> done,
> flag_upstream_failed,
> session):
> """
> Yields a dependency status that indicate whether the given task
> instance's trigger
> rule was met.
> :param ti: the task instance to evaluate the trigger rule of
> :type ti: airflow.models.TaskInstance
> :param successes: Number of successful upstream tasks
> :type successes: int
> :param skipped: Number of skipped upstream tasks
> :type skipped: int
> :param failed: Number of failed upstream tasks
> :type failed: int
> :param upstream_failed: Number of upstream_failed upstream tasks
> :type upstream_failed: int
> :param done: Number of completed upstream tasks
> :type done: int
> :param flag_upstream_failed: This is a hack to generate
> the upstream_failed state creation while checking to see
> whether the task instance is runnable. It was the shortest
> path to add the feature
> :type flag_upstream_failed: bool
> :param session: database session
> :type session: sqlalchemy.orm.session.Session
> """
> TR = airflow.utils.trigger_rule.TriggerRule
> task = ti.task
> upstream = len(task.upstream_task_ids)
> trigger_rule = task.trigger_rule
> upstream_done = done >= upstream
> upstream_tasks_state = {
> "total": upstream, "successes": successes, "skipped": skipped,
> "failed": failed, "upstream_failed": upstream_failed, "done": done
> }
> # TODO(aoen): Ideally each individual trigger rules would be its own
> class, but
> # this isn't very feasible at the moment since the database queries
> need to be
> # bundled together for efficiency.
> # handling instant state assignment based on trigger rules
> if flag_upstream_failed:
> if trigger_rule == TR.ALL_SUCCESS:
> if upstream_failed or failed:
> ti.set_state(State.UPSTREAM_FAILED, session)
> elif skipped:
> ti.set_state(State.SKIPPED, session)
> elif trigger_rule == TR.ALL_FAILED:
> if successes or skipped:
> ti.set_state(State.SKIPPED, session)
> elif trigger_rule == TR.ONE_SUCCESS:
> if upstream_done and not successes:
> ti.set_state(State.SKIPPED, session)
> elif trigger_rule == TR.ONE_FAILED:
> if upstream_done and not (failed or upstream_failed):
> ti.set_state(State.SKIPPED, session)
> elif trigger_rule == TR.NONE_FAILED:
> if upstream_failed or failed:
> ti.set_state(State.UPSTREAM_FAILED, session)
> elif trigger_rule == TR.NONE_SKIPPED:
> if skipped:
> ti.set_state(State.SKIPPED, session)
> elif trigger_rule == TR.NONE_FAILED_AND_ONE_SUCCESS:
> if upstream_failed or failed:
> ti.set_state(State.UPSTREAM_FAILED, session)
> elif skipped == upstream:
> ti.set_state(State.SKIPPED, session)
> if trigger_rule == TR.ONE_SUCCESS:
> if successes <= 0:
> yield self._failing_status(
> reason="Task's trigger rule '{0}' requires one upstream "
> "task success, but none were found. "
> "upstream_tasks_state={1}, upstream_task_ids={2}"
> .format(trigger_rule, upstream_tasks_state,
> task.upstream_task_ids))
> elif trigger_rule == TR.ONE_FAILED:
> if not failed and not upstream_failed:
> yield self._failing_status(
> reason="Task's trigger rule '{0}' requires one upstream "
> "task failure, but none were found. "
> "upstream_tasks_state={1}, upstream_task_ids={2}"
> .format(trigger_rule, upstream_tasks_state,
> task.upstream_task_ids))
> elif trigger_rule == TR.ALL_SUCCESS:
> num_failures = upstream - successes
> if num_failures > 0:
> yield self._failing_status(
> reason="Task's trigger rule '{0}' requires all upstream "
> "tasks to have succeeded, but found {1} non-success(es). "
> "upstream_tasks_state={2}, upstream_task_ids={3}"
> .format(trigger_rule, num_failures, upstream_tasks_state,
> task.upstream_task_ids))
> elif trigger_rule == TR.ALL_FAILED:
> num_successes = upstream - failed - upstream_failed
> if num_successes > 0:
> yield self._failing_status(
> reason="Task's trigger rule '{0}' requires all upstream "
> "tasks to have failed, but found {1} non-failure(s). "
> "upstream_tasks_state={2}, upstream_task_ids={3}"
> .format(trigger_rule, num_successes, upstream_tasks_state,
> task.upstream_task_ids))
> elif trigger_rule == TR.ALL_DONE:
> if not upstream_done:
> yield self._failing_status(
> reason="Task's trigger rule '{0}' requires all upstream "
> "tasks to have completed, but found {1} task(s) that "
> "weren't done. upstream_tasks_state={2}, "
> "upstream_task_ids={3}"
> .format(trigger_rule, upstream_done, upstream_tasks_state,
> task.upstream_task_ids))
> elif trigger_rule == TR.NONE_FAILED:
> num_failures = upstream - successes - skipped
> if num_failures > 0:
> yield self._failing_status(
> reason="Task's trigger rule '{0}' requires all upstream "
> "tasks to have succeeded or been skipped, but found {1}
> non-success(es). "
> "upstream_tasks_state={2}, upstream_task_ids={3}"
> .format(trigger_rule, num_failures, upstream_tasks_state,
> task.upstream_task_ids))
> elif trigger_rule == TR.NONE_SKIPPED:
> if not upstream_done or (skipped > 0):
> yield self._failing_status(
> reason="Task's trigger rule '{0}' requires all upstream "
> "tasks to not have been skipped, but found {1} task(s)
> skipped. "
> "upstream_tasks_state={2}, upstream_task_ids={3}"
> .format(trigger_rule, skipped, upstream_tasks_state,
> task.upstream_task_ids))
> elif trigger_rule == TR.NONE_FAILED_AND_ONE_SUCCESS:
> num_failures = upstream - successes - skipped
> if num_failures > 0:
> yield self._failing_status(
> reason="Task's trigger rule '{0}' requires all upstream "
> "tasks to have succeeded or been skipped, but found {1}
> non-success(es). "
> "upstream_tasks_state={2}, upstream_task_ids={3}"
> .format(trigger_rule, num_failures, upstream_tasks_state,
> task.upstream_task_ids))
> else:
> yield self._failing_status(
> reason="No strategy to evaluate trigger rule
> '{0}'.".format(trigger_rule))
> {code}
>
> Files to be updated:
> *
> [trigger_rule_dep.py|https://github.com/apache/airflow/tree/master/airflow/ti_deps/deps/trigger_rule_dep.py]
> *
> [trigger_rule.py|https://github.com/apache/airflow/tree/master/airflow/utils]
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)