nirben82 opened a new issue, #36090:
URL: https://github.com/apache/airflow/issues/36090

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   When marking a task that is using a deferrable operators as failed or 
cleared, on_kill method is not called and the remote job is never stopped.
   
   Task log out:
   ```
   *** Found local files:
   ***   * 
/usr/local/google/home/nirben/airflow/logs/dag_id=dag_test/run_id=manual__2023-12-06T13:22:54.698489+00:00/task_id=bigquery_task/attempt=30.log
   ***   * 
/usr/local/google/home/nirben/airflow/logs/dag_id=dag_test/run_id=manual__2023-12-06T13:22:54.698489+00:00/task_id=bigquery_task/attempt=30.log.trigger.13880.log
   [2023-12-06, 14:14:11 UTC] {taskinstance.py:1103} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
dag_test.bigquery_task manual__2023-12-06T13:22:54.698489+00:00 [queued]>
   [2023-12-06, 14:14:11 UTC] {taskinstance.py:1103} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: dag_test.bigquery_task 
manual__2023-12-06T13:22:54.698489+00:00 [queued]>
   [2023-12-06, 14:14:11 UTC] {taskinstance.py:1308} INFO - Starting attempt 30 
of 30
   [2023-12-06, 14:14:11 UTC] {taskinstance.py:1327} INFO - Executing 
<Task(BigQueryInsertJobOperator): bigquery_task> on 2023-12-06 
13:22:54.698489+00:00
   [2023-12-06, 14:14:11 UTC] {standard_task_runner.py:57} INFO - Started 
process 629728 to run task
   [2023-12-06, 14:14:11 UTC] {standard_task_runner.py:84} INFO - Running: 
['airflow', 'tasks', 'run', 'dag_test', 'bigquery_task', 
'manual__2023-12-06T13:22:54.698489+00:00', '--job-id', '13896', '--raw', 
'--subdir', 'DAGS_FOLDER/dag_test.py', '--cfg-path', '/tmp/tmpxnqe4ysz']
   [2023-12-06, 14:14:11 UTC] {standard_task_runner.py:85} INFO - Job 13896: 
Subtask bigquery_task
   [2023-12-06, 14:14:11 UTC] {task_command.py:410} INFO - Running 
<TaskInstance: dag_test.bigquery_task manual__2023-12-06T13:22:54.698489+00:00 
[running]> on host nirben-ws1.tlv.corp.google.com
   [2023-12-06, 14:14:11 UTC] {taskinstance.py:1545} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='some-owner' AIRFLOW_CTX_DAG_ID='dag_test' 
AIRFLOW_CTX_TASK_ID='bigquery_task' 
AIRFLOW_CTX_EXECUTION_DATE='2023-12-06T13:22:54.698489+00:00' 
AIRFLOW_CTX_TRY_NUMBER='30' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-12-06T13:22:54.698489+00:00'
   [2023-12-06, 14:14:11 UTC] {base.py:73} INFO - Using connection ID 
'some-connection' for task execution.
   [2023-12-06, 14:14:11 UTC] {bigquery.py:2799} INFO - Executing: {'query': 
{'query': 'SELECT * FROM table, 'useLegacySql': False, 'priority': 'batch', 
'allowLargeResults': True, 'destinationTable': {'datasetId': 'datasetId', 
'projectId': 'projectId', 'tableId': 'some_table_test'}, 'flattenResults': 
False, 'writeDisposition': 'WRITE_TRUNCATE', 'createDisposition': 
'CREATE_IF_NEEDED'}}'
   [2023-12-06, 14:14:11 UTC] {credentials_provider.py:353} INFO - Getting 
connection using `google.auth.default()` since no explicit credentials are 
provided.
   [2023-12-06, 14:14:11 UTC] {logging_mixin.py:150} WARNING - 
/usr/local/google/home/nirben/venv/waze-data/lib/python3.8/site-packages/google/auth/_default.py:78
 UserWarning: Your application has authenticated using end user credentials 
from Google Cloud SDK without a quota project. You might receive a "quota 
exceeded" or "API not enabled" error. See the following page for 
troubleshooting: 
https://cloud.google.com/docs/authentication/adc-troubleshooting/user-creds.
   [2023-12-06, 14:14:12 UTC] {bigquery.py:1596} INFO - Inserting job 
airflow_dag_test_bigquery_task_2023_12_06T13_22_54_698489_00_00_5bf2e5098a664fd1d54ec9b9b75d077b
   [2023-12-06, 14:14:13 UTC] {bigquery.py:51} INFO - Using the connection  
some-connection .
   [2023-12-06, 14:14:13 UTC] {taskinstance.py:1415} INFO - Pausing task as 
DEFERRED. dag_id=dag_test, task_id=bigquery_task, 
execution_date=20231206T132254, start_date=20231206T141411
   [2023-12-06, 14:14:13 UTC] {local_task_job_runner.py:222} INFO - Task exited 
with return code 100 (task deferral)
   [2023-12-06, 14:14:14 UTC] {base.py:73} INFO - Using connection ID 
'google_cloud_pipelines' for task execution.
   [2023-12-06, 14:14:15 UTC] {bigquery.py:93} INFO - Bigquery job status is 
running. Sleeping for 4.0 seconds.
   [2023-12-06, 14:14:19 UTC] {bigquery.py:93} INFO - Bigquery job status is 
running. Sleeping for 4.0 seconds.
   [2023-12-06, 14:14:21 UTC] {triggerer_job_runner.py:625} ERROR - Trigger 
cancelled; message=```
   
   ### What you think should happen instead
   
   I think that the trigger should be stopped and task instance should continue 
