ashb commented on a change in pull request #17719:
URL: https://github.com/apache/airflow/pull/17719#discussion_r692777320



##########
File path: airflow/ti_deps/deps/dagrun_exists_dep.py
##########
@@ -29,27 +29,9 @@ class DagrunRunningDep(BaseTIDep):
 
     @provide_session
     def _get_dep_statuses(self, ti, session, dep_context):
-        dag = ti.task.dag
-        dagrun = ti.get_dagrun(session)
-        if not dagrun:
-            # The import is needed here to avoid a circular dependency
-            from airflow.models.dagrun import DagRun
-
-            running_dagruns = DagRun.find(
-                dag_id=dag.dag_id, state=State.RUNNING, 
external_trigger=False, session=session
+        dr = ti.get_dagrun(session)
+        if dr.state != State.RUNNING:
+            yield self._failing_status(
+                reason="Task instance's dagrun was not in the 'running' state 
but in "
+                "the state '{}'.".format(dr.state)
             )
-
-            if len(running_dagruns) >= dag.max_active_runs:
-                reason = (
-                    "The maximum number of active dag runs ({}) for this task "
-                    "instance's DAG '{}' has been 
reached.".format(dag.max_active_runs, ti.dag_id)
-                )
-            else:
-                reason = "Unknown reason"
-            yield self._failing_status(reason=f"Task instance's dagrun did not 
exist: {reason}.")

Review comment:
       This was only triggered for the case when there was no dag run, which is 
not possible now.
   
   (And the scheduler now manages the number of active DagRuns)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to