dstandish commented on code in PR #59183:
URL: https://github.com/apache/airflow/pull/59183#discussion_r2624589238


##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -54,6 +55,63 @@
 log = structlog.get_logger(__name__)
 
 
+@contextmanager
+def _acquire_apdr_lock(
+    *,
+    session: Session,
+    dag_id: str,
+    partition_key: str,
+    asset_id: int,
+    max_retries: int = 10,
+    retry_delay: float = 0.05,
+):
+    """
+    Context manager to acquire a lock for AssetPartitionDagRun creation.
+
+    - SQLite: Use a no-op ORM update to trigger a write-transaction and 
acquire SQLite's global writer lock.
+    - Postgres/MySQL: uses row-level lock on AssetModel.
+    """
+    if get_dialect_name(session) == "sqlite":
+        import time
+
+        from sqlalchemy import update
+
+        # no-op update
+        # This is used to acquire SQLite's global writer lock.
+        stmt = update(AssetModel).where(AssetModel.id == 
asset_id).values(id=AssetModel.id)
+        for _ in range(max_retries):
+            try:
+                session.execute(stmt)
+                session.flush()
+                try:
+                    # lock acquired
+                    yield
+                finally:
+                    session.flush()
+                return
+            except exc.OperationalError as err:
+                err_msg = str(err).lower()
+                if "locked" in err_msg or "busy" in err_msg:
+                    session.rollback()
+                    time.sleep(retry_delay)

Review Comment:
   It seems you might have the retry logic in the wrong place.
   
   I think the operational error you expect would only occur in 
session.execute(stmt), correct?
   
   But the way you have it, it would potentially even retry if something went 
wrong in the `yield` part.
   
   Do you think I have that right?
   
   If so you might want to move the try / except to only include the 
session.execute part, and you could just `continue` if you hit except.
   
   wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to