dstandish commented on code in PR #58289:
URL: https://github.com/apache/airflow/pull/58289#discussion_r2577515092
##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -243,15 +283,94 @@ def _queue_dagruns(cls, asset_id: int, dags_to_queue:
set[DagModel], session: Se
# "fallback" to running this in a nested transaction. This is needed
# so that the adding of these rows happens in the same transaction
# where `ti.state` is changed.
- if not dags_to_queue:
- return
-
if get_dialect_name(session) == "postgresql":
- return cls._postgres_queue_dagruns(asset_id, dags_to_queue,
session)
- return cls._slow_path_queue_dagruns(asset_id, dags_to_queue, session)
+ return cls._queue_dagruns_nonpartitioned_postgres(asset_id,
dags_to_queue, session)
+ return cls._queue_dagruns_nonpartitioned_slow_path(asset_id,
dags_to_queue, session)
@classmethod
- def _slow_path_queue_dagruns(cls, asset_id: int, dags_to_queue:
set[DagModel], session: Session) -> None:
+ def _queue_partitioned_dags(
+ cls,
+ asset_id: int,
+ all_dags: set[DagModel],
+ event: AssetEvent,
+ partition_key: str | None,
+ session: Session,
+ ) -> list[DagModel]:
+ # TODO: AIP-76 there may be a better way to identify that timetable is
partition-driven
+ partition_dags = [x for x in all_dags if x.timetable_summary ==
"Partitioned Asset"]
+ if partition_dags and not partition_key:
+ # TODO: AIP-76 how to best ensure users can see this? Probably add
Log record.
+ log.warning(
+ "Listening dags are partition-aware but run has no partition
key",
+ listening_dags=[x.dag_id for x in partition_dags],
+ asset_id=asset_id,
+ run_id=event.source_run_id,
+ dag_id=event.source_dag_id,
+ task_id=event.source_task_id,
+ )
Review Comment:
sure
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]