dlstadther-pl opened a new issue, #32434:
URL: https://github.com/apache/airflow/issues/32434

   ### Apache Airflow version
   
   2.6.2
   
   ### What happened
   
   After migrating from Airflow 2.2.3 to 2.6.2, we saw a large (~5-10x) 
increase in DAG File Processing time for our dags. While we have some 
anti-patterns with dag generation (dynamic dag generation and usage of 5 
Airflow Variables), we have isolated the increase in processing duration to the 
existence of Dag Params (see "How to Reproduce", below).
   
   We're experiencing this issue in our most complex dag file. This dag file 
creates 1 "main" dag which runs a TriggerDagRunOperator on each 
"client-specific" dags for which it generates dynamically. Each client-specific 
dag is assigned 5 Dag Params (which describe certain characteristics of the 
client) and about 400 tasks.
   
   Dag files which used to take 0.58s now take 2.88s; 3s now take 30s; 95s now 
take 985s.
   
   ### What you think should happen instead
   
   I believe DAG Processing is inefficient at serializing dag params during the 
serialization of tasks.
   
   (However, I have been unable to pinpoint a commit which caused a significant 
change to the serialization of `DagBag.sync_to_db()` code).
   
   ### How to reproduce
   
   I have reproduced the situation we experience locally with a representative 
(but dumb) dag example which can show the increasing dag file processing 
runtimes increase as the qty of Dag Params increase.
   
   I realize this dag may be a bit complex and so I've also included the visual 
representation of how the dags relate to each other and many of the Dag File 
Processing times for 2.2.3 and 2.6.2 when using various qty of Dag Params in 
the client-specific dag definitions.
   
   ## Code
   ```python
   import datetime
   import random
   import time
   from typing import Any, Dict, List
   
   from airflow.models import DAG
   from airflow.operators.python import PythonOperator
   from airflow.operators.trigger_dagrun import TriggerDagRunOperator
   from airflow.utils.state import State
   from airflow.utils.task_group import TaskGroup
   
   
   # in practice, this CLIENTS_METADATA comes from an Airflow Variable which is 
set by a diffent DAG which runs multiple times per day
   # e.g. CLIENTS_METADATA: List[Dict[str, Any]] = 
Variable.get('CLIENTS_METADATA', deserialize_json=True, default_var=list())
   # for the sake of an example, we'll create fake client metadata and allow 
the client qty to be configurable, but deterministic
   client_qty = 150
   CLIENTS_METADATA: List[Dict[str, Any]] = list()
   for i in range(client_qty):
       # if even, set certain values
       if i // 2 == 0:
           client_group = "4"
           shared_resource_group = "us-01"
           client_size = "normal"
       else:
           client_group = "3"
           shared_resource_group = "us-02"
           client_size = "large"
       CLIENTS_METADATA.append(
           {
               "clientId": f"client{i}",
               "clientGroup": client_group,
               "sharedResourceGroup": shared_resource_group,
               "isDataIsolated": False,
               "resourceTags": {"size": client_size},
           }
       )
   
   DE_STACK_NAME = "de-prod-us-01"
   
   
   ARGS = {
       'owner': 'airflow',
       'start_date': datetime.datetime(2000, 1, 1, 0, 0, 0),
   }
   
   
   def task(**kwargs):
       # would normally receive dag params and use them in the job,
       # but this is a dumb example dag that doesn't actually do anything
       rand_int = random.randint(1, 10)
       print(f"Sleeping: {rand_int}s")
       time.sleep(rand_int)
       return rand_int
   
   
   def build_abstract_dag(client: dict) -> DAG:
       """One copy of the graph per client"""
       client_id = client.get('clientId')
       client_group = client.get('clientGroup')
       resource_tags = client.get('resourceTags')
       resource_group = client.get('sharedResourceGroup')
   
       dag_id = 'client-specific-dag-{}'.format(client_id)
   
       dag = DAG(
           dag_id=dag_id,
           default_args=ARGS,
           schedule_interval=None,
           is_paused_upon_creation=False,
           catchup=False,
           # CHANGE THE QTY OF PARAMS HERE TO SEE THE IMPACT TO DAG PROCESSING 
RUNTIME
           params={
               'client_id': client_id,
               'client_group': client_group,
               'resource_tags': resource_tags,
               'resource_group': resource_group,
               'airflow_stack': DE_STACK_NAME,
           },
       )
   
       # mimic multiple sets of jobs
       qty_task_groups = 100
       for i in range(qty_task_groups):
           with TaskGroup(f"task_group_{i}", dag=dag):
               # mimic our generic job structure
               #   data generation job (t1)
               #   distribution jobs (t2 and t3)
               #   visualization or validation job (t4)
               t1 = PythonOperator(dag=dag, task_id="task_1", 
python_callable=task, retries=0)
               t2 = PythonOperator(dag=dag, task_id="task_2", 
python_callable=task, retries=0)
               t3 = PythonOperator(dag=dag, task_id="task_3", 
python_callable=task, retries=0)
               t4 = PythonOperator(dag=dag, task_id="task_4", 
python_callable=task, retries=0)
   
               [t3, t2] << t1
               t4 << t1
   
       return dag
   
   
   # after all clients complete, go back and requeue anything which didn't 
succeed
   # addresses network-related transient errors which aren't covered by default 
retry config
   def retry_all_main_graph_tasks(retry_meta: list, **context):
       print(f"Retrying: {retry_meta}")
   
   
   dag_meta = DAG(
       dag_id='a-trigger-dag',
       default_args=ARGS,
       dagrun_timeout=datetime.timedelta(hours=8),
       is_paused_upon_creation=True,
       catchup=False,
       schedule_interval='00 05 * * *',  # daily, at 0500 UTC
   )
   
   trigger_dag_tasks = list()
   # List containing meta around the retry tasks and triggered dag per client
   # Each list record is a dict {'trigger_task_id': <str>, 'dag': <Dag>}
   retry_main_graph_tasks_meta = list()
   
   # Create 1 Abstract DAG per client
   for client_data in CLIENTS_METADATA:
       abstract_dag = build_abstract_dag(
           client=client_data,
       )
       trigger_dag_id = abstract_dag.dag_id
       globals()[trigger_dag_id] = abstract_dag
   
       trigger_operator = TriggerDagRunOperator(
           dag=dag_meta,
           task_id='trigger_{}'.format(trigger_dag_id),
           retries=1,
           trigger_dag_id=trigger_dag_id,
           wait_for_completion=True,
           # "success" or "failed" states means the client DAG was triggered 
successfully.
           # trigger_l3_main_{CLIENT_ID} task failures indicate the DAG was 
never triggered.
           allowed_states=[State.SUCCESS, State.FAILED],
           # 
https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/operators/trigger_dagrun.html
           # we cannot set failed_states to None or empty list, as it will set 
failed_states = ['failed']
           # in the BaseOperator initialization. The code snippet:
           # `self.failed_states = failed_states or [State.FAILED]`
           failed_states=['not-a-state'],
           execution_date='{{ data_interval_start }}',
           execution_timeout=datetime.timedelta(hours=2),
           pool='main_graph_trigger',
       )
       trigger_dag_tasks.append(trigger_operator)
       retry_main_graph_tasks_meta.append(
           dict(
               trigger_task_id=trigger_operator.task_id,
               dag=abstract_dag,
           )
       )
   
   retry_main_graph_tasks = PythonOperator(
       dag=dag_meta,
       task_id='retry_all_main_graph_tasks',
       retries=1,
       trigger_rule='all_done',
       op_kwargs={
           'retry_meta': retry_main_graph_tasks_meta,
       },
       pool='batch',
       python_callable=retry_all_main_graph_tasks,
   )
   retry_main_graph_tasks << trigger_dag_tasks
   ```
   Creates:
   
