potiuk commented on code in PR #42082:
URL: https://github.com/apache/airflow/pull/42082#discussion_r1797783844


##########
airflow/models/taskinstance.py:
##########
@@ -1760,16 +1758,16 @@ def _handle_reschedule(
     ti.end_date = timezone.utcnow()
     ti.set_duration()
 
-    # Lock DAG run to be sure not to get into a deadlock situation when trying 
to insert
-    # TaskReschedule which apparently also creates lock on corresponding 
DagRun entity
-    with_row_locks(
-        session.query(DagRun).filter_by(
-            dag_id=ti.dag_id,
-            run_id=ti.run_id,
-        ),
-        session=session,
-    ).one()
-    # Log reschedule request
+    # set state
+    ti.state = TaskInstanceState.UP_FOR_RESCHEDULE
+
+    ti.clear_next_method_args()
+
+    session.merge(ti)
+    session.commit()

Review Comment:
   Some more comments. Generally with Airflow 3 the "transaction" boundaries 
for any operation on the database will have to change heavily - because 
basically when you reach out to the API component, there is no way to make 
several calls in the same transaction - so the transaction boundary will be 
necessarily smaller than every API call implemented (this was one of the major 
issues with the "internal-api" approach that we tried to maintain the same 
transaction boundaries "with DB" and "without DB" and it led to some spaghetti 
code - where we had to move related DB code from one transaction together in a 
single `@internal_api` call.
   
   Here, I am not sure how "reschedule" will be converted to the new API - and 
where transaction boundaries will be, but in any case I think this code will be 
looking quite differently.



##########
airflow/models/taskinstance.py:
##########
@@ -1760,16 +1758,16 @@ def _handle_reschedule(
     ti.end_date = timezone.utcnow()
     ti.set_duration()
 
-    # Lock DAG run to be sure not to get into a deadlock situation when trying 
to insert
-    # TaskReschedule which apparently also creates lock on corresponding 
DagRun entity
-    with_row_locks(
-        session.query(DagRun).filter_by(
-            dag_id=ti.dag_id,
-            run_id=ti.run_id,
-        ),
-        session=session,
-    ).one()
-    # Log reschedule request
+    # set state
+    ti.state = TaskInstanceState.UP_FOR_RESCHEDULE
+
+    ti.clear_next_method_args()
+
+    session.merge(ti)
+    session.commit()

Review Comment:
   Some more explanation:
   
   Generally with Airflow 3 the "transaction" boundaries for any operation on 
the database will have to change heavily - because basically when you reach out 
to the API component, there is no way to make several calls in the same 
transaction - so the transaction boundary will be necessarily smaller than 
every API call implemented (this was one of the major issues with the 
"internal-api" approach that we tried to maintain the same transaction 
boundaries "with DB" and "without DB" and it led to some spaghetti code - where 
we had to move related DB code from one transaction together in a single 
`@internal_api` call.
   
   Here, I am not sure how "reschedule" will be converted to the new API - and 
where transaction boundaries will be, but in any case I think this code will be 
looking quite differently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to