GitHub user plutaniano created a discussion: Would it be possible for a DAG to
have a static data interval?
I'm working with a source system that I can't query using date filters, I have
to pull entire dataset every time.
I wanted `data_interval_start` and `data_interval_end` for the DAG that fetches
this dataset to match this condition, by having static values for the data
interval. Is something like this possible? Is it a bad idea?
I tried implementing my own Timetable, but it seems that because the
`logical_date` is the same everytime, the dag runs only once.
<details>
<summary>StaticIntervalTimetable</summary>
```python
# Not final implementation
class StaticIntervalTimetable(Timetable):
def __init__(
self,
cron: str,
*,
data_interval_start: dt.datetime = dt.datetime(1900, 1, 1,
tzinfo=dt.UTC),
data_interval_end: dt.datetime = dt.datetime(2100, 1, 1, tzinfo=dt.UTC),
) -> None:
self._cron = cron
self._data_interval_start = DateTime.instance(data_interval_start)
self._data_interval_end = DateTime.instance(data_interval_end)
@classmethod
def deserialize(cls, data: dict[str, Any]) -> Timetable:
return cls(
cron=data["cron"],
data_interval_start=dt.datetime.fromisoformat(data["data_interval_start"]),
data_interval_end=dt.datetime.fromisoformat(data["data_interval_end"]),
)
def serialize(self) -> dict[str, Any]:
return {
"cron": self._cron,
"data_interval_start": self._data_interval_start.isoformat(),
"data_interval_end": self._data_interval_end.isoformat(),
}
@property
def _interval(self) -> DataInterval:
return DataInterval(
start=self._data_interval_start,
end=self._data_interval_end,
)
def next_dagrun_info(
self,
*,
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
last_cron = DateTime.fromtimestamp(
croniter(self._cron, start_time=dt.datetime.now(dt.UTC)).get_prev(),
tz=dt.UTC,
)
if last_automated_data_interval is None:
if restriction.earliest is None:
return None
return DagRunInfo(
run_after=last_cron,
data_interval=self._interval,
)
next_cron = DateTime.fromtimestamp(
croniter(self._cron, start_time=dt.datetime.now(dt.UTC)).get_next(),
tz=dt.UTC,
)
return DagRunInfo(
run_after=next_cron,
data_interval=self._interval,
)
def infer_manual_data_interval(self, *, run_after: DateTime) ->
DataInterval:
return self._interval
```
</details>
GitHub link: https://github.com/apache/airflow/discussions/59398
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]