[ 
https://issues.apache.org/jira/browse/AIRFLOW-6986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Naresh Edla reassigned AIRFLOW-6986:
------------------------------------

    Assignee: Naresh Edla

> Update NONE_FAILED and add a new trigger rule
> ---------------------------------------------
>
>                 Key: AIRFLOW-6986
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6986
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: utils
>    Affects Versions: 1.10.9
>            Reporter: Naresh Edla
>            Assignee: Naresh Edla
>            Priority: Major
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Hi Team,
>  I think any task with trigger rule as NONE_FAILED should not skip if all of 
> its upstream task have skipped, for which there should be another trigger 
> rule added as given below . 
> {code:python}
>     @provide_session
>     def _evaluate_trigger_rule(  # pylint: disable=too-many-branches
>             self,
>             ti,
>             successes,
>             skipped,
>             failed,
>             upstream_failed,
>             done,
>             flag_upstream_failed,
>             session):
>         """
>         Yields a dependency status that indicate whether the given task 
> instance's trigger
>         rule was met.
>         :param ti: the task instance to evaluate the trigger rule of
>         :type ti: airflow.models.TaskInstance
>         :param successes: Number of successful upstream tasks
>         :type successes: int
>         :param skipped: Number of skipped upstream tasks
>         :type skipped: int
>         :param failed: Number of failed upstream tasks
>         :type failed: int
>         :param upstream_failed: Number of upstream_failed upstream tasks
>         :type upstream_failed: int
>         :param done: Number of completed upstream tasks
>         :type done: int
>         :param flag_upstream_failed: This is a hack to generate
>             the upstream_failed state creation while checking to see
>             whether the task instance is runnable. It was the shortest
>             path to add the feature
>         :type flag_upstream_failed: bool
>         :param session: database session
>         :type session: sqlalchemy.orm.session.Session
>         """
>         TR = airflow.utils.trigger_rule.TriggerRule
>         task = ti.task
>         upstream = len(task.upstream_task_ids)
>         trigger_rule = task.trigger_rule
>         upstream_done = done >= upstream
>         upstream_tasks_state = {
>             "total": upstream, "successes": successes, "skipped": skipped,
>             "failed": failed, "upstream_failed": upstream_failed, "done": done
>         }
>         # TODO(aoen): Ideally each individual trigger rules would be its own 
> class, but
>         # this isn't very feasible at the moment since the database queries 
> need to be
>         # bundled together for efficiency.
>         # handling instant state assignment based on trigger rules
>         if flag_upstream_failed:
>             if trigger_rule == TR.ALL_SUCCESS:
>                 if upstream_failed or failed:
>                     ti.set_state(State.UPSTREAM_FAILED, session)
>                 elif skipped:
>                     ti.set_state(State.SKIPPED, session)
>             elif trigger_rule == TR.ALL_FAILED:
>                 if successes or skipped:
>                     ti.set_state(State.SKIPPED, session)
>             elif trigger_rule == TR.ONE_SUCCESS:
>                 if upstream_done and not successes:
>                     ti.set_state(State.SKIPPED, session)
>             elif trigger_rule == TR.ONE_FAILED:
>                 if upstream_done and not (failed or upstream_failed):
>                     ti.set_state(State.SKIPPED, session)
>             elif trigger_rule == TR.NONE_FAILED:
>                 if upstream_failed or failed:
>                     ti.set_state(State.UPSTREAM_FAILED, session)
>             elif trigger_rule == TR.NONE_SKIPPED:
>                 if skipped:
>                     ti.set_state(State.SKIPPED, session)
>             elif trigger_rule == TR.NONE_FAILED_AND_ONE_SUCCESS:
>                 if upstream_failed or failed:
>                     ti.set_state(State.UPSTREAM_FAILED, session)
>                 elif skipped == upstream:
>                     ti.set_state(State.SKIPPED, session)
>         if trigger_rule == TR.ONE_SUCCESS:
>             if successes <= 0:
>                 yield self._failing_status(
>                     reason="Task's trigger rule '{0}' requires one upstream "
>                     "task success, but none were found. "
>                     "upstream_tasks_state={1}, upstream_task_ids={2}"
>                     .format(trigger_rule, upstream_tasks_state, 
> task.upstream_task_ids))
>         elif trigger_rule == TR.ONE_FAILED:
>             if not failed and not upstream_failed:
>                 yield self._failing_status(
>                     reason="Task's trigger rule '{0}' requires one upstream "
>                     "task failure, but none were found. "
>                     "upstream_tasks_state={1}, upstream_task_ids={2}"
>                     .format(trigger_rule, upstream_tasks_state, 
> task.upstream_task_ids))
>         elif trigger_rule == TR.ALL_SUCCESS:
>             num_failures = upstream - successes
>             if num_failures > 0:
>                 yield self._failing_status(
>                     reason="Task's trigger rule '{0}' requires all upstream "
>                     "tasks to have succeeded, but found {1} non-success(es). "
>                     "upstream_tasks_state={2}, upstream_task_ids={3}"
>                     .format(trigger_rule, num_failures, upstream_tasks_state,
>                             task.upstream_task_ids))
>         elif trigger_rule == TR.ALL_FAILED:
>             num_successes = upstream - failed - upstream_failed
>             if num_successes > 0:
>                 yield self._failing_status(
>                     reason="Task's trigger rule '{0}' requires all upstream "
>                     "tasks to have failed, but found {1} non-failure(s). "
>                     "upstream_tasks_state={2}, upstream_task_ids={3}"
>                     .format(trigger_rule, num_successes, upstream_tasks_state,
>                             task.upstream_task_ids))
>         elif trigger_rule == TR.ALL_DONE:
>             if not upstream_done:
>                 yield self._failing_status(
>                     reason="Task's trigger rule '{0}' requires all upstream "
>                     "tasks to have completed, but found {1} task(s) that "
>                     "weren't done. upstream_tasks_state={2}, "
>                     "upstream_task_ids={3}"
>                     .format(trigger_rule, upstream_done, upstream_tasks_state,
>                             task.upstream_task_ids))
>         elif trigger_rule == TR.NONE_FAILED:
>             num_failures = upstream - successes - skipped
>             if num_failures > 0:
>                 yield self._failing_status(
>                     reason="Task's trigger rule '{0}' requires all upstream "
>                     "tasks to have succeeded or been skipped, but found {1} 
> non-success(es). "
>                     "upstream_tasks_state={2}, upstream_task_ids={3}"
>                     .format(trigger_rule, num_failures, upstream_tasks_state,
>                             task.upstream_task_ids))
>         elif trigger_rule == TR.NONE_SKIPPED:
>             if not upstream_done or (skipped > 0):
>                 yield self._failing_status(
>                     reason="Task's trigger rule '{0}' requires all upstream "
>                     "tasks to not have been skipped, but found {1} task(s) 
> skipped. "
>                     "upstream_tasks_state={2}, upstream_task_ids={3}"
>                     .format(trigger_rule, skipped, upstream_tasks_state,
>                             task.upstream_task_ids))
>         elif trigger_rule == TR.NONE_FAILED_AND_ONE_SUCCESS:
>             num_failures = upstream - successes - skipped
>             if num_failures > 0:
>                 yield self._failing_status(
>                     reason="Task's trigger rule '{0}' requires all upstream "
>                     "tasks to have succeeded or been skipped, but found {1} 
> non-success(es). "
>                     "upstream_tasks_state={2}, upstream_task_ids={3}"
>                     .format(trigger_rule, num_failures, upstream_tasks_state,
>                             task.upstream_task_ids))
>         else:
>             yield self._failing_status(
>                 reason="No strategy to evaluate trigger rule 
> '{0}'.".format(trigger_rule))
> {code}
>  
> Files to be updated: 
>  * 
> [trigger_rule_dep.py|https://github.com/apache/airflow/tree/master/airflow/ti_deps/deps/trigger_rule_dep.py]
>  * 
> [trigger_rule.py|https://github.com/apache/airflow/tree/master/airflow/utils]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to