This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d2b61976a4 Share data loader to across asyncio boto sessions (#40658)
d2b61976a4 is described below
commit d2b61976a4f8f73286906e2a6884d836a11fe4fb
Author: Daniel Standish <[email protected]>
AuthorDate: Mon Jul 8 13:03:10 2024 -0700
Share data loader to across asyncio boto sessions (#40658)
By default, a botocore session creates and caches an instance of
JSONDecoder which
consumes a lot of memory. This issue was reported here
https://github.com/boto/botocore/issues/3078.
In the context of triggers which use boto sessions, this can result in
excessive
memory usage and as a result reduced capacity on the triggerer. We can
reduce
memory footprint by sharing the loader instance across the sessions.
---
airflow/providers/amazon/aws/hooks/base_aws.py | 17 ++++++++++++++++-
1 file changed, 16 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/amazon/aws/hooks/base_aws.py
b/airflow/providers/amazon/aws/hooks/base_aws.py
index 0d88ded9b9..0d07bb1649 100644
--- a/airflow/providers/amazon/aws/hooks/base_aws.py
+++ b/airflow/providers/amazon/aws/hooks/base_aws.py
@@ -70,6 +70,19 @@ if TYPE_CHECKING:
from airflow.models.connection import Connection # Avoid circular imports.
+_loader = botocore.loaders.Loader()
+"""
+botocore data loader to be used with async sessions
+
+By default, a botocore session creates and caches an instance of JSONDecoder
which
+consumes a lot of memory. This issue was reported here
https://github.com/boto/botocore/issues/3078.
+In the context of triggers which use boto sessions, this can result in
excessive
+memory usage and as a result reduced capacity on the triggerer. We can reduce
+memory footprint by sharing the loader instance across the sessions.
+
+:meta private:
+"""
+
class BaseSessionFactory(LoggingMixin):
"""
@@ -155,7 +168,9 @@ class BaseSessionFactory(LoggingMixin):
def get_async_session(self):
from aiobotocore.session import get_session as async_get_session
- return async_get_session()
+ session = async_get_session()
+ session.register_component("data_loader", _loader)
+ return session
def create_session(
self, deferrable: bool = False