potiuk commented on code in PR #59183: URL: https://github.com/apache/airflow/pull/59183#discussion_r2609922352
########## airflow-core/src/airflow/assets/manager.py: ########## @@ -54,6 +55,59 @@ log = structlog.get_logger(__name__) +@contextmanager +def _aquire_apdr_lock( + *, + session: Session, + dag_id: str, + partition_key: str, + asset_id: int, + max_retries: int = 10, + retry_delay: float = 0.05, +): + """ + Context manager to acquire a lock for AssetPartitionDagRun creation. + + - SQLite: uses AssetPartitionDagRunMutexLock table as row-level lock is not supported + - Postgres/MySQL: uses row-level lock on AssetModel. + """ + if get_dialect_name(session) == "sqlite": + from airflow.models.asset import AssetPartitionDagRunMutexLock + + for _ in range(max_retries): + try: + mutex = AssetPartitionDagRunMutexLock(target_dag_id=dag_id, partition_key=partition_key) + session.add(mutex) + session.flush() + try: + yield # mutex acquired + finally: + session.delete(mutex) Review Comment: I am not necessarily for removal, I am also not 100% sure how WAL works, but from what I understand - there is only one WAL log that you can add things to and once one writer starts writing, the other is not able to write and has to wait. WAL is basically: * multiple readers * readers do not block writers * there can be only one writer at a time if I read it correctly, it means that at the moment you start a "write" operation by any of the threads/processes (say INSERT flushed() via sqlalachemy) - until you commit, all other writers (i.e. any of the threads/processes that attempt to change the database - with UPDATE or INSERT or DELETE) will wait until the commit happens and you release the transaction. There is even `busyTimeout` parameter in sqlite that determines how long such a process that wants to write sometning will wait for the database write lock to be released until it returns "the database is busy" error. I am not entirely sure though - and that would likely need some testing. But if uunderstand it correctly we absolutely do not need a separate mutex - because the sqlite database in WAL mode cannot have concurrent writers anyway. It's enough to make "any" write operation at the start of the transaction - and no other write operation will start - so what we really need is to start ANY write operation - we do not need to have a separate Mutex table nor migration - simply ANY write operation is guaranteed to block any other write operation. Effectively - for write operations SQLite works as if we had a global advisory lock. Which means that attempts to do it more "fine-grained" are unneccessary. This is also confirmed by reading how sqlite driver works for sqlalchemy: https://docs.sqlalchemy.org/en/20/dialects/sqlite.html#legacy-transaction-mode-with-the-sqlite3-driver > The most important aspect of transaction handling with the sqlite3 driver is that it defaults (which will continue through Python 3.15 before being removed in Python 3.16) to legacy transactional behavior which does not strictly follow [PEP 249](https://peps.python.org/pep-0249/). The way in which the driver diverges from the PEP is that it does not “begin” a transaction automatically as dictated by [PEP 249](https://peps.python.org/pep-0249/) except in the case of DML statements, e.g. INSERT, UPDATE, and DELETE. Normally, [PEP 249](https://peps.python.org/pep-0249/) dictates that a BEGIN must be emitted upon the first SQL statement of any kind, so that all subsequent operations will be established within a transaction until connection.commit() has been called. The sqlite3 driver, in an effort to be easier to use in highly concurrent environments, skips this step for DQL (e.g. SELECT) statements, and also skips it for DDL (e.g. CREATE TABLE etc.) statements for more legacy r easons. Statements such as SAVEPOINT are also skipped. This means that if you do any DQL (not DML) operation - no "begin" transaction is issues - i.e. you process is "reader" rather than "writer" - but at the moment you do **any** DML operation - begin() is isssues, transaction is started and your thread becomes "writer" - as long as there is no other "writer" - otherwise it will wait. So, IMHO this whole sqlite case can be vastly simplified - simply running UPDATE on ADPR table or amy other table - before running SELECT to check if row exist will be enough. Maybe some other simpler way of starting write transaction will be even better, but I'd argue we absolutely do not need new mutex table and corresponding migration for sqlite. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
