o-nikolas commented on code in PR #44456:
URL: https://github.com/apache/airflow/pull/44456#discussion_r1866566263
##########
tests/dag_processing/test_collection.py:
##########
@@ -62,3 +76,57 @@ def test_statement_latest_runs_many_dag():
"WHERE dag_run.dag_id = anon_1.dag_id AND dag_run.logical_date =
anon_1.max_logical_date",
]
assert actual == expected, compiled_stmt
+
+
[email protected]_test
+class TestAssetModelOperation:
+ @staticmethod
+ def clean_db():
+ clear_db_dags()
+ clear_db_assets()
+ clear_db_triggers()
+
+ @pytest.fixture(autouse=True)
+ def per_test(self) -> Generator:
+ self.clean_db()
+ yield
+ self.clean_db()
Review Comment:
Why do we need to clean before and after? Shouldn't just after be sufficient?
##########
tests/dag_processing/test_collection.py:
##########
@@ -62,3 +76,57 @@ def test_statement_latest_runs_many_dag():
"WHERE dag_run.dag_id = anon_1.dag_id AND dag_run.logical_date =
anon_1.max_logical_date",
]
assert actual == expected, compiled_stmt
+
+
[email protected]_test
+class TestAssetModelOperation:
+ @staticmethod
+ def clean_db():
+ clear_db_dags()
+ clear_db_assets()
+ clear_db_triggers()
+
+ @pytest.fixture(autouse=True)
+ def per_test(self) -> Generator:
+ self.clean_db()
+ yield
+ self.clean_db()
+
+ @pytest.mark.parametrize(
+ "is_active, is_paused, expected_num_triggers",
+ [
+ (True, True, 0),
+ (True, False, 1),
+ (False, True, 0),
+ (False, False, 0),
+ ],
+ )
+ def test_add_asset_trigger_references(self, is_active, is_paused,
expected_num_triggers, dag_maker):
+ trigger = TimeDeltaTrigger(timedelta(seconds=0))
+ asset = Asset("test_add_asset_trigger_references_asset",
watchers=[trigger])
+
+ with dag_maker(dag_id="test_add_asset_trigger_references_dag",
schedule=[asset]) as dag:
+ EmptyOperator(task_id="mytask")
+
+ asset_op =
AssetModelOperation.collect({"test_add_asset_trigger_references_dag": dag})
+
+ with create_session() as session:
+ # Update `is_active` and `is_paused` properties from DAG
+ dags = session.query(DagModel).all()
+ for dag in dags:
+ dag.is_active = is_active
+ dag.is_paused = is_paused
+ session.commit()
+
+ orm_assets = asset_op.add_assets(session=session)
+ # Create AssetActive objects from assets. It is usually done in
the scheduler
+ for asset in orm_assets.values():
+ session.add(AssetActive.for_asset(asset))
+ session.commit()
+
+ asset_op.add_asset_trigger_references(orm_assets, session=session)
+
+ session.commit()
Review Comment:
Do we need all the commits above? Can we just keep this final one? Not sure
the speed impact :shrug:
##########
airflow/dag_processing/collection.py:
##########
@@ -435,14 +456,17 @@ def add_asset_trigger_references(
refs_to_add: dict[tuple[str, str], set[str]] = {}
refs_to_remove: dict[tuple[str, str], set[str]] = {}
triggers: dict[str, BaseTrigger] = {}
+ active_assets = _find_active_assets(self.assets.keys(),
session=session)
Review Comment:
Do we know the DB impact of this? I'm not sure how often the
`add_asset_trigger_references` method is called, but it now runs a query with a
few joins each time it's called.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]