ephraimbuddy commented on issue #57618:
URL: https://github.com/apache/airflow/issues/57618#issuecomment-3655519351
This seems to come from multiple schedulers creating TIs concurrently
through DagRun.verify_integrity which doesn't have protection. Can you try this
patch without your own patch?:
```diff
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index 89999bdbf2..cebaa58762 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -47,7 +47,7 @@ from sqlalchemy import (
update,
)
from sqlalchemy.dialects import postgresql
-from sqlalchemy.exc import IntegrityError
+from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.ext.mutable import MutableDict
@@ -84,6 +84,7 @@ from airflow.utils.sqlalchemy import (
ExtendedJSON,
UtcDateTime,
get_dialect_name,
+ is_lock_not_available_error,
mapped_column,
nulls_first,
with_row_locks,
@@ -1701,6 +1702,34 @@ class DagRun(Base, LoggingMixin):
"""
from airflow.settings import task_instance_mutation_hook
+ try:
+ locked_dag_run = session.scalar(
+ with_row_locks(
+ select(DagRun).where(DagRun.dag_id == self.dag_id,
DagRun.run_id == self.run_id),
+ session=session,
+ nowait=True,
+ )
+ )
+ if not locked_dag_run:
+ # DagRun doesn't exist (shouldn't happen, but handle
gracefully)
+ self.log.warning(
+ "DagRun %s-%s not found when trying to lock for
verify_integrity",
+ self.dag_id,
+ self.run_id,
+ )
+ return
+ except OperationalError as e:
+ if is_lock_not_available_error(error=e):
+ # Another scheduler is already processing this DagRun, skip
it
+ self.log.debug(
+ "Skipping verify_integrity for DagRun %s-%s as it is
locked by another scheduler",
+ self.dag_id,
+ self.run_id,
+ )
+ return
+ # Re-raise other OperationalErrors
+ raise
+
# Set for the empty default in airflow.settings -- if it's not set
this means it has been changed
# Note: Literal[True, False] instead of bool because otherwise it
doesn't correctly find the overload.
hook_is_noop: Literal[True, False] =
getattr(task_instance_mutation_hook, "is_noop", False)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]