ashb commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r663752836



##########
File path: airflow/config_templates/config.yml
##########
@@ -1709,6 +1709,14 @@
       type: float
       example: ~
       default: "15.0"
+    - name: clean_tis_without_dag_interval
+      description: |
+        How often (in seconds) to check and fail task instances and dagruns 
whose corresponding
+        DAG is missing
+      version_added: 2.1.1

Review comment:
       ```suggestion
         version_added: 2.1.2
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -807,6 +810,37 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _clean_tis_without_dag(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in 
the dagbag/DB"""
+        states_to_check = State.unfinished - frozenset([State.NONE, 
State.SHUTDOWN])
+        tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
+        missing_dags = {}
+        dag_runs = set()
+
+        for ti in tis:
+            if ti.dag_id in missing_dags:
+                ti.set_state(State.FAILED, session=session)
+                continue
+            # Dag no longer in dagbag?
+            if not self.dagbag.has_dag(ti.dag_id, session=session):
+                ti.set_state(State.FAILED, session=session)
+                dag_runs.add(ti.dag_run)
+                missing_dags[ti.dag_id] = [ti]
+                continue
+        if missing_dags:
+            self.log.warning(
+                "The following Dags are missing, therefore the DAG's "
+                "task instances have been set to failed: \t\n%s",
+                [
+                    (f"Missing DAGs: {dag_name}", f"Failed TaskInstances: 
[{ti}]")
+                    for dag_name, ti in missing_dags.items()
+                ],
+            )
+            self.log.warning("Failing the corresponding DagRuns of the missing 
DAGs. DagRuns: %s", dag_runs)
+            for dr in dag_runs:
+                dr.set_state(State.FAILED)

Review comment:
       ```suggestion
                   dr.state = State.FAILED
   ```

##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,34 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the 
configured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        sd_last_updated_datetime = 
SerializedDagModel.get_last_updated_datetime(
+            dag_id=dag_id,
+            session=session,
+        )
+        sd_has_dag = sd_last_updated_datetime is not None
+        if dag_id not in self.dags:
+            return sd_has_dag
+        if dag_id not in self.dags_last_fetched:
+            return sd_has_dag
+        min_serialized_dag_fetch_secs = 
timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+        if timezone.utcnow() < self.dags_last_fetched[dag_id] + 
min_serialized_dag_fetch_secs:
+            return sd_has_dag
+        if sd_has_dag:
+            return True

Review comment:
       We should refactor this to delay the DB check until we need it -- for 
instance if we have the dag locally, and it was fetched less than the 
configured timeout already, then we don't need to ask the DB

##########
File path: airflow/models/dagbag.py
##########
@@ -166,6 +166,34 @@ def dag_ids(self) -> List[str]:
         """
         return list(self.dags.keys())
 
+    @provide_session
+    def has_dag(self, dag_id: str, session: Session = None):
+        """
+        Checks the "local" cache, if it exists, and is not older than the 
configured cache time, return True
+        else check in the DB if the dag exists, return True, False otherwise
+        """
+        from airflow.models.serialized_dag import SerializedDagModel
+
+        sd_last_updated_datetime = 
SerializedDagModel.get_last_updated_datetime(
+            dag_id=dag_id,
+            session=session,
+        )
+        sd_has_dag = sd_last_updated_datetime is not None
+        if dag_id not in self.dags:
+            return sd_has_dag
+        if dag_id not in self.dags_last_fetched:
+            return sd_has_dag
+        min_serialized_dag_fetch_secs = 
timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
+        if timezone.utcnow() < self.dags_last_fetched[dag_id] + 
min_serialized_dag_fetch_secs:
+            return sd_has_dag
+        if sd_has_dag:
+            return True

Review comment:
       So something like this (pseudo-python):
   
   ```python
           if dag_id in self.dags and timezone.utcnow() < 
self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs:
               return True
   
           sd_last_updated_datetime = 
SerializedDagModel.get_last_updated_datetime(
               dag_id=dag_id,
               session=session,
           )
           # etc ...
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -807,6 +810,37 @@ def _clean_tis_without_dagrun(self, session):
                     raise
             guard.commit()
 
+    @provide_session
+    def _clean_tis_without_dag(self, session: Session = None):
+        """Fails task instances and DagRuns of DAGs that no longer exist in 
the dagbag/DB"""
+        states_to_check = State.unfinished - frozenset([State.NONE, 
State.SHUTDOWN])
+        tis = session.query(TI).filter(TI.state.in_(states_to_check)).all()
+        missing_dags = {}
+        dag_runs = set()
+
+        for ti in tis:
+            if ti.dag_id in missing_dags:
+                ti.set_state(State.FAILED, session=session)
+                continue
+            # Dag no longer in dagbag?
+            if not self.dagbag.has_dag(ti.dag_id, session=session):
+                ti.set_state(State.FAILED, session=session)

