vchiapaikeo opened a new issue, #27296:
URL: https://github.com/apache/airflow/issues/27296

   ### Apache Airflow version
   
   2.4.2
   
   ### What happened
   
   We are load testing our Airflow environment with the KubernetesExecutor and 
will observe lock wait timeouts happening every so often. This happens after 
the job itself completes (in this case, sleeping for a random amount of time) 
and marks the task as a failure without performing any retries.
   
   ```
   [2022-10-26, 00:04:07 UTC] {taskinstance.py:1165} INFO - Dependencies all 
met for <TaskInstance: factory_load_test_v53_31.longrun2 
scheduled__2022-10-25T00:00:00+00:00 [queued]>
   [2022-10-26, 00:04:07 UTC] {taskinstance.py:1165} INFO - Dependencies all 
met for <TaskInstance: factory_load_test_v53_31.longrun2 
scheduled__2022-10-25T00:00:00+00:00 [queued]>
   [2022-10-26, 00:04:07 UTC] {taskinstance.py:1362} INFO - 
   
--------------------------------------------------------------------------------
   [2022-10-26, 00:04:07 UTC] {taskinstance.py:1363} INFO - Starting attempt 1 
of 2
   [2022-10-26, 00:04:07 UTC] {taskinstance.py:1364} INFO - 
   
--------------------------------------------------------------------------------
   [2022-10-26, 00:04:07 UTC] {taskinstance.py:1383} INFO - Executing 
<Task(BashOperator): longrun2> on 2022-10-25 00:00:00+00:00
   [2022-10-26, 00:04:07 UTC] {standard_task_runner.py:55} INFO - Started 
process 162 to run task
   [2022-10-26, 00:04:07 UTC] {standard_task_runner.py:82} INFO - Running: 
['airflow', 'tasks', 'run', 'factory_load_test_v53_31', 'longrun2', 
'scheduled__2022-10-25T00:00:00+00:00', '--job-id', '56234', '--raw', 
'--subdir', 'DAGS_FOLDER/factory_load_test_v53.py', '--cfg-path', 
'/tmp/tmp82sxf8im']
   [2022-10-26, 00:04:07 UTC] {standard_task_runner.py:83} INFO - Job 56234: 
Subtask longrun2
   [2022-10-26, 00:04:08 UTC] {task_command.py:376} INFO - Running 
<TaskInstance: factory_load_test_v53_31.longrun2 
scheduled__2022-10-25T00:00:00+00:00 [running]> on host 
factoryloadtestv5331longrun2-be2fa24c168a4614aace786ac9cb6e78
   [2022-10-26, 00:04:09 UTC] {taskinstance.py:1590} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_OWNER=gcp-data-platform
   AIRFLOW_CTX_DAG_ID=factory_load_test_v53_31
   AIRFLOW_CTX_TASK_ID=longrun2
   AIRFLOW_CTX_EXECUTION_DATE=2022-10-25T00:00:00+00:00
   AIRFLOW_CTX_TRY_NUMBER=1
   AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-10-25T00:00:00+00:00
   [2022-10-26, 00:04:09 UTC] {subprocess.py:63} INFO - Tmp dir root location: 
    /tmp
   [2022-10-26, 00:04:09 UTC] {subprocess.py:75} INFO - Running command: 
['/bin/bash', '-c', 'sleep 777']
   [2022-10-26, 00:04:09 UTC] {subprocess.py:86} INFO - Output:
   [2022-10-26, 00:17:06 UTC] {subprocess.py:97} INFO - Command exited with 
return code 0
   [2022-10-26, 00:17:06 UTC] {datahub.py:122} ERROR - 'BashOperator' object 
has no attribute '_inlets'
   [2022-10-26, 00:17:06 UTC] {datahub.py:123} INFO - Supressing error because 
graceful_exceptions is set
   [2022-10-26, 00:17:06 UTC] {taskinstance.py:1401} INFO - Marking task as 
SUCCESS. dag_id=factory_load_test_v53_31, task_id=longrun2, 
execution_date=20221025T000000, start_date=20221026T000407, 
end_date=20221026T001706
   [2022-10-26, 00:17:58 UTC] {standard_task_runner.py:100} ERROR - Failed to 
execute job 56234 for task longrun2 ((MySQLdb.OperationalError) (1205, 'Lock 
wait timeout exceeded; try restarting transaction')
   [SQL: UPDATE dag_run SET last_scheduling_decision=%s WHERE dag_run.id = %s]
   [parameters: (datetime.datetime(2022, 10, 26, 0, 3, 41, 951161), 8265)]
   (Background on this error at: https://sqlalche.me/e/14/e3q8); 162)
   [2022-10-26, 00:17:58 UTC] {local_task_job.py:164} INFO - Task exited with 
return code 1
   ```
   
   <img width="1436" alt="image" 
src="https://user-images.githubusercontent.com/9200263/198032136-2f9c3e12-6ecd-4147-bd8b-2848e3689af3.png";>
   
   
   ### What you think should happen instead
   
   Updates to the dag_run table's last_scheduling_decision should be part of a 
