dlstadther-pl opened a new issue, #32434:
URL: https://github.com/apache/airflow/issues/32434
### Apache Airflow version
2.6.2
### What happened
After migrating from Airflow 2.2.3 to 2.6.2, we saw a large (~5-10x)
increase in DAG File Processing time for our dags. While we have some
anti-patterns with dag generation (dynamic dag generation and usage of 5
Airflow Variables), we have isolated the increase in processing duration to the
existence of Dag Params (see "How to Reproduce", below).
We're experiencing this issue in our most complex dag file. This dag file
creates 1 "main" dag which runs a TriggerDagRunOperator on each
"client-specific" dags for which it generates dynamically. Each client-specific
dag is assigned 5 Dag Params (which describe certain characteristics of the
client) and about 400 tasks.
Dag files which used to take 0.58s now take 2.88s; 3s now take 30s; 95s now
take 985s.
### What you think should happen instead
I believe DAG Processing is inefficient at serializing dag params during the
serialization of tasks.
(However, I have been unable to pinpoint a commit which caused a significant
change to the serialization of `DagBag.sync_to_db()` code).
### How to reproduce
I have reproduced the situation we experience locally with a representative
(but dumb) dag example which can show the increasing dag file processing
runtimes increase as the qty of Dag Params increase.
I realize this dag may be a bit complex and so I've also included the visual
representation of how the dags relate to each other and many of the Dag File
Processing times for 2.2.3 and 2.6.2 when using various qty of Dag Params in
the client-specific dag definitions.
## Code
```python
import datetime
import random
import time
from typing import Any, Dict, List
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.state import State
from airflow.utils.task_group import TaskGroup
# in practice, this CLIENTS_METADATA comes from an Airflow Variable which is
set by a diffent DAG which runs multiple times per day
# e.g. CLIENTS_METADATA: List[Dict[str, Any]] =
Variable.get('CLIENTS_METADATA', deserialize_json=True, default_var=list())
# for the sake of an example, we'll create fake client metadata and allow
the client qty to be configurable, but deterministic
client_qty = 150
CLIENTS_METADATA: List[Dict[str, Any]] = list()
for i in range(client_qty):
# if even, set certain values
if i // 2 == 0:
client_group = "4"
shared_resource_group = "us-01"
client_size = "normal"
else:
client_group = "3"
shared_resource_group = "us-02"
client_size = "large"
CLIENTS_METADATA.append(
{
"clientId": f"client{i}",
"clientGroup": client_group,
"sharedResourceGroup": shared_resource_group,
"isDataIsolated": False,
"resourceTags": {"size": client_size},
}
)
DE_STACK_NAME = "de-prod-us-01"
ARGS = {
'owner': 'airflow',
'start_date': datetime.datetime(2000, 1, 1, 0, 0, 0),
}
def task(**kwargs):
# would normally receive dag params and use them in the job,
# but this is a dumb example dag that doesn't actually do anything
rand_int = random.randint(1, 10)
print(f"Sleeping: {rand_int}s")
time.sleep(rand_int)
return rand_int
def build_abstract_dag(client: dict) -> DAG:
"""One copy of the graph per client"""
client_id = client.get('clientId')
client_group = client.get('clientGroup')
resource_tags = client.get('resourceTags')
resource_group = client.get('sharedResourceGroup')
dag_id = 'client-specific-dag-{}'.format(client_id)
dag = DAG(
dag_id=dag_id,
default_args=ARGS,
schedule_interval=None,
is_paused_upon_creation=False,
catchup=False,
# CHANGE THE QTY OF PARAMS HERE TO SEE THE IMPACT TO DAG PROCESSING
RUNTIME
params={
'client_id': client_id,
'client_group': client_group,
'resource_tags': resource_tags,
'resource_group': resource_group,
'airflow_stack': DE_STACK_NAME,
},
)
# mimic multiple sets of jobs
qty_task_groups = 100
for i in range(qty_task_groups):
with TaskGroup(f"task_group_{i}", dag=dag):
# mimic our generic job structure
# data generation job (t1)
# distribution jobs (t2 and t3)
# visualization or validation job (t4)
t1 = PythonOperator(dag=dag, task_id="task_1",
python_callable=task, retries=0)
t2 = PythonOperator(dag=dag, task_id="task_2",
python_callable=task, retries=0)
t3 = PythonOperator(dag=dag, task_id="task_3",
python_callable=task, retries=0)
t4 = PythonOperator(dag=dag, task_id="task_4",
python_callable=task, retries=0)
[t3, t2] << t1
t4 << t1
return dag
# after all clients complete, go back and requeue anything which didn't
succeed
# addresses network-related transient errors which aren't covered by default
retry config
def retry_all_main_graph_tasks(retry_meta: list, **context):
print(f"Retrying: {retry_meta}")
dag_meta = DAG(
dag_id='a-trigger-dag',
default_args=ARGS,
dagrun_timeout=datetime.timedelta(hours=8),
is_paused_upon_creation=True,
catchup=False,
schedule_interval='00 05 * * *', # daily, at 0500 UTC
)
trigger_dag_tasks = list()
# List containing meta around the retry tasks and triggered dag per client
# Each list record is a dict {'trigger_task_id': <str>, 'dag': <Dag>}
retry_main_graph_tasks_meta = list()
# Create 1 Abstract DAG per client
for client_data in CLIENTS_METADATA:
abstract_dag = build_abstract_dag(
client=client_data,
)
trigger_dag_id = abstract_dag.dag_id
globals()[trigger_dag_id] = abstract_dag
trigger_operator = TriggerDagRunOperator(
dag=dag_meta,
task_id='trigger_{}'.format(trigger_dag_id),
retries=1,
trigger_dag_id=trigger_dag_id,
wait_for_completion=True,
# "success" or "failed" states means the client DAG was triggered
successfully.
# trigger_l3_main_{CLIENT_ID} task failures indicate the DAG was
never triggered.
allowed_states=[State.SUCCESS, State.FAILED],
#
https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/operators/trigger_dagrun.html
# we cannot set failed_states to None or empty list, as it will set
failed_states = ['failed']
# in the BaseOperator initialization. The code snippet:
# `self.failed_states = failed_states or [State.FAILED]`
failed_states=['not-a-state'],
execution_date='{{ data_interval_start }}',
execution_timeout=datetime.timedelta(hours=2),
pool='main_graph_trigger',
)
trigger_dag_tasks.append(trigger_operator)
retry_main_graph_tasks_meta.append(
dict(
trigger_task_id=trigger_operator.task_id,
dag=abstract_dag,
)
)
retry_main_graph_tasks = PythonOperator(
dag=dag_meta,
task_id='retry_all_main_graph_tasks',
retries=1,
trigger_rule='all_done',
op_kwargs={
'retry_meta': retry_main_graph_tasks_meta,
},
pool='batch',
python_callable=retry_all_main_graph_tasks,
)
retry_main_graph_tasks << trigger_dag_tasks
```
Creates:

## Runtimes
* `client_qty = 1`
| Dag Param Qty | Runtime (2.2.3) | Runtime (2.6.2) |
| --------------- | -------- | - |
| 0 | 0.3s | 1s |
| 1 || 2s |
| 2 || 3s |
| 3 || 4s |
| 4 || 5s |
| 5 | 0.4s | 6s |
* `client_qty = 10`
| Dag Param Qty | Runtime (2.2.3) | Runtime (2.6.2) |
| --------------- | -------- | -|
| 0 | 0.3s | 4s |
| 1 || 20s |
| 2 || 25s |
| 3 || 30s |
| 4 || 35s |
| 5 | 3s | 40s |
* `client_qty = 150`
| Dag Param Qty | Runtime (2.2.3) | Runtime (2.6.2) |
| --------------- | -------- | - |
| 0 | 25s | 50s |
| 5 | 50s | 900s (15m) |
### Operating System
Debian 11 (bullseye)
### Versions of Apache Airflow Providers
_No response_
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
* kubernetes 1.24
* official helm chart 1.9.0
* standalone dag processor
* celery executor
* postgres database (with pgbouncer)
### Anything else
I've also recreated the same issue (with the sample code provided above)
using the [Airflow Docker Compose
setup](https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#).
Our issue differs from https://github.com/apache/airflow/issues/30593 and
https://github.com/apache/airflow/issues/30884 , as we are already on 2.6.x and
use the default value (5s) for `job_heartbeat_sec`.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]