dondaum commented on PR #39244: URL: https://github.com/apache/airflow/pull/39244#issuecomment-2078992353
> is there any way test the same? > > is there any way test the same? > > Yeah it would be great to have a test for it. > > A good test should test the current behavior in SQLAlchemy 1.4 against 2.0. > > I guess locally one could install SQLAlchemy 2.0 and run it against. Perhaps we could also add a new CI workflow that upgrades to SQLAlchemy 2.0 and run all tests ? I tried to make all tests run with SQLAlchemy 2.0 but it is a lot of effort to adjust and fix everything (to make Airflow core compatible with SQLAlchemy 2.0). Instead I created a small test setup and verified that the fix indeed preserve the current behavior. **SQLAlchemy 1.4** ```Python from sqlalchemy import Column, Integer, String, create_engine, text, ForeignKeyConstraint, UniqueConstraint, select from sqlalchemy.orm import Session, declarative_base, relationship Base = declarative_base() class TaskInstance(Base): __tablename__ = "task_instance" task_id = Column(String(50), primary_key=True, nullable=False) dag_id = Column(String(50), primary_key=True, nullable=False) run_id = Column(String(50), primary_key=True, nullable=False) map_index = Column(Integer, primary_key=True, nullable=False, server_default=text("-1")) __table_args__ = ( ForeignKeyConstraint( [dag_id, run_id], ["dag_run.dag_id", "dag_run.run_id"], name="task_instance_dag_run_fkey", ondelete="CASCADE", ), ) dag_run = relationship("DagRun", back_populates="task_instances", lazy="joined", innerjoin=True) class DagRun(Base): __tablename__ = "dag_run" id = Column(Integer, primary_key=True) dag_id = Column(String(50), nullable=False) run_id = Column(String(50), nullable=False) task_instances = relationship( TaskInstance, back_populates="dag_run", cascade="save-update, merge, delete, delete-orphan" ) __table_args__ = ( UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"), ) engine = create_engine("sqlite://", echo=False, future=True) Base.metadata.drop_all(engine) Base.metadata.create_all(engine) with Session(engine) as session: dag_run = DagRun(id=1, dag_id="dag_1", run_id="id_1") session.add(dag_run) session.commit() # Simulate current behavior with Session(engine) as session: dag_run = session.scalars(select(DagRun).where(DagRun.id == 1)).one() print(dag_run) print("Dag run in session:", dag_run in session) ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run) print("TaskInstance in session:", ti in session) session.commit() # Check if task instance is in db with Session(engine) as session: all_tis = session.scalars(select(TaskInstance).where(TaskInstance.dag_id == "dag_1")).all() print(all_tis) ``` **SQLAlchemy 1.4 output** ```shell vscode ➜ /workspaces/sqa2backref/backpop $ python sqa_1_4.py <__main__.DagRun object at 0x7f68a696c0d0> Dag run in session: True /workspaces/sqa2backref/backpop/sqa_1_4.py:63: RemovedIn20Warning: Deprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". Set environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings. Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9) ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run) TaskInstance in session: True [<__main__.TaskInstance object at 0x7f68a6992c90> ``` **SQLAlchemy 2.0** ```Python from sqlalchemy import Column, Integer, String, create_engine, text, ForeignKeyConstraint, UniqueConstraint, select from sqlalchemy.orm import Session, declarative_base, relationship Base = declarative_base() class TaskInstance(Base): __tablename__ = "task_instance" task_id = Column(String(50), primary_key=True, nullable=False) dag_id = Column(String(50), primary_key=True, nullable=False) run_id = Column(String(50), primary_key=True, nullable=False) map_index = Column(Integer, primary_key=True, nullable=False, server_default=text("-1")) __table_args__ = ( ForeignKeyConstraint( [dag_id, run_id], ["dag_run.dag_id", "dag_run.run_id"], name="task_instance_dag_run_fkey", ondelete="CASCADE", ), ) dag_run = relationship("DagRun", back_populates="task_instances", lazy="joined", innerjoin=True) class DagRun(Base): __tablename__ = "dag_run" id = Column(Integer, primary_key=True) dag_id = Column(String(50), nullable=False) run_id = Column(String(50), nullable=False) task_instances = relationship( TaskInstance, back_populates="dag_run", cascade="save-update, merge, delete, delete-orphan" ) __table_args__ = ( UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"), ) engine = create_engine("sqlite://", echo=False, future=True) Base.metadata.drop_all(engine) Base.metadata.create_all(engine) with Session(engine) as session: dag_run = DagRun(id=1, dag_id="dag_1", run_id="id_1") session.add(dag_run) session.commit() # Simulate current behavior with Session(engine) as session: dag_run = session.scalars(select(DagRun).where(DagRun.id == 1)).one() print(dag_run) print("Dag run in session:", dag_run in session) ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run) print("TaskInstance in session:", ti in session) session.commit() # Check if task instance is in db with Session(engine) as session: all_tis = session.scalars(select(TaskInstance).where(TaskInstance.dag_id == "dag_1")).all() print(all_tis) ``` **SQLAlchemy 2.0 output** ```shell vscode ➜ /workspaces/sqa2backref/backpop $ python sqa_2_0.py <__main__.DagRun object at 0x7f24bad1f850> Dag run in session: True TaskInstance in session: False [] ``` **SQLAlchemy 2.0 with fix** ```Python from sqlalchemy import Column, Integer, String, create_engine, text, ForeignKeyConstraint, UniqueConstraint, select from sqlalchemy.orm import Session, declarative_base, relationship Base = declarative_base() class TaskInstance(Base): __tablename__ = "task_instance" task_id = Column(String(50), primary_key=True, nullable=False) dag_id = Column(String(50), primary_key=True, nullable=False) run_id = Column(String(50), primary_key=True, nullable=False) map_index = Column(Integer, primary_key=True, nullable=False, server_default=text("-1")) __table_args__ = ( ForeignKeyConstraint( [dag_id, run_id], ["dag_run.dag_id", "dag_run.run_id"], name="task_instance_dag_run_fkey", ondelete="CASCADE", ), ) dag_run = relationship("DagRun", back_populates="task_instances", lazy="joined", innerjoin=True) class DagRun(Base): __tablename__ = "dag_run" id = Column(Integer, primary_key=True) dag_id = Column(String(50), nullable=False) run_id = Column(String(50), nullable=False) task_instances = relationship( TaskInstance, back_populates="dag_run", cascade="save-update, merge, delete, delete-orphan" ) __table_args__ = ( UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"), ) engine = create_engine("sqlite://", echo=False, future=True) Base.metadata.drop_all(engine) Base.metadata.create_all(engine) with Session(engine) as session: dag_run = DagRun(id=1, dag_id="dag_1", run_id="id_1") session.add(dag_run) session.commit() # Simulate current behavior with Session(engine) as session: dag_run = session.scalars(select(DagRun).where(DagRun.id == 1)).one() print(dag_run) print("Dag run in session:", dag_run in session) ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run) print("TaskInstance in session:", ti in session) session.add(ti) # <-- fix session.commit() # Check if task instance is in db with Session(engine) as session: all_tis = session.scalars(select(TaskInstance).where(TaskInstance.dag_id == "dag_1")).all() print(all_tis) ``` **SQLAlchemy 2.0 with fix output** ```shell vscode ➜ /workspaces/sqa2backref/backpop $ python sqa_2_0_fix.py <__main__.DagRun object at 0x7efc93103e10> Dag run in session: True TaskInstance in session: False [<__main__.TaskInstance object at 0x7efc937081d0> ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org