Dev-iL commented on issue #56471:
URL: https://github.com/apache/airflow/issues/56471#issuecomment-3532386174

   Here's a DAG generator we're using that results in repeatedly incrementing 
versions:
   
   ```python
   from __future__ import annotations
   
   from dataclasses import dataclass
   
   import pendulum
   from airflow.sdk import DAG, task
   from airflow.timetables.trigger import CronTriggerTimetable
   from upath import UPath
   
   from my_module import SOME_CONSTANT_UPATH, CustomSlackSuccessOperator, 
   
   @dataclass
   class _DAGParams:
       dag_id: str
       schedule: CronTriggerTimetable
       url: str
       name: str
       separator: str
       columns: list[str] | None = None
   
   
   def dag_factory():
       _dag_params: tuple[_DAGParams, ...] = (
           _DAGParams(
               dag_id="restricted_type_1",
               schedule=CronTriggerTimetable("@daily", timezone="UTC"),
               url="https://<redacted>/list.txt",
               name="restricted_1
               separator=" ",
               columns=["symbol"],
           ),
           _DAGParams(
               dag_id="restricted_type_2",
               schedule=CronTriggerTimetable("@monthly", timezone="UTC"),
               url="https://<redacted>",
               name="restricted_2",
               separator="\t",
           ),
       )
   
       dag_holder = []
       for dag_params in _dag_params:
           with DAG(
               dag_params.dag_id,
               start_date=pendulum.datetime(2020, 9, 20, tz="UTC"),
               schedule=dag_params.schedule,
               catchup=False,
           ) as dag:
   
               @task.python(
                   retries=6,
                   retry_delay=pendulum.duration(minutes=10).total_seconds(),
                   templates_dict={
                       "datestr": "{{ data_interval_start | YMD }}",
                       "output_path": SOME_CONSTANT_UPATH.as_posix(),
                   },
               )
               def download_task(
                   download_params: _DAGParams,
                   templates_dict: dict[str, str],
               ) -> None:
                   import pandas as pd
   
                   filename: UPath = (
                       UPath(templates_dict["output_path"]) / 
f"{download_params.name}_{templates_dict['datestr']}.csv"
                   )
                   filename.parent.mkdir(parents=True, exist_ok=True)
   
                   df = pd.read_csv(
                       download_params.url,
                       sep=download_params.separator,
                       names=download_params.columns,
                       storage_options={"User-Agent": "Mozilla/5.0"},
                   )
                   df.to_csv(filename, index=False)
                   logger.info(f"Saved {download_params.url} to 
{filename.as_posix()}")
   
               (
                   download_task(dag_params)
                   >> CustomSlackSuccessOperator(text="Successfully downloaded 
restricted files.")
               )
   
           dag_holder.append(dag)
   
       return dag_holder
   
   
   dags: list[DAG] = dag_factory()
   
   
   if __name__ == "__main__":
       dags[0].test(
           logical_date=pendulum.datetime(2024, 1, 24),
       )
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to