This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new 35264c1114 fix(datasets/managers): fix error handling file loc when
dataset alias resolved into new datasets (#42733)
35264c1114 is described below
commit 35264c11148370286bc6ffafe999d5556c2d5096
Author: Wei Lee <[email protected]>
AuthorDate: Tue Oct 8 12:25:16 2024 +0800
fix(datasets/managers): fix error handling file loc when dataset alias
resolved into new datasets (#42733)
* fix(datasets/managers): fix error handling fileloc when datasetalias
resolved into new datasets
* test(datasets/manager): add test case
test_register_dataset_change_with_alias
* refactor(datasets/manager): simplify for loop
Co-authored-by: Tzu-ping Chung <[email protected]>
---------
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow/datasets/manager.py | 5 ++--
tests/datasets/test_manager.py | 55 ++++++++++++++++++++++++++++++++++++++++--
2 files changed, 56 insertions(+), 4 deletions(-)
diff --git a/airflow/datasets/manager.py b/airflow/datasets/manager.py
index 058eef6ab8..306781daba 100644
--- a/airflow/datasets/manager.py
+++ b/airflow/datasets/manager.py
@@ -226,14 +226,15 @@ class DatasetManager(LoggingMixin):
return None
return req.fileloc
- (_send_dag_priority_parsing_request_if_needed(fileloc) for fileloc in
file_locs)
+ for fileloc in file_locs:
+ _send_dag_priority_parsing_request_if_needed(fileloc)
@classmethod
def _postgres_send_dag_priority_parsing_request(cls, file_locs:
Iterable[str], session: Session) -> None:
from sqlalchemy.dialects.postgresql import insert
stmt = insert(DagPriorityParsingRequest).on_conflict_do_nothing()
- session.execute(stmt, {"fileloc": fileloc for fileloc in file_locs})
+ session.execute(stmt, [{"fileloc": fileloc} for fileloc in file_locs])
def resolve_dataset_manager() -> DatasetManager:
diff --git a/tests/datasets/test_manager.py b/tests/datasets/test_manager.py
index 1e7b4fda40..65ebbf0006 100644
--- a/tests/datasets/test_manager.py
+++ b/tests/datasets/test_manager.py
@@ -24,11 +24,19 @@ from unittest import mock
import pytest
from sqlalchemy import delete
-from airflow.datasets import Dataset
+from airflow.datasets import Dataset, DatasetAlias
from airflow.datasets.manager import DatasetManager
from airflow.listeners.listener import get_listener_manager
from airflow.models.dag import DagModel
-from airflow.models.dataset import DagScheduleDatasetReference,
DatasetDagRunQueue, DatasetEvent, DatasetModel
+from airflow.models.dagbag import DagPriorityParsingRequest
+from airflow.models.dataset import (
+ DagScheduleDatasetAliasReference,
+ DagScheduleDatasetReference,
+ DatasetAliasModel,
+ DatasetDagRunQueue,
+ DatasetEvent,
+ DatasetModel,
+)
from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
from tests.listeners import dataset_listener
@@ -38,6 +46,15 @@ pytestmark = pytest.mark.db_test
pytest.importorskip("pydantic", minversion="2.0.0")
[email protected]
+def clear_datasets():
+ from tests.test_utils.db import clear_db_datasets
+
+ clear_db_datasets()
+ yield
+ clear_db_datasets()
+
+
@pytest.fixture
def mock_task_instance():
return TaskInstancePydantic(
@@ -127,6 +144,40 @@ class TestDatasetManager:
assert
session.query(DatasetEvent).filter_by(dataset_id=dsm.id).count() == 1
assert session.query(DatasetDagRunQueue).count() == 2
+ @pytest.mark.usefixtures("clear_datasets")
+ def test_register_dataset_change_with_alias(self, session, dag_maker,
mock_task_instance):
+ consumer_dag_1 = DagModel(dag_id="conumser_1", is_active=True,
fileloc="dag1.py")
+ consumer_dag_2 = DagModel(dag_id="conumser_2", is_active=True,
fileloc="dag2.py")
+ session.add_all([consumer_dag_1, consumer_dag_2])
+
+ dsm = DatasetModel(uri="test_dataset_uri")
+ session.add(dsm)
+
+ dsam = DatasetAliasModel(name="test_dataset_name")
+ session.add(dsam)
+ dsam.consuming_dags = [
+ DagScheduleDatasetAliasReference(dag_id=dag.dag_id) for dag in
(consumer_dag_1, consumer_dag_2)
+ ]
+ session.execute(delete(DatasetDagRunQueue))
+ session.flush()
+
+ dataset = Dataset(uri="test_dataset_uri")
+ dataset_alias = DatasetAlias(name="test_dataset_name")
+ dataset_manager = DatasetManager()
+ dataset_manager.register_dataset_change(
+ task_instance=mock_task_instance,
+ dataset=dataset,
+ aliases=[dataset_alias],
+ source_alias_names=["test_dataset_name"],
+ session=session,
+ )
+ session.flush()
+
+ # Ensure we've created an asset
+ assert
session.query(DatasetEvent).filter_by(dataset_id=dsm.id).count() == 1
+ assert session.query(DatasetDagRunQueue).count() == 2
+ assert session.query(DagPriorityParsingRequest).count() == 2
+
def test_register_dataset_change_no_downstreams(self, session,
mock_task_instance):
dsem = DatasetManager()