db retry strategy (like `run_with_db_retries`):
   
   
https://github.com/apache/airflow/blob/777b57f0c6a8ca16df2b96fd17c26eab56b3f268/airflow/models/dagrun.py#L553
   
   Additionally, an Airflow retry is not attempted, even though the task itself 
has retries.
   
   ### How to reproduce
   
   Using MySQL 8 deployed via GCP CloudSQL with transaction isolation level set 
to READ_COMMITTED and a default innodb_lock_wait_timeout of 50s.
   
   ```
   from airflow import DAG
   from airflow.operators.bash import BashOperator
   from airflow.operators.empty import EmptyOperator
   from random import randint
   
   DEFAULT_TASK_ARGS = {
       "owner": "gcp-data-platform",
       "start_date": "2022-09-26",
       "retries": 1,
       "retry_delay": 300,
   }
   
   dag_id_prefix = "factory_load_test_v53_"
   
   
   def generate_dag(key):
   
       with DAG(
           schedule_interval="@daily",
           max_active_runs=1,
           max_active_tasks=5,
           catchup=False,
           dag_id=f"{dag_id_prefix}{key}",
           default_args=DEFAULT_TASK_ARGS,
       ) as dag:
   
           start = EmptyOperator(task_id="start")
   
           long_running_task_1 = BashOperator(
               task_id="longrun1",
               bash_command=f"sleep {randint(0, 1800)}",
           )
   
           long_running_task_2 = BashOperator(
               task_id="longrun2",
               bash_command=f"sleep {randint(0, 1200)}",
           )
   
           start >> [long_running_task_1, long_running_task_2]
           return dag
   
   
   for key in range(0, 1000):
       globals()[f"{dag_id_prefix}{key}"] = generate_dag(key)
   ```
   
   ### Operating System
   
   Debian 11
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-google==8.4.0
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   KubernetesExecutor with the following configurations:
   
   ```
     AIRFLOW__CORE__DEFAULT_POOL_TASK_SLOT_COUNT: "100000"
     # DAG factories can take a long time to process. Bump from default 50s to
     # 90s to ensure that we aren't stuck in timeout dag processor loops
     AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT: "90"
     # Reduces load on db with less frequent updates; we double health check
     # threshold as well in line with fewer heartbeats
     AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC: "10"
     # Need to keep job heartbeat at a high rate to avoid jobs being mistakened
     # in queued state (even though they are actually running); 5s is the 
default
     AIRFLOW__SCHEDULER__JOB_HEARTBEAT_SEC: "5"
     # Lower from default 512 to 100 to reduce the load on the DB
     AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY: "100"
     # Bump from 2 to 4 to speed up dag processing per scheduler loop
     AIRFLOW__SCHEDULER__PARSING_PROCESSES: "4"
     # Interval in which to run adopt_or_reset_orphaned_tasks in 
scheduler_job.py
     # which will externally mark SchedulerJobs as failed or LocalJob tasks to 
None
     # Default is 300s. Increase to avoid orphaning tasks
     AIRFLOW__SCHEDULER__ORPHANED_TASKS_CHECK_INTERVAL: "600"
     AIRFLOW__KUBERNETES__WORKER_PODS_CREATION_BATCH_SIZE: "5"
     # Allow pods up to 20 mins to schedule themselves - defaults to 5 mins
     # During periods of high load (like midnight UTC), pods may be slow to 
schedule so allow
     # them more time to do so
     AIRFLOW__KUBERNETES__WORKER_PODS_PENDING_TIMEOUT: "1200"
     AIRFLOW__KUBERNETES__WORKER_PODS_PENDING_TIMEOUT_BATCH_SIZE: "200"
     # Num seconds to check for task instances stuck in `pending` status without
     # a pod. Default is 120s but during periods of high load (like midnight 
UTC),
     # this can actually be a long time so increase to 480s
     AIRFLOW__KUBERNETES__WORKER_PODS_PENDING_TIMEOUT_CHECK_INTERVAL: "480"
     # Num seconds to check for task instances stuck in `queued` status without
     # a pod. Default is 60s but during periods of high load (like midnight 
UTC),
     # this can actually be a long time so increase to 900s. Note, this check 
will
     # attempt to put pods in queued state to scheduled state
     AIRFLOW__KUBERNETES__WORKER_PODS_QUEUED_CHECK_INTERVAL: "900"
     # Note that if no _request_timeout is specified, the kubernetes client 
will wait
     # indefinitely for kubernetes api responses, which will cause the 
scheduler to hang.
     # The timeout is specified as [connect timeout, read timeout]
     AIRFLOW__KUBERNETES__KUBE_CLIENT_REQUEST_ARGS: '{"_request_timeout": 
[60,60]}'
   ```
   
   ### Anything else
   
   With 1k DAGs starting a midnight, this happens approximately 2 every night.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to