uranusjr commented on code in PR #58289:
URL: https://github.com/apache/airflow/pull/58289#discussion_r2536789080
##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -243,13 +284,77 @@ def _queue_dagruns(cls, asset_id: int, dags_to_queue:
set[DagModel], session: Se
# "fallback" to running this in a nested transaction. This is needed
# so that the adding of these rows happens in the same transaction
# where `ti.state` is changed.
- if not dags_to_queue:
- return
-
if get_dialect_name(session) == "postgresql":
return cls._postgres_queue_dagruns(asset_id, dags_to_queue,
session)
return cls._slow_path_queue_dagruns(asset_id, dags_to_queue, session)
+ @classmethod
+ def _queue_partitioned_dags(
+ cls,
+ asset_id: int,
+ all_dags: set[DagModel],
+ event: AssetEvent,
+ partition_key: str,
+ session: Session,
+ ) -> list[DagModel]:
+ # todo: AIP-76 there may be a better way to identify that timetable is
partition-driven
+ partition_dags = [x for x in all_dags if x.timetable_summary ==
"Partitioned Asset"]
Review Comment:
I seel you access the timetable from serdag below anyway. So maybe there’s a
way to just merge this logic somewhere below instead? It would be a bit less
performant (need to deserialise unrelated dags) though. Alternatively, maybe we
can just add a bool flag on DagModel to indicate if the dag has a
PartitionedAssetTimetable?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]