nirben82 opened a new issue, #36090:
URL: https://github.com/apache/airflow/issues/36090
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
When marking a task that is using a deferrable operators as failed or
cleared, on_kill method is not called and the remote job is never stopped.
Task log out:
```
*** Found local files:
*** *
/usr/local/google/home/nirben/airflow/logs/dag_id=dag_test/run_id=manual__2023-12-06T13:22:54.698489+00:00/task_id=bigquery_task/attempt=30.log
*** *
/usr/local/google/home/nirben/airflow/logs/dag_id=dag_test/run_id=manual__2023-12-06T13:22:54.698489+00:00/task_id=bigquery_task/attempt=30.log.trigger.13880.log
[2023-12-06, 14:14:11 UTC] {taskinstance.py:1103} INFO - Dependencies all
met for dep_context=non-requeueable deps ti=<TaskInstance:
dag_test.bigquery_task manual__2023-12-06T13:22:54.698489+00:00 [queued]>
[2023-12-06, 14:14:11 UTC] {taskinstance.py:1103} INFO - Dependencies all
met for dep_context=requeueable deps ti=<TaskInstance: dag_test.bigquery_task
manual__2023-12-06T13:22:54.698489+00:00 [queued]>
[2023-12-06, 14:14:11 UTC] {taskinstance.py:1308} INFO - Starting attempt 30
of 30
[2023-12-06, 14:14:11 UTC] {taskinstance.py:1327} INFO - Executing
<Task(BigQueryInsertJobOperator): bigquery_task> on 2023-12-06
13:22:54.698489+00:00
[2023-12-06, 14:14:11 UTC] {standard_task_runner.py:57} INFO - Started
process 629728 to run task
[2023-12-06, 14:14:11 UTC] {standard_task_runner.py:84} INFO - Running:
['airflow', 'tasks', 'run', 'dag_test', 'bigquery_task',
'manual__2023-12-06T13:22:54.698489+00:00', '--job-id', '13896', '--raw',
'--subdir', 'DAGS_FOLDER/dag_test.py', '--cfg-path', '/tmp/tmpxnqe4ysz']
[2023-12-06, 14:14:11 UTC] {standard_task_runner.py:85} INFO - Job 13896:
Subtask bigquery_task
[2023-12-06, 14:14:11 UTC] {task_command.py:410} INFO - Running
<TaskInstance: dag_test.bigquery_task manual__2023-12-06T13:22:54.698489+00:00
[running]> on host nirben-ws1.tlv.corp.google.com
[2023-12-06, 14:14:11 UTC] {taskinstance.py:1545} INFO - Exporting env vars:
AIRFLOW_CTX_DAG_OWNER='some-owner' AIRFLOW_CTX_DAG_ID='dag_test'
AIRFLOW_CTX_TASK_ID='bigquery_task'
AIRFLOW_CTX_EXECUTION_DATE='2023-12-06T13:22:54.698489+00:00'
AIRFLOW_CTX_TRY_NUMBER='30'
AIRFLOW_CTX_DAG_RUN_ID='manual__2023-12-06T13:22:54.698489+00:00'
[2023-12-06, 14:14:11 UTC] {base.py:73} INFO - Using connection ID
'some-connection' for task execution.
[2023-12-06, 14:14:11 UTC] {bigquery.py:2799} INFO - Executing: {'query':
{'query': 'SELECT * FROM table, 'useLegacySql': False, 'priority': 'batch',
'allowLargeResults': True, 'destinationTable': {'datasetId': 'datasetId',
'projectId': 'projectId', 'tableId': 'some_table_test'}, 'flattenResults':
False, 'writeDisposition': 'WRITE_TRUNCATE', 'createDisposition':
'CREATE_IF_NEEDED'}}'
[2023-12-06, 14:14:11 UTC] {credentials_provider.py:353} INFO - Getting
connection using `google.auth.default()` since no explicit credentials are
provided.
[2023-12-06, 14:14:11 UTC] {logging_mixin.py:150} WARNING -
/usr/local/google/home/nirben/venv/waze-data/lib/python3.8/site-packages/google/auth/_default.py:78
UserWarning: Your application has authenticated using end user credentials
from Google Cloud SDK without a quota project. You might receive a "quota
exceeded" or "API not enabled" error. See the following page for
troubleshooting:
https://cloud.google.com/docs/authentication/adc-troubleshooting/user-creds.
[2023-12-06, 14:14:12 UTC] {bigquery.py:1596} INFO - Inserting job
airflow_dag_test_bigquery_task_2023_12_06T13_22_54_698489_00_00_5bf2e5098a664fd1d54ec9b9b75d077b
[2023-12-06, 14:14:13 UTC] {bigquery.py:51} INFO - Using the connection
some-connection .
[2023-12-06, 14:14:13 UTC] {taskinstance.py:1415} INFO - Pausing task as
DEFERRED. dag_id=dag_test, task_id=bigquery_task,
execution_date=20231206T132254, start_date=20231206T141411
[2023-12-06, 14:14:13 UTC] {local_task_job_runner.py:222} INFO - Task exited
with return code 100 (task deferral)
[2023-12-06, 14:14:14 UTC] {base.py:73} INFO - Using connection ID
'google_cloud_pipelines' for task execution.
[2023-12-06, 14:14:15 UTC] {bigquery.py:93} INFO - Bigquery job status is
running. Sleeping for 4.0 seconds.
[2023-12-06, 14:14:19 UTC] {bigquery.py:93} INFO - Bigquery job status is
running. Sleeping for 4.0 seconds.
[2023-12-06, 14:14:21 UTC] {triggerer_job_runner.py:625} ERROR - Trigger
cancelled; message=```
### What you think should happen instead
I think that the trigger should be stopped and task instance should continue
the same
[behavior](https://github.com/apache/airflow/blob/d2514b408cb98f792289a5d032aaf85fe605350d/airflow/models/taskinstance.py#L2452)
that is done by any non-deferrable tasks.
### How to reproduce
1. Invoke a run of the below dag and after the task is in state `defer`,
mark it as `failed` or clear it.
The task log ends with the below text and the job in BQ does not stop.
```
[2023-12-06, 14:14:14 UTC] {base.py:73} INFO - Using connection ID
'google_cloud_pipelines' for task execution.
[2023-12-06, 14:14:15 UTC] {bigquery.py:93} INFO - Bigquery job status
is running. Sleeping for 4.0 seconds.
[2023-12-06, 14:14:19 UTC] {bigquery.py:93} INFO - Bigquery job status
is running. Sleeping for 4.0 seconds.
[2023-12-06, 14:14:21 UTC] {triggerer_job_runner.py:625} ERROR - Trigger
cancelled; message=
```
2. Change to `deferrable=False`, Invoke a run of the dag and after the task
is in state `running` and the job started in BQ, mark it as `failed` or clear
it.
The task log ends with the below text and the job in BQ stops.
```
[2023-12-06, 14:13:06 UTC] {bigquery.py:1596} INFO - Inserting job
airflow_dag_test_bigquery_task_2023_12_06T13_22_54_698489_00_00_9238ba6363d9b8cca1e4a1fd26b7e985
[2023-12-06, 14:13:36 UTC] {local_task_job_runner.py:291} WARNING -
State of this instance has been externally set to failed. Terminating instance.
[2023-12-06, 14:13:36 UTC] {process_utils.py:131} INFO - Sending
Signals.SIGTERM to group 629540. PIDs of all processes in the group: [629540]
[2023-12-06, 14:13:36 UTC] {process_utils.py:86} INFO - Sending the
signal Signals.SIGTERM to group 629540
[2023-12-06, 14:13:36 UTC] {taskinstance.py:1517} ERROR - Received
SIGTERM. Terminating subprocesses.
[2023-12-06, 14:13:36 UTC] {bigquery.py:1487} INFO - Attempting to
cancel job : project-id,
airflow_dag_test_bigquery_task_2023_12_06T13_22_54_698489_00_00_9238ba6363d9b8cca1e4a1fd26b7e985
[2023-12-06, 14:13:37 UTC] {bigquery.py:1508} INFO - Waiting for
canceled job project-id,
airflow_dag_test_bigquery_task_2023_12_06T13_22_54_698489_00_00_9238ba6363d9b8cca1e4a1fd26b7e985
to finish.
[2023-12-06, 14:13:43 UTC] {bigquery.py:1499} INFO - Job successfully
canceled: project-id,
airflow_dag_test_bigquery_task_2023_12_06T13_22_54_698489_00_00_9238ba6363d9b8cca1e4a1fd26b7e985
[2023-12-06, 14:13:43 UTC] {process_utils.py:79} INFO - Process
psutil.Process(pid=629540, status='terminated', exitcode=0, started='16:13:05')
(629540) terminated with exit code 0
```
```
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import
BigQueryInsertJobOperator
default_args = {
'owner': 'owner',
'start_date': datetime(2023, 12, 5),
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=15)
}
dag = DAG('dag_test', default_args=default_args, catchup=False)
query_that_runs_for_a_few_minutes = """
SELECT * FROM some-large-table
"""
client_driver_events_task = BigQueryInsertJobOperator(
task_id='bigquery_task',
gcp_conn_id='google_cloud_pipelines',
dag=dag,
configuration={
'query': {
'query': query_that_runs_for_a_few_minutes.strip(),
'useLegacySql': False,
'priority': 'batch',
"allowLargeResults": True,
"destinationTable": {
"datasetId": "datasetId",
"projectId": "projectId",
"tableId": "tableId"
},
'flattenResults': False,
'writeDisposition': 'WRITE_TRUNCATE',
'createDisposition': 'CREATE_IF_NEEDED'
}
},
deferrable=True)
```
### Operating System
Debian GNU
### Versions of Apache Airflow Providers
apache-airflow==2.6.2
apache-airflow-providers-google==10.9.0
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]