This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0949a8d2be Don't queue runs on DatasetEvent for disabled DAGs (#38891)
0949a8d2be is described below
commit 0949a8d2be751b9bfb9281373ab9ac9f785bc907
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Fri Apr 19 09:33:36 2024 +0800
Don't queue runs on DatasetEvent for disabled DAGs (#38891)
---
airflow/datasets/manager.py | 38 ++++++++++++++++-----------
airflow/models/dataset.py | 2 ++
newsfragments/38891.significant.rst | 10 +++++++
tests/datasets/test_manager.py | 4 +--
tests/jobs/test_scheduler_job.py | 52 ++++++++++++++++++++++++++++++++++++-
5 files changed, 88 insertions(+), 18 deletions(-)
diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py
index e0c2db9e1f..0a6aad50a0 100644
--- a/airflow/datasets/manager.py
+++ b/airflow/datasets/manager.py
@@ -25,13 +25,14 @@ from sqlalchemy.orm import joinedload
from airflow.configuration import conf
from airflow.datasets import Dataset
from airflow.listeners.listener import get_listener_manager
-from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent,
DatasetModel
+from airflow.models.dataset import DagScheduleDatasetReference,
DatasetDagRunQueue, DatasetEvent, DatasetModel
from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin
if TYPE_CHECKING:
from sqlalchemy.orm.session import Session
+ from airflow.models.dag import DagModel
from airflow.models.taskinstance import TaskInstance
@@ -73,7 +74,7 @@ class DatasetManager(LoggingMixin):
dataset_model = session.scalar(
select(DatasetModel)
.where(DatasetModel.uri == dataset.uri)
- .options(joinedload(DatasetModel.consuming_dags))
+
.options(joinedload(DatasetModel.consuming_dags).joinedload(DagScheduleDatasetReference.dag))
)
if not dataset_model:
self.log.warning("DatasetModel %s not found", dataset)
@@ -99,8 +100,7 @@ class DatasetManager(LoggingMixin):
self.notify_dataset_changed(dataset=dataset)
Stats.incr("dataset.updates")
- if dataset_model.consuming_dags:
- self._queue_dagruns(dataset_model, session)
+ self._queue_dagruns(dataset_model, session)
session.flush()
return dataset_event
@@ -127,27 +127,35 @@ class DatasetManager(LoggingMixin):
return self._slow_path_queue_dagruns(dataset, session)
def _slow_path_queue_dagruns(self, dataset: DatasetModel, session:
Session) -> None:
- consuming_dag_ids = [x.dag_id for x in dataset.consuming_dags]
- self.log.debug("consuming dag ids %s", consuming_dag_ids)
-
- # Don't error whole transaction when a single RunQueue item conflicts.
- #
https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#using-savepoint
- for dag_id in consuming_dag_ids:
- item = DatasetDagRunQueue(target_dag_id=dag_id,
dataset_id=dataset.id)
+ def _queue_dagrun_if_needed(dag: DagModel) -> str | None:
+ if not dag.is_active or dag.is_paused:
+ return None
+ item = DatasetDagRunQueue(target_dag_id=dag.dag_id,
dataset_id=dataset.id)
+ # Don't error whole transaction when a single RunQueue item
conflicts.
+ #
https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#using-savepoint
try:
with session.begin_nested():
session.merge(item)
except exc.IntegrityError:
self.log.debug("Skipping record %s", item, exc_info=True)
+ return dag.dag_id
+
+ queued_results = (_queue_dagrun_if_needed(ref.dag) for ref in
dataset.consuming_dags)
+ if queued_dag_ids := [r for r in queued_results if r is not None]:
+ self.log.debug("consuming dag ids %s", queued_dag_ids)
def _postgres_queue_dagruns(self, dataset: DatasetModel, session: Session)
-> None:
from sqlalchemy.dialects.postgresql import insert
+ values = [
+ {"target_dag_id": dag.dag_id}
+ for dag in (r.dag for r in dataset.consuming_dags)
+ if dag.is_active and not dag.is_paused
+ ]
+ if not values:
+ return
stmt =
insert(DatasetDagRunQueue).values(dataset_id=dataset.id).on_conflict_do_nothing()
- session.execute(
- stmt,
- [{"target_dag_id": target_dag.dag_id} for target_dag in
dataset.consuming_dags],
- )
+ session.execute(stmt, values)
def resolve_dataset_manager() -> DatasetManager:
diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py
index aa10eb3809..19ebce0897 100644
--- a/airflow/models/dataset.py
+++ b/airflow/models/dataset.py
@@ -112,6 +112,8 @@ class DagScheduleDatasetReference(Base):
updated_at = Column(UtcDateTime, default=timezone.utcnow,
onupdate=timezone.utcnow, nullable=False)
dataset = relationship("DatasetModel", back_populates="consuming_dags")
+ dag = relationship("DagModel")
+
queue_records = relationship(
"DatasetDagRunQueue",
primaryjoin="""and_(
diff --git a/newsfragments/38891.significant.rst
b/newsfragments/38891.significant.rst
new file mode 100644
index 0000000000..82caa4cdfc
--- /dev/null
+++ b/newsfragments/38891.significant.rst
@@ -0,0 +1,10 @@
+Datasets no longer trigger inactive DAGs
+
+Previously, when a DAG is paused or removed, incoming dataset events would
still
+trigger it, and the DAG would run when it is unpaused or added back in a DAG
+file. This has been changed; a DAG's dataset schedule can now only be satisfied
+by events that occur when the DAG is active. While this is a breaking change,
+the previous behavior is considered a bug.
+
+The behavior of time-based scheduling is unchanged, including the timetable
part
+of ``DatasetOrTimeSchedule``.
diff --git a/tests/datasets/test_manager.py b/tests/datasets/test_manager.py
index 4acb3922f4..25f0d9e52b 100644
--- a/tests/datasets/test_manager.py
+++ b/tests/datasets/test_manager.py
@@ -70,8 +70,8 @@ class TestDatasetManager:
dsem = DatasetManager()
ds = Dataset(uri="test_dataset_uri")
- dag1 = DagModel(dag_id="dag1")
- dag2 = DagModel(dag_id="dag2")
+ dag1 = DagModel(dag_id="dag1", is_active=True)
+ dag2 = DagModel(dag_id="dag2", is_active=True)
session.add_all([dag1, dag2])
dsm = DatasetModel(uri="test_dataset_uri")
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 70fb64df1d..389172bed1 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -30,7 +30,7 @@ from unittest.mock import MagicMock, PropertyMock, patch
import psutil
import pytest
import time_machine
-from sqlalchemy import func
+from sqlalchemy import func, select, update
import airflow.example_dags
from airflow import settings
@@ -39,6 +39,7 @@ from airflow.callbacks.database_callback_sink import
DatabaseCallbackSink
from airflow.callbacks.pipe_callback_sink import PipeCallbackSink
from airflow.dag_processing.manager import DagFileProcessorAgent
from airflow.datasets import Dataset
+from airflow.datasets.manager import DatasetManager
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor
from airflow.executors.executor_constants import MOCK_EXECUTOR
@@ -3784,6 +3785,55 @@ class TestSchedulerJob:
assert dag3.get_last_dagrun().creating_job_id == scheduler_job.id
+ @pytest.mark.need_serialized_dag
+ @pytest.mark.parametrize(
+ "disable, enable",
+ [
+ pytest.param({"is_active": False}, {"is_active": True},
id="active"),
+ pytest.param({"is_paused": True}, {"is_paused": False},
id="paused"),
+ ],
+ )
+ def test_no_create_dag_runs_when_dag_disabled(self, session, dag_maker,
disable, enable):
+ ds = Dataset("ds")
+ with dag_maker(dag_id="consumer", schedule=[ds], session=session):
+ pass
+ with dag_maker(dag_id="producer", schedule="@daily", session=session):
+ BashOperator(task_id="task", bash_command="echo 1", outlets=ds)
+ dsm = DatasetManager()
+
+ ds_id =
session.scalars(select(DatasetModel.id).filter_by(uri=ds.uri)).one()
+
+ dse_q = select(DatasetEvent).where(DatasetEvent.dataset_id ==
ds_id).order_by(DatasetEvent.timestamp)
+ ddrq_q = select(DatasetDagRunQueue).where(
+ DatasetDagRunQueue.dataset_id == ds_id,
DatasetDagRunQueue.target_dag_id == "consumer"
+ )
+
+ # Simulate the consumer DAG being disabled.
+ session.execute(update(DagModel).where(DagModel.dag_id ==
"consumer").values(**disable))
+
+ # A DDRQ is not scheduled although an event is emitted.
+ dr1: DagRun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+ dsm.register_dataset_change(
+ task_instance=dr1.get_task_instance("task", session=session),
+ dataset=ds,
+ session=session,
+ )
+ assert session.scalars(dse_q).one().source_run_id == dr1.run_id
+ assert session.scalars(ddrq_q).one_or_none() is None
+
+ # Simulate the consumer DAG being enabled.
+ session.execute(update(DagModel).where(DagModel.dag_id ==
"consumer").values(**enable))
+
+ # A DDRQ should be scheduled for the new event, but not the previous
one.
+ dr2: DagRun = dag_maker.create_dagrun_after(dr1,
run_type=DagRunType.SCHEDULED)
+ dsm.register_dataset_change(
+ task_instance=dr2.get_task_instance("task", session=session),
+ dataset=ds,
+ session=session,
+ )
+ assert [e.source_run_id for e in session.scalars(dse_q)] ==
[dr1.run_id, dr2.run_id]
+ assert session.scalars(ddrq_q).one().target_dag_id == "consumer"
+
@time_machine.travel(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9),
tick=False)
@mock.patch("airflow.jobs.scheduler_job_runner.Stats.timing")
def test_start_dagruns(self, stats_timing, dag_maker):