pdellarciprete commented on issue #57618:
URL: https://github.com/apache/airflow/issues/57618#issuecomment-3654781912

   Hello @ephraimbuddy, 
   I tested the patch #59234 and it looks good so far. I no longer see multiple 
schedulers picking the same task concurrently, and I haven’t noticed any 
regressions in other scenarios.
   
   However, I’ve observed another unexpected behavior that seems to cause 
wasted task attempts.
   Below is a concrete example captured from the logs, summarized in a timeline:
   
   | Time (UTC) | Scheduler ID | Action / Source | Try / Run Status | 
Significance of Event |
   
|------------|--------------|-----------------|------------------|-----------------------|
   | 04:42:29.146 | A (nzlm5) | Add task | TI: Try 1 (→ QUEUED) | Scheduler A 
dispatches the first attempt. |
   | 04:42:29.886 | A | Received executor event with state queued | TI: Try 1 
(QUEUED) | A confirms the task is in the executor queue. |
   | 04:43:58.258 | B (rkv75) | Reset the following 12 orphaned TaskInstances | 
TI: Try 1 (→ SCHEDULED/None) | **Critical Failure 1:** Scheduler B/C 
incorrectly declared A's task (Try 1) as "orphaned" and reset its state, making 
it available for rescheduling. |
   | 04:44:01.747 | B (rkv75) | 7 tasks up for execution | TI: Try 1 
(SCHEDULED) | B picks up the just-reset task as a new job candidate. |
   | 04:44:01.790 | B (rkv75) | Add task | TI: Try 2 (→ QUEUED) | **Critical 
Failure 2:** B re-schedules the task, creating a concurrent run (Try 2 is now 
running/queued). |
   | 04:44:49.985 | Worker Pod (w5n0w48v) | Executing workload | TI: Try 2 
(RUNNING) | Worker starts Try 2. |
   | 04:45:19.337 | A (nzlm5) | Marking run successful | DagRun: SUCCESS | 
**Critical Failure 3:** The DagRun completes successfully while Try 2 is 
running and Try 1 is still technically running/failing in the background. |
   | 04:45:59.754 | Worker Pod (k01e9t75) | Executing workload | TI: Try 1 
(RUNNING) | The worker pod for the original Try 1 finally starts, minutes after 
the DagRun was marked successful. |
   | 04:46:04.784 | A | Received executor event with state failed | TI: Try 1 
(FAILED) | Scheduler A receives the failure event for the originally dispatched 
task, further confirming the state confusion. |
   
   ### Observed behavior
   
   In short:
   
   - A task attempt (Try 1) is queued and acknowledged by Scheduler A
   - Another scheduler later "incorrectly" resets it as orphaned
   - This leads to a second attempt (Try 2) being scheduled and executed
   - The DagRun is marked successful while an attempt is still running
   - The original attempt eventually runs and fails after the DagRun has 
already succeeded
   
   This doesn’t reintroduce the original concurrency issue, but it does result 
