vapiravfif opened a new issue #16023:
URL: https://github.com/apache/airflow/issues/16023


   
   **Apache Airflow version**:
   2.0.2, 2.1.0
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
   Server Version: version.Info{Major:"1", Minor:"17+", 
GitVersion:"v1.17.12-eks-7684af", 
GitCommit:"7684af4ac41370dd109ac13817023cb8063e3d45", GitTreeState:"clean", 
BuildDate:"2020-10-20T22:57:40Z", GoVersion:"go1.13.15", Compiler:"gc", 
Platform:"linux/amd64"}
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**:
   AWS EKS
   - **Others**:
   Helm chart - 8.0.8, 8.1.0
   Executor - CeleryExecutor
   
   **What happened**:
   
   When DAG is paused, and long PythonOperator tasks triggered manually (with 
"Ingnore all deps" - "run"), they are failing with error:
   ```
   [2021-05-24 08:49:02,166] {logging_mixin.py:104} INFO - hi there, try 6, 
going to sleep for 15 secs
   [2021-05-24 08:49:03,808] {local_task_job.py:188} WARNING - State of this 
instance has been externally set to None. Terminating instance.
   [2021-05-24 08:49:03,810] {process_utils.py:100} INFO - Sending 
Signals.SIGTERM to GPID 172
   [2021-05-24 08:49:03,812] {taskinstance.py:1265} ERROR - Received SIGTERM. 
Terminating subprocesses.
   ```
   And in scheduler logs there’s message:
   ```
   [2021-05-24 08:48:59,471] {scheduler_job.py:1854} INFO - Resetting orphaned 
tasks for active dag runs
   [2021-05-24 08:48:59,485] {scheduler_job.py:1921} INFO - Reset the following 
2 orphaned TaskInstances:
        <TaskInstance: timeout_testing.run_param_all 2021-05-23 
13:46:13.840235+00:00 [running]>
        <TaskInstance: timeout_testing.sleep_well 2021-05-23 
13:46:13.840235+00:00 [running]>
   ```
   
   **What you expected to happen**:
   
   These tasks are alive and well, and shouldn't be killed :)
   Looks like something in `reset_state_for_orphaned_tasks` is wrongly marking 
running tasks as abandoned...
   
   **How to reproduce it**:
   ```
   dag = DAG(os.path.basename(__file__).replace('.py', ''),
             start_date=datetime(2021, 5, 11),
             schedule_interval=timedelta(days=1))
   
   def sleep_tester(time_out, retries):
       for i in range(retries):
           print(f'hi there, try {i}, going to sleep for {time_out}')
           time.sleep(time_out)
           print("Aaah, good times, see ya soon")
   
   
   sleeping = PythonOperator(task_id="sleep_well",
                             python_callable=sleep_tester,
                             op_kwargs={'time_out': 15, 'retries': 50},
                             dag=dag)
   ```
   Create DAG with task above, verify it paused, trigger dag run manually from 
UI, then trigger the task manually. The task should fail after several tries.
   
   **Anything else we need to know**:
   It might happen only if DAG never was unpaused ("ON"), though couldn't 
verify it.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to