kaxil commented on code in PR #67127:
URL: https://github.com/apache/airflow/pull/67127#discussion_r3262851195


##########
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py:
##########
@@ -160,6 +160,21 @@ def create_celery_app(team_conf: ExecutorConf | 
AirflowConfigParser) -> Celery:
     return celery_app
 
 
+@lru_cache(maxsize=8)
+def _get_celery_app_for_workload(team_name: str | None) -> Celery:
+    """Return a subprocess-local Celery app cached by team name for task 
publishing."""

Review Comment:
   The "subprocess-local" claim only holds when `_send_workloads_to_celery` 
actually uses the `ProcessPoolExecutor` branch. In 
`celery_executor.py:243-245`, the single-workload (or `sync_parallelism=1`) 
path runs `send_workload_to_executor` inline via `map(...)` in the scheduler 
process. In that case this cache lives in the scheduler itself and keeps the 
cached Celery apps' broker connections open there too. Worth either updating 
the docstring to reflect "scheduler process or publisher subprocess, depending 
on path", or being explicit that this is intentional.



##########
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py:
##########
@@ -160,6 +160,21 @@ def create_celery_app(team_conf: ExecutorConf | 
AirflowConfigParser) -> Celery:
     return celery_app
 
 
+@lru_cache(maxsize=8)
+def _get_celery_app_for_workload(team_name: str | None) -> Celery:
+    """Return a subprocess-local Celery app cached by team name for task 
publishing."""
+    if TYPE_CHECKING:
+        _conf: ExecutorConf | AirflowConfigParser

Review Comment:
   This `if TYPE_CHECKING: _conf: ExecutorConf | AirflowConfigParser` is a 
copy-paste leftover from the old `send_workload_to_executor`. The annotation 
never escapes this function and `_conf` already gets a concrete type from the 
if/else assignment below, so this whole two-line block can be dropped.



##########
providers/celery/tests/unit/celery/executors/test_celery_executor.py:
##########
@@ -77,6 +77,13 @@
 pytestmark = pytest.mark.db_test
 
 
[email protected](autouse=True)
+def clear_cached_workload_celery_apps():
+    celery_executor_utils._get_celery_app_for_workload.cache_clear()
+    yield
+    celery_executor_utils._get_celery_app_for_workload.cache_clear()

Review Comment:
   The two new tests cover `team-a` reuse and `team-a` vs `team-b` separation, 
but not `team_name=None`, which is the path the vast majority of deployments 
hit (no multi-team config). A third parametrized case asserting `None` also 
hits the cache would catch a regression where `lru_cache` started treating 
`None` specially.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to