Dev-iL commented on issue #56471:
URL: https://github.com/apache/airflow/issues/56471#issuecomment-3532386174
Here's a DAG generator we're using that results in repeatedly incrementing
versions:
```python
from __future__ import annotations
from dataclasses import dataclass
import pendulum
from airflow.sdk import DAG, task
from airflow.timetables.trigger import CronTriggerTimetable
from upath import UPath
from my_module import SOME_CONSTANT_UPATH, CustomSlackSuccessOperator,
@dataclass
class _DAGParams:
dag_id: str
schedule: CronTriggerTimetable
url: str
name: str
separator: str
columns: list[str] | None = None
def dag_factory():
_dag_params: tuple[_DAGParams, ...] = (
_DAGParams(
dag_id="restricted_type_1",
schedule=CronTriggerTimetable("@daily", timezone="UTC"),
url="https://<redacted>/list.txt",
name="restricted_1
separator=" ",
columns=["symbol"],
),
_DAGParams(
dag_id="restricted_type_2",
schedule=CronTriggerTimetable("@monthly", timezone="UTC"),
url="https://<redacted>",
name="restricted_2",
separator="\t",
),
)
dag_holder = []
for dag_params in _dag_params:
with DAG(
dag_params.dag_id,
start_date=pendulum.datetime(2020, 9, 20, tz="UTC"),
schedule=dag_params.schedule,
catchup=False,
) as dag:
@task.python(
retries=6,
retry_delay=pendulum.duration(minutes=10).total_seconds(),
templates_dict={
"datestr": "{{ data_interval_start | YMD }}",
"output_path": SOME_CONSTANT_UPATH.as_posix(),
},
)
def download_task(
download_params: _DAGParams,
templates_dict: dict[str, str],
) -> None:
import pandas as pd
filename: UPath = (
UPath(templates_dict["output_path"]) /
f"{download_params.name}_{templates_dict['datestr']}.csv"
)
filename.parent.mkdir(parents=True, exist_ok=True)
df = pd.read_csv(
download_params.url,
sep=download_params.separator,
names=download_params.columns,
storage_options={"User-Agent": "Mozilla/5.0"},
)
df.to_csv(filename, index=False)
logger.info(f"Saved {download_params.url} to
{filename.as_posix()}")
(
download_task(dag_params)
>> CustomSlackSuccessOperator(text="Successfully downloaded
restricted files.")
)
dag_holder.append(dag)
return dag_holder
dags: list[DAG] = dag_factory()
if __name__ == "__main__":
dags[0].test(
logical_date=pendulum.datetime(2024, 1, 24),
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]