This is an automated email from the ASF dual-hosted git repository.
pankajkoti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a1671f1f7d Retrieve dataset event created through RESTful API when
creating dag run (#38332)
a1671f1f7d is described below
commit a1671f1f7d4c3d74de3121c0f8aa18cef8823464
Author: Wei Lee <[email protected]>
AuthorDate: Thu Mar 21 23:44:22 2024 +0800
Retrieve dataset event created through RESTful API when creating dag run
(#38332)
Fetch dataset events generated via the RESTful API during the
creation of DAG runs. Currently, dataset events produced via
the RESTful API are overlooked because there is no
'source_dag_run' attribute. Furthermore, take into account the
timestamps from dataset events created through the RESTful API
when calculating data intervals.
---
airflow/jobs/scheduler_job_runner.py | 1 -
airflow/timetables/simple.py | 18 +++++++++++-------
2 files changed, 11 insertions(+), 8 deletions(-)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index c62ebc53b5..0596e7f59f 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1270,7 +1270,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
DagScheduleDatasetReference,
DatasetEvent.dataset_id ==
DagScheduleDatasetReference.dataset_id,
)
- .join(DatasetEvent.source_dag_run)
.where(*dataset_event_filters)
).all()
diff --git a/airflow/timetables/simple.py b/airflow/timetables/simple.py
index dea3ca2d6a..6452244262 100644
--- a/airflow/timetables/simple.py
+++ b/airflow/timetables/simple.py
@@ -16,7 +16,6 @@
# under the License.
from __future__ import annotations
-import operator
from typing import TYPE_CHECKING, Any, Collection, Sequence
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
@@ -183,12 +182,17 @@ class DatasetTriggeredTimetable(_TrivialTimetable):
if not events:
return DataInterval(logical_date, logical_date)
- start = min(
- events,
key=operator.attrgetter("source_dag_run.data_interval_start")
- ).source_dag_run.data_interval_start
- end = max(
- events, key=operator.attrgetter("source_dag_run.data_interval_end")
- ).source_dag_run.data_interval_end
+ start_dates, end_dates = [], []
+ for event in events:
+ if event.source_dag_run is not None:
+ start_dates.append(event.source_dag_run.data_interval_start)
+ end_dates.append(event.source_dag_run.data_interval_end)
+ else:
+ start_dates.append(event.timestamp)
+ end_dates.append(event.timestamp)
+
+ start = min(start_dates)
+ end = max(end_dates)
return DataInterval(start, end)
def next_dagrun_info(