eskarimov opened a new issue #19929:
URL: https://github.com/apache/airflow/issues/19929


   ### Apache Airflow version
   
   2.2.2 (latest released)
   
   ### Operating System
   
   Debian GNU/Linux 10 (buster)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   ```bash
   ./breeze start-airflow --python 3.7 --backend postgres --use-airflow-version 
2.2.2
   ```
   
   ### What happened
   
   When a task was killed while being executed by `Triggerer` process, there's 
no log available for this task after kill.
   Also, `on_kill` function of Operator isn't called.
   Later on, if the same task is started again, it finishes immediately, like 
it was continued after being deferred.
   
   ### What you expected to happen
   
   Task is killed:
   
![AIrflowKilledTimeDeltaAsync](https://user-images.githubusercontent.com/13220923/144225137-89e7b3b0-ed5e-4261-87c6-49deb75fa24e.jpg)
   
   Log is empty:
   
![AirflowEmptyLog](https://user-images.githubusercontent.com/13220923/144225156-9b037c04-923d-423e-817f-db5cd5c713e7.jpg)
   
   Log appears if I clear the task, the task instance is finished right away:
   <details>
   <summary>Log file content</summary>
   
   ```
   *** Reading local file: 
/root/airflow/logs/example_time_delta_async/time_delta_async/2021-12-01T11:11:29.762988+00:00/1.log
   [2021-12-01, 12:11:31 CET] {taskinstance.py:1035} INFO - Dependencies all 
met for <TaskInstance: example_time_delta_async.time_delta_async 
manual__2021-12-01T11:11:29.762988+00:00 [queued]>
   [2021-12-01, 12:11:31 CET] {taskinstance.py:1035} INFO - Dependencies all 
met for <TaskInstance: example_time_delta_async.time_delta_async 
manual__2021-12-01T11:11:29.762988+00:00 [queued]>
   [2021-12-01, 12:11:31 CET] {taskinstance.py:1239} INFO - 
   
--------------------------------------------------------------------------------
   [2021-12-01, 12:11:31 CET] {taskinstance.py:1240} INFO - Starting attempt 1 
of 1
   [2021-12-01, 12:11:31 CET] {taskinstance.py:1241} INFO - 
   
--------------------------------------------------------------------------------
   [2021-12-01, 12:11:31 CET] {taskinstance.py:1260} INFO - Executing 
<Task(TimeDeltaSensorAsync): time_delta_async> on 2021-12-01 
11:11:29.762988+00:00
   [2021-12-01, 12:11:31 CET] {standard_task_runner.py:52} INFO - Started 
process 442 to run task
   [2021-12-01, 12:11:31 CET] {standard_task_runner.py:76} INFO - Running: 
['***', 'tasks', 'run', 'example_time_delta_async', 'time_delta_async', 
'manual__2021-12-01T11:11:29.762988+00:00', '--job-id', '9', '--raw', 
'--subdir', 'DAGS_FOLDER/example_time_delta_async.py', '--cfg-path', 
'/tmp/tmpgdon4hbb', '--error-file', '/tmp/tmpqa1kftrc']
   [2021-12-01, 12:11:31 CET] {standard_task_runner.py:77} INFO - Job 9: 
Subtask time_delta_async
   [2021-12-01, 12:11:31 CET] {logging_mixin.py:109} INFO - Running 
<TaskInstance: example_time_delta_async.time_delta_async 
manual__2021-12-01T11:11:29.762988+00:00 [running]> on host e403e88ccb5c
   [2021-12-01, 12:11:31 CET] {taskinstance.py:1427} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_OWNER=***
   AIRFLOW_CTX_DAG_ID=example_time_delta_async
   AIRFLOW_CTX_TASK_ID=time_delta_async
   AIRFLOW_CTX_EXECUTION_DATE=2021-12-01T11:11:29.762988+00:00
   AIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-01T11:11:29.762988+00:00
   [2021-12-01, 12:11:31 CET] {taskinstance.py:1343} INFO - Pausing task as 
DEFERRED. dag_id=example_time_delta_async, task_id=time_delta_async, 
execution_date=20211201T111129, start_date=20211201T111131
   [2021-12-01, 12:11:31 CET] {local_task_job.py:154} INFO - Task exited with 
return code 0
   [2021-12-01, 12:11:31 CET] {local_task_job.py:264} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   [2021-12-01, 12:14:00 CET] {taskinstance.py:1035} INFO - Dependencies all 
met for <TaskInstance: example_time_delta_async.time_delta_async 
manual__2021-12-01T11:11:29.762988+00:00 [queued]>
   [2021-12-01, 12:14:00 CET] {taskinstance.py:1035} INFO - Dependencies all 
met for <TaskInstance: example_time_delta_async.time_delta_async 
manual__2021-12-01T11:11:29.762988+00:00 [queued]>
   [2021-12-01, 12:14:00 CET] {taskinstance.py:1239} INFO - 
   
--------------------------------------------------------------------------------
   [2021-12-01, 12:14:00 CET] {taskinstance.py:1240} INFO - Starting attempt 1 
of 1
   [2021-12-01, 12:14:00 CET] {taskinstance.py:1241} INFO - 
   
--------------------------------------------------------------------------------
   [2021-12-01, 12:14:00 CET] {taskinstance.py:1260} INFO - Executing 
<Task(TimeDeltaSensorAsync): time_delta_async> on 2021-12-01 
11:11:29.762988+00:00
   [2021-12-01, 12:14:00 CET] {standard_task_runner.py:52} INFO - Started 
process 519 to run task
   [2021-12-01, 12:14:00 CET] {standard_task_runner.py:76} INFO - Running: 
['***', 'tasks', 'run', 'example_time_delta_async', 'time_delta_async', 
'manual__2021-12-01T11:11:29.762988+00:00', '--job-id', '11', '--raw', 
'--subdir', 'DAGS_FOLDER/example_time_delta_async.py', '--cfg-path', 
'/tmp/tmpikbpc_re', '--error-file', '/tmp/tmpqzcatu80']
   [2021-12-01, 12:14:00 CET] {standard_task_runner.py:77} INFO - Job 11: 
Subtask time_delta_async
   [2021-12-01, 12:14:00 CET] {logging_mixin.py:109} INFO - Running 
<TaskInstance: example_time_delta_async.time_delta_async 
manual__2021-12-01T11:11:29.762988+00:00 [running]> on host e403e88ccb5c
   [2021-12-01, 12:14:00 CET] {taskinstance.py:1427} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_OWNER=***
   AIRFLOW_CTX_DAG_ID=example_time_delta_async
   AIRFLOW_CTX_TASK_ID=time_delta_async
   AIRFLOW_CTX_EXECUTION_DATE=2021-12-01T11:11:29.762988+00:00
   AIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-01T11:11:29.762988+00:00
   [2021-12-01, 12:14:00 CET] {taskinstance.py:1278} INFO - Marking task as 
SUCCESS. dag_id=example_time_delta_async, task_id=time_delta_async, 
execution_date=20211201T111129, start_date=20211201T111400, 
end_date=20211201T111400
   [2021-12-01, 12:14:00 CET] {local_task_job.py:154} INFO - Task exited with 
return code 0
   [2021-12-01, 12:14:00 CET] {local_task_job.py:264} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   </details>
   
   ### How to reproduce
   
   - Create a DAG file with the following content:
   ```python
   from datetime import datetime, timedelta
   
   from airflow import DAG
   from airflow.sensors.time_delta import TimeDeltaSensorAsync
   
   
   with DAG(
       dag_id='example_time_delta_async',
       start_date=datetime(2021, 1, 1),
       catchup=False,
       dagrun_timeout=timedelta(minutes=60),
   ) as dag:
       time_delta_async_sensor = 
TimeDeltaSensorAsync(task_id='time_delta_async',
                                                      
delta=timedelta(seconds=60),
                                                      
execution_timeout=timedelta(seconds=60),
                                                      )
   
   if __name__ == "__main__":
       dag.cli()
   ```
   
   - Start the DAG
   - Mark task `time_delta_async` as Failed while it's being deferred
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to