pdellarciprete commented on issue #57618:
URL: https://github.com/apache/airflow/issues/57618#issuecomment-3654781912
Hello @ephraimbuddy,
I tested the patch #59234 and it looks good so far. I no longer see multiple
schedulers picking the same task concurrently, and I haven’t noticed any
regressions in other scenarios.
However, I’ve observed another unexpected behavior that seems to cause
wasted task attempts.
Below is a concrete example captured from the logs, summarized in a timeline:
| Time (UTC) | Scheduler ID | Action / Source | Try / Run Status |
Significance of Event |
|------------|--------------|-----------------|------------------|-----------------------|
| 04:42:29.146 | A (nzlm5) | Add task | TI: Try 1 (→ QUEUED) | Scheduler A
dispatches the first attempt. |
| 04:42:29.886 | A | Received executor event with state queued | TI: Try 1
(QUEUED) | A confirms the task is in the executor queue. |
| 04:43:58.258 | B (rkv75) | Reset the following 12 orphaned TaskInstances |
TI: Try 1 (→ SCHEDULED/None) | **Critical Failure 1:** Scheduler B/C
incorrectly declared A's task (Try 1) as "orphaned" and reset its state, making
it available for rescheduling. |
| 04:44:01.747 | B (rkv75) | 7 tasks up for execution | TI: Try 1
(SCHEDULED) | B picks up the just-reset task as a new job candidate. |
| 04:44:01.790 | B (rkv75) | Add task | TI: Try 2 (→ QUEUED) | **Critical
Failure 2:** B re-schedules the task, creating a concurrent run (Try 2 is now
running/queued). |
| 04:44:49.985 | Worker Pod (w5n0w48v) | Executing workload | TI: Try 2
(RUNNING) | Worker starts Try 2. |
| 04:45:19.337 | A (nzlm5) | Marking run successful | DagRun: SUCCESS |
**Critical Failure 3:** The DagRun completes successfully while Try 2 is
running and Try 1 is still technically running/failing in the background. |
| 04:45:59.754 | Worker Pod (k01e9t75) | Executing workload | TI: Try 1
(RUNNING) | The worker pod for the original Try 1 finally starts, minutes after
the DagRun was marked successful. |
| 04:46:04.784 | A | Received executor event with state failed | TI: Try 1
(FAILED) | Scheduler A receives the failure event for the originally dispatched
task, further confirming the state confusion. |
### Observed behavior
In short:
- A task attempt (Try 1) is queued and acknowledged by Scheduler A
- Another scheduler later "incorrectly" resets it as orphaned
- This leads to a second attempt (Try 2) being scheduled and executed
- The DagRun is marked successful while an attempt is still running
- The original attempt eventually runs and fails after the DagRun has
already succeeded
This doesn’t reintroduce the original concurrency issue, but it does result
in:
- Unnecessary retries
- Extra executor / worker usage
- Confusing final task state transitions
For completeness, here is the full log excerpt showing the sequence above:
```
scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
2025-12-15T04:42:29.146414Z [info ] Add task
TaskInstanceKey(dag_id='sadp_dag_128', task_id='task_5',
run_id='scheduled__2025-12-15T03:15:00+00:00', try_number=1, map_index=-1) with
command [ExecuteTask(token='eyJ***',
ti=TaskInstance(id=UUID('019b2045-cf95-7196-9df7-4fce08dbe45a'),
dag_version_id=UUID('019b1df2-bb41-7ada-b061-87fe50795ffe'), task_id='task_5',
dag_id='sadp_dag_128', run_id='scheduled__2025-12-15T03:15:00+00:00',
try_number=1, map_index=-1, pool_slots=1, queue='default', priority_weight=1,
executor_config={}, parent_context_carrier={}, context_carrier={}),
dag_rel_path=PosixPath('sadp_dag_generator.py'),
bundle_info=BundleInfo(name='dags-folder', version=None),
log_path='dag_id=sadp_dag_128/run_id=scheduled__2025-12-15T03:15:00+00:00/task_id=task_5/attempt=1.log',
type='ExecuteTask')]
[airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
loc=kubernetes_executor.py:266
scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
2025-12-15T04:42:29.886794Z [info ] Received executor event with state
queued for task instance TaskInstanceKey(dag_id='sadp_dag_128',
task_id='task_5', run_id='scheduled__2025-12-15T03:15:00+00:00', try_number=1,
map_index=-1) [airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:818
scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
2025-12-15T04:42:29.919003Z [info ] Setting external_executor_id for
<TaskInstance: sadp_dag_128.task_5 scheduled__2025-12-15T03:15:00+00:00
[queued]> to 14061 [airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:854
scheduler B (workflow-manager-scheduler-77d8bd4d7f-rkv75)
2025-12-15T04:43:48.198405Z [info ] Adopting or resetting orphaned tasks
for active dag runs [airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:2276
scheduler C (workflow-manager-scheduler-77d8bd4d7f-rkv75)
2025-12-15T04:43:58.258654Z [info ] Reset the following 12 orphaned
TaskInstances: <TaskInstance: sadp_dag_128.task_5
scheduled__2025-12-15T03:15:00+00:00 [queued]> ... (11 more tasks)
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:2342
scheduler B (workflow-manager-scheduler-77d8bd4d7f-rkv75)
2025-12-15T04:44:01.747212Z [info ] 7 tasks up for execution: ...
<TaskInstance: sadp_dag_128.task_5 scheduled__2025-12-15T03:15:00+00:00
[scheduled]> [airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:444
scheduler B (workflow-manager-scheduler-77d8bd4d7f-rkv75)
2025-12-15T04:44:01.790234Z [info ] Add task
TaskInstanceKey(dag_id='sadp_dag_128', task_id='task_5',
run_id='scheduled__2025-12-15T03:15:00+00:00', try_number=2, map_index=-1) with
command [ExecuteTask(token='eyJ***',
ti=TaskInstance(id=UUID('019b2045-cf95-7196-9df7-4fce08dbe45a'),
dag_version_id=UUID('019b1df2-bb41-7ada-b061-87fe50795ffe'), task_id='task_5',
dag_id='sadp_dag_128', run_id='scheduled__2025-12-15T03:15:00+00:00',
try_number=2, map_index=-1, pool_slots=1, queue='default', priority_weight=1,
executor_config={}, parent_context_carrier={}, context_carrier={}),
dag_rel_path=PosixPath('sadp_dag_generator.py'),
bundle_info=BundleInfo(name='dags-folder', version=None),
log_path='dag_id=sadp_dag_128/run_id=scheduled__2025-12-15T03:15:00+00:00/task_id=task_5/attempt=2.log',
type='ExecuteTask')]
[airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
loc=kubernetes_executor.py:266
scheduler B (workflow-manager-scheduler-77d8bd4d7f-rkv75)
2025-12-15T04:44:02.860218Z [info ] Received executor event with state
queued for task instance TaskInstanceKey(dag_id='sadp_dag_128',
task_id='task_5', run_id='scheduled__2025-12-15T03:15:00+00:00', try_number=2,
map_index=-1) [airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:818
scheduler B (workflow-manager-scheduler-77d8bd4d7f-rkv75)
2025-12-15T04:44:02.933014Z [info ] Setting external_executor_id for
<TaskInstance: sadp_dag_128.task_5 scheduled__2025-12-15T03:15:00+00:00
[queued]> to 14069 [airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:854
scheduler B (workflow-manager-scheduler-77d8bd4d7f-rkv75)
2025-12-15T04:44:13.348541Z [info ] Creating kubernetes pod for job is
TaskInstanceKey(dag_id='sadp_dag_128', task_id='task_5',
run_id='scheduled__2025-12-15T03:15:00+00:00', try_number=2, map_index=-1),
with pod name sadp-dag-128-task-5-w5n0w48v, annotations: <omitted>
[airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler]
loc=kubernetes_executor_utils.py:590
worker pod (sadp-dag-128-task-5-w5n0w48v)
{"timestamp":"2025-12-15T04:44:49.985783Z","level":"info","event":"Executing
workload","workload":"ExecuteTask(token='eyJ***',
ti=TaskInstance(id=UUID('019b2045-cf95-7196-9df7-4fce08dbe45a'),
dag_version_id=UUID('019b1df2-bb41-7ada-b061-87fe50795ffe'), task_id='task_5',
dag_id='sadp_dag_128', run_id='scheduled__2025-12-15T03:15:00+00:00',
try_number=2, map_index=-1, pool_slots=1, queue='default', priority_weight=1,
executor_config=None, parent_context_carrier={}, context_carrier={}),
dag_rel_path=PurePosixPath('sadp_dag_generator.py'),
bundle_info=BundleInfo(name='dags-folder', version=None),
log_path='dag_id=sadp_dag_128/run_id=scheduled__2025-12-15T03:15:00+00:00/task_id=task_5/attempt=2.log',
type='ExecuteTask')","logger":"__main__","filename":"execute_workload.py","lineno":56}.
scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
2025-12-15T04:45:19.337468Z [info ] Marking run <DagRun sadp_dag_128 @
2025-12-15 03:15:00+00:00: scheduled__2025-12-15T03:15:00+00:00, state:running,
queued_at: 2025-12-15 04:30:04.672690+00:00. run_type: scheduled> successful
[airflow.models.dagrun.DagRun] loc=dagrun.py:1207
scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
2025-12-15T04:45:19.337899Z [info ] DagRun Finished:
dag_id=sadp_dag_128, logical_date=2025-12-15 03:15:00+00:00,
run_id=scheduled__2025-12-15T03:15:00+00:00, run_start_date=2025-12-15
04:30:04.739972+00:00, run_end_date=2025-12-15 04:45:19.337690+00:00,
run_duration=914.597718, state=success, run_type=scheduled,
data_interval_start=2025-12-15 03:15:00+00:00, data_interval_end=2025-12-15
04:15:00+00:00, [airflow.models.dagrun.DagRun] loc=dagrun.py:1277
scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
2025-12-15T04:45:20.037421Z [info ] Applying patches to pod with dag_id:
sadp_dag_128 [root] loc=entrypoint.py:99
scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
2025-12-15T04:45:20.038291Z [info ] Creating kubernetes pod for job is
TaskInstanceKey(dag_id='sadp_dag_128', task_id='task_5',
run_id='scheduled__2025-12-15T03:15:00+00:00', try_number=1, map_index=-1),
with pod name sadp-dag-128-task-5-k01e9t75, annotations: <omitted>
[airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler]
loc=kubernetes_executor_utils.py:590
worker pod (sadp-dag-128-task-5-k01e9t75)
{"timestamp":"2025-12-15T04:45:59.754063Z","level":"info","event":"Executing
workload","workload":"ExecuteTask(token='eyJ***',
ti=TaskInstance(id=UUID('019b2045-cf95-7196-9df7-4fce08dbe45a'),
dag_version_id=UUID('019b1df2-bb41-7ada-b061-87fe50795ffe'), task_id='task_5',
dag_id='sadp_dag_128', run_id='scheduled__2025-12-15T03:15:00+00:00',
try_number=1, map_index=-1, pool_slots=1, queue='default', priority_weight=1,
executor_config=None, parent_context_carrier={}, context_carrier={}),
dag_rel_path=PurePosixPath('sadp_dag_generator.py'),
bundle_info=BundleInfo(name='dags-folder', version=None),
log_path='dag_id=sadp_dag_128/run_id=scheduled__2025-12-15T03:15:00+00:00/task_id=task_5/attempt=1.log',
type='ExecuteTask')","logger":"__main__","filename":"execute_workload.py","lineno":56}
scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
2025-12-15T04:46:02.705511Z [warning ] Event: sadp-dag-128-task-5-k01e9t75
Failed, task: sadp_dag_128.task_5.1, annotations: <omitted>
[airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher]
loc=kubernetes_executor_utils.py:304
scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
2025-12-15T04:46:04.782995Z [info ] Changing state of
KubernetesResults(key=TaskInstanceKey(dag_id='sadp_dag_128', task_id='task_5',
run_id='scheduled__2025-12-15T03:15:00+00:00', try_number=1, map_index=-1),
state=<TaskInstanceState.FAILED: 'failed'>,
pod_name='sadp-dag-128-task-5-k01e9t75',
namespace='workflow-manager-experimentation-priority',
resource_version='1765773962632719016', failure_details={'pod_status':
'Failed', 'pod_reason': None, 'pod_message': None, 'container_state':
'terminated', 'container_reason': 'Error', 'container_message': None,
'exit_code': 1, 'container_type': 'main', 'container_name': 'base'}) to failed
[airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
loc=kubernetes_executor.py:332
scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
2025-12-15T04:46:04.783295Z [warning ] Task sadp_dag_128.task_5.1 failed in
pod workflow-manager-experimentation-priority/sadp-dag-128-task-5-k01e9t75. Pod
phase: Failed, reason: None, message: None, container_type: main,
container_name: base, container_state: terminated, container_reason: Error,
container_message: None, exit_code: 1
[airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor]
loc=kubernetes_executor.py:445
scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
2025-12-15T04:46:04.784337Z [info ] Received executor event with state
failed for task instance TaskInstanceKey(dag_id='sadp_dag_128',
task_id='task_5', run_id='scheduled__2025-12-15T03:15:00+00:00', try_number=1,
map_index=-1) [airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:818
scheduler A (workflow-manager-scheduler-77d8bd4d7f-nzlm5)
2025-12-15T04:46:04.811357Z [info ] TaskInstance Finished:
dag_id=sadp_dag_128, task_id=task_5,
run_id=scheduled__2025-12-15T03:15:00+00:00, map_index=-1,
run_start_date=2025-12-15 04:44:50.091613+00:00, run_end_date=2025-12-15
04:45:09.418923+00:00, run_duration=19.32731, state=success,
executor=KubernetesExecutor(parallelism=128), executor_state=failed,
try_number=1, max_tries=3, pool=default_pool, queue=default, priority_weight=1,
operator=PythonOperator, queued_dttm=2025-12-15 04:44:01.754200+00:00,
scheduled_dttm=2025-12-15 04:44:01.273310+00:00,queued_by_job_id=14069, pid=14
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:864
```
Let me know if this aligns with expected behavior or if you’d like me to
test a specific configuration or scenario. Happy to dig further if useful.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]