Review comment:
       I wonder if this (and L823) should be set to State.REMOVED? It might be 
clearer for debugging for the user than a failure without any logs.

##########
File path: tests/models/test_dagbag.py
##########
@@ -859,6 +859,39 @@ def test_get_dag_with_dag_serialization(self):
         assert set(updated_ser_dag_1.tags) == {"example", "example2", 
"new_tag"}
         assert updated_ser_dag_1_update_time > ser_dag_1_update_time
 
+    @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 
5)
+    def test_has_dag_method(self):
+        """Test has_dag method"""
+        dag_id = "example_bash_operator"
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)):
+            example_bash_op_dag = 
DagBag(include_examples=True).dags.get(dag_id)
+            SerializedDagModel.write_dag(dag=example_bash_op_dag)
+
+            dag_bag = DagBag(read_dags_from_db=True)
+            dag_bag.get_dag(dag_id)  # Add dag to self.dags
+            assert dag_bag.has_dag(dag_id)

Review comment:
       We should put a query count assertion around this line to ensure it is 0

##########
File path: tests/models/test_dagbag.py
##########
@@ -859,6 +859,39 @@ def test_get_dag_with_dag_serialization(self):
         assert set(updated_ser_dag_1.tags) == {"example", "example2", 
"new_tag"}
         assert updated_ser_dag_1_update_time > ser_dag_1_update_time
 
+    @patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL", 
5)
+    def test_has_dag_method(self):
+        """Test has_dag method"""
+        dag_id = "example_bash_operator"
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)):
+            example_bash_op_dag = 
DagBag(include_examples=True).dags.get(dag_id)
+            SerializedDagModel.write_dag(dag=example_bash_op_dag)
+
+            dag_bag = DagBag(read_dags_from_db=True)
+            dag_bag.get_dag(dag_id)  # Add dag to self.dags
+            assert dag_bag.has_dag(dag_id)
+            assert not dag_bag.has_dag("non_added_dag_id")
+
+        # Check that min_serialized_dag_fetch_interval has passed but
+        # dag is in SerializedDagModel and we return True
+        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 6)):
+            assert dag_bag.has_dag(dag_id)
+
+        # We remove the dag from dag_bag.dags and ensure it returns True
+        dag_bag.dags.pop(dag_id)
+        assert not dag_bag.dags.get(dag_id)

Review comment:
       ```suggestion
   ```
   
   This line was just testing python core semantics, no need to do it in our 
unit tests :) 

##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1475,6 +1475,55 @@ def 
test_scheduler_loop_should_change_state_for_tis_without_dagrun(
             assert ti.start_date == ti.end_date
             assert ti.duration is not None
 
+    def 
test_scheduler_loop_should_fail_tasks_with_missing_dag_in_dagbag_dags(self):
+        """This tests that if dags are missing in dagbag, then it should be 
failed"""
+        session = settings.Session()
+        dag_id = 'test_execute_helper_should_fail_tasks_with_missing_dag'
+        dag = DAG(dag_id, start_date=DEFAULT_DATE, default_args={'owner': 
'owner1'})
+
+        with dag:
+            op1 = BashOperator(task_id='op1', bash_command='sleep 5')
+
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, 
read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
+
+        # Mock dagbag.has_dag
+        dagbag.has_dag = mock.MagicMock(return_value=False)
+
+        dag = DagBag(read_dags_from_db=True, 
include_examples=False).get_dag(dag_id)
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE + timedelta(days=1),
+            start_date=DEFAULT_DATE + timedelta(days=1),
+            session=session,
+        )
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.state = State.RUNNING
+        session.commit()
+
+        # This poll interval is large, bug the scheduler doesn't sleep that
+        # long, instead we hit the clean_tis_without_dag interval instead
+        self.scheduler_job = SchedulerJob(num_runs=2, 
processor_poll_interval=30)
+        self.scheduler_job.dagbag = dagbag
+        executor = MockExecutor(do_update=False)
+        self.scheduler_job.executor = executor
+        processor = mock.MagicMock()
+        processor.done = False
+        self.scheduler_job.processor_agent = processor
+
+        with mock.patch.object(settings, "USE_JOB_SCHEDULE", False), conf_vars(
+            {('scheduler', 'clean_tis_without_dag_interval'): '0.001'}
+        ):
+            self.scheduler_job._run_scheduler_loop()

Review comment:
       Not sure we need to run the whole scheduler loop here -- we could just 
call `self.scheduler_job._clean_tis_without_dag()` directly.
   
   By calling scheduler_loop the only thing extra we check is that we've added 
this to the timer, but we can see that pretty easily.
   
   Dunno :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to