potiuk commented on code in PR #59183:
URL: https://github.com/apache/airflow/pull/59183#discussion_r2609922352


##########
airflow-core/src/airflow/assets/manager.py:
##########
@@ -54,6 +55,59 @@
 log = structlog.get_logger(__name__)
 
 
+@contextmanager
+def _aquire_apdr_lock(
+    *,
+    session: Session,
+    dag_id: str,
+    partition_key: str,
+    asset_id: int,
+    max_retries: int = 10,
+    retry_delay: float = 0.05,
+):
+    """
+    Context manager to acquire a lock for AssetPartitionDagRun creation.
+
+    - SQLite: uses AssetPartitionDagRunMutexLock table as row-level lock is 
not supported
+    - Postgres/MySQL: uses row-level lock on AssetModel.
+    """
+    if get_dialect_name(session) == "sqlite":
+        from airflow.models.asset import AssetPartitionDagRunMutexLock
+
+        for _ in range(max_retries):
+            try:
+                mutex = AssetPartitionDagRunMutexLock(target_dag_id=dag_id, 
partition_key=partition_key)
+                session.add(mutex)
+                session.flush()
+                try:
+                    yield  # mutex acquired
+                finally:
+                    session.delete(mutex)

Review Comment:
   I am not necessarily for removal, I am also not 100% sure how WAL works, but 
from what I understand - there is only one WAL log that you can add things to 
and once one writer starts writing, the other is not able to write and has to 
wait. WAL is basically: 
   
   * multiple readers
   * readers do not block writers
   * there can be only one writer at a time
   
   if I read it correctly, it means that at the moment you start a "write" 
operation by any of the threads/processes (say INSERT flushed() via 
sqlalachemy) - until you commit, all other writers (i.e. any of the 
threads/processes that attempt to change the database - with UPDATE or INSERT 
or DELETE) will wait until the commit happens and you release the transaction. 
   
   There is even `busyTimeout` parameter in sqlite that determines how long 
such a process that wants to write sometning will wait for the database write 
lock to be released until it returns "the database is busy" error. 
   
   I am not entirely sure though - and that would likely need some testing. 
   
   But if uunderstand it correctly we absolutely do not need a separate mutex - 
because the sqlite database in WAL mode cannot have concurrent writers anyway. 
It's enough to make "any" write operation at the start of the transaction - and 
no other write operation will start - so what we really need is to start ANY 
write operation - we do not need to have a separate Mutex table nor migration - 
simply ANY write operation is guaranteed to block any other write operation. 
   
   Effectively - for write operations SQLite works as if we had a global 
advisory lock. Which means that attempts to do it more "fine-grained" are 
unneccessary. 
   
   This is also confirmed by reading how sqlite driver works for sqlalchemy:
   
   
https://docs.sqlalchemy.org/en/20/dialects/sqlite.html#legacy-transaction-mode-with-the-sqlite3-driver
   
   > The most important aspect of transaction handling with the sqlite3 driver 
is that it defaults (which will continue through Python 3.15 before being 
removed in Python 3.16) to legacy transactional behavior which does not 
strictly follow [PEP 249](https://peps.python.org/pep-0249/). The way in which 
the driver diverges from the PEP is that it does not “begin” a transaction 
automatically as dictated by [PEP 249](https://peps.python.org/pep-0249/) 
except in the case of DML statements, e.g. INSERT, UPDATE, and DELETE. 
Normally, [PEP 249](https://peps.python.org/pep-0249/) dictates that a BEGIN 
must be emitted upon the first SQL statement of any kind, so that all 
subsequent operations will be established within a transaction until 
connection.commit() has been called. The sqlite3 driver, in an effort to be 
easier to use in highly concurrent environments, skips this step for DQL (e.g. 
SELECT) statements, and also skips it for DDL (e.g. CREATE TABLE etc.) 
statements for more legacy r
 easons. Statements such as SAVEPOINT are also skipped.
   
   This means that if you do any DQL (not DML) operation - no "begin" 
transaction is issues - i.e. you process is "reader" rather than "writer" - but 
at the moment you do **any** DML operation - begin() is isssues, transaction is 
started and your thread becomes "writer" - as long as there is no other 
"writer" - otherwise it will wait. 
   
   So, IMHO this whole sqlite case can be vastly simplified - simply running 
UPDATE on ADPR table or amy other table - before running SELECT to check if row 
exist will be enough. Maybe some other simpler way of starting write 
transaction will be even better, but I'd argue we absolutely do not need new 
mutex table and corresponding migration for sqlite.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to