Lee-W commented on code in PR #42404:
URL: https://github.com/apache/airflow/pull/42404#discussion_r1847584414
##########
airflow/www/views.py:
##########
@@ -1417,7 +1417,9 @@ def rendered_templates(self, session):
logger.info("Retrieving rendered templates.")
dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id)
- dag_run = dag.get_dagrun(logical_date=dttm)
+ dag_run = dag.get_dagrun(
+ select(DagRun.run_id).where(DagRun.logical_date ==
dttm).order_by(DagRun.id.desc()).limit(1)
Review Comment:
```suggestion
run_id=select(DagRun.run_id).where(DagRun.logical_date ==
dttm).order_by(DagRun.id.desc()).limit(1)
```
##########
tests/models/test_dag.py:
##########
@@ -2671,8 +2663,8 @@ def get_ti_from_db(task):
# task_2 remains as SUCCESS
assert get_ti_from_db(task_2).state == State.SUCCESS
# task_3 and task_4 are cleared because they were in
FAILED/UPSTREAM_FAILED state
- assert get_ti_from_db(task_3).state == State.NONE
- assert get_ti_from_db(task_4).state == State.NONE
+ assert get_ti_from_db(task_3).state == State.UPSTREAM_FAILED
Review Comment:
May I know why we're changing the expected values here?
##########
airflow/www/views.py:
##########
@@ -1417,7 +1417,9 @@ def rendered_templates(self, session):
logger.info("Retrieving rendered templates.")
dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id)
- dag_run = dag.get_dagrun(logical_date=dttm)
+ dag_run = dag.get_dagrun(
+ select(DagRun.run_id).where(DagRun.logical_date ==
dttm).order_by(DagRun.id.desc()).limit(1)
Review Comment:
do we need to pass session here?
##########
tests/sensors/test_external_task_sensor.py:
##########
@@ -1373,9 +1374,17 @@ def
test_external_task_marker_clear_activate(dag_bag_parent_child, session):
run_tasks(dag_bag, logical_date=day_1)
run_tasks(dag_bag, logical_date=day_2)
+ from sqlalchemy import select
+
# Assert that dagruns of all the affected dags are set to SUCCESS before
tasks are cleared.
for dag, logical_date in itertools.product(dag_bag.dags.values(), [day_1,
day_2]):
- dagrun = dag.get_dagrun(logical_date=logical_date, session=session)
+ dagrun = dag.get_dagrun(
+ run_id=select(DagRun.run_id)
+ .where(DagRun.logical_date == logical_date)
+ .order_by(DagRun.id.desc())
+ .limit(1),
+ session=session,
+ )
Review Comment:
```suggestion
run_id = (
select(DagRun.run_id)
.where(DagRun.logical_date == logical_date)
.order_by(DagRun.id.desc())
.limit(1)
)
dagrun = dag.get_dagrun(
run_id=run_id,
session=session,
)
```
split it into 2 variables might make it easier to read
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]