o-nikolas commented on code in PR #68108:
URL: https://github.com/apache/airflow/pull/68108#discussion_r3376007188
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -394,36 +397,35 @@ def _get_team_names_for_dag_ids(
if not dag_ids:
return {}
- try:
- # Query all team names for the given DAG IDs in a single query
- query_results = session.execute(
- select(DagModel.dag_id, Team.name)
- .join(DagBundleModel.teams) # Join Team to DagBundleModel via
association table
- .join(
- DagModel, DagModel.bundle_name == DagBundleModel.name
- ) # Join DagBundleModel to DagModel
- .where(DagModel.dag_id.in_(dag_ids))
- ).all()
-
- # Create mapping from results
- dag_id_to_team_name = {dag_id: team_name for dag_id, team_name in
query_results}
-
- # Ensure all requested dag_ids are in the result (with None for
those not found)
- result = {dag_id: dag_id_to_team_name.get(dag_id) for dag_id in
dag_ids}
+ missing = [dag_id for dag_id in dag_ids if dag_id not in
self._dag_id_to_team_name]
+ if missing:
+ try:
+ # Query all team names for the given DAG IDs in a single query
+ query_results = session.execute(
Review Comment:
FWIW: I think it's useful to cache for each scheduler loop, since there are
a few places during the loop where we need this association. But it should be
updated each loop that any particular Dag is getting scheduled. That way we
don't need to worry about TTL.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]