vchiapaikeo opened a new issue, #27296:
URL: https://github.com/apache/airflow/issues/27296
### Apache Airflow version
2.4.2
### What happened
We are load testing our Airflow environment with the KubernetesExecutor and
will observe lock wait timeouts happening every so often. This happens after
the job itself completes (in this case, sleeping for a random amount of time)
and marks the task as a failure without performing any retries.
```
[2022-10-26, 00:04:07 UTC] {taskinstance.py:1165} INFO - Dependencies all
met for <TaskInstance: factory_load_test_v53_31.longrun2
scheduled__2022-10-25T00:00:00+00:00 [queued]>
[2022-10-26, 00:04:07 UTC] {taskinstance.py:1165} INFO - Dependencies all
met for <TaskInstance: factory_load_test_v53_31.longrun2
scheduled__2022-10-25T00:00:00+00:00 [queued]>
[2022-10-26, 00:04:07 UTC] {taskinstance.py:1362} INFO -
--------------------------------------------------------------------------------
[2022-10-26, 00:04:07 UTC] {taskinstance.py:1363} INFO - Starting attempt 1
of 2
[2022-10-26, 00:04:07 UTC] {taskinstance.py:1364} INFO -
--------------------------------------------------------------------------------
[2022-10-26, 00:04:07 UTC] {taskinstance.py:1383} INFO - Executing
<Task(BashOperator): longrun2> on 2022-10-25 00:00:00+00:00
[2022-10-26, 00:04:07 UTC] {standard_task_runner.py:55} INFO - Started
process 162 to run task
[2022-10-26, 00:04:07 UTC] {standard_task_runner.py:82} INFO - Running:
['airflow', 'tasks', 'run', 'factory_load_test_v53_31', 'longrun2',
'scheduled__2022-10-25T00:00:00+00:00', '--job-id', '56234', '--raw',
'--subdir', 'DAGS_FOLDER/factory_load_test_v53.py', '--cfg-path',
'/tmp/tmp82sxf8im']
[2022-10-26, 00:04:07 UTC] {standard_task_runner.py:83} INFO - Job 56234:
Subtask longrun2
[2022-10-26, 00:04:08 UTC] {task_command.py:376} INFO - Running
<TaskInstance: factory_load_test_v53_31.longrun2
scheduled__2022-10-25T00:00:00+00:00 [running]> on host
factoryloadtestv5331longrun2-be2fa24c168a4614aace786ac9cb6e78
[2022-10-26, 00:04:09 UTC] {taskinstance.py:1590} INFO - Exporting the
following env vars:
AIRFLOW_CTX_DAG_OWNER=gcp-data-platform
AIRFLOW_CTX_DAG_ID=factory_load_test_v53_31
AIRFLOW_CTX_TASK_ID=longrun2
AIRFLOW_CTX_EXECUTION_DATE=2022-10-25T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-10-25T00:00:00+00:00
[2022-10-26, 00:04:09 UTC] {subprocess.py:63} INFO - Tmp dir root location:
/tmp
[2022-10-26, 00:04:09 UTC] {subprocess.py:75} INFO - Running command:
['/bin/bash', '-c', 'sleep 777']
[2022-10-26, 00:04:09 UTC] {subprocess.py:86} INFO - Output:
[2022-10-26, 00:17:06 UTC] {subprocess.py:97} INFO - Command exited with
return code 0
[2022-10-26, 00:17:06 UTC] {datahub.py:122} ERROR - 'BashOperator' object
has no attribute '_inlets'
[2022-10-26, 00:17:06 UTC] {datahub.py:123} INFO - Supressing error because
graceful_exceptions is set
[2022-10-26, 00:17:06 UTC] {taskinstance.py:1401} INFO - Marking task as
SUCCESS. dag_id=factory_load_test_v53_31, task_id=longrun2,
execution_date=20221025T000000, start_date=20221026T000407,
end_date=20221026T001706
[2022-10-26, 00:17:58 UTC] {standard_task_runner.py:100} ERROR - Failed to
execute job 56234 for task longrun2 ((MySQLdb.OperationalError) (1205, 'Lock
wait timeout exceeded; try restarting transaction')
[SQL: UPDATE dag_run SET last_scheduling_decision=%s WHERE dag_run.id = %s]
[parameters: (datetime.datetime(2022, 10, 26, 0, 3, 41, 951161), 8265)]
(Background on this error at: https://sqlalche.me/e/14/e3q8); 162)
[2022-10-26, 00:17:58 UTC] {local_task_job.py:164} INFO - Task exited with
return code 1
```
<img width="1436" alt="image"
src="https://user-images.githubusercontent.com/9200263/198032136-2f9c3e12-6ecd-4147-bd8b-2848e3689af3.png">
### What you think should happen instead
Updates to the dag_run table's last_scheduling_decision should be part of a
db retry strategy (like `run_with_db_retries`):
https://github.com/apache/airflow/blob/777b57f0c6a8ca16df2b96fd17c26eab56b3f268/airflow/models/dagrun.py#L553
Additionally, an Airflow retry is not attempted, even though the task itself
has retries.
### How to reproduce
Using MySQL 8 deployed via GCP CloudSQL with transaction isolation level set
to READ_COMMITTED and a default innodb_lock_wait_timeout of 50s.
```
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from random import randint
DEFAULT_TASK_ARGS = {
"owner": "gcp-data-platform",
"start_date": "2022-09-26",
"retries": 1,
"retry_delay": 300,
}
dag_id_prefix = "factory_load_test_v53_"
def generate_dag(key):
with DAG(
schedule_interval="@daily",
max_active_runs=1,
max_active_tasks=5,
catchup=False,
dag_id=f"{dag_id_prefix}{key}",
default_args=DEFAULT_TASK_ARGS,
) as dag:
start = EmptyOperator(task_id="start")
long_running_task_1 = BashOperator(
task_id="longrun1",
bash_command=f"sleep {randint(0, 1800)}",
)
long_running_task_2 = BashOperator(
task_id="longrun2",
bash_command=f"sleep {randint(0, 1200)}",
)
start >> [long_running_task_1, long_running_task_2]
return dag
for key in range(0, 1000):
globals()[f"{dag_id_prefix}{key}"] = generate_dag(key)
```
### Operating System
Debian 11
### Versions of Apache Airflow Providers
apache-airflow-providers-google==8.4.0
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
KubernetesExecutor with the following configurations:
```
AIRFLOW__CORE__DEFAULT_POOL_TASK_SLOT_COUNT: "100000"
# DAG factories can take a long time to process. Bump from default 50s to
# 90s to ensure that we aren't stuck in timeout dag processor loops
AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT: "90"
# Reduces load on db with less frequent updates; we double health check
# threshold as well in line with fewer heartbeats
AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC: "10"
# Need to keep job heartbeat at a high rate to avoid jobs being mistakened
# in queued state (even though they are actually running); 5s is the
default
AIRFLOW__SCHEDULER__JOB_HEARTBEAT_SEC: "5"
# Lower from default 512 to 100 to reduce the load on the DB
AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY: "100"
# Bump from 2 to 4 to speed up dag processing per scheduler loop
AIRFLOW__SCHEDULER__PARSING_PROCESSES: "4"
# Interval in which to run adopt_or_reset_orphaned_tasks in
scheduler_job.py
# which will externally mark SchedulerJobs as failed or LocalJob tasks to
None
# Default is 300s. Increase to avoid orphaning tasks
AIRFLOW__SCHEDULER__ORPHANED_TASKS_CHECK_INTERVAL: "600"
AIRFLOW__KUBERNETES__WORKER_PODS_CREATION_BATCH_SIZE: "5"
# Allow pods up to 20 mins to schedule themselves - defaults to 5 mins
# During periods of high load (like midnight UTC), pods may be slow to
schedule so allow
# them more time to do so
AIRFLOW__KUBERNETES__WORKER_PODS_PENDING_TIMEOUT: "1200"
AIRFLOW__KUBERNETES__WORKER_PODS_PENDING_TIMEOUT_BATCH_SIZE: "200"
# Num seconds to check for task instances stuck in `pending` status without
# a pod. Default is 120s but during periods of high load (like midnight
UTC),
# this can actually be a long time so increase to 480s
AIRFLOW__KUBERNETES__WORKER_PODS_PENDING_TIMEOUT_CHECK_INTERVAL: "480"
# Num seconds to check for task instances stuck in `queued` status without
# a pod. Default is 60s but during periods of high load (like midnight
UTC),
# this can actually be a long time so increase to 900s. Note, this check
will
# attempt to put pods in queued state to scheduled state
AIRFLOW__KUBERNETES__WORKER_PODS_QUEUED_CHECK_INTERVAL: "900"
# Note that if no _request_timeout is specified, the kubernetes client
will wait
# indefinitely for kubernetes api responses, which will cause the
scheduler to hang.
# The timeout is specified as [connect timeout, read timeout]
AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: '{"_request_timeout":
[60,60]}'
```
### Anything else
With 1k DAGs starting a midnight, this happens approximately 2 every night.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]