uranusjr commented on code in PR #31772:
URL: https://github.com/apache/airflow/pull/31772#discussion_r1243286840
##########
airflow/models/pool.py:
##########
@@ -114,7 +114,7 @@ def create_or_update_pool(
if not name:
raise ValueError("Pool name must not be empty")
- pool = session.query(Pool).filter_by(pool=name).one_or_none()
+ pool = session.scalars(select(Pool).filter_by(pool=name)).one_or_none()
Review Comment:
```suggestion
pool = session.scalar(select(Pool).filter_by(pool=name))
```
for consistency (this is the style we’re using elsewhere)
##########
airflow/models/pool.py:
##########
@@ -162,19 +162,19 @@ def slots_stats(
pools: dict[str, PoolStats] = {}
- query = session.query(Pool.pool, Pool.slots)
+ query = select(Pool.pool, Pool.slots)
if lock_rows:
query = with_row_locks(query, session=session, **nowait(session))
- pool_rows: Iterable[tuple[str, int]] = query.all()
+ pool_rows = session.execute(query)
for (pool_name, total_slots) in pool_rows:
if total_slots == -1:
total_slots = float("inf") # type: ignore
pools[pool_name] = PoolStats(total=total_slots, running=0,
queued=0, open=0)
- state_count_by_pool = (
- session.query(TaskInstance.pool, TaskInstance.state,
func.sum(TaskInstance.pool_slots))
+ state_count_by_pool = session.execute(
+ select(TaskInstance.pool, TaskInstance.state,
func.sum(TaskInstance.pool_slots))
.filter(TaskInstance.state.in_(list(EXECUTION_STATES)))
.group_by(TaskInstance.pool, TaskInstance.state)
).all()
Review Comment:
This `all()` isn’t needed (or rather, it’s not needed previously to begin
with)
##########
airflow/models/dag.py:
##########
@@ -1402,19 +1415,18 @@ def get_num_active_runs(self, external_trigger=None,
only_running=True, session=
:param session:
:return: number greater than 0 for active dag runs
"""
- # .count() is inefficient
- query = session.query(func.count()).filter(DagRun.dag_id ==
self.dag_id)
+ query = select(func.count()).where(DagRun.dag_id == self.dag_id)
if only_running:
- query = query.filter(DagRun.state == State.RUNNING)
+ query = query.where(DagRun.state == State.RUNNING)
else:
- query = query.filter(DagRun.state.in_({State.RUNNING,
State.QUEUED}))
+ query = query.where(DagRun.state.in_({State.RUNNING,
State.QUEUED}))
Review Comment:
```suggestion
query = query.where(DagRun.state.in_({DagRunState.RUNNING,
DagRunState.QUEUED}))
```
##########
airflow/models/dagrun.py:
##########
@@ -1335,44 +1335,45 @@ def schedule_tis(
schedulable_ti_ids, max_tis_per_query or
len(schedulable_ti_ids)
)
for schedulable_ti_ids_chunk in schedulable_ti_ids_chunks:
- count += (
- session.query(TI)
- .filter(
+ count += session.execute(
+ update(TI)
+ .where(
TI.dag_id == self.dag_id,
TI.run_id == self.run_id,
tuple_in_condition((TI.task_id, TI.map_index),
schedulable_ti_ids_chunk),
)
- .update({TI.state: State.SCHEDULED},
synchronize_session=False)
- )
+ .values(state=State.SCHEDULED)
Review Comment:
```suggestion
.values(state=TaskInstanceState.SCHEDULED)
```
##########
airflow/models/dag.py:
##########
@@ -1402,19 +1415,18 @@ def get_num_active_runs(self, external_trigger=None,
only_running=True, session=
:param session:
:return: number greater than 0 for active dag runs
"""
- # .count() is inefficient
- query = session.query(func.count()).filter(DagRun.dag_id ==
self.dag_id)
+ query = select(func.count()).where(DagRun.dag_id == self.dag_id)
if only_running:
- query = query.filter(DagRun.state == State.RUNNING)
+ query = query.where(DagRun.state == State.RUNNING)
Review Comment:
```suggestion
query = query.where(DagRun.state == DagRunState.RUNNING)
```
##########
airflow/models/dagrun.py:
##########
@@ -1335,44 +1335,45 @@ def schedule_tis(
schedulable_ti_ids, max_tis_per_query or
len(schedulable_ti_ids)
)
for schedulable_ti_ids_chunk in schedulable_ti_ids_chunks:
- count += (
- session.query(TI)
- .filter(
+ count += session.execute(
+ update(TI)
+ .where(
TI.dag_id == self.dag_id,
TI.run_id == self.run_id,
tuple_in_condition((TI.task_id, TI.map_index),
schedulable_ti_ids_chunk),
)
- .update({TI.state: State.SCHEDULED},
synchronize_session=False)
- )
+ .values(state=State.SCHEDULED)
+ .execution_options(synchronize_session=False)
+ ).rowcount
# Tasks using EmptyOperator should not be executed, mark them as
success
if dummy_ti_ids:
dummy_ti_ids_chunks = chunks(dummy_ti_ids, max_tis_per_query or
len(dummy_ti_ids))
for dummy_ti_ids_chunk in dummy_ti_ids_chunks:
- count += (
- session.query(TI)
- .filter(
+ count += session.execute(
+ update(TI)
+ .where(
TI.dag_id == self.dag_id,
TI.run_id == self.run_id,
TI.task_id.in_(dummy_ti_ids_chunk),
)
- .update(
- {
- TI.state: State.SUCCESS,
- TI.start_date: timezone.utcnow(),
- TI.end_date: timezone.utcnow(),
- TI.duration: 0,
- },
+ .values(
+ state=State.SUCCESS,
Review Comment:
```suggestion
state=TaskInstanceState.SUCCESS,
```
##########
airflow/models/dag.py:
##########
@@ -1296,11 +1307,13 @@ def get_concurrency_reached(self, session=NEW_SESSION)
-> bool:
has been reached.
"""
TI = TaskInstance
- qry = session.query(func.count(TI.task_id)).filter(
- TI.dag_id == self.dag_id,
- TI.state == State.RUNNING,
+ total_tasks = session.scalar(
+ select(func.count(TI.task_id)).where(
+ TI.dag_id == self.dag_id,
+ TI.state == State.RUNNING,
Review Comment:
```suggestion
TI.state == TaskInstanceState.RUNNING,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]