the same 
[behavior](https://github.com/apache/airflow/blob/d2514b408cb98f792289a5d032aaf85fe605350d/airflow/models/taskinstance.py#L2452)
 that is done by any non-deferrable tasks.
   
   ### How to reproduce
   
   1. Invoke a run of the below dag and after the task is in state `defer`, 
mark it as `failed` or clear it. 
   
       The task log ends with the below text and the job in BQ does not stop.
       ```
       [2023-12-06, 14:14:14 UTC] {base.py:73} INFO - Using connection ID 
'google_cloud_pipelines' for task execution.
       [2023-12-06, 14:14:15 UTC] {bigquery.py:93} INFO - Bigquery job status 
is running. Sleeping for 4.0 seconds.
       [2023-12-06, 14:14:19 UTC] {bigquery.py:93} INFO - Bigquery job status 
is running. Sleeping for 4.0 seconds.
       [2023-12-06, 14:14:21 UTC] {triggerer_job_runner.py:625} ERROR - Trigger 
cancelled; message=
       ```
   
   2. Change to `deferrable=False`,  Invoke a run of the dag and after the task 
is in state `running` and the job started in BQ, mark it as `failed` or clear 
it. 
   
       The task log ends with the below text and the job in BQ stops.
       ```
       [2023-12-06, 14:13:06 UTC] {bigquery.py:1596} INFO - Inserting job 
airflow_dag_test_bigquery_task_2023_12_06T13_22_54_698489_00_00_9238ba6363d9b8cca1e4a1fd26b7e985
       [2023-12-06, 14:13:36 UTC] {local_task_job_runner.py:291} WARNING - 
State of this instance has been externally set to failed. Terminating instance.
       [2023-12-06, 14:13:36 UTC] {process_utils.py:131} INFO - Sending 
Signals.SIGTERM to group 629540. PIDs of all processes in the group: [629540]
       [2023-12-06, 14:13:36 UTC] {process_utils.py:86} INFO - Sending the 
signal Signals.SIGTERM to group 629540
       [2023-12-06, 14:13:36 UTC] {taskinstance.py:1517} ERROR - Received 
SIGTERM. Terminating subprocesses.
       [2023-12-06, 14:13:36 UTC] {bigquery.py:1487} INFO - Attempting to 
cancel job : project-id, 
airflow_dag_test_bigquery_task_2023_12_06T13_22_54_698489_00_00_9238ba6363d9b8cca1e4a1fd26b7e985
       [2023-12-06, 14:13:37 UTC] {bigquery.py:1508} INFO - Waiting for 
canceled job project-id, 
airflow_dag_test_bigquery_task_2023_12_06T13_22_54_698489_00_00_9238ba6363d9b8cca1e4a1fd26b7e985
 to finish.
       [2023-12-06, 14:13:43 UTC] {bigquery.py:1499} INFO - Job successfully 
canceled: project-id, 
airflow_dag_test_bigquery_task_2023_12_06T13_22_54_698489_00_00_9238ba6363d9b8cca1e4a1fd26b7e985
       [2023-12-06, 14:13:43 UTC] {process_utils.py:79} INFO - Process 
psutil.Process(pid=629540, status='terminated', exitcode=0, started='16:13:05') 
(629540) terminated with exit code 0
       ```
   
   ```
   from datetime import datetime, timedelta
   from airflow import DAG
   from airflow.providers.google.cloud.operators.bigquery import 
BigQueryInsertJobOperator
   
   default_args = {
       'owner': 'owner',
       'start_date': datetime(2023, 12, 5),
       'email_on_failure': False,
       'email_on_retry': False,
       'retries': 0,
       'retry_delay': timedelta(minutes=15)
   }
   
   dag = DAG('dag_test', default_args=default_args, catchup=False)
   
   query_that_runs_for_a_few_minutes = """
   SELECT * FROM some-large-table
   """
   
   client_driver_events_task = BigQueryInsertJobOperator(
       task_id='bigquery_task',
       gcp_conn_id='google_cloud_pipelines',
       dag=dag,
       configuration={
           'query': {
               'query': query_that_runs_for_a_few_minutes.strip(),
               'useLegacySql': False,
               'priority': 'batch',
               "allowLargeResults": True,
               "destinationTable": {
                   "datasetId": "datasetId",
                   "projectId": "projectId",
                   "tableId": "tableId"
               },
               'flattenResults': False,
               'writeDisposition': 'WRITE_TRUNCATE',
               'createDisposition': 'CREATE_IF_NEEDED'
           }
       },
       deferrable=True)
   ```
   
   ### Operating System
   
   Debian GNU
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow==2.6.2
   apache-airflow-providers-google==10.9.0
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to