Copilot commented on code in PR #64997:
URL: https://github.com/apache/airflow/pull/64997#discussion_r3066465827


##########
providers/celery/src/airflow/providers/celery/executors/celery_executor.py:
##########
@@ -39,6 +39,8 @@
 
 from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.executors.base_executor import BaseExecutor
+from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstancekey import TaskInstanceKey
 from airflow.providers.celery.executors import (

Review Comment:
   Importing ORM models (TaskInstance/TaskInstanceKey) at module import time 
makes `airflow.providers.celery.executors.celery_executor` heavier to import. 
This module is imported by Celery CLI commands to access `app` (see 
`providers/celery/.../celery_command.py`), and the module already uses 
`__getattr__` to avoid import-time costs. Consider lazily importing these 
models inside `_persist_task_external_executor_id` (or another narrow code 
path) so CLI/worker paths that only need `app` don't pay the ORM import cost / 
risk DB initialization side effects.



##########
providers/celery/src/airflow/providers/celery/executors/celery_executor.py:
##########
@@ -225,12 +225,39 @@ def _send_workloads(self, workload_tuples_to_send: 
Sequence[WorkloadInCelery]):
                 result.backend = cached_celery_backend
                 self.running.add(key)
                 self.workloads[key] = result
+                # Persist the Celery task_id before scheduler event handling 
so a replacement
+                # scheduler can still adopt the task after a crash/restart.
+                self._persist_task_external_executor_id(key, result.task_id)
 
                 # Store the Celery task_id (workload execution ID) in the 
event buffer. This will get "overwritten" if the task
                 # has another event, but that is fine, because the only other 
events are success/failed at
                 # which point we don't need the ID anymore anyway.
                 self.event_buffer[key] = (TaskInstanceState.QUEUED, 
result.task_id)
 
+    def _persist_task_external_executor_id(self, key: WorkloadKey, task_id: 
str) -> None:
+        """Persist Celery task ids for task workloads so they survive 
scheduler restarts."""
+        from sqlalchemy import update
+
+        from airflow.utils.session import create_session
+
+        if not isinstance(key, TaskInstanceKey):
+            return
+
+        with create_session() as session:
+            result = session.execute(
+                update(TaskInstance)
+                .where(
+                    TaskInstance.dag_id == key.dag_id,

Review Comment:
   If the DB update/commit fails here (e.g., transient DB outage/lock), the 
exception will propagate even though the Celery task has already been 
successfully sent. That can break the scheduler/executor loop and leave the 
task in-flight but not tracked. Consider catching SQLAlchemy/DB exceptions 
around this block, logging at warning/debug, and continuing so task submission 
isn’t treated as failed solely due to the best-effort persistence step.



##########
providers/celery/tests/unit/celery/executors/test_celery_executor.py:
##########
@@ -285,6 +285,45 @@ def test_try_adopt_task_instances(self, 
clean_dags_dagruns_and_dagbundles, testi
         assert executor.workloads == {key_1: AsyncResult("231"), key_2: 
AsyncResult("232")}
         assert not_adopted_tis == []
 
+    @pytest.mark.backend("mysql", "postgres")
+    def test_send_workloads_persists_external_executor_id(
+        self, clean_dags_dagruns_and_dagbundles, testing_dag_bundle
+    ):
+        from sqlalchemy import select
+
+        from airflow.utils.session import create_session
+

Review Comment:
   These imports are inside the test function but don’t appear to be 
conditional/lazy. Moving them to the module import section keeps test code 
consistent and avoids repeated imports when the test is collected/executed 
multiple times.



##########
providers/celery/src/airflow/providers/celery/executors/celery_executor.py:
##########
@@ -225,12 +225,39 @@ def _send_workloads(self, workload_tuples_to_send: 
Sequence[WorkloadInCelery]):
                 result.backend = cached_celery_backend
                 self.running.add(key)
                 self.workloads[key] = result
+                # Persist the Celery task_id before scheduler event handling 
so a replacement
+                # scheduler can still adopt the task after a crash/restart.
+                self._persist_task_external_executor_id(key, result.task_id)
 

Review Comment:
   `_persist_task_external_executor_id` opens and commits a new DB session for 
every workload in `_send_workloads`. On busy schedulers this adds extra 
per-task latency and DB load (in addition to the existing event-buffer 
reconciliation write). Consider batching these updates (e.g., collect task 
keys/task_ids and run a single session/transaction per `_send_workloads` call, 
or use an executemany-style update) to reduce overhead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to