Lee-W commented on code in PR #59183: URL: https://github.com/apache/airflow/pull/59183#discussion_r2625489132
########## airflow-core/src/airflow/assets/manager.py: ########## @@ -54,6 +55,63 @@ log = structlog.get_logger(__name__) +@contextmanager +def _acquire_apdr_lock( + *, + session: Session, + dag_id: str, + partition_key: str, + asset_id: int, + max_retries: int = 10, + retry_delay: float = 0.05, +): + """ + Context manager to acquire a lock for AssetPartitionDagRun creation. + + - SQLite: Use a no-op ORM update to trigger a write-transaction and acquire SQLite's global writer lock. + - Postgres/MySQL: uses row-level lock on AssetModel. + """ + if get_dialect_name(session) == "sqlite": + import time + + from sqlalchemy import update + + # no-op update + # This is used to acquire SQLite's global writer lock. + stmt = update(AssetModel).where(AssetModel.id == asset_id).values(id=AssetModel.id) + for _ in range(max_retries): + try: + session.execute(stmt) + session.flush() + try: + # lock acquired + yield + finally: + session.flush() + return + except exc.OperationalError as err: + err_msg = str(err).lower() + if "locked" in err_msg or "busy" in err_msg: + session.rollback() + time.sleep(retry_delay) Review Comment: yes, thanks for pointing out! just updated it! ########## airflow-core/src/airflow/assets/manager.py: ########## @@ -54,6 +55,63 @@ log = structlog.get_logger(__name__) +@contextmanager +def _acquire_apdr_lock( Review Comment: updated ########## airflow-core/src/airflow/assets/manager.py: ########## @@ -54,6 +55,63 @@ log = structlog.get_logger(__name__) +@contextmanager +def _acquire_apdr_lock( + *, + session: Session, + dag_id: str, + partition_key: str, + asset_id: int, + max_retries: int = 10, + retry_delay: float = 0.05, Review Comment: updated it to 0.1. hope it's better now ########## airflow-core/src/airflow/assets/manager.py: ########## @@ -54,6 +55,63 @@ log = structlog.get_logger(__name__) +@contextmanager +def _acquire_apdr_lock( + *, + session: Session, + dag_id: str, + partition_key: str, Review Comment: yep, removed. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
