ashb commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r644648007
##########
File path: airflow/ti_deps/deps/prev_dagrun_dep.py
##########
@@ -44,25 +44,16 @@ def _get_dep_statuses(self, ti, session, dep_context):
return
# Don't depend on the previous task instance if we are the first task
- dag = ti.task.dag
- if dag.catchup:
- if dag.previous_schedule(ti.execution_date) is None:
- yield self._passing_status(reason="This task does not have a
schedule or is @once")
- return
- if dag.previous_schedule(ti.execution_date) < ti.task.start_date:
Review comment:
This conditional was checking something like this case:
```python
with DAG(start_date='2020-01-01'):
task1 = Operator()
task2 = Operator(start_date='2021-07-01', depends_on_past=True)
```
I guess this isn't covered by the tests either :(
##########
File path: airflow/ti_deps/deps/prev_dagrun_dep.py
##########
@@ -44,25 +44,16 @@ def _get_dep_statuses(self, ti, session, dep_context):
return
# Don't depend on the previous task instance if we are the first task
- dag = ti.task.dag
- if dag.catchup:
- if dag.previous_schedule(ti.execution_date) is None:
- yield self._passing_status(reason="This task does not have a
schedule or is @once")
- return
- if dag.previous_schedule(ti.execution_date) < ti.task.start_date:
- yield self._passing_status(
- reason="This task instance was the first task instance for
its task."
- )
- return
+ dr = ti.get_dagrun(session=session)
+ if not dr:
+ last_dagrun = None
+ elif ti.task.dag.catchup:
+ last_dagrun = dr.get_previous_scheduled_dagrun(session)
else:
- dr = ti.get_dagrun(session=session)
- last_dagrun = dr.get_previous_dagrun(session=session) if dr else
None
-
- if not last_dagrun:
- yield self._passing_status(
- reason="This task instance was the first task instance for
its task."
- )
- return
+ last_dagrun = dr.get_previous_dagrun(session=session)
+ if not last_dagrun:
+ yield self._passing_status(reason="This task instance was the
first task instance for its task.")
+ return
previous_ti = ti.get_previous_ti(session=session)
Review comment:
This should be changed to get tis from `last_dagrun`, otherwise we are
duplicating the effort to lookup the previous dag run twice.
##########
File path: airflow/ti_deps/deps/prev_dagrun_dep.py
##########
@@ -44,25 +44,16 @@ def _get_dep_statuses(self, ti, session, dep_context):
return
# Don't depend on the previous task instance if we are the first task
- dag = ti.task.dag
- if dag.catchup:
- if dag.previous_schedule(ti.execution_date) is None:
- yield self._passing_status(reason="This task does not have a
schedule or is @once")
- return
- if dag.previous_schedule(ti.execution_date) < ti.task.start_date:
- yield self._passing_status(
- reason="This task instance was the first task instance for
its task."
- )
- return
+ dr = ti.get_dagrun(session=session)
+ if not dr:
+ last_dagrun = None
+ elif ti.task.dag.catchup:
+ last_dagrun = dr.get_previous_scheduled_dagrun(session)
else:
- dr = ti.get_dagrun(session=session)
- last_dagrun = dr.get_previous_dagrun(session=session) if dr else
None
-
- if not last_dagrun:
- yield self._passing_status(
- reason="This task instance was the first task instance for
its task."
- )
- return
+ last_dagrun = dr.get_previous_dagrun(session=session)
+ if not last_dagrun:
+ yield self._passing_status(reason="This task instance was the
first task instance for its task.")
+ return
previous_ti = ti.get_previous_ti(session=session)
Review comment:
```suggestion
previous_ti = last_dagrun.get_task_instance(ti.task_id,
session=session)
```
##########
File path: airflow/www/views.py
##########
@@ -2298,8 +2298,10 @@ def duration(self, session=None):
else:
base_date = dag.get_latest_execution_date() or timezone.utcnow()
- dates = dag.date_range(base_date, num=-abs(num_runs))
- min_date = dates[0] if dates else timezone.utc_epoch()
+ min_date = next(
+ dag.time_table.iter_next_n(base_date, 1),
+ timezone.utc_epoch(),
+ )
Review comment:
This was powering the "dag runs" drop down on the graph view etc.

So this change will break the values in the drop down (and yes, it's not
covered by tests. UI tests are hard/bad)
##########
File path: airflow/www/views.py
##########
@@ -2298,8 +2298,10 @@ def duration(self, session=None):
else:
base_date = dag.get_latest_execution_date() or timezone.utcnow()
- dates = dag.date_range(base_date, num=-abs(num_runs))
- min_date = dates[0] if dates else timezone.utc_epoch()
+ min_date = next(
+ dag.time_table.iter_next_n(base_date, 1),
+ timezone.utc_epoch(),
+ )
Review comment:
Oh no, it's worse that that.
This controls what runs are shown on the these views, where "base_date" is
current and then we look back `num_runs` older runs.
##########
File path: airflow/www/views.py
##########
@@ -2298,8 +2298,10 @@ def duration(self, session=None):
else:
base_date = dag.get_latest_execution_date() or timezone.utcnow()
- dates = dag.date_range(base_date, num=-abs(num_runs))
- min_date = dates[0] if dates else timezone.utc_epoch()
+ min_date = next(
+ dag.time_table.iter_next_n(base_date, 1),
+ timezone.utc_epoch(),
+ )
Review comment:
I think this needs to be something like:
```
SELECT execution_date FROM dag_run WHERE .... ORDER BY execution_date DESC
OFFSET ? LIMIT 1
```
with `abs(num_runs)` as the offset.
I _think_
##########
File path: airflow/www/views.py
##########
@@ -2298,8 +2298,10 @@ def duration(self, session=None):
else:
base_date = dag.get_latest_execution_date() or timezone.utcnow()
- dates = dag.date_range(base_date, num=-abs(num_runs))
- min_date = dates[0] if dates else timezone.utc_epoch()
+ min_date = next(
+ dag.time_table.iter_next_n(base_date, 1),
+ timezone.utc_epoch(),
+ )
Review comment:
Possibly also adding in a "type != manual" too.
##########
File path: airflow/www/views.py
##########
@@ -2298,8 +2298,10 @@ def duration(self, session=None):
else:
base_date = dag.get_latest_execution_date() or timezone.utcnow()
- dates = dag.date_range(base_date, num=-abs(num_runs))
- min_date = dates[0] if dates else timezone.utc_epoch()
+ min_date = next(
+ dag.time_table.iter_next_n(base_date, 1),
+ timezone.utc_epoch(),
+ )
Review comment:
I think this needs to be something like:
```
SELECT execution_date FROM dag_run WHERE execution_date <= $base_date ORDER
BY execution_date DESC OFFSET ? LIMIT 1
```
with `abs(num_runs)` as the offset.
I _think_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]