This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0949a8d2be Don't queue runs on DatasetEvent for disabled DAGs (#38891)
0949a8d2be is described below

commit 0949a8d2be751b9bfb9281373ab9ac9f785bc907
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Fri Apr 19 09:33:36 2024 +0800

    Don't queue runs on DatasetEvent for disabled DAGs (#38891)
---
 airflow/datasets/manager.py         | 38 ++++++++++++++++-----------
 airflow/models/dataset.py           |  2 ++
 newsfragments/38891.significant.rst | 10 +++++++
 tests/datasets/test_manager.py      |  4 +--
 tests/jobs/test_scheduler_job.py    | 52 ++++++++++++++++++++++++++++++++++++-
 5 files changed, 88 insertions(+), 18 deletions(-)

diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py
index e0c2db9e1f..0a6aad50a0 100644
--- a/airflow/datasets/manager.py
+++ b/airflow/datasets/manager.py
@@ -25,13 +25,14 @@ from sqlalchemy.orm import joinedload
 from airflow.configuration import conf
 from airflow.datasets import Dataset
 from airflow.listeners.listener import get_listener_manager
-from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, 
DatasetModel
+from airflow.models.dataset import DagScheduleDatasetReference, 
DatasetDagRunQueue, DatasetEvent, DatasetModel
 from airflow.stats import Stats
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 if TYPE_CHECKING:
     from sqlalchemy.orm.session import Session
 
+    from airflow.models.dag import DagModel
     from airflow.models.taskinstance import TaskInstance
 
 
@@ -73,7 +74,7 @@ class DatasetManager(LoggingMixin):
         dataset_model = session.scalar(
             select(DatasetModel)
             .where(DatasetModel.uri == dataset.uri)
-            .options(joinedload(DatasetModel.consuming_dags))
+            
.options(joinedload(DatasetModel.consuming_dags).joinedload(DagScheduleDatasetReference.dag))
         )
         if not dataset_model:
             self.log.warning("DatasetModel %s not found", dataset)
@@ -99,8 +100,7 @@ class DatasetManager(LoggingMixin):
         self.notify_dataset_changed(dataset=dataset)
 
         Stats.incr("dataset.updates")
-        if dataset_model.consuming_dags:
-            self._queue_dagruns(dataset_model, session)
+        self._queue_dagruns(dataset_model, session)
         session.flush()
         return dataset_event
 
@@ -127,27 +127,35 @@ class DatasetManager(LoggingMixin):
         return self._slow_path_queue_dagruns(dataset, session)
 
     def _slow_path_queue_dagruns(self, dataset: DatasetModel, session: 
Session) -> None:
-        consuming_dag_ids = [x.dag_id for x in dataset.consuming_dags]
-        self.log.debug("consuming dag ids %s", consuming_dag_ids)
-
-        # Don't error whole transaction when a single RunQueue item conflicts.
-        # 
https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#using-savepoint
-        for dag_id in consuming_dag_ids:
-            item = DatasetDagRunQueue(target_dag_id=dag_id, 
dataset_id=dataset.id)
+        def _queue_dagrun_if_needed(dag: DagModel) -> str | None:
+            if not dag.is_active or dag.is_paused:
+                return None
+            item = DatasetDagRunQueue(target_dag_id=dag.dag_id, 
dataset_id=dataset.id)
+            # Don't error whole transaction when a single RunQueue item 
conflicts.
+            # 
https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#using-savepoint
             try:
                 with session.begin_nested():
                     session.merge(item)
             except exc.IntegrityError:
                 self.log.debug("Skipping record %s", item, exc_info=True)
+            return dag.dag_id
+
+        queued_results = (_queue_dagrun_if_needed(ref.dag) for ref in 
dataset.consuming_dags)
+        if queued_dag_ids := [r for r in queued_results if r is not None]:
+            self.log.debug("consuming dag ids %s", queued_dag_ids)
 
     def _postgres_queue_dagruns(self, dataset: DatasetModel, session: Session) 
-> None:
         from sqlalchemy.dialects.postgresql import insert
 
+        values = [
+            {"target_dag_id": dag.dag_id}
+            for dag in (r.dag for r in dataset.consuming_dags)
+            if dag.is_active and not dag.is_paused
+        ]
+        if not values:
+            return
         stmt = 
insert(DatasetDagRunQueue).values(dataset_id=dataset.id).on_conflict_do_nothing()
-        session.execute(
-            stmt,
-            [{"target_dag_id": target_dag.dag_id} for target_dag in 
dataset.consuming_dags],
-        )
+        session.execute(stmt, values)
 
 
 def resolve_dataset_manager() -> DatasetManager:
diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py
index aa10eb3809..19ebce0897 100644
--- a/airflow/models/dataset.py
+++ b/airflow/models/dataset.py
@@ -112,6 +112,8 @@ class DagScheduleDatasetReference(Base):
     updated_at = Column(UtcDateTime, default=timezone.utcnow, 
onupdate=timezone.utcnow, nullable=False)
 
     dataset = relationship("DatasetModel", back_populates="consuming_dags")
+    dag = relationship("DagModel")
+
     queue_records = relationship(
         "DatasetDagRunQueue",
         primaryjoin="""and_(
diff --git a/newsfragments/38891.significant.rst 
b/newsfragments/38891.significant.rst
new file mode 100644
index 0000000000..82caa4cdfc
--- /dev/null
+++ b/newsfragments/38891.significant.rst
@@ -0,0 +1,10 @@
+Datasets no longer trigger inactive DAGs
+
+Previously, when a DAG is paused or removed, incoming dataset events would 
still
+trigger it, and the DAG would run when it is unpaused or added back in a DAG
+file. This has been changed; a DAG's dataset schedule can now only be satisfied
+by events that occur when the DAG is active. While this is a breaking change,
+the previous behavior is considered a bug.
+
+The behavior of time-based scheduling is unchanged, including the timetable 
part
+of ``DatasetOrTimeSchedule``.
diff --git a/tests/datasets/test_manager.py b/tests/datasets/test_manager.py
index 4acb3922f4..25f0d9e52b 100644
--- a/tests/datasets/test_manager.py
+++ b/tests/datasets/test_manager.py
@@ -70,8 +70,8 @@ class TestDatasetManager:
         dsem = DatasetManager()
 
         ds = Dataset(uri="test_dataset_uri")
-        dag1 = DagModel(dag_id="dag1")
-        dag2 = DagModel(dag_id="dag2")
+        dag1 = DagModel(dag_id="dag1", is_active=True)
+        dag2 = DagModel(dag_id="dag2", is_active=True)
         session.add_all([dag1, dag2])
 
         dsm = DatasetModel(uri="test_dataset_uri")
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 70fb64df1d..389172bed1 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -30,7 +30,7 @@ from unittest.mock import MagicMock, PropertyMock, patch
 import psutil
 import pytest
 import time_machine
-from sqlalchemy import func
+from sqlalchemy import func, select, update
 
 import airflow.example_dags
 from airflow import settings
@@ -39,6 +39,7 @@ from airflow.callbacks.database_callback_sink import 
DatabaseCallbackSink
 from airflow.callbacks.pipe_callback_sink import PipeCallbackSink
 from airflow.dag_processing.manager import DagFileProcessorAgent
 from airflow.datasets import Dataset
+from airflow.datasets.manager import DatasetManager
 from airflow.exceptions import AirflowException
 from airflow.executors.base_executor import BaseExecutor
 from airflow.executors.executor_constants import MOCK_EXECUTOR
@@ -3784,6 +3785,55 @@ class TestSchedulerJob:
 
         assert dag3.get_last_dagrun().creating_job_id == scheduler_job.id
 
+    @pytest.mark.need_serialized_dag
+    @pytest.mark.parametrize(
+        "disable, enable",
+        [
+            pytest.param({"is_active": False}, {"is_active": True}, 
id="active"),
+            pytest.param({"is_paused": True}, {"is_paused": False}, 
id="paused"),
+        ],
+    )
+    def test_no_create_dag_runs_when_dag_disabled(self, session, dag_maker, 
disable, enable):
+        ds = Dataset("ds")
+        with dag_maker(dag_id="consumer", schedule=[ds], session=session):
+            pass
+        with dag_maker(dag_id="producer", schedule="@daily", session=session):
+            BashOperator(task_id="task", bash_command="echo 1", outlets=ds)
+        dsm = DatasetManager()
+
+        ds_id = 
session.scalars(select(DatasetModel.id).filter_by(uri=ds.uri)).one()
+
+        dse_q = select(DatasetEvent).where(DatasetEvent.dataset_id == 
ds_id).order_by(DatasetEvent.timestamp)
+        ddrq_q = select(DatasetDagRunQueue).where(
+            DatasetDagRunQueue.dataset_id == ds_id, 
DatasetDagRunQueue.target_dag_id == "consumer"
+        )
+
+        # Simulate the consumer DAG being disabled.
+        session.execute(update(DagModel).where(DagModel.dag_id == 
"consumer").values(**disable))
+
+        # A DDRQ is not scheduled although an event is emitted.
+        dr1: DagRun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+        dsm.register_dataset_change(
+            task_instance=dr1.get_task_instance("task", session=session),
+            dataset=ds,
+            session=session,
+        )
+        assert session.scalars(dse_q).one().source_run_id == dr1.run_id
+        assert session.scalars(ddrq_q).one_or_none() is None
+
+        # Simulate the consumer DAG being enabled.
+        session.execute(update(DagModel).where(DagModel.dag_id == 
"consumer").values(**enable))
+
+        # A DDRQ should be scheduled for the new event, but not the previous 
one.
+        dr2: DagRun = dag_maker.create_dagrun_after(dr1, 
run_type=DagRunType.SCHEDULED)
+        dsm.register_dataset_change(
+            task_instance=dr2.get_task_instance("task", session=session),
+            dataset=ds,
+            session=session,
+        )
+        assert [e.source_run_id for e in session.scalars(dse_q)] == 
[dr1.run_id, dr2.run_id]
+        assert session.scalars(ddrq_q).one().target_dag_id == "consumer"
+
     @time_machine.travel(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9), 
tick=False)
     @mock.patch("airflow.jobs.scheduler_job_runner.Stats.timing")
     def test_start_dagruns(self, stats_timing, dag_maker):

Reply via email to