dingo4dev commented on code in PR #62501:
URL: https://github.com/apache/airflow/pull/62501#discussion_r3321669966
##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -136,6 +137,34 @@ def _add_one(asset: SerializedAsset) -> AssetModel:
return [_add_one(a) for a in assets]
+ @classmethod
+ def create_asset_event(cls, *, session: Session, **event_kwargs) ->
AssetEvent:
+ """
+ Persist an :class:`AssetEvent` row and return it, bound to *session*.
+
+ For non-SQLite backends a short-lived independent session is used so
+ that the row is committed (and therefore visible to the scheduler's
+ session) before the caller continues. SQLite does not support
+ concurrent connections, so the event is added directly to the caller's
+ *session* and flushed instead.
+ """
+ # Create a short-lived session to populate asset event in db.
+ # This is to ensure the asset event is committed and visible to other
sessions.
+ # e.g. Scheduler's session when it looks for new asset events to
trigger dags via ADRQ.
+ # Use ``scoped=False`` to get a truly independent session with its own
connection/transaction.
+ with create_session(scoped=False) as ae_session:
+ # SQLite cannot have two concurrent connections to the same file,
so opening a second session would deadlock.
+ if get_dialect_name(session) == "sqlite":
+ ae_session.execute(text("BEGIN IMMEDIATE"))
Review Comment:
@kaxil Thank you for the comprehensive review. I'm still working on it and
this explanation help me a lot.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]