potiuk commented on a change in pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#discussion_r800911499
##########
File path: airflow/models/taskinstance.py
##########
@@ -1671,11 +1671,24 @@ def _handle_reschedule(
# Don't record reschedule request in test mode
if test_mode:
return
+
+ from airflow.models.dagrun import DagRun # Avoid circular import
+
self.refresh_from_db(session)
self.end_date = timezone.utcnow()
self.set_duration()
+ # Lock DAG run to be sure not to get into a deadlock situation when
trying to insert
+ # TaskReschedule which apparently also creates lock on corresponding
DagRun entity
+ with_row_locks(
+ session.query(DagRun).filter_by(
+ dag_id=self.dag_id,
+ run_id=self.run_id,
+ ),
+ session=session,
+ ).one()
Review comment:
I think we should always obtain a lock here. And I think the only error
that can happen here is the same as in Scheduler or Mini-Scheduler (we already
do exactly the same call there). The error that might happen here is if we
cannot obtain the lock for a "long time" I think. Which should not happen
usually unless someone locks and does not free the lock (manual DB query).
Also I think one_or_none() is not good here. We insert TaskReschedule which
(as I understand) MUST have a corresponding entry in `DagRun`. So if we try to
insert TaskReschedule that will not have a dagrun, it will fail because of the
relationship with `DagRun` and inability to have foreign key to it. So if we
have "none" returned here we are in a deep trouble. The whole point of this row
is to obtaine the lock on DagRun (same as scheduler and mini-scheduler do) so
that any operations on the related task instances are "protected" from others
who want to obtain the same DagRun row (and wait for the same lock).
I just literally copy&pasted it from there
```
@provide_session
@Sentry.enrich_errors
def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
try:
# Re-select the row with a lock
dag_run = with_row_locks(
session.query(DagRun).filter_by(
dag_id=self.dag_id,
run_id=self.task_instance.run_id,
),
session=session,
).one()
```
In mini scheduler we do try/except OperationalError , but I think this only
makes sense because mini-scheduler is really "optional" and any error can be
safely ignored there (and session rolled back).
```
except OperationalError as e:
# Any kind of DB error here is _non fatal_ as this block is just
an optimisation.
self.log.info(
"Skipping mini scheduling run due to exception: %s",
e.statement,
exc_info=True,
)
session.rollback()
```
But I think in this case, this woud not be appropriate. Inserting
TaskReschedule in this case is not optional, so I prefer the error to propagate
up to the top and FAIL the task instead (that's what woudl happen if we just
let the exception propagate). I think this is a good behaviour.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]