kaxil commented on code in PR #62501:
URL: https://github.com/apache/airflow/pull/62501#discussion_r3321613223
##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -136,6 +137,34 @@ def _add_one(asset: SerializedAsset) -> AssetModel:
return [_add_one(a) for a in assets]
+ @classmethod
+ def create_asset_event(cls, *, session: Session, **event_kwargs) ->
AssetEvent:
+ """
+ Persist an :class:`AssetEvent` row and return it, bound to *session*.
+
+ For non-SQLite backends a short-lived independent session is used so
+ that the row is committed (and therefore visible to the scheduler's
+ session) before the caller continues. SQLite does not support
+ concurrent connections, so the event is added directly to the caller's
+ *session* and flushed instead.
+ """
+ # Create a short-lived session to populate asset event in db.
+ # This is to ensure the asset event is committed and visible to other
sessions.
+ # e.g. Scheduler's session when it looks for new asset events to
trigger dags via ADRQ.
+ # Use ``scoped=False`` to get a truly independent session with its own
connection/transaction.
+ with create_session(scoped=False) as ae_session:
+ # SQLite cannot have two concurrent connections to the same file,
so opening a second session would deadlock.
+ if get_dialect_name(session) == "sqlite":
+ ae_session.execute(text("BEGIN IMMEDIATE"))
+ _asset_event = AssetEvent(**event_kwargs)
+ ae_session.add(_asset_event)
+ ae_session.flush()
+ asset_event_id = _asset_event.id
+
+ # Re-load the now-committed AssetEvent into the caller's session so
that
+ # subsequent relationship operations work correctly.
+ return session.get_one(AssetEvent, asset_event_id)
Review Comment:
Committing the AssetEvent in this independent session, before the caller's
transaction commits the alias association, ADRQ rows, and TI state change,
means a failure after this point leaves the event persisted with no ADRQ for
the scheduler to consume, or a duplicate event if the caller retries
`register_asset_change`.
I know the event-before-ADRQ visibility is the whole point of the separate
session, and I'm not asking to change that. But the durability tradeoff
(orphaned event, or duplicate on retry) should be called out in the PR
description and a newsfragment so operators understand the new failure mode.
##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -136,6 +137,34 @@ def _add_one(asset: SerializedAsset) -> AssetModel:
return [_add_one(a) for a in assets]
+ @classmethod
+ def create_asset_event(cls, *, session: Session, **event_kwargs) ->
AssetEvent:
+ """
+ Persist an :class:`AssetEvent` row and return it, bound to *session*.
+
+ For non-SQLite backends a short-lived independent session is used so
+ that the row is committed (and therefore visible to the scheduler's
+ session) before the caller continues. SQLite does not support
+ concurrent connections, so the event is added directly to the caller's
+ *session* and flushed instead.
+ """
+ # Create a short-lived session to populate asset event in db.
+ # This is to ensure the asset event is committed and visible to other
sessions.
+ # e.g. Scheduler's session when it looks for new asset events to
trigger dags via ADRQ.
+ # Use ``scoped=False`` to get a truly independent session with its own
connection/transaction.
+ with create_session(scoped=False) as ae_session:
+ # SQLite cannot have two concurrent connections to the same file,
so opening a second session would deadlock.
+ if get_dialect_name(session) == "sqlite":
+ ae_session.execute(text("BEGIN IMMEDIATE"))
Review Comment:
The docstring says SQLite adds the event directly to the caller's session
and flushes it, and the inline comment right above this line says opening a
second session for SQLite would deadlock. But the code does open a second
session via `create_session(scoped=False)` for SQLite too, just running `BEGIN
IMMEDIATE` on it first.
That `BEGIN IMMEDIATE` is the problem on SQLite. The new `session.flush()`
you added in `register_asset_changes_in_db` (taskinstance.py, the
`partition_key` backfill when `len(runtime_pks) == 1`) writes the
`dag_run.partition_key` UPDATE on the caller's connection, which takes SQLite's
RESERVED lock. That lock is held until commit, and `create_asset_event` runs
before any commit, so this second connection's `BEGIN IMMEDIATE` competes for
the same RESERVED lock and raises `sqlite3.OperationalError: database is
locked`.
File-based SQLite uses a real QueuePool, so the two sessions are genuinely
separate connections. In-memory SQLite (most unit tests) shares a single
connection, which is why existing tests don't surface this, and the existing
partition tests pass `outlet_events=[]` so they never reach this branch.
Worth a test that emits an outlet event with a single runtime
`partition_key` under `-b sqlite`, and reconciling the docstring plus the
comment above with what the code actually does for SQLite.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]