Copilot commented on code in PR #62164:
URL: https://github.com/apache/airflow/pull/62164#discussion_r3066497884


##########
airflow-core/src/airflow/models/dagcode.py:
##########
@@ -133,12 +133,11 @@ def get_code_from_file(fileloc):
     @provide_session
     def _get_code_from_db(cls, dag_id, session: Session = NEW_SESSION) -> str:
         dag_code = session.scalar(
-            select(cls).where(cls.dag_id == 
dag_id).order_by(cls.last_updated.desc()).limit(1)
+            select(cls.source_code).where(cls.dag_id == 
dag_id).order_by(cls.last_updated.desc()).limit(1)
         )
         if not dag_code:

Review Comment:
   `_get_code_from_db` now selects only `source_code`, so `dag_code` is the 
string itself. The check `if not dag_code:` will incorrectly raise 
`DagCodeNotFound` when the latest row exists but the code is an empty string 
(valid for an empty DAG file). Use an explicit `is None` check (or 
`one_or_none()` on a query selecting the row) so empty-but-present source code 
is returned.
   ```suggestion
           if dag_code is None:
   ```



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1870,30 +1870,33 @@ def _mark_backfills_complete(self, session: Session = 
NEW_SESSION) -> None:
         self.log.debug("checking for completed backfills.")
         unfinished_states = (DagRunState.RUNNING, DagRunState.QUEUED)
         now = timezone.utcnow()
-        # todo: AIP-78 simplify this function to an update statement
         initializing_cutoff = now - timedelta(minutes=2)
-        query = select(Backfill).where(
-            Backfill.completed_at.is_(None),
-            # Guard: backfill must have at least one association,
-            # otherwise it is still being set up (see #61375).
-            # Allow cleanup of orphaned backfills older than 2 minutes
-            # that failed during initialization and never got any associations.
-            or_(
-                
exists(select(BackfillDagRun.id).where(BackfillDagRun.backfill_id == 
Backfill.id)),
-                Backfill.created_at < initializing_cutoff,
-            ),
-            ~exists(
-                select(DagRun.id).where(
-                    and_(DagRun.backfill_id == Backfill.id, 
DagRun.state.in_(unfinished_states))
+        result = cast(
+            "CursorResult",
+            session.execute(
+                update(Backfill)
+                .where(
+                    Backfill.completed_at.is_(None),
+                    # Guard: backfill must have at least one association,
+                    # otherwise it is still being set up (see #61375).
+                    # Allow cleanup of orphaned backfills older than 2 minutes
+                    # that failed during initialization and never got any 
associations.
+                    or_(
+                        
exists(select(BackfillDagRun.id).where(BackfillDagRun.backfill_id == 
Backfill.id)),
+                        Backfill.created_at < initializing_cutoff,
+                    ),
+                    ~exists(
+                        select(DagRun.id).where(
+                            and_(DagRun.backfill_id == Backfill.id, 
DagRun.state.in_(unfinished_states))
+                        )
+                    ),
                 )
+                .values(completed_at=now)
             ),

Review Comment:
   This bulk `update(Backfill)` is executed via the ORM session, but it doesn’t 
set `synchronize_session=False`. With the complex `WHERE` (EXISTS subqueries), 
SQLAlchemy’s default session synchronization can trigger an extra SELECT (or 
costly synchronization work), undermining the intended optimization. Add 
`.execution_options(synchronize_session=False)` to the statement, consistent 
with other bulk updates in this file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to