Copilot commented on code in PR #59823:
URL: https://github.com/apache/airflow/pull/59823#discussion_r2648366934


##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -3943,9 +3952,12 @@ def test_verify_integrity_if_dag_not_changed(self, 
dag_maker, session):
     def test_verify_integrity_if_dag_changed(self, dag_maker):
         # CleanUp
         with create_session() as session:
-            session.query(SerializedDagModel).filter(
-                SerializedDagModel.dag_id == 
"test_verify_integrity_if_dag_changed"
-            ).delete(synchronize_session=False)
+            session.execute(
+                select(SerializedDagModel).where(
+                    SerializedDagModel.dag_id == 
"test_verify_integrity_if_dag_changed"
+                ),
+                execution_options={"synchronize_session": False},

Review Comment:
   This is attempting to delete records but is using `select()` instead of 
`delete()`. This code should use 
`session.execute(delete(SerializedDagModel).where(...))` to actually delete 
records, not just select them. The `execution_options` parameter is also 
incorrect for a select statement - it should be part of the delete operation.
   ```suggestion
                   delete(SerializedDagModel)
                   .where(SerializedDagModel.dag_id == 
"test_verify_integrity_if_dag_changed")
                   .execution_options(synchronize_session=False)
   ```



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -7996,8 +7990,8 @@ def 
test_start_queued_dagruns_uses_latest_max_active_runs_from_dag_model(self, d
         self.job_runner._create_dag_runs([dag_model], session)
 
         # Verify SerializedDAG has max_active_runs=1
-        dag_run_1 = (
-            session.query(DagRun).filter(DagRun.dag_id == 
dag.dag_id).order_by(DagRun.logical_date).first()
+        dag_run_1 = session.scalars(
+            select(DagRun).where(DagRun.dag_id == 
dag.dag_id).order_by(DagRun.logical_date).first()
         )

Review Comment:
   Incorrect chaining of `.order_by()` and `.first()`. The `.first()` method 
should be called on the result of `session.scalars()`, not chained after 
`.order_by()` on the select statement. The correct pattern should be: 
`session.scalars(select(DagRun).where(DagRun.dag_id == 
dag.dag_id).order_by(DagRun.logical_date)).first()`
   ```suggestion
               select(DagRun).where(DagRun.dag_id == 
dag.dag_id).order_by(DagRun.logical_date)
           ).first()
   ```



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -3626,9 +3637,12 @@ def test_scheduler_task_start_date_catchup_false(self, 
testing_dag_bundle):
         run_job(scheduler_job, execute_callable=self.job_runner._execute)
 
         session = settings.Session()
-        tiq = session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id)
-        ti1s = tiq.filter(TaskInstance.task_id == "dummy1").all()
-        ti2s = tiq.filter(TaskInstance.task_id == "dummy2").all()
+        ti1s = session.scalars(
+            select(TaskInstance).filter(TaskInstance.dag_id == dag_id, 
TaskInstance.task_id == "dummy1")
+        ).all()
+        ti2s = session.scalars(
+            select(TaskInstance).filter(TaskInstance.dag_id == dag_id, 
TaskInstance.task_id == "dummy2")

Review Comment:
   Inconsistent use of `.filter()` instead of `.where()` in the SQLAlchemy 2.0 
migration. Throughout the rest of this file, the migration pattern uses 
`.where()` consistently with `select()` statements. This should be changed to 
use `.where()` for consistency with the SQLAlchemy 2.0 migration pattern.
   ```suggestion
               select(TaskInstance).where(TaskInstance.dag_id == dag_id, 
TaskInstance.task_id == "dummy1")
           ).all()
           ti2s = session.scalars(
               select(TaskInstance).where(TaskInstance.dag_id == dag_id, 
TaskInstance.task_id == "dummy2")
   ```



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -3626,9 +3637,12 @@ def test_scheduler_task_start_date_catchup_false(self, 
testing_dag_bundle):
         run_job(scheduler_job, execute_callable=self.job_runner._execute)
 
         session = settings.Session()
-        tiq = session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id)
-        ti1s = tiq.filter(TaskInstance.task_id == "dummy1").all()
-        ti2s = tiq.filter(TaskInstance.task_id == "dummy2").all()
+        ti1s = session.scalars(
+            select(TaskInstance).filter(TaskInstance.dag_id == dag_id, 
TaskInstance.task_id == "dummy1")
+        ).all()
+        ti2s = session.scalars(
+            select(TaskInstance).filter(TaskInstance.dag_id == dag_id, 
TaskInstance.task_id == "dummy2")

Review Comment:
   Inconsistent use of `.filter()` instead of `.where()` in the SQLAlchemy 2.0 
migration. Throughout the rest of this file, the migration pattern uses 
`.where()` consistently with `select()` statements. This should be changed to 
use `.where()` for consistency with the SQLAlchemy 2.0 migration pattern.
   ```suggestion
               select(TaskInstance).where(TaskInstance.dag_id == dag_id, 
TaskInstance.task_id == "dummy1")
           ).all()
           ti2s = session.scalars(
               select(TaskInstance).where(TaskInstance.dag_id == dag_id, 
TaskInstance.task_id == "dummy2")
   ```



##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -3869,32 +3883,29 @@ def test_scheduler_verify_priority_and_slots(self, 
dag_maker, mock_executor):
         # Only second and third
         assert len(task_instances_list) == 2
 
-        ti0 = (
-            session.query(TaskInstance)
-            .filter(TaskInstance.task_id == 
"test_scheduler_verify_priority_and_slots_t0")
-            .first()
-        )
+        ti0 = session.scalars(
+            select(TaskInstance).where(TaskInstance.task_id == 
"test_scheduler_verify_priority_and_slots_t0")
+        ).first()
         assert ti0.state == State.SCHEDULED
 
-        ti1 = (
-            session.query(TaskInstance)
-            .filter(TaskInstance.task_id == 
"test_scheduler_verify_priority_and_slots_t1")
-            .first()
-        )
+        ti1 = session.scalars(
+            select(TaskInstance).where(TaskInstance.task_id == 
"test_scheduler_verify_priority_and_slots_t1")
+        ).first()
         assert ti1.state == State.QUEUED
 
-        ti2 = (
-            session.query(TaskInstance)
-            .filter(TaskInstance.task_id == 
"test_scheduler_verify_priority_and_slots_t2")
-            .first()
-        )
+        ti2 = session.scalars(
+            select(TaskInstance).where(TaskInstance.task_id == 
"test_scheduler_verify_priority_and_slots_t2")
+        ).first()
         assert ti2.state == State.QUEUED
 
     def test_verify_integrity_if_dag_not_changed(self, dag_maker, session):
         # CleanUp
-        session.query(SerializedDagModel).filter(
-            SerializedDagModel.dag_id == 
"test_verify_integrity_if_dag_not_changed"
-        ).delete(synchronize_session=False)
+        session.execute(
+            select(SerializedDagModel).where(
+                SerializedDagModel.dag_id == 
"test_verify_integrity_if_dag_not_changed"
+            ),
+            execution_options={"synchronize_session": False},

Review Comment:
   This is attempting to delete records but is using `select()` instead of 
`delete()`. This code should use 
`session.execute(delete(SerializedDagModel).where(...))` to actually delete 
records, not just select them. The `execution_options` parameter is also 
incorrect for a select statement - it should be part of the delete operation.
   ```suggestion
               delete(SerializedDagModel)
               .where(SerializedDagModel.dag_id == 
"test_verify_integrity_if_dag_not_changed")
               .execution_options(synchronize_session=False)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to