uranusjr commented on code in PR #29094:
URL: https://github.com/apache/airflow/pull/29094#discussion_r1149012834
##########
airflow/jobs/scheduler_job.py:
##########
@@ -219,28 +231,27 @@ def is_alive(self, grace_multiplier: float | None = None)
-> bool:
and (timezone.utcnow() - self.latest_heartbeat).total_seconds() <
scheduler_health_check_threshold
)
- def __get_concurrency_maps(
- self, states: list[TaskInstanceState], session: Session
- ) -> tuple[DefaultDict[str, int], DefaultDict[tuple[str, str], int]]:
+ def __get_concurrency_maps(self, states: list[TaskInstanceState], session:
Session) -> ConcurrencyMap:
"""
Get the concurrency maps.
:param states: List of states to query for
- :return: A map from (dag_id, task_id) to # of task instances and
- a map from (dag_id, task_id) to # of task instances in the given
state list
+ :return: A map from (dag_id, task_id) to # of task instances, a map
from (dag_id, task_id)
+ to # of task instances in the given state list and a map from
(dag_id, run_id, task_id)
+ to # of task instances in the given state list in the each DAG run
Review Comment:
It seems easier to document this in `ConcurrencyMap`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]