jeremiahishere opened a new issue, #41894:
URL: https://github.com/apache/airflow/issues/41894

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.7.3
   
   ### What happened?
   
   My team is working on upgrading an old system to the latest stable version 
of Airflow.  We are currently stuck on the 2.7.3 upgrade.  The first time a 
test is run in 2.7.3 that calls airflow/modles/dag.py _get_or_create_dagrun, it 
passes.  All subsequent test runs fail with the following error:
   
   `sqlalchemy.exc.NoReferencedTableError: Foreign key associated with column 
'dag_run_note.user_id' could not find table 'ab_user' with which to generate a 
foreign key to target column 'id'`
   
   ### What you think should happen instead?
   
   The error occurs when running a dag with the same dag id and execution date 
more than once.  This happens in our integration tests.  The `if dr:` block 
below is run and the error occurs when `session.commit()` is called.
   
   ```
   def _get_or_create_dagrun(...) -> DagRun:
       dr: DagRun = session.scalar(
           select(DagRun).where(DagRun.dag_id == dag.dag_id, 
DagRun.execution_date == execution_date)
       )
       if dr:
           session.delete(dr)
           session.commit() # this line
       dr = ...
   ```
   
   I believe that rerunning a dag id on an execution date should work or should 
raise an error based on overwriting a previous dag run.
   
   ### How to reproduce
   
   In our system this is reproducible in test on any dag with a hard coded 
execution date.  Here is a sample setup.  I can fill out classes if it is 
helpful.
   
   dags/my_dag.py
   ```
   def my_dag(dag_config: MyConfig) -> DAG:
       with DAG(...) as dag:
           my_operator = MyOperator(...)
   
           other_operator = ...
   
           return dag
   ```
   
   test/integration/test_my_dag.py
   ```
   # test setup
   # ...
   
   DAG_RUN_DATE = datetime(
       year=2024,
       month=4,
       day=26,
       hour=3,
       minute=00,
       tzinfo=pendulum.timezone("America/New_York")
   
   dag_config = MyConfig(
     dag_id="my_test_dag",
     ...
   )
   
   dag = my_dag(dag_config)
   run_dag(dag, DAG_RUN_DATE)
   
   # test assertions
   # ...
   ```
   
   test/helpers.py
   ```
   from airflow.utils.session import provide_session
   @provide_session
   def run_dag(dag, date, session=None):
       dag.test(
           execution_date=date,
       )
   ```
   
   ### Operating System
   
   CentOS 7
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==8.10.0
   apache-airflow-providers-celery==3.4.1
   apache-airflow-providers-cncf-kubernetes==7.8.0
   apache-airflow-providers-common-sql==1.8.0
   apache-airflow-providers-datadog==3.4.0
   apache-airflow-providers-docker==3.8.0
   apache-airflow-providers-ftp==3.6.0
   apache-airflow-providers-hashicorp==3.5.0
   apache-airflow-providers-http==4.6.0
   apache-airflow-providers-imap==3.4.0
   apache-airflow-providers-jenkins==3.4.0
   apache-airflow-providers-microsoft-azure==8.1.0
   apache-airflow-providers-opsgenie==5.2.0
   apache-airflow-providers-postgres==5.7.1
   apache-airflow-providers-redis==3.4.0
   apache-airflow-providers-slack==8.3.0
   apache-airflow-providers-sqlite==3.5.0
   apache-airflow-providers-ssh==3.8.1
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   I have found a number of issues on this github page that target similar 
loading problems with the ab_user table.  None of them mention running a dag 
twice as part of the reproduction steps.
   - https://github.com/apache/airflow/issues/34191
   - https://github.com/apache/airflow/issues/34109
   - https://github.com/apache/airflow/issues/34859
   
   We wrote a patch to fix the issue.  This patch will get us through the 
Airflow 2.7.3 upgrade so we can continue upgrading.  We don't understand why 
there aren't other people with the same problem.
   ```
   def patched_get_or_create_dagrun(...) -> DagRun:
       # CHANGES
       from airflow.auth.managers.fab.models import User
       from sqlalchemy import select
       # END OF CHANGES
   
       dr: DagRun = session.scalar(
           select(DagRun).where(DagRun.dag_id == dag.dag_id, 
DagRun.execution_date == execution_date)
       )
       if dr:
           session.delete(dr)
           session.commit() # this line
       ...
   
   import airflow.models.dag as af_dag
   af_dag._get_or_create_dagrun = patched_get_or_create_dagrun
   ```
   
   Our integration tests generally upload data to an internal s3 analog and use 
the file path to partition the data based on the date.  Making this system 
dynamic would be a pretty big rewrite so we are looking for options.  Is there 
a standard practice for airflow integration testing with hard coded dates that 
we have missed?  Are doing something out of the ordinary for airflow?
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to