GitHub user plutaniano created a discussion: Would it be possible for a DAG to 
have a static data interval?

I'm working with a source system that I can't query using date filters, I have 
to pull entire dataset every time.
I wanted `data_interval_start` and `data_interval_end` for the DAG that fetches 
this dataset to match this condition, by having static values for the data 
interval. Is something like this possible? Is it a bad idea?

I tried implementing my own Timetable, but it seems that because the 
`logical_date` is the same everytime, the dag runs only once.

<details>

<summary>StaticIntervalTimetable</summary>

```python
# Not final implementation
class StaticIntervalTimetable(Timetable):
    def __init__(
        self,
        cron: str,
        *,
        data_interval_start: dt.datetime = dt.datetime(1900, 1, 1, 
tzinfo=dt.UTC),
        data_interval_end: dt.datetime = dt.datetime(2100, 1, 1, tzinfo=dt.UTC),
    ) -> None:
        self._cron = cron
        self._data_interval_start = DateTime.instance(data_interval_start)
        self._data_interval_end = DateTime.instance(data_interval_end)

    @classmethod
    def deserialize(cls, data: dict[str, Any]) -> Timetable:
        return cls(
            cron=data["cron"],
            
data_interval_start=dt.datetime.fromisoformat(data["data_interval_start"]),
            
data_interval_end=dt.datetime.fromisoformat(data["data_interval_end"]),
        )

    def serialize(self) -> dict[str, Any]:
        return {
            "cron": self._cron,
            "data_interval_start": self._data_interval_start.isoformat(),
            "data_interval_end": self._data_interval_end.isoformat(),
        }

    @property
    def _interval(self) -> DataInterval:
        return DataInterval(
            start=self._data_interval_start,
            end=self._data_interval_end,
        )

    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: DataInterval | None,
        restriction: TimeRestriction,
    ) -> DagRunInfo | None:
        last_cron = DateTime.fromtimestamp(
            croniter(self._cron, start_time=dt.datetime.now(dt.UTC)).get_prev(),
            tz=dt.UTC,
        )

        if last_automated_data_interval is None:
            if restriction.earliest is None:
                return None

            return DagRunInfo(
                run_after=last_cron,
                data_interval=self._interval,
            )

        next_cron = DateTime.fromtimestamp(
            croniter(self._cron, start_time=dt.datetime.now(dt.UTC)).get_next(),
            tz=dt.UTC,
        )

        return DagRunInfo(
            run_after=next_cron,
            data_interval=self._interval,
        )

    def infer_manual_data_interval(self, *, run_after: DateTime) -> 
DataInterval:
        return self._interval
```
</details>

GitHub link: https://github.com/apache/airflow/discussions/59398

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to