This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b0442f3  Handle timetable exception in ``DAG.next_dagrun_info`` 
(#18729)
b0442f3 is described below

commit b0442f30e296eae5e1f38fb74d0fbb6b1c5cef54
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Tue Oct 5 19:10:38 2021 +0800

    Handle timetable exception in ``DAG.next_dagrun_info`` (#18729)
    
    For now, the exception is simply logged and the DAG not being scheduled.
    
    Currently, an incorrectly implemented timetable may crash the scheduler 
process entirely due to uncaught exceptions. This PR adds exception handlers 
around those calls. A failed infer_manual_data_interval() will cause the manual 
DAG run being skipped, and a failed next_dagrun_info() will cause the DAG run 
to not happen, and the DAG not being scheduled anymore (because None is set on 
DagModel.next_dagrun) until the DAG file is modified.
    
    For now, the exception is simply logged. In the future we'll add a new db 
model similar to ImportError and hold these errors and display them on the web 
UI. This new model class will also be designed to incorporate ImportError 
eventually.
---
 airflow/models/dag.py    | 47 ++++++++++++++++++++++++++------
 tests/models/test_dag.py | 71 ++++++++++++++++++++++++++++++++++++++++++++++--
 2 files changed, 107 insertions(+), 11 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 41f7b23..056283b 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -713,10 +713,19 @@ class DAG(LoggingMixin):
             restriction = self._time_restriction
         else:
             restriction = TimeRestriction(earliest=None, latest=None, 
catchup=True)
-        return self.timetable.next_dagrun_info(
-            last_automated_data_interval=data_interval,
-            restriction=restriction,
-        )
+        try:
+            info = self.timetable.next_dagrun_info(
+                last_automated_data_interval=data_interval,
+                restriction=restriction,
+            )
+        except Exception:
+            self.log.exception(
+                "Failed to fetch run info after data interval %s for DAG %r",
+                data_interval,
+                self.dag_id,
+            )
+            info = None
+        return info
 
     def next_dagrun_after_date(self, date_last_automated_dagrun: 
Optional[pendulum.DateTime]):
         warnings.warn(
@@ -790,7 +799,19 @@ class DAG(LoggingMixin):
         if self.is_subdag:
             align = False
 
-        info = 
self.timetable.next_dagrun_info(last_automated_data_interval=None, 
restriction=restriction)
+        try:
+            info = self.timetable.next_dagrun_info(
+                last_automated_data_interval=None,
+                restriction=restriction,
+            )
+        except Exception:
+            self.log.exception(
+                "Failed to fetch run info after data interval %s for DAG %r",
+                None,
+                self.dag_id,
+            )
+            info = None
+
         if info is None:
             # No runs to be scheduled between the user-supplied timeframe. But
             # if align=False, "invent" a data interval for the timeframe 
itself.
@@ -806,10 +827,18 @@ class DAG(LoggingMixin):
         # Generate naturally according to schedule.
         while info is not None:
             yield info
-            info = self.timetable.next_dagrun_info(
-                last_automated_data_interval=info.data_interval,
-                restriction=restriction,
-            )
+            try:
+                info = self.timetable.next_dagrun_info(
+                    last_automated_data_interval=info.data_interval,
+                    restriction=restriction,
+                )
+            except Exception:
+                self.log.exception(
+                    "Failed to fetch run info after data interval %s for DAG 
%r",
+                    info.data_interval,
+                    self.dag_id,
+                )
+                break
 
     def get_run_dates(self, start_date, end_date=None):
         """
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index a853d85..e77f271 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -27,7 +27,7 @@ from contextlib import redirect_stdout
 from datetime import timedelta
 from pathlib import Path
 from tempfile import NamedTemporaryFile
-from typing import Optional
+from typing import List, Optional
 from unittest import mock
 from unittest.mock import patch
 
@@ -51,7 +51,7 @@ from airflow.operators.bash import BashOperator
 from airflow.operators.dummy import DummyOperator
 from airflow.operators.subdag import SubDagOperator
 from airflow.security import permissions
-from airflow.timetables.base import DagRunInfo
+from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
 from airflow.timetables.simple import NullTimetable, OnceTimetable
 from airflow.utils import timezone
 from airflow.utils.file import list_py_file_paths
@@ -1675,6 +1675,40 @@ class TestDag(unittest.TestCase):
         next_info = dag.next_dagrun_info(next_info.data_interval)
         assert next_info and next_info.logical_date == timezone.datetime(2020, 
5, 4)
 
+    def test_next_dagrun_info_timetable_exception(self):
+        """Test the DAG does not crash the scheduler if the timetable raises 
an exception."""
+
+        class FailingTimetable(Timetable):
+            def next_dagrun_info(self, last_automated_data_interval, 
restriction):
+                raise RuntimeError("this fails")
+
+        dag = DAG(
+            "test_next_dagrun_info_timetable_exception",
+            start_date=timezone.datetime(2020, 5, 1),
+            timetable=FailingTimetable(),
+            catchup=True,
+        )
+
+        def _check_logs(records: List[logging.LogRecord], data_interval: 
DataInterval) -> None:
+            assert len(records) == 1
+            record = records[0]
+            assert record.exc_info is not None, "Should contain exception"
+            assert record.getMessage() == (
+                f"Failed to fetch run info after data interval {data_interval} 
"
+                f"for DAG 'test_next_dagrun_info_timetable_exception'"
+            )
+
+        with self.assertLogs(dag.log, level=logging.ERROR) as ctx:
+            next_info = dag.next_dagrun_info(None)
+        assert next_info is None, "failed next_dagrun_info should return None"
+        _check_logs(ctx.records, data_interval=None)
+
+        data_interval = DataInterval(timezone.datetime(2020, 5, 1), 
timezone.datetime(2020, 5, 2))
+        with self.assertLogs(dag.log, level=logging.ERROR) as ctx:
+            next_info = dag.next_dagrun_info(data_interval)
+        assert next_info is None, "failed next_dagrun_info should return None"
+        _check_logs(ctx.records, data_interval)
+
     def test_next_dagrun_after_auto_align(self):
         """
         Test if the schedule_interval will be auto aligned with the start_date
@@ -2147,3 +2181,36 @@ def test_iter_dagrun_infos_between(start_date, 
expected_infos):
         align=True,
     )
     assert expected_infos == list(iterator)
+
+
+def test_iter_dagrun_infos_between_error(caplog):
+    start = pendulum.instance(DEFAULT_DATE - datetime.timedelta(hours=1))
+    end = pendulum.instance(DEFAULT_DATE)
+
+    class FailingAfterOneTimetable(Timetable):
+        def next_dagrun_info(self, last_automated_data_interval, restriction):
+            if last_automated_data_interval is None:
+                return DagRunInfo.interval(start, end)
+            raise RuntimeError("this fails")
+
+    dag = DAG(
+        dag_id='test_iter_dagrun_infos_between_error',
+        start_date=DEFAULT_DATE,
+        timetable=FailingAfterOneTimetable(),
+    )
+
+    iterator = dag.iter_dagrun_infos_between(earliest=start, latest=end, 
align=True)
+    with caplog.at_level(logging.ERROR):
+        infos = list(iterator)
+
+    # The second timetable.next_dagrun_info() call raises an exception, so 
only the first result is returned.
+    assert infos == [DagRunInfo.interval(start, end)]
+
+    assert caplog.record_tuples == [
+        (
+            "airflow.models.dag.DAG",
+            logging.ERROR,
+            f"Failed to fetch run info after data interval 
{DataInterval(start, end)} for DAG {dag.dag_id!r}",
+        ),
+    ]
+    assert caplog.records[0].exc_info is not None, "should contain exception 
context"

Reply via email to