vchiapaikeo commented on PR #33172: URL: https://github.com/apache/airflow/pull/33172#issuecomment-1677501450
So it turns out that this issue actually stemmed from something completely
unrelated. Our team will post a discussion / issue about that later. Long story
short, we were using `AIRFLOW__DATABASE__SQL_ALCHEMY_CONN_CMD` instead of
`AIRFLOW__DATABASE__SQL_ALCHEMY_CONN` to define our database connection string.
A trace showed that while making a bulk fetch query (a simple .all() on the
`trigger` table), we are continually making calls to `configuration.py` to
subprocess out for the connection string for EVERY record that is returned
checking whether or not the database supported json 😠. Switching to defining
the connection with `AIRFLOW__DATABASE__SQL_ALCHEMY_CONN` resolved our issues
and gave us huge performance boosts on both our scheduler and triggerer
processes.
```
65686 function calls (64626 primitive calls) in 29.928 seconds
Ordered by: cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
[506/45463]
1 0.000 0.000 29.928 29.928
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/query.py:2757(all)
1 0.000 0.000 29.923 29.923
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/result.py:1468(all)
1 0.000 0.000 29.923 29.923
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/result.py:395(_allrows)
1 0.000 0.000 29.923 29.923
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/result.py:1388(_fetchall_impl)
1 0.000 0.000 29.923 29.923
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/result.py:1808(_fetchall_impl)
2 0.000 0.000 29.922 14.961
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/loading.py:135(chunks)
1 0.000 0.000 29.921 29.921
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/result.py:390(_raw_all_rows)
1 0.001 0.001 29.921 29.921
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/result.py:393(<listcomp>)
125 0.000 0.000 29.919 0.239
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/sql/type_api.py:1711(process)
125 0.002 0.000 29.915 0.239
/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/sqlalchemy.py:146(process_result_value)
125 0.001 0.000 29.909 0.239
/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/sqlalchemy.py:122(db_supports_json)
125 0.001 0.000 29.908 0.239
/home/airflow/.local/lib/python3.10/site-packages/airflow/configuration.py:562(get)
125 0.000 0.000 29.907 0.239
/home/airflow/.local/lib/python3.10/site-packages/airflow/configuration.py:732(_get_environment_variables)
125 0.002 0.000 29.907 0.239
/home/airflow/.local/lib/python3.10/site-packages/airflow/configuration.py:478(_get_env_var_option)
125 0.002 0.000 29.902 0.239
/home/airflow/.local/lib/python3.10/site-packages/airflow/configuration.py:103(run_command)
125 0.001 0.000 29.786 0.238
/usr/local/lib/python3.10/subprocess.py:1110(communicate)
125 0.006 0.000 29.785 0.238
/usr/local/lib/python3.10/subprocess.py:1952(_communicate)
250 0.003 0.000 29.762 0.119
/usr/local/lib/python3.10/selectors.py:403(select)
250 29.758 0.119 29.758 0.119 {method 'poll' of
'select.poll' objects}
125 0.002 0.000 0.100 0.001
/usr/local/lib/python3.10/subprocess.py:758(__init__)
125 0.004 0.000 0.094 0.001
/usr/local/lib/python3.10/subprocess.py:1687(_execute_child)
125 0.069 0.001 0.069 0.001 {built-in method
_posixsubprocess.fork_exec}
125 0.001 0.000 0.013 0.000
/usr/local/lib/python3.10/shlex.py:305(split)
500 0.001 0.000 0.010 0.000
/usr/local/lib/python3.10/shlex.py:299(__next__)
500 0.001 0.000 0.010 0.000
/usr/local/lib/python3.10/shlex.py:101(get_token)
500 0.007 0.000 0.009 0.000
/usr/local/lib/python3.10/shlex.py:133(read_token)
1375 0.001 0.000 0.006 0.000
/usr/local/lib/python3.10/subprocess.py:1775(<genexpr>)
125 0.002 0.000 0.006 0.000
/usr/local/lib/python3.10/os.py:620(get_exec_path)
250 0.000 0.000 0.005 0.000
/usr/local/lib/python3.10/subprocess.py:1204(wait)
1 0.000 0.000 0.005 0.005
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/query.py:2911(_iter)
1 0.000 0.000 0.005 0.005
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py:1563(execute)
250 0.001 0.000 0.005 0.000
/usr/local/lib/python3.10/subprocess.py:1911(_wait)
125 0.001 0.000 0.004 0.000
/usr/local/lib/python3.10/subprocess.py:1227(_close_pipe_fds)
1057/125 0.002 0.000 0.004 0.000
/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py:458(deserialize)
1 0.000 0.000 0.004 0.004
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1691(_execute_20)
1 0.000 0.000 0.004 0.004
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py:330(_execute_on_connection)
1 0.000 0.000 0.004 0.004
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1523(_execute_clauseelement)
125 0.000 0.000 0.004 0.000
/usr/local/lib/python3.10/subprocess.py:1898(_try_wait)
1250 0.003 0.000 0.004 0.000
/usr/local/lib/python3.10/posixpath.py:71(join)
1 0.000 0.000 0.004 0.004
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1768(_execute_context)
125 0.000 0.000 0.004 0.000
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/sql/sqltypes.py:2675(process)
125 0.004 0.000 0.004 0.000 {built-in method
posix.waitpid}
125 0.001 0.000 0.003 0.000
/usr/local/lib/python3.10/json/__init__.py:299(loads)
1 0.000 0.000 0.003 0.003
/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:735(do_execute)
1 0.000 0.000 0.003 0.003
/home/airflow/.local/lib/python3.10/site-packages/MySQLdb/cursors.py:171(execute)
250 0.001 0.000 0.003 0.000
/usr/local/lib/python3.10/selectors.py:366(unregister)
625 0.001 0.000 0.003 0.000
/usr/local/lib/python3.10/os.py:675(__getitem__)
250 0.001 0.000 0.003 0.000
/usr/local/lib/python3.10/selectors.py:352(register)
125 0.001 0.000 0.003 0.000
/usr/local/lib/python3.10/json/decoder.py:332(decode)
240/125 0.001 0.000 0.003 0.000
/home/airflow/.local/lib/python3.10/site-packages/airflow/serialization/serialized_objects.py:476(<dictcomp>)
[...truncated]
```
Relevant callsites:
- Processing result value and checking if db supports json for EVERY row:
https://github.com/apache/airflow/blob/2.5.3/airflow/utils/sqlalchemy.py#L146-L156
- Conditional that is checked for EVERY row:
https://github.com/apache/airflow/blob/2.5.3/airflow/utils/sqlalchemy.py#L122-L124
- Subprocessing to get connection string again for EVERY row:
https://github.com/apache/airflow/blob/2.5.3/airflow/configuration.py#L485-L488
Triggerer Model:
https://github.com/apache/airflow/blob/2.5.3/airflow/models/trigger.py#L57
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