in:
   
   - Unnecessary retries
   - Extra executor / worker usage
   - Confusing final task state transitions
   
   For completeness, here is the full log excerpt showing the sequence above:
   
   ```
   scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
   2025-12-15T04:42:29.146414Z [info     ] Add task 
TaskInstanceKey(dag_id='sadp_dag_128', task_id='task_5', 
run_id='scheduled__2025-12-15T03:15:00+00:00', try_number=1, map_index=-1) with 
command [ExecuteTask(token='eyJ***', 
ti=TaskInstance(id=UUID('019b2045-cf95-7196-9df7-4fce08dbe45a'), 
dag_version_id=UUID('019b1df2-bb41-7ada-b061-87fe50795ffe'), task_id='task_5', 
dag_id='sadp_dag_128', run_id='scheduled__2025-12-15T03:15:00+00:00', 
try_number=1, map_index=-1, pool_slots=1, queue='default', priority_weight=1, 
executor_config={}, parent_context_carrier={}, context_carrier={}), 
dag_rel_path=PosixPath('sadp_dag_generator.py'), 
bundle_info=BundleInfo(name='dags-folder', version=None), 
log_path='dag_id=sadp_dag_128/run_id=scheduled__2025-12-15T03:15:00+00:00/task_id=task_5/attempt=1.log',
 type='ExecuteTask')] 
[airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
 loc=kubernetes_executor.py:266
   scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
   2025-12-15T04:42:29.886794Z [info     ] Received executor event with state 
queued for task instance TaskInstanceKey(dag_id='sadp_dag_128', 
task_id='task_5', run_id='scheduled__2025-12-15T03:15:00+00:00', try_number=1, 
map_index=-1) [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:818
   scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
   2025-12-15T04:42:29.919003Z [info     ] Setting external_executor_id for 
<TaskInstance: sadp_dag_128.task_5 scheduled__2025-12-15T03:15:00+00:00 
[queued]> to 14061 [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:854
   scheduler B (workflow-manager-scheduler-77d8bd4d7f-rkv75)
   2025-12-15T04:43:48.198405Z [info     ] Adopting or resetting orphaned tasks 
for active dag runs [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:2276
   scheduler C (workflow-manager-scheduler-77d8bd4d7f-rkv75)
   2025-12-15T04:43:58.258654Z [info     ] Reset the following 12 orphaned 
TaskInstances:    <TaskInstance: sadp_dag_128.task_5 
scheduled__2025-12-15T03:15:00+00:00 [queued]> ... (11 more tasks) 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:2342
   scheduler B (workflow-manager-scheduler-77d8bd4d7f-rkv75)
   2025-12-15T04:44:01.747212Z [info     ] 7 tasks up for execution: ... 
<TaskInstance: sadp_dag_128.task_5 scheduled__2025-12-15T03:15:00+00:00 
[scheduled]> [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:444
   scheduler B (workflow-manager-scheduler-77d8bd4d7f-rkv75)
   2025-12-15T04:44:01.790234Z [info     ] Add task 
TaskInstanceKey(dag_id='sadp_dag_128', task_id='task_5', 
run_id='scheduled__2025-12-15T03:15:00+00:00', try_number=2, map_index=-1) with 
command [ExecuteTask(token='eyJ***', 
ti=TaskInstance(id=UUID('019b2045-cf95-7196-9df7-4fce08dbe45a'), 
dag_version_id=UUID('019b1df2-bb41-7ada-b061-87fe50795ffe'), task_id='task_5', 
dag_id='sadp_dag_128', run_id='scheduled__2025-12-15T03:15:00+00:00', 
try_number=2, map_index=-1, pool_slots=1, queue='default', priority_weight=1, 
executor_config={}, parent_context_carrier={}, context_carrier={}), 
dag_rel_path=PosixPath('sadp_dag_generator.py'), 
bundle_info=BundleInfo(name='dags-folder', version=None), 
log_path='dag_id=sadp_dag_128/run_id=scheduled__2025-12-15T03:15:00+00:00/task_id=task_5/attempt=2.log',
 type='ExecuteTask')] 
[airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
 loc=kubernetes_executor.py:266
   scheduler B (workflow-manager-scheduler-77d8bd4d7f-rkv75)
   2025-12-15T04:44:02.860218Z [info     ] Received executor event with state 
queued for task instance TaskInstanceKey(dag_id='sadp_dag_128', 
task_id='task_5', run_id='scheduled__2025-12-15T03:15:00+00:00', try_number=2, 
map_index=-1) [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:818
   scheduler B (workflow-manager-scheduler-77d8bd4d7f-rkv75)
   2025-12-15T04:44:02.933014Z [info     ] Setting external_executor_id for 
<TaskInstance: sadp_dag_128.task_5 scheduled__2025-12-15T03:15:00+00:00 
[queued]> to 14069 [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:854
   scheduler B (workflow-manager-scheduler-77d8bd4d7f-rkv75)
   2025-12-15T04:44:13.348541Z [info     ] Creating kubernetes pod for job is 
TaskInstanceKey(dag_id='sadp_dag_128', task_id='task_5', 
run_id='scheduled__2025-12-15T03:15:00+00:00', try_number=2, map_index=-1), 
with pod name sadp-dag-128-task-5-w5n0w48v, annotations: <omitted> 
[airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler]
 loc=kubernetes_executor_utils.py:590
   worker pod (sadp-dag-128-task-5-w5n0w48v) 
   {"timestamp":"2025-12-15T04:44:49.985783Z","level":"info","event":"Executing 
workload","workload":"ExecuteTask(token='eyJ***', 
ti=TaskInstance(id=UUID('019b2045-cf95-7196-9df7-4fce08dbe45a'), 
dag_version_id=UUID('019b1df2-bb41-7ada-b061-87fe50795ffe'), task_id='task_5', 
dag_id='sadp_dag_128', run_id='scheduled__2025-12-15T03:15:00+00:00', 
try_number=2, map_index=-1, pool_slots=1, queue='default', priority_weight=1, 
executor_config=None, parent_context_carrier={}, context_carrier={}), 
dag_rel_path=PurePosixPath('sadp_dag_generator.py'), 
bundle_info=BundleInfo(name='dags-folder', version=None), 
log_path='dag_id=sadp_dag_128/run_id=scheduled__2025-12-15T03:15:00+00:00/task_id=task_5/attempt=2.log',
 
type='ExecuteTask')","logger":"__main__","filename":"execute_workload.py","lineno":56}.
         
   scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
   2025-12-15T04:45:19.337468Z [info     ] Marking run <DagRun sadp_dag_128 @ 
2025-12-15 03:15:00+00:00: scheduled__2025-12-15T03:15:00+00:00, state:running, 
queued_at: 2025-12-15 04:30:04.672690+00:00. run_type: scheduled> successful 
[airflow.models.dagrun.DagRun] loc=dagrun.py:1207
   scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
   2025-12-15T04:45:19.337899Z [info     ] DagRun Finished: 
dag_id=sadp_dag_128, logical_date=2025-12-15 03:15:00+00:00, 
run_id=scheduled__2025-12-15T03:15:00+00:00, run_start_date=2025-12-15 
04:30:04.739972+00:00, run_end_date=2025-12-15 04:45:19.337690+00:00, 
run_duration=914.597718, state=success, run_type=scheduled, 
data_interval_start=2025-12-15 03:15:00+00:00, data_interval_end=2025-12-15 
04:15:00+00:00, [airflow.models.dagrun.DagRun] loc=dagrun.py:1277
   scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
   2025-12-15T04:45:20.037421Z [info     ] Applying patches to pod with dag_id: 
sadp_dag_128 [root] loc=entrypoint.py:99
   scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
   2025-12-15T04:45:20.038291Z [info     ] Creating kubernetes pod for job is 
TaskInstanceKey(dag_id='sadp_dag_128', task_id='task_5', 
run_id='scheduled__2025-12-15T03:15:00+00:00', try_number=1, map_index=-1), 
with pod name sadp-dag-128-task-5-k01e9t75, annotations: <omitted> 
[airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler]
 loc=kubernetes_executor_utils.py:590
   worker pod (sadp-dag-128-task-5-k01e9t75) 
   {"timestamp":"2025-12-15T04:45:59.754063Z","level":"info","event":"Executing 
workload","workload":"ExecuteTask(token='eyJ***', 
ti=TaskInstance(id=UUID('019b2045-cf95-7196-9df7-4fce08dbe45a'), 
dag_version_id=UUID('019b1df2-bb41-7ada-b061-87fe50795ffe'), task_id='task_5', 
dag_id='sadp_dag_128', run_id='scheduled__2025-12-15T03:15:00+00:00', 
try_number=1, map_index=-1, pool_slots=1, queue='default', priority_weight=1, 
executor_config=None, parent_context_carrier={}, context_carrier={}), 
dag_rel_path=PurePosixPath('sadp_dag_generator.py'), 
bundle_info=BundleInfo(name='dags-folder', version=None), 
log_path='dag_id=sadp_dag_128/run_id=scheduled__2025-12-15T03:15:00+00:00/task_id=task_5/attempt=1.log',
 
type='ExecuteTask')","logger":"__main__","filename":"execute_workload.py","lineno":56}
   scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
   2025-12-15T04:46:02.705511Z [warning  ] Event: sadp-dag-128-task-5-k01e9t75 
Failed, task: sadp_dag_128.task_5.1, annotations: <omitted> 
[airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher]
 loc=kubernetes_executor_utils.py:304
   scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
   2025-12-15T04:46:04.782995Z [info     ] Changing state of 
KubernetesResults(key=TaskInstanceKey(dag_id='sadp_dag_128', task_id='task_5', 
run_id='scheduled__2025-12-15T03:15:00+00:00', try_number=1, map_index=-1), 
state=<TaskInstanceState.FAILED: 'failed'>, 
pod_name='sadp-dag-128-task-5-k01e9t75', 
namespace='workflow-manager-experimentation-priority', 
resource_version='1765773962632719016', failure_details={'pod_status': 
'Failed', 'pod_reason': None, 'pod_message': None, 'container_state': 
'terminated', 'container_reason': 'Error', 'container_message': None, 
'exit_code': 1, 'container_type': 'main', 'container_name': 'base'}) to failed 
[airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
 loc=kubernetes_executor.py:332
   scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
   2025-12-15T04:46:04.783295Z [warning  ] Task sadp_dag_128.task_5.1 failed in 
pod workflow-manager-experimentation-priority/sadp-dag-128-task-5-k01e9t75. Pod 
phase: Failed, reason: None, message: None, container_type: main, 
container_name: base, container_state: terminated, container_reason: Error, 
container_message: None, exit_code: 1 
[airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
 loc=kubernetes_executor.py:445
   scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
   2025-12-15T04:46:04.784337Z [info     ] Received executor event with state 
failed for task instance TaskInstanceKey(dag_id='sadp_dag_128', 
task_id='task_5', run_id='scheduled__2025-12-15T03:15:00+00:00', try_number=1, 
map_index=-1) [airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:818
   scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
   2025-12-15T04:46:04.811357Z [info     ] TaskInstance Finished: 
dag_id=sadp_dag_128, task_id=task_5, 
run_id=scheduled__2025-12-15T03:15:00+00:00, map_index=-1, 
run_start_date=2025-12-15 04:44:50.091613+00:00, run_end_date=2025-12-15 
04:45:09.418923+00:00, run_duration=19.32731, state=success, 
executor=KubernetesExecutor(parallelism=128), executor_state=failed, 
try_number=1, max_tries=3, pool=default_pool, queue=default, priority_weight=1, 
operator=PythonOperator, queued_dttm=2025-12-15 04:44:01.754200+00:00, 
scheduled_dttm=2025-12-15 04:44:01.273310+00:00,queued_by_job_id=14069, pid=14 
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner] 
loc=scheduler_job_runner.py:864
   ```
   
   Let me know if this aligns with expected behavior or if you’d like me to 
test a specific configuration or scenario. Happy to dig further if useful.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to