ft2898 commented on issue #52276:
URL: https://github.com/apache/airflow/issues/52276#issuecomment-3035432389

   @sjyangkevin I have a DAG named example_bash_operator1. Initially, I added a 
task with task_id as runme_3501. While the runme_3501 task was running, I added 
another task with task_id as runme_3502. However, I observed that when the 
runme_3501 task finished, it marked runme_3502 as removed. I tracked this 
behavior to the following piece of code:
   
   
   ```
   def _filter_tis_and_exclude_removed(dag: DAG, tis: list[TI]) -> Iterable[TI]:
       """Populate ``ti.task`` while excluding those missing one, marking them 
as REMOVED."""
       for ti in tis:
           try:
               ti.task = dag.get_task(ti.task_id)
           except TaskNotFound:
               if ti.state != TaskInstanceState.REMOVED:
                   self.log.error("Failed to get task for ti %s. Marking it as 
removed.", ti)
                   ti.state = TaskInstanceState.REMOVED
                   session.flush()
           else:
               yield ti
   ```
   It seems that tasks are being cached during the execution of runme_3501. 
When the task completes, Airflow verifies the current state of all tasks in the 
DAG file. If it finds any task_id that do not match the cached data, an 
exception is raised, and the associated task is marked as removed. This change 
in the database state for runme_3502 consequently causes it to be rescheduled.
   
   Below are the logs from runme_3501. Please pay special attention to the 
critical log line:
   
   
   > [2025-07-04, 17:55:34 CST] {dagrun.py:977} ERROR - Failed to get task for 
ti <TaskInstance: example_bash_operator1.runme_3502 
scheduled__2025-07-02T16:00:00+00:00 [success]>. Marking it as removed.
   
   
   Full Logs:
   
   > [2025-07-04, 17:44:47 CST] {local_task_job_runner.py:123} ▼ Pre task 
execution logs
   > [2025-07-04, 17:44:47 CST] {taskinstance.py:2614} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
example_bash_operator1.runme_3501 scheduled__2025-07-02T16:00:00+00:00 [queued]>
   > [2025-07-04, 17:44:48 CST] {taskinstance.py:2614} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: 
example_bash_operator1.runme_3501 scheduled__2025-07-02T16:00:00+00:00 [queued]>
   > [2025-07-04, 17:44:48 CST] {taskinstance.py:2867} INFO - Starting attempt 
1 of 1
   > [2025-07-04, 17:44:48 CST] {taskinstance.py:2890} INFO - Executing 
<Task(BashOperator): runme_3501> on 2025-07-02 16:00:00+00:00
   > [2025-07-04, 17:44:48 CST] {standard_task_runner.py:72} INFO - Started 
process 91058 to run task
   > [2025-07-04, 17:44:48 CST] {standard_task_runner.py:104} INFO - Running: 
['airflow', 'tasks', 'run', 'example_bash_operator1', 'runme_3501', 
'scheduled__2025-07-02T16:00:00+00:00', '--job-id', '1465677', '--raw', 
'--subdir', 'DAGS_FOLDER/example_bash_operator1.py', '--cfg-path', 
'/tmp/tmp_7nmg8fj']
   > [2025-07-04, 17:44:48 CST] {standard_task_runner.py:105} INFO - Job 
1465677: Subtask runme_3501
   > [2025-07-04, 17:44:48 CST] {task_command.py:467} INFO - Running 
<TaskInstance: example_bash_operator1.runme_3501 
scheduled__2025-07-02T16:00:00+00:00 [running]> on host 
centos-hadoop3dn-480896.intsig.internal
   > [2025-07-04, 17:45:07 CST] {taskinstance.py:3134} INFO - Exporting env 
vars: AIRFLOW_CTX_DAG_OWNER='airflow' 
AIRFLOW_CTX_DAG_ID='example_bash_operator1' AIRFLOW_CTX_TASK_ID='runme_3501' 
AIRFLOW_CTX_EXECUTION_DATE='2025-07-02T16:00:00+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2025-07-02T16:00:00+00:00'
   > [2025-07-04, 17:45:07 CST] {taskinstance.py:732} ▲▲▲ Log group end
   > [2025-07-04, 17:45:07 CST] {subprocess.py:78} INFO - Tmp dir root 
location: /tmp
   > [2025-07-04, 17:45:07 CST] {subprocess.py:88} INFO - Running command: 
['/usr/bin/bash', '-c', 'echo "example_bash_operator1__runme_3501__20250702" && 
sleep 600']
   > [2025-07-04, 17:45:07 CST] {subprocess.py:99} INFO - Output:
   > [2025-07-04, 17:45:07 CST] {subprocess.py:116} INFO - 
example_bash_operator1__runme_3501__20250702
   > [2025-07-04, 17:55:07 CST] {subprocess.py:136} INFO - Command exited with 
return code 0
   > [2025-07-04, 17:55:07 CST] {taskinstance.py:341} ▼ Post task execution logs
   > [2025-07-04, 17:55:07 CST] {taskinstance.py:353} INFO - Marking task as 
SUCCESS. dag_id=example_bash_operator1, task_id=runme_3501, 
run_id=scheduled__2025-07-02T16:00:00+00:00, execution_date=20250702T160000, 
start_date=20250704T094447, end_date=20250704T095507
   > [2025-07-04, 17:55:34 CST] {local_task_job_runner.py:266} INFO - Task 
exited with return code 0
   > [2025-07-04, 17:55:34 CST] {dagrun.py:977} ERROR - Failed to get task for 
ti <TaskInstance: example_bash_operator1.runme_3502 
scheduled__2025-07-02T16:00:00+00:00 [success]>. Marking it as removed.
   > [2025-07-04, 17:55:38 CST] {standard_task_runner.py:217} INFO - Process 
not found (most likely exited), stop collecting metrics
   > [2025-07-04, 17:55:38 CST] {taskinstance.py:3901} INFO - 0 downstream 
tasks scheduled from follow-on schedule check
   > [2025-07-04, 17:55:38 CST] {local_task_job_runner.py:245} ▲▲▲ Log group end
   
   Looking forward to further clarification or suggestions for resolving this 
issue!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to