amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r302166463
 
 

 ##########
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##########
 @@ -49,33 +50,46 @@ def _get_dep_statuses(self, ti, session, dep_context):
             yield self._passing_status(reason="The task had a dummy trigger 
rule set.")
             return
 
-        # TODO(unknown): this query becomes quite expensive with dags that 
have many
-        # tasks. It should be refactored to let the task report to the dag run 
and get the
-        # aggregates from there.
-        qry = (
-            session
-            .query(
-                func.coalesce(func.sum(
-                    case([(TI.state == State.SUCCESS, 1)], else_=0)), 0),
-                func.coalesce(func.sum(
-                    case([(TI.state == State.SKIPPED, 1)], else_=0)), 0),
-                func.coalesce(func.sum(
-                    case([(TI.state == State.FAILED, 1)], else_=0)), 0),
-                func.coalesce(func.sum(
-                    case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 
0),
-                func.count(TI.task_id),
+        successes, skipped, failed, upstream_failed, done = 0, 0, 0, 0, 0
+        if dep_context.finished_tasks is None:
+            qry = (
+                session
+                .query(
+                    func.coalesce(func.sum(
+                        case([(TI.state == State.SUCCESS, 1)], else_=0)), 0),
+                    func.coalesce(func.sum(
+                        case([(TI.state == State.SKIPPED, 1)], else_=0)), 0),
+                    func.coalesce(func.sum(
+                        case([(TI.state == State.FAILED, 1)], else_=0)), 0),
+                    func.coalesce(func.sum(
+                        case([(TI.state == State.UPSTREAM_FAILED, 1)], 
else_=0)), 0),
+                    func.count(TI.task_id),
+                )
+                .filter(
+                    TI.dag_id == ti.dag_id,
+                    TI.task_id.in_(ti.task.upstream_task_ids),
+                    TI.execution_date == ti.execution_date,
+                    TI.state.in_(State.finished()),
+                )
             )
-            .filter(
-                TI.dag_id == ti.dag_id,
-                TI.task_id.in_(ti.task.upstream_task_ids),
-                TI.execution_date == ti.execution_date,
-                TI.state.in_([
-                    State.SUCCESS, State.FAILED,
-                    State.UPSTREAM_FAILED, State.SKIPPED]),
-            )
-        )
+            successes, skipped, failed, upstream_failed, done = qry.first()
+        else:
+            # see if the task name is in the task upstream for our task
+            upstream_tasks = [finished_task for finished_task in 
dep_context.finished_tasks
+                              if finished_task.task_id in 
ti.task.upstream_task_ids]
+            if upstream_tasks:
 
 Review comment:
   yes, because we need to share sql results for more than one purpose

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to