dondaum commented on PR #39244:
URL: https://github.com/apache/airflow/pull/39244#issuecomment-2078992353
> is there any way test the same?
> > is there any way test the same?
>
> Yeah it would be great to have a test for it.
>
> A good test should test the current behavior in SQLAlchemy 1.4 against 2.0.
>
> I guess locally one could install SQLAlchemy 2.0 and run it against.
Perhaps we could also add a new CI workflow that upgrades to SQLAlchemy 2.0 and
run all tests ?
I tried to make all tests run with SQLAlchemy 2.0 but it is a lot of effort
to adjust and fix everything (to make Airflow core compatible with SQLAlchemy
2.0).
Instead I created a small test setup and verified that the fix indeed
preserve the current behavior.
**SQLAlchemy 1.4**
```Python
from sqlalchemy import Column, Integer, String, create_engine, text,
ForeignKeyConstraint, UniqueConstraint, select
from sqlalchemy.orm import Session, declarative_base, relationship
Base = declarative_base()
class TaskInstance(Base):
__tablename__ = "task_instance"
task_id = Column(String(50), primary_key=True, nullable=False)
dag_id = Column(String(50), primary_key=True, nullable=False)
run_id = Column(String(50), primary_key=True, nullable=False)
map_index = Column(Integer, primary_key=True, nullable=False,
server_default=text("-1"))
__table_args__ = (
ForeignKeyConstraint(
[dag_id, run_id],
["dag_run.dag_id", "dag_run.run_id"],
name="task_instance_dag_run_fkey",
ondelete="CASCADE",
),
)
dag_run = relationship("DagRun", back_populates="task_instances",
lazy="joined", innerjoin=True)
class DagRun(Base):
__tablename__ = "dag_run"
id = Column(Integer, primary_key=True)
dag_id = Column(String(50), nullable=False)
run_id = Column(String(50), nullable=False)
task_instances = relationship(
TaskInstance, back_populates="dag_run", cascade="save-update, merge,
delete, delete-orphan"
)
__table_args__ = (
UniqueConstraint("dag_id", "run_id",
name="dag_run_dag_id_run_id_key"),
)
engine = create_engine("sqlite://", echo=False, future=True)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
with Session(engine) as session:
dag_run = DagRun(id=1, dag_id="dag_1", run_id="id_1")
session.add(dag_run)
session.commit()
# Simulate current behavior
with Session(engine) as session:
dag_run = session.scalars(select(DagRun).where(DagRun.id == 1)).one()
print(dag_run)
print("Dag run in session:", dag_run in session)
ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
print("TaskInstance in session:", ti in session)
session.commit()
# Check if task instance is in db
with Session(engine) as session:
all_tis = session.scalars(select(TaskInstance).where(TaskInstance.dag_id
== "dag_1")).all()
print(all_tis)
```
**SQLAlchemy 1.4 output**
```shell
vscode ➜ /workspaces/sqa2backref/backpop $ python sqa_1_4.py
<__main__.DagRun object at 0x7f68a696c0d0>
Dag run in session: True
/workspaces/sqa2backref/backpop/sqa_1_4.py:63: RemovedIn20Warning:
Deprecated API features detected! These feature(s) are not compatible with
SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating
applications, ensure requirements files are pinned to "sqlalchemy<2.0". Set
environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings.
Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this
message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
TaskInstance in session: True
[<__main__.TaskInstance object at 0x7f68a6992c90>
```
**SQLAlchemy 2.0**
```Python
from sqlalchemy import Column, Integer, String, create_engine, text,
ForeignKeyConstraint, UniqueConstraint, select
from sqlalchemy.orm import Session, declarative_base, relationship
Base = declarative_base()
class TaskInstance(Base):
__tablename__ = "task_instance"
task_id = Column(String(50), primary_key=True, nullable=False)
dag_id = Column(String(50), primary_key=True, nullable=False)
run_id = Column(String(50), primary_key=True, nullable=False)
map_index = Column(Integer, primary_key=True, nullable=False,
server_default=text("-1"))
__table_args__ = (
ForeignKeyConstraint(
[dag_id, run_id],
["dag_run.dag_id", "dag_run.run_id"],
name="task_instance_dag_run_fkey",
ondelete="CASCADE",
),
)
dag_run = relationship("DagRun", back_populates="task_instances",
lazy="joined", innerjoin=True)
class DagRun(Base):
__tablename__ = "dag_run"
id = Column(Integer, primary_key=True)
dag_id = Column(String(50), nullable=False)
run_id = Column(String(50), nullable=False)
task_instances = relationship(
TaskInstance, back_populates="dag_run", cascade="save-update, merge,
delete, delete-orphan"
)
__table_args__ = (
UniqueConstraint("dag_id", "run_id",
name="dag_run_dag_id_run_id_key"),
)
engine = create_engine("sqlite://", echo=False, future=True)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
with Session(engine) as session:
dag_run = DagRun(id=1, dag_id="dag_1", run_id="id_1")
session.add(dag_run)
session.commit()
# Simulate current behavior
with Session(engine) as session:
dag_run = session.scalars(select(DagRun).where(DagRun.id == 1)).one()
print(dag_run)
print("Dag run in session:", dag_run in session)
ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
print("TaskInstance in session:", ti in session)
session.commit()
# Check if task instance is in db
with Session(engine) as session:
all_tis = session.scalars(select(TaskInstance).where(TaskInstance.dag_id
== "dag_1")).all()
print(all_tis)
```
**SQLAlchemy 2.0 output**
```shell
vscode ➜ /workspaces/sqa2backref/backpop $ python sqa_2_0.py
<__main__.DagRun object at 0x7f24bad1f850>
Dag run in session: True
TaskInstance in session: False
[]
```
**SQLAlchemy 2.0 with fix**
```Python
from sqlalchemy import Column, Integer, String, create_engine, text,
ForeignKeyConstraint, UniqueConstraint, select
from sqlalchemy.orm import Session, declarative_base, relationship
Base = declarative_base()
class TaskInstance(Base):
__tablename__ = "task_instance"
task_id = Column(String(50), primary_key=True, nullable=False)
dag_id = Column(String(50), primary_key=True, nullable=False)
run_id = Column(String(50), primary_key=True, nullable=False)
map_index = Column(Integer, primary_key=True, nullable=False,
server_default=text("-1"))
__table_args__ = (
ForeignKeyConstraint(
[dag_id, run_id],
["dag_run.dag_id", "dag_run.run_id"],
name="task_instance_dag_run_fkey",
ondelete="CASCADE",
),
)
dag_run = relationship("DagRun", back_populates="task_instances",
lazy="joined", innerjoin=True)
class DagRun(Base):
__tablename__ = "dag_run"
id = Column(Integer, primary_key=True)
dag_id = Column(String(50), nullable=False)
run_id = Column(String(50), nullable=False)
task_instances = relationship(
TaskInstance, back_populates="dag_run", cascade="save-update, merge,
delete, delete-orphan"
)
__table_args__ = (
UniqueConstraint("dag_id", "run_id",
name="dag_run_dag_id_run_id_key"),
)
engine = create_engine("sqlite://", echo=False, future=True)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
with Session(engine) as session:
dag_run = DagRun(id=1, dag_id="dag_1", run_id="id_1")
session.add(dag_run)
session.commit()
# Simulate current behavior
with Session(engine) as session:
dag_run = session.scalars(select(DagRun).where(DagRun.id == 1)).one()
print(dag_run)
print("Dag run in session:", dag_run in session)
ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
print("TaskInstance in session:", ti in session)
session.add(ti) # <-- fix
session.commit()
# Check if task instance is in db
with Session(engine) as session:
all_tis = session.scalars(select(TaskInstance).where(TaskInstance.dag_id
== "dag_1")).all()
print(all_tis)
```
**SQLAlchemy 2.0 with fix output**
```shell
vscode ➜ /workspaces/sqa2backref/backpop $ python sqa_2_0_fix.py
<__main__.DagRun object at 0x7efc93103e10>
Dag run in session: True
TaskInstance in session: False
[<__main__.TaskInstance object at 0x7efc937081d0>
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]