alexkruc commented on code in PR #25280:
URL: https://github.com/apache/airflow/pull/25280#discussion_r938690062
##########
airflow/models/dag.py:
##########
@@ -2557,6 +2569,14 @@ def bulk_write_to_db(cls, dags: Collection["DAG"],
session=NEW_SESSION):
orm_dag.tags.append(dag_tag_orm)
session.add(dag_tag_orm)
+ orm_dag_links = orm_dag.dag_owner_links or []
+ for orm_dag_link in orm_dag_links:
+ if orm_dag_link not in dag.owner_links:
+ session.delete(orm_dag_link)
+ for owner_name, owner_link in dag.owner_links.items():
+ dag_owner_orm = DagOwnerAttributes(dag_id=dag.dag_id,
owner=owner_name, link=owner_link)
+ session.add(dag_owner_orm)
Review Comment:
Can you elaborate a bit on your comment? I think I didn't fully get your
intention :(
I tried to change it to use `session. bulk_save_objects ` or `session.
bulk_insert_mappings`, but it's not going to work because we can't use it
because it breaches the foreign key constraint, as when we try to do this, the
`dag` table is not yet populated.
It gets populated in this function, but only when we call `session.flush()`
here:
https://github.com/apache/airflow/blob/main/airflow/models/dag.py#L2623
AFAIK, running `session.add()` multiple times is adding a transaction, but
it's "lazy" because nothing happens until we run `flush()`. BTW, this
implementation is using the same flow as we use to update the tags, here:
https://github.com/apache/airflow/blob/main/airflow/models/dag.py#L2547
So this is why I used it 😅
Can you please explain a bit more or provide an example of the intention of
this comment, so maybe I'll understand it more clearly and be able to add this
enhancement?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]