eskarimov opened a new issue #19929: URL: https://github.com/apache/airflow/issues/19929
### Apache Airflow version 2.2.2 (latest released) ### Operating System Debian GNU/Linux 10 (buster) ### Versions of Apache Airflow Providers _No response_ ### Deployment Docker-Compose ### Deployment details ```bash ./breeze start-airflow --python 3.7 --backend postgres --use-airflow-version 2.2.2 ``` ### What happened When a task was killed while being executed by `Triggerer` process, there's no log available for this task after kill. Also, `on_kill` function of Operator isn't called. Later on, if the same task is started again, it finishes immediately, like it was continued after being deferred. ### What you expected to happen Task is killed:  Log is empty:  Log appears if I clear the task, the task instance is finished right away: <details> <summary>Log file content</summary> ``` *** Reading local file: /root/airflow/logs/example_time_delta_async/time_delta_async/2021-12-01T11:11:29.762988+00:00/1.log [2021-12-01, 12:11:31 CET] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: example_time_delta_async.time_delta_async manual__2021-12-01T11:11:29.762988+00:00 [queued]> [2021-12-01, 12:11:31 CET] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: example_time_delta_async.time_delta_async manual__2021-12-01T11:11:29.762988+00:00 [queued]> [2021-12-01, 12:11:31 CET] {taskinstance.py:1239} INFO - -------------------------------------------------------------------------------- [2021-12-01, 12:11:31 CET] {taskinstance.py:1240} INFO - Starting attempt 1 of 1 [2021-12-01, 12:11:31 CET] {taskinstance.py:1241} INFO - -------------------------------------------------------------------------------- [2021-12-01, 12:11:31 CET] {taskinstance.py:1260} INFO - Executing <Task(TimeDeltaSensorAsync): time_delta_async> on 2021-12-01 11:11:29.762988+00:00 [2021-12-01, 12:11:31 CET] {standard_task_runner.py:52} INFO - Started process 442 to run task [2021-12-01, 12:11:31 CET] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'example_time_delta_async', 'time_delta_async', 'manual__2021-12-01T11:11:29.762988+00:00', '--job-id', '9', '--raw', '--subdir', 'DAGS_FOLDER/example_time_delta_async.py', '--cfg-path', '/tmp/tmpgdon4hbb', '--error-file', '/tmp/tmpqa1kftrc'] [2021-12-01, 12:11:31 CET] {standard_task_runner.py:77} INFO - Job 9: Subtask time_delta_async [2021-12-01, 12:11:31 CET] {logging_mixin.py:109} INFO - Running <TaskInstance: example_time_delta_async.time_delta_async manual__2021-12-01T11:11:29.762988+00:00 [running]> on host e403e88ccb5c [2021-12-01, 12:11:31 CET] {taskinstance.py:1427} INFO - Exporting the following env vars: AIRFLOW_CTX_DAG_OWNER=*** AIRFLOW_CTX_DAG_ID=example_time_delta_async AIRFLOW_CTX_TASK_ID=time_delta_async AIRFLOW_CTX_EXECUTION_DATE=2021-12-01T11:11:29.762988+00:00 AIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-01T11:11:29.762988+00:00 [2021-12-01, 12:11:31 CET] {taskinstance.py:1343} INFO - Pausing task as DEFERRED. dag_id=example_time_delta_async, task_id=time_delta_async, execution_date=20211201T111129, start_date=20211201T111131 [2021-12-01, 12:11:31 CET] {local_task_job.py:154} INFO - Task exited with return code 0 [2021-12-01, 12:11:31 CET] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check [2021-12-01, 12:14:00 CET] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: example_time_delta_async.time_delta_async manual__2021-12-01T11:11:29.762988+00:00 [queued]> [2021-12-01, 12:14:00 CET] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: example_time_delta_async.time_delta_async manual__2021-12-01T11:11:29.762988+00:00 [queued]> [2021-12-01, 12:14:00 CET] {taskinstance.py:1239} INFO - -------------------------------------------------------------------------------- [2021-12-01, 12:14:00 CET] {taskinstance.py:1240} INFO - Starting attempt 1 of 1 [2021-12-01, 12:14:00 CET] {taskinstance.py:1241} INFO - -------------------------------------------------------------------------------- [2021-12-01, 12:14:00 CET] {taskinstance.py:1260} INFO - Executing <Task(TimeDeltaSensorAsync): time_delta_async> on 2021-12-01 11:11:29.762988+00:00 [2021-12-01, 12:14:00 CET] {standard_task_runner.py:52} INFO - Started process 519 to run task [2021-12-01, 12:14:00 CET] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'example_time_delta_async', 'time_delta_async', 'manual__2021-12-01T11:11:29.762988+00:00', '--job-id', '11', '--raw', '--subdir', 'DAGS_FOLDER/example_time_delta_async.py', '--cfg-path', '/tmp/tmpikbpc_re', '--error-file', '/tmp/tmpqzcatu80'] [2021-12-01, 12:14:00 CET] {standard_task_runner.py:77} INFO - Job 11: Subtask time_delta_async [2021-12-01, 12:14:00 CET] {logging_mixin.py:109} INFO - Running <TaskInstance: example_time_delta_async.time_delta_async manual__2021-12-01T11:11:29.762988+00:00 [running]> on host e403e88ccb5c [2021-12-01, 12:14:00 CET] {taskinstance.py:1427} INFO - Exporting the following env vars: AIRFLOW_CTX_DAG_OWNER=*** AIRFLOW_CTX_DAG_ID=example_time_delta_async AIRFLOW_CTX_TASK_ID=time_delta_async AIRFLOW_CTX_EXECUTION_DATE=2021-12-01T11:11:29.762988+00:00 AIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-01T11:11:29.762988+00:00 [2021-12-01, 12:14:00 CET] {taskinstance.py:1278} INFO - Marking task as SUCCESS. dag_id=example_time_delta_async, task_id=time_delta_async, execution_date=20211201T111129, start_date=20211201T111400, end_date=20211201T111400 [2021-12-01, 12:14:00 CET] {local_task_job.py:154} INFO - Task exited with return code 0 [2021-12-01, 12:14:00 CET] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check ``` </details> ### How to reproduce - Create a DAG file with the following content: ```python from datetime import datetime, timedelta from airflow import DAG from airflow.sensors.time_delta import TimeDeltaSensorAsync with DAG( dag_id='example_time_delta_async', start_date=datetime(2021, 1, 1), catchup=False, dagrun_timeout=timedelta(minutes=60), ) as dag: time_delta_async_sensor = TimeDeltaSensorAsync(task_id='time_delta_async', delta=timedelta(seconds=60), execution_timeout=timedelta(seconds=60), ) if __name__ == "__main__": dag.cli() ``` - Start the DAG - Mark task `time_delta_async` as Failed while it's being deferred ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
