ft2898 commented on issue #52276:
URL: https://github.com/apache/airflow/issues/52276#issuecomment-3035389846
@sjyangkevin
I have a DAG named example_bash_operator1. Initially, I added a task with
task_id as runme_3501. While the runme_3501 task was running, I added another
task with task_id as runme_3502. However, I observed that when the runme_3501
task finished, it marked runme_3502 as removed. I tracked this behavior to the
following piece of code:
```
def _filter_tis_and_exclude_removed(dag: DAG, tis: list[TI]) -> Iterable[TI]:
"""Populate ``ti.task`` while excluding those missing one, marking them
as REMOVED."""
for ti in tis:
try:
ti.task = dag.get_task(ti.task_id)
except TaskNotFound:
if ti.state != TaskInstanceState.REMOVED:
self.log.error("Failed to get task for ti %s. Marking it as
removed.", ti)
ti.state = TaskInstanceState.REMOVED
session.flush()
else:
yield ti
```
It seems that tasks are being cached during the execution of runme_3501.
When the task completes, Airflow verifies the current state of all tasks in the
DAG file. If it finds any task_id that do not match the cached data, an
exception is raised, and the associated task is marked as removed. This change
in the database state for runme_3502 consequently causes it to be rescheduled.
Below are the logs from runme_3501. Please pay special attention to the
critical log line:
> [2025-07-04, 17:55:34 CST] {dagrun.py:977} ERROR - Failed to get task for
ti . Marking it as removed.
Full Logs:
> [2025-07-04, 17:44:47 CST] {local_task_job_runner.py:123} ▼ Pre task
execution logs
> [2025-07-04, 17:44:47 CST] {taskinstance.py:2614} INFO - Dependencies all
met for dep_context=non-requeueable deps ti=
> [2025-07-04, 17:44:48 CST] {taskinstance.py:2614} INFO - Dependencies all
met for dep_context=requeueable deps ti=
> [2025-07-04, 17:44:48 CST] {taskinstance.py:2867} INFO - Starting attempt
1 of 1
> [2025-07-04, 17:44:48 CST] {taskinstance.py:2890} INFO - Executing on
2025-07-02 16:00:00+00:00
> [2025-07-04, 17:44:48 CST] {standard_task_runner.py:72} INFO - Started
process 91058 to run task
> [2025-07-04, 17:44:48 CST] {standard_task_runner.py:104} INFO - Running:
['airflow', 'tasks', 'run', 'example_bash_operator1', 'runme_3501',
'scheduled__2025-07-02T16:00:00+00:00', '--job-id', '1465677', '--raw',
'--subdir', 'DAGS_FOLDER/example_bash_operator1.py', '--cfg-path',
'/tmp/tmp_7nmg8fj']
> [2025-07-04, 17:44:48 CST] {standard_task_runner.py:105} INFO - Job
1465677: Subtask runme_3501
> [2025-07-04, 17:44:48 CST] {task_command.py:467} INFO - Running on host
centos-hadoop3dn-480896.intsig.internal
> [2025-07-04, 17:45:07 CST] {taskinstance.py:3134} INFO - Exporting env
vars: AIRFLOW_CTX_DAG_OWNER='airflow'
AIRFLOW_CTX_DAG_ID='example_bash_operator1' AIRFLOW_CTX_TASK_ID='runme_3501'
AIRFLOW_CTX_EXECUTION_DATE='2025-07-02T16:00:00+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2025-07-02T16:00:00+00:00'
> [2025-07-04, 17:45:07 CST] {taskinstance.py:732} ▲▲▲ Log group end
> [2025-07-04, 17:45:07 CST] {subprocess.py:78} INFO - Tmp dir root
location: /tmp
> [2025-07-04, 17:45:07 CST] {subprocess.py:88} INFO - Running command:
['/usr/bin/bash', '-c', 'echo "example_bash_operator1__runme_3501__20250702" &&
sleep 600']
> [2025-07-04, 17:45:07 CST] {subprocess.py:99} INFO - Output:
> [2025-07-04, 17:45:07 CST] {subprocess.py:116} INFO -
example_bash_operator1__runme_3501__20250702
> [2025-07-04, 17:55:07 CST] {subprocess.py:136} INFO - Command exited with
return code 0
> [2025-07-04, 17:55:07 CST] {taskinstance.py:341} ▼ Post task execution logs
> [2025-07-04, 17:55:07 CST] {taskinstance.py:353} INFO - Marking task as
SUCCESS. dag_id=example_bash_operator1, task_id=runme_3501,
run_id=scheduled__2025-07-02T16:00:00+00:00, execution_date=20250702T160000,
start_date=20250704T094447, end_date=20250704T095507
> [2025-07-04, 17:55:34 CST] {local_task_job_runner.py:266} INFO - Task
exited with return code 0
> [2025-07-04, 17:55:34 CST] {dagrun.py:977} ERROR - Failed to get task for
ti . Marking it as removed.
> [2025-07-04, 17:55:38 CST] {standard_task_runner.py:217} INFO - Process
not found (most likely exited), stop collecting metrics
> [2025-07-04, 17:55:38 CST] {taskinstance.py:3901} INFO - 0 downstream
tasks scheduled from follow-on schedule check
> [2025-07-04, 17:55:38 CST] {local_task_job_runner.py:245} ▲▲▲ Log group
end
Looking forward to further clarification or suggestions for resolving this
issue!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]