ephraimbuddy commented on issue #56204:
URL: https://github.com/apache/airflow/issues/56204#issuecomment-3570910192

   > [@ephraimbuddy](https://github.com/ephraimbuddy) I'm afraid that won't 
solve my problem. This solution calculates the number of tasks to run during 
loading the DAG. I need it to do that while running the schedule_dags task 
because the configuration which reports to run is extracted from the database.
   
   Ok. Here's another approach using SUPERVISOR_COMMS:
   
   ```python
   # This DAG should schedule the other one multiple times
   with DAG(
       dag_id="Task-Run-Demo",
       schedule="0 0 * * *",  # Daily at midnight (00:00)
       catchup=False,
       params={"schedule_date": Param(f"{datetime.date.today()}", 
type="string", format="date")},
   ) as dag:
   
       @task
       def schedule_dags(**kwargs):
           from airflow.models.dagrun import DagRun, DagRunType
           from airflow.sdk.execution_time.comms import TriggerDagRun
           from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
   
           for i in range(5):
               execution_date = utcnow() + datetime.timedelta(minutes=5)
               logging.info(f"Trigger report for counter #{i} at 
'{execution_date}'")
   
               execution_date_utc = execution_date.replace(tzinfo=pytz.UTC)
               run_id = DagRun.generate_run_id(
                   run_type=DagRunType.MANUAL,
                   logical_date=execution_date_utc,
                   run_after=execution_date_utc,
               )
   
               logger.info(
                   "Triggering report #%s at '%s' with run_id '%s'",
                   i,
                   execution_date,
                   run_id,
               )
   
               try:
                   # Send trigger request via supervisor communication
                   SUPERVISOR_COMMS.send(
                       TriggerDagRun(
                           dag_id="Create-Reports",
                           run_id=run_id,
                           conf={"counter": i},
                           logical_date=execution_date.replace(tzinfo=pytz.UTC),
                           reset_dag_run=False,
                       )
                   )
   
               except Exception as ex:
                   error_message = str(ex)
                   logging.error(f"Failed to trigger DAG #{i}: {error_message}")
   
       schedule_dags()
   
   
   # This DAG should not be executed directly but triggered by the 
Task-Run-Demo DAG
   with DAG(
       dag_id="Create-Reports",
       schedule=None,
       catchup=False,
       params={"counter": None},
   ) as target_dag:
   
       @task.python
       def report(params: dict):
           counter = params["counter"]
           logging.info(f"Current counter: {counter}")
   
       report()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to