BasPH commented on code in PR #36935:
URL: https://github.com/apache/airflow/pull/36935#discussion_r1464581185


##########
airflow/models/dagrun.py:
##########
@@ -498,6 +498,37 @@ def fetch_task_instances(
             tis = tis.where(TI.task_id.in_(task_ids))
         return session.scalars(tis).all()
 
+    @internal_api_call
+    def _check_last_N_dagruns_failed(
+        self,
+        dag_id: str,
+        number_of_dag_runs: int,
+        session: Session
+    ):
+        """Check if last N dags failed."""
+        dag_runs = session.query(DagRun).filter(DagRun.dag_id == dag_id) \
+            .order_by(DagRun.execution_date.desc()) \
+            .limit(number_of_dag_runs).all()
+
+        """ Marking dag as paused, if needed"""

Review Comment:
   ```suggestion
           """ Marking DAG as paused, if needed"""
   ```



##########
airflow/models/dag.py:
##########
@@ -611,12 +614,18 @@ def __init__(
         self.last_loaded: datetime = timezone.utcnow()
         self.safe_dag_id = dag_id.replace(".", "__dot__")
         self.max_active_runs = max_active_runs
+        self.max_failure_runs = max_failure_runs
         if self.timetable.active_runs_limit is not None:
             if self.timetable.active_runs_limit < self.max_active_runs:
                 raise AirflowException(
                     f"Invalid max_active_runs: {type(self.timetable)} "
                     f"requires max_active_runs <= 
{self.timetable.active_runs_limit}"
                 )
+        if self.max_failure_runs is not None and self.max_failure_runs < 0:
+            raise AirflowException(
+                f"Invalid max_failure_runs: {str(self.max_failure_runs)}"
+                f"requires max_failure_runs >= 0"

Review Comment:
   ```suggestion
                   f"Invalid max_failure_runs: {self.max_failure_runs}. "
                   "Requires max_failure_runs >= 0"
   ```
   
   Don't believe string conversion is required here. Also adding whitespace 
between sentences + remove unnecessary f-string.



##########
airflow/config_templates/config.yml:
##########
@@ -116,6 +116,15 @@ core:
       type: string
       example: ~
       default: "16"
+    max_failure_runs_per_dag:
+      description: |
+        The maximum number of consecutive DAG failures before DAG is turned 
off. The scheduler will disable the DAG
+        if it reaches the limit. This is configurable at the DAG level with 
``max_failure_runs``,
+        which is defaulted as ``max_failure_runs_per_dag``.

Review Comment:
   ```suggestion
           The maximum number of consecutive failed DAG runs before a DAG is 
automatically paused. This is also configurable per DAG with the argument 
``max_consecutive_failed_dag_runs``.
   ```
   
   WDYT of having the same name for the global config and per-DAG config for 
clarity? This avoids config name sprawl.



##########
airflow/models/dagrun.py:
##########
@@ -498,6 +498,37 @@ def fetch_task_instances(
             tis = tis.where(TI.task_id.in_(task_ids))
         return session.scalars(tis).all()
 
+    @internal_api_call
+    def _check_last_N_dagruns_failed(

Review Comment:
   ```suggestion
       def _check_last_n_dagruns_failed(
   ```
   
   Just keep everything lowercase



##########
airflow/models/dagrun.py:
##########
@@ -498,6 +498,37 @@ def fetch_task_instances(
             tis = tis.where(TI.task_id.in_(task_ids))
         return session.scalars(tis).all()
 
+    @internal_api_call
+    def _check_last_N_dagruns_failed(
+        self,
+        dag_id: str,
+        number_of_dag_runs: int,

Review Comment:
   Can we just give this argument the same name as the config to avoid variable 
naming sprawl? E.g. `max_consecutive_failed_dag_runs`, this helps code 
readability.



##########
airflow/models/dagrun.py:
##########
@@ -724,6 +755,13 @@ def recalculate(self) -> _UnfinishedStates:
                     msg="task_failure",
                 )
 
+            # checking if the max_failure_runs has been provided and last 
consecutivate failures are more
+            # than this number if so we have to mark this dag as off

Review Comment:
   ```suggestion
               # than this number if so we have to pause this DAG
   ```
   
   Lets use the same wording for everything for readability, so instead of 
"marking off" -> "pausing".



##########
airflow/models/dagrun.py:
##########
@@ -724,6 +755,13 @@ def recalculate(self) -> _UnfinishedStates:
                     msg="task_failure",
                 )
 
+            # checking if the max_failure_runs has been provided and last 
consecutivate failures are more
+            # than this number if so we have to mark this dag as off
+            if bool(dag.max_failure_runs):
+                self.log.info("Checking consecutive failed dags for %s, limit 
is %s", self.dag_id,

Review Comment:
   ```suggestion
                   self.log.info("Checking consecutive failed DAG runs for DAG 
%s, limit is %s", self.dag_id,
   ```



##########
airflow/models/dagrun.py:
##########
@@ -498,6 +498,37 @@ def fetch_task_instances(
             tis = tis.where(TI.task_id.in_(task_ids))
         return session.scalars(tis).all()
 
+    @internal_api_call
+    def _check_last_N_dagruns_failed(
+        self,
+        dag_id: str,
+        number_of_dag_runs: int,
+        session: Session
+    ):
+        """Check if last N dags failed."""
+        dag_runs = session.query(DagRun).filter(DagRun.dag_id == dag_id) \
+            .order_by(DagRun.execution_date.desc()) \
+            .limit(number_of_dag_runs).all()
+
+        """ Marking dag as paused, if needed"""
+        toBePaused = all(dag_run.state == DagRunState.FAILED for dag_run in 
dag_runs)
+        if toBePaused:
+            from airflow.models.dag import DagModel
+            self.log.info("Marking Dag %s paused", self.dag_id)

Review Comment:
   ```suggestion
               self.log.info("Pausing DAG %s because last %s DAG runs failed.", 
self.dag_id, number_of_dag_runs)
   ```
   
   Simplify wording + add rationale for an event, this helps debugging.



##########
airflow/config_templates/config.yml:
##########
@@ -116,6 +116,15 @@ core:
       type: string
       example: ~
       default: "16"
+    max_failure_runs_per_dag:

Review Comment:
   ```suggestion
       max_consecutive_failed_dag_runs:
   ```
   
   Would add "consecutive" for clarity



##########
airflow/models/dagrun.py:
##########
@@ -724,6 +755,13 @@ def recalculate(self) -> _UnfinishedStates:
                     msg="task_failure",
                 )
 
+            # checking if the max_failure_runs has been provided and last 
consecutivate failures are more

Review Comment:
   ```suggestion
               # Check if the max_failure_runs has been provided and last 
consecutive failures are more
   ```



##########
airflow/models/dagrun.py:
##########
@@ -498,6 +498,37 @@ def fetch_task_instances(
             tis = tis.where(TI.task_id.in_(task_ids))
         return session.scalars(tis).all()
 
+    @internal_api_call
+    def _check_last_N_dagruns_failed(
+        self,
+        dag_id: str,
+        number_of_dag_runs: int,
+        session: Session
+    ):
+        """Check if last N dags failed."""
+        dag_runs = session.query(DagRun).filter(DagRun.dag_id == dag_id) \
+            .order_by(DagRun.execution_date.desc()) \
+            .limit(number_of_dag_runs).all()
+
+        """ Marking dag as paused, if needed"""
+        toBePaused = all(dag_run.state == DagRunState.FAILED for dag_run in 
dag_runs)

Review Comment:
   ```suggestion
           to_be_paused = all(dag_run.state == DagRunState.FAILED for dag_run 
in dag_runs)
   ```



##########
airflow/models/dagrun.py:
##########
@@ -498,6 +498,37 @@ def fetch_task_instances(
             tis = tis.where(TI.task_id.in_(task_ids))
         return session.scalars(tis).all()
 
+    @internal_api_call
+    def _check_last_N_dagruns_failed(
+        self,
+        dag_id: str,
+        number_of_dag_runs: int,
+        session: Session
+    ):
+        """Check if last N dags failed."""
+        dag_runs = session.query(DagRun).filter(DagRun.dag_id == dag_id) \
+            .order_by(DagRun.execution_date.desc()) \
+            .limit(number_of_dag_runs).all()
+
+        """ Marking dag as paused, if needed"""
+        toBePaused = all(dag_run.state == DagRunState.FAILED for dag_run in 
dag_runs)
+        if toBePaused:
+            from airflow.models.dag import DagModel
+            self.log.info("Marking Dag %s paused", self.dag_id)
+            filter_query = [
+                DagModel.dag_id == self.dag_id,
+                DagModel.root_dag_id == self.dag_id  # for sub-dags
+            ]
+            session.execute(
+                update(DagModel)
+                .where(or_(*filter_query))
+                .values(is_paused=True)
+                .execution_options(synchronize_session="fetch")
+            )
+        else:
+            self.log.info("Limit of consecutive dag failed is not reached, Dag 
%s is not being paused.",

Review Comment:
   ```suggestion
               self.log.info("Limit of consecutive failed DAG runs is not 
reached, DAG %s will not be paused.",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to