dstandish commented on code in PR #58289:
URL: https://github.com/apache/airflow/pull/58289#discussion_r2530000995
##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -243,13 +284,77 @@ def _queue_dagruns(cls, asset_id: int, dags_to_queue:
set[DagModel], session: Se
# "fallback" to running this in a nested transaction. This is needed
# so that the adding of these rows happens in the same transaction
# where `ti.state` is changed.
- if not dags_to_queue:
- return
-
if get_dialect_name(session) == "postgresql":
return cls._postgres_queue_dagruns(asset_id, dags_to_queue,
session)
return cls._slow_path_queue_dagruns(asset_id, dags_to_queue, session)
+ @classmethod
+ def _queue_partitioned_dags(
+ cls,
+ asset_id: int,
+ all_dags: set[DagModel],
+ event: AssetEvent,
+ partition_key: str,
+ session: Session,
+ ) -> list[DagModel]:
+ # todo: AIP-76 there may be a better way to identify that timetable is
partition-driven
+ partition_dags = [x for x in all_dags if x.timetable_summary ==
"Partitioned Asset"]
Review Comment:
I realized that my question wasn't clear
What I am asking about is, how to identify which timetables are
"partitioned". Since at this point we only have DagModel, we don't have the
full timetable. To get that we need to load the serdag. So what I'm doing is
using timetable summary to filter out the non-partitioned dags. Having done
that, now with the partitioned dags in hand, for each of them, we load the
serdag and do the partition mapping.
My question is, is using timetable summary robust enough? Should we instead
just load the serdag and check inheritance? Or add some other identifier
somehow?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]