raphaelauv commented on issue #38342:
URL: https://github.com/apache/airflow/issues/38342#issuecomment-2306678401

   I made a little dag to perform the 'Queue up new tasks' on all non success 
dag_run
   
   It's unconvenient that airflow no more include new tasks  when we clear a 
dag_run ( like it was happening in older versions of Airflow )
   
   ```python
   import os
   from os import path
   
   from airflow import DAG, settings
   from airflow.cli.commands.task_command import _get_ti
   from airflow.models import DagRun, DagBag
   from airflow.models.param import Param
   from airflow.operators.python import PythonOperator, get_current_context
   from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
   
   from airflow.utils.dates import days_ago
   
   DAG_NAME = "operation_render_task"
   DAG_VERSION = "v1"
   
   with DAG(
           dag_id=f"{DAG_NAME}_{DAG_VERSION}",
           start_date=days_ago(1),
           schedule_interval=None,
           tags=[],
           params={
               "dag_name": Param("toto", type="string"),
               "task_to_render": Param("tata", type="string"),
           },
           max_active_runs=3,
   ):
       def render_task(dag_id, task_to_render):
           context = get_current_context()
           session = settings.Session()
   
           query = session.query(DagRun).filter(DagRun.state !="success" , 
DagRun.dag_id == dag_id)
   
           rst = query.all()
   
           print(f"nb dag_run to render : {len(rst)}")
   
           src_folder = 
os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
           dag_bag = DagBag(dag_folder=path.join(src_folder, 'dags'), 
include_examples=False)
           dag: DAG = dag_bag.get_dag(dag_id, session)
           task = dag.get_task(task_id=task_to_render)
           for dag_run in rst:
               ti, _ = _get_ti(
                   task, -1, exec_date_or_run_id=dag_run.run_id, 
create_if_necessary="memory"
               )
               # task_render is executed with access to the database.
               if isinstance(ti, TaskInstancePydantic):
                   raise ValueError("not a TaskInstance")
   
               ti.render_templates()
   
       PythonOperator(
           task_id="render",
           python_callable=render_task,
           op_kwargs={"dag_id": "{{params.dag_name}}",
                      "task_to_render": "{{params.task_to_render}}"}
       )
   
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to