![airflow-sample-dag](https://github.com/apache/airflow/assets/55100432/2dbce1ef-da76-4e98-b5d0-92681d3dd0b4)
   
   
   ## Runtimes
   * `client_qty = 1`
   
   | Dag Param Qty | Runtime (2.2.3) | Runtime (2.6.2) |
   | --------------- | -------- | -  |
   | 0 | 0.3s | 1s |
   | 1 || 2s |
   | 2 || 3s |
   | 3 || 4s |
   | 4 || 5s |
   | 5 | 0.4s | 6s |
   
   * `client_qty = 10`
   
   | Dag Param Qty | Runtime (2.2.3) | Runtime (2.6.2) |
   | --------------- | -------- | -|
   | 0 | 0.3s | 4s |
   | 1 || 20s |
   | 2 || 25s |
   | 3 || 30s |
   | 4 || 35s |
   | 5 | 3s | 40s |
   
   * `client_qty = 150`
   
   | Dag Param Qty | Runtime (2.2.3) | Runtime (2.6.2) |
   | --------------- | -------- | - |
   | 0 | 25s | 50s |
   | 5 | 50s | 900s (15m) |
   
   
   ### Operating System
   
   Debian 11 (bullseye)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   * kubernetes 1.24
   * official helm chart 1.9.0
   * standalone dag processor
   * celery executor
   * postgres database (with pgbouncer)
   
   ### Anything else
   
   I've also recreated the same issue (with the sample code provided above) 
using the [Airflow Docker Compose 
setup](https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#).
   
   Our issue differs from https://github.com/apache/airflow/issues/30593 and 
https://github.com/apache/airflow/issues/30884 , as we are already on 2.6.x and 
use the default value (5s) for `job_heartbeat_sec`.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to