ephraimbuddy commented on issue #57618:
URL: https://github.com/apache/airflow/issues/57618#issuecomment-3655519351

   This seems to come from multiple schedulers creating TIs concurrently 
through DagRun.verify_integrity which doesn't have protection. Can you try this 
patch without your own patch?:
   
   ```diff
   diff --git a/airflow-core/src/airflow/models/dagrun.py 
b/airflow-core/src/airflow/models/dagrun.py
   index 89999bdbf2..cebaa58762 100644
   --- a/airflow-core/src/airflow/models/dagrun.py
   +++ b/airflow-core/src/airflow/models/dagrun.py
   @@ -47,7 +47,7 @@ from sqlalchemy import (
        update,
    )
    from sqlalchemy.dialects import postgresql
   -from sqlalchemy.exc import IntegrityError
   +from sqlalchemy.exc import IntegrityError, OperationalError
    from sqlalchemy.ext.associationproxy import association_proxy
    from sqlalchemy.ext.hybrid import hybrid_property
    from sqlalchemy.ext.mutable import MutableDict
   @@ -84,6 +84,7 @@ from airflow.utils.sqlalchemy import (
        ExtendedJSON,
        UtcDateTime,
        get_dialect_name,
   +    is_lock_not_available_error,
        mapped_column,
        nulls_first,
        with_row_locks,
   @@ -1701,6 +1702,34 @@ class DagRun(Base, LoggingMixin):
            """
            from airflow.settings import task_instance_mutation_hook
    
   +        try:
   +            locked_dag_run = session.scalar(
   +                with_row_locks(
   +                    select(DagRun).where(DagRun.dag_id == self.dag_id, 
DagRun.run_id == self.run_id),
   +                    session=session,
   +                    nowait=True,
   +                )
   +            )
   +            if not locked_dag_run:
   +                # DagRun doesn't exist (shouldn't happen, but handle 
gracefully)
   +                self.log.warning(
   +                    "DagRun %s-%s not found when trying to lock for 
verify_integrity",
   +                    self.dag_id,
   +                    self.run_id,
   +                )
   +                return
   +        except OperationalError as e:
   +            if is_lock_not_available_error(error=e):
   +                # Another scheduler is already processing this DagRun, skip 
it
   +                self.log.debug(
   +                    "Skipping verify_integrity for DagRun %s-%s as it is 
locked by another scheduler",
   +                    self.dag_id,
   +                    self.run_id,
   +                )
   +                return
   +            # Re-raise other OperationalErrors
   +            raise
   +
            # Set for the empty default in airflow.settings -- if it's not set 
this means it has been changed
            # Note: Literal[True, False] instead of bool because otherwise it 
doesn't correctly find the overload.
            hook_is_noop: Literal[True, False] = 
getattr(task_instance_mutation_hook, "is_noop", False)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to