Nataneljpwd commented on code in PR #61680:
URL: https://github.com/apache/airflow/pull/61680#discussion_r2786139185
##########
airflow-core/src/airflow/dag_processing/collection.py:
##########
@@ -713,18 +728,33 @@ def _find_all_asset_aliases(dags:
Iterable[LazyDeserializedDAG]) -> Iterator[Ser
def _find_active_assets(name_uri_assets: Iterable[tuple[str, str]], session:
Session) -> set[tuple[str, str]]:
- return {
- (str(row[0]), str(row[1]))
- for row in session.execute(
- select(AssetModel.name, AssetModel.uri).where(
- tuple_(AssetModel.name, AssetModel.uri).in_(name_uri_assets),
- AssetModel.active.has(),
- AssetModel.scheduled_dags.any(
- DagScheduleAssetReference.dag.has(~DagModel.is_stale &
~DagModel.is_paused)
- ),
+ # Convert to list for chunking to avoid large IN clauses
+ name_uri_list = list(name_uri_assets)
+ if not name_uri_list:
+ return set()
+
+ log.debug(
+ "Finding active assets with chunked queries: total_assets=%d",
+ len(name_uri_list),
+ )
+
+ results = set()
+ for chunk in _chunk_list(name_uri_list, chunk_size=300):
Review Comment:
Why pass a chunk size if it is already 300 by default?
##########
airflow-core/tests/unit/dag_processing/test_collection.py:
##########
@@ -228,6 +230,176 @@ def test_change_asset_alias_property_sync_group(self,
dag_maker, session):
assert len(orm_aliases) == 1
assert next(iter(orm_aliases.values())).group == "new_group"
+ def test_chunk_list_utility(self):
Review Comment:
Usually private methods are not directly tested
##########
airflow-core/src/airflow/dag_processing/collection.py:
##########
@@ -713,18 +728,33 @@ def _find_all_asset_aliases(dags:
Iterable[LazyDeserializedDAG]) -> Iterator[Ser
def _find_active_assets(name_uri_assets: Iterable[tuple[str, str]], session:
Session) -> set[tuple[str, str]]:
- return {
- (str(row[0]), str(row[1]))
- for row in session.execute(
- select(AssetModel.name, AssetModel.uri).where(
- tuple_(AssetModel.name, AssetModel.uri).in_(name_uri_assets),
- AssetModel.active.has(),
- AssetModel.scheduled_dags.any(
- DagScheduleAssetReference.dag.has(~DagModel.is_stale &
~DagModel.is_paused)
- ),
+ # Convert to list for chunking to avoid large IN clauses
+ name_uri_list = list(name_uri_assets)
Review Comment:
Do we have to convert to a list?
##########
airflow-core/src/airflow/dag_processing/collection.py:
##########
@@ -713,18 +728,33 @@ def _find_all_asset_aliases(dags:
Iterable[LazyDeserializedDAG]) -> Iterator[Ser
def _find_active_assets(name_uri_assets: Iterable[tuple[str, str]], session:
Session) -> set[tuple[str, str]]:
- return {
- (str(row[0]), str(row[1]))
- for row in session.execute(
- select(AssetModel.name, AssetModel.uri).where(
- tuple_(AssetModel.name, AssetModel.uri).in_(name_uri_assets),
- AssetModel.active.has(),
- AssetModel.scheduled_dags.any(
- DagScheduleAssetReference.dag.has(~DagModel.is_stale &
~DagModel.is_paused)
- ),
+ # Convert to list for chunking to avoid large IN clauses
+ name_uri_list = list(name_uri_assets)
+ if not name_uri_list:
+ return set()
+
+ log.debug(
+ "Finding active assets with chunked queries: total_assets=%d",
+ len(name_uri_list),
+ )
+
+ results = set()
+ for chunk in _chunk_list(name_uri_list, chunk_size=300):
+ chunk_results = {
+ (str(row[0]), str(row[1]))
Review Comment:
Why convert to a string and not stay as an ORM object?
##########
airflow-core/src/airflow/dag_processing/collection.py:
##########
@@ -779,12 +809,23 @@ def sync_assets(self, *, session: Session) ->
dict[tuple[str, str], AssetModel]:
# Optimization: skip all database calls if no assets were collected.
if not self.assets:
return {}
- orm_assets: dict[tuple[str, str], AssetModel] = {
- (am.name, am.uri): am
- for am in session.scalars(
- select(AssetModel).where(tuple_(AssetModel.name,
AssetModel.uri).in_(self.assets))
- )
- }
+
+ log.debug(
+ "Syncing assets with chunked queries: total_assets=%d",
+ len(self.assets),
+ )
+
+ # Query existing assets in chunks to avoid large IN clauses
+ asset_keys = list(self.assets.keys())
+ orm_assets: dict[tuple[str, str], AssetModel] = {}
+ for chunk in _chunk_list(asset_keys, chunk_size=300):
+ chunk_assets = {
+ (am.name, am.uri): am
+ for am in session.scalars(
+ select(AssetModel).where(tuple_(AssetModel.name,
AssetModel.uri).in_(chunk))
+ )
+ }
+ orm_assets.update(chunk_assets)
Review Comment:
Same concern as above, why do we do it like so, where we update a
dictionary, and have a lot of RTT, it will be very slow, slower than what we
currently have
##########
airflow-core/src/airflow/dag_processing/collection.py:
##########
@@ -713,18 +728,33 @@ def _find_all_asset_aliases(dags:
Iterable[LazyDeserializedDAG]) -> Iterator[Ser
def _find_active_assets(name_uri_assets: Iterable[tuple[str, str]], session:
Session) -> set[tuple[str, str]]:
- return {
- (str(row[0]), str(row[1]))
- for row in session.execute(
- select(AssetModel.name, AssetModel.uri).where(
- tuple_(AssetModel.name, AssetModel.uri).in_(name_uri_assets),
- AssetModel.active.has(),
- AssetModel.scheduled_dags.any(
- DagScheduleAssetReference.dag.has(~DagModel.is_stale &
~DagModel.is_paused)
- ),
+ # Convert to list for chunking to avoid large IN clauses
+ name_uri_list = list(name_uri_assets)
+ if not name_uri_list:
+ return set()
+
+ log.debug(
+ "Finding active assets with chunked queries: total_assets=%d",
+ len(name_uri_list),
+ )
+
+ results = set()
+ for chunk in _chunk_list(name_uri_list, chunk_size=300):
+ chunk_results = {
+ (str(row[0]), str(row[1]))
+ for row in session.execute(
+ select(AssetModel.name, AssetModel.uri).where(
+ tuple_(AssetModel.name, AssetModel.uri).in_(chunk),
+ AssetModel.active.has(),
+ AssetModel.scheduled_dags.any(
+ DagScheduleAssetReference.dag.has(~DagModel.is_stale &
~DagModel.is_paused)
+ ),
+ )
)
- )
- }
+ }
+ results.update(chunk_results)
Review Comment:
I can see this approach becoming slow as it relies on RTT, imagine we have
to make 100 queries to the database one after the other, just in RTT it would
take over a few seconds and hang for longer than what we currently have
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]