dondaum commented on PR #39244:
URL: https://github.com/apache/airflow/pull/39244#issuecomment-2078992353

   > is there any way test the same?
   
   
   
   > > is there any way test the same?
   > 
   > Yeah it would be great to have a test for it.
   > 
   > A good test should test the current behavior in SQLAlchemy 1.4 against 2.0.
   > 
   > I guess locally one could install SQLAlchemy 2.0 and run it against. 
Perhaps we could also add a new CI workflow that upgrades to SQLAlchemy 2.0 and 
run all tests ?
   
   I tried to make all tests run with SQLAlchemy 2.0 but it is a lot of effort 
to adjust and fix everything (to make Airflow core compatible with SQLAlchemy 
2.0). 
   
   Instead I created a small test setup and verified that the fix indeed 
preserve the current behavior.
   
   **SQLAlchemy 1.4**
   ```Python
   
   from sqlalchemy import Column, Integer, String, create_engine, text, 
ForeignKeyConstraint, UniqueConstraint, select
   from sqlalchemy.orm import Session, declarative_base, relationship
   
   
   Base = declarative_base()
   
   
   
   class TaskInstance(Base):
       __tablename__ = "task_instance"
   
       task_id = Column(String(50), primary_key=True, nullable=False)
       dag_id = Column(String(50), primary_key=True, nullable=False)
       run_id = Column(String(50), primary_key=True, nullable=False)
       map_index = Column(Integer, primary_key=True, nullable=False, 
server_default=text("-1"))
   
       __table_args__ = (
           ForeignKeyConstraint(
               [dag_id, run_id],
               ["dag_run.dag_id", "dag_run.run_id"],
               name="task_instance_dag_run_fkey",
               ondelete="CASCADE",
           ),
       )
   
       dag_run = relationship("DagRun", back_populates="task_instances", 
lazy="joined", innerjoin=True)
   
   class DagRun(Base):
       __tablename__ = "dag_run"
   
       id = Column(Integer, primary_key=True)
       dag_id = Column(String(50), nullable=False)
       run_id = Column(String(50), nullable=False)
       
       task_instances = relationship(
           TaskInstance, back_populates="dag_run", cascade="save-update, merge, 
delete, delete-orphan"
       )
   
       __table_args__ = (
           UniqueConstraint("dag_id", "run_id", 
name="dag_run_dag_id_run_id_key"),
       )
   
   
   
   engine = create_engine("sqlite://", echo=False, future=True)
   Base.metadata.drop_all(engine)
   Base.metadata.create_all(engine)
   
   
   with Session(engine) as session:
       dag_run = DagRun(id=1, dag_id="dag_1", run_id="id_1")
       session.add(dag_run)
       session.commit()
   
   
   # Simulate current behavior
   with Session(engine) as session:
       dag_run = session.scalars(select(DagRun).where(DagRun.id == 1)).one()
       print(dag_run)
       print("Dag run in session:", dag_run in session)
   
   
       ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
       print("TaskInstance in session:", ti in session)
   
       session.commit()
   
   
   # Check if task instance is in db
   with Session(engine) as session:
       all_tis = session.scalars(select(TaskInstance).where(TaskInstance.dag_id 
== "dag_1")).all()
       print(all_tis)
   
   ```
   
   **SQLAlchemy 1.4 output**
   ```shell
   vscode ➜ /workspaces/sqa2backref/backpop $ python sqa_1_4.py 
   <__main__.DagRun object at 0x7f68a696c0d0>
   Dag run in session: True
   /workspaces/sqa2backref/backpop/sqa_1_4.py:63: RemovedIn20Warning: 
Deprecated API features detected! These feature(s) are not compatible with 
SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating 
applications, ensure requirements files are pinned to "sqlalchemy<2.0". Set 
environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings.  
Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this 
message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
     ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
   TaskInstance in session: True
   [<__main__.TaskInstance object at 0x7f68a6992c90>
   ```
   
   **SQLAlchemy 2.0**
   ```Python
   
   from sqlalchemy import Column, Integer, String, create_engine, text, 
ForeignKeyConstraint, UniqueConstraint, select
   from sqlalchemy.orm import Session, declarative_base, relationship
   
   
   Base = declarative_base()
   
   
   
   class TaskInstance(Base):
       __tablename__ = "task_instance"
   
       task_id = Column(String(50), primary_key=True, nullable=False)
       dag_id = Column(String(50), primary_key=True, nullable=False)
       run_id = Column(String(50), primary_key=True, nullable=False)
       map_index = Column(Integer, primary_key=True, nullable=False, 
server_default=text("-1"))
   
       __table_args__ = (
           ForeignKeyConstraint(
               [dag_id, run_id],
               ["dag_run.dag_id", "dag_run.run_id"],
               name="task_instance_dag_run_fkey",
               ondelete="CASCADE",
           ),
       )
   
       dag_run = relationship("DagRun", back_populates="task_instances", 
lazy="joined", innerjoin=True)
   
   class DagRun(Base):
       __tablename__ = "dag_run"
   
       id = Column(Integer, primary_key=True)
       dag_id = Column(String(50), nullable=False)
       run_id = Column(String(50), nullable=False)
       
       task_instances = relationship(
           TaskInstance, back_populates="dag_run", cascade="save-update, merge, 
delete, delete-orphan"
       )
   
       __table_args__ = (
           UniqueConstraint("dag_id", "run_id", 
name="dag_run_dag_id_run_id_key"),
       )
   
   
   
   engine = create_engine("sqlite://", echo=False, future=True)
   Base.metadata.drop_all(engine)
   Base.metadata.create_all(engine)
   
   
   with Session(engine) as session:
       dag_run = DagRun(id=1, dag_id="dag_1", run_id="id_1")
       session.add(dag_run)
       session.commit()
   
   
   # Simulate current behavior
   with Session(engine) as session:
       dag_run = session.scalars(select(DagRun).where(DagRun.id == 1)).one()
       print(dag_run)
       print("Dag run in session:", dag_run in session)
   
   
       ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
       print("TaskInstance in session:", ti in session)
   
       session.commit()
   
   
   # Check if task instance is in db
   with Session(engine) as session:
       all_tis = session.scalars(select(TaskInstance).where(TaskInstance.dag_id 
== "dag_1")).all()
       print(all_tis)
   
   ```
   
   **SQLAlchemy 2.0 output**
   ```shell
   vscode ➜ /workspaces/sqa2backref/backpop $ python sqa_2_0.py 
   <__main__.DagRun object at 0x7f24bad1f850>
   Dag run in session: True
   TaskInstance in session: False
   []
   ```
   
   **SQLAlchemy 2.0 with fix**
   ```Python
   from sqlalchemy import Column, Integer, String, create_engine, text, 
ForeignKeyConstraint, UniqueConstraint, select
   from sqlalchemy.orm import Session, declarative_base, relationship
   
   
   Base = declarative_base()
   
   
   
   class TaskInstance(Base):
       __tablename__ = "task_instance"
   
       task_id = Column(String(50), primary_key=True, nullable=False)
       dag_id = Column(String(50), primary_key=True, nullable=False)
       run_id = Column(String(50), primary_key=True, nullable=False)
       map_index = Column(Integer, primary_key=True, nullable=False, 
server_default=text("-1"))
   
       __table_args__ = (
           ForeignKeyConstraint(
               [dag_id, run_id],
               ["dag_run.dag_id", "dag_run.run_id"],
               name="task_instance_dag_run_fkey",
               ondelete="CASCADE",
           ),
       )
   
       dag_run = relationship("DagRun", back_populates="task_instances", 
lazy="joined", innerjoin=True)
   
   class DagRun(Base):
       __tablename__ = "dag_run"
   
       id = Column(Integer, primary_key=True)
       dag_id = Column(String(50), nullable=False)
       run_id = Column(String(50), nullable=False)
       
       task_instances = relationship(
           TaskInstance, back_populates="dag_run", cascade="save-update, merge, 
delete, delete-orphan"
       )
   
       __table_args__ = (
           UniqueConstraint("dag_id", "run_id", 
name="dag_run_dag_id_run_id_key"),
       )
   
   
   
   engine = create_engine("sqlite://", echo=False, future=True)
   Base.metadata.drop_all(engine)
   Base.metadata.create_all(engine)
   
   
   with Session(engine) as session:
       dag_run = DagRun(id=1, dag_id="dag_1", run_id="id_1")
       session.add(dag_run)
       session.commit()
   
   
   # Simulate current behavior
   with Session(engine) as session:
       dag_run = session.scalars(select(DagRun).where(DagRun.id == 1)).one()
       print(dag_run)
       print("Dag run in session:", dag_run in session)
   
   
       ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
       print("TaskInstance in session:", ti in session)
   
       session.add(ti)  # <-- fix
   
       session.commit()
   
   
   # Check if task instance is in db
   with Session(engine) as session:
       all_tis = session.scalars(select(TaskInstance).where(TaskInstance.dag_id 
== "dag_1")).all()
       print(all_tis)
   ```
   
   **SQLAlchemy 2.0 with fix output**
   ```shell
   vscode ➜ /workspaces/sqa2backref/backpop $ python sqa_2_0_fix.py
   <__main__.DagRun object at 0x7efc93103e10>
   Dag run in session: True
   TaskInstance in session: False
   [<__main__.TaskInstance object at 0x7efc937081d0>
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to