raphaelauv commented on issue #38342:
URL: https://github.com/apache/airflow/issues/38342#issuecomment-2306678401
I made a little dag to perform the 'Queue up new tasks' on all non success
dag_run
It's unconvenient that airflow no more include new tasks when we clear a
dag_run ( like it was happening in older versions of Airflow )
```python
import os
from os import path
from airflow import DAG, settings
from airflow.cli.commands.task_command import _get_ti
from airflow.models import DagRun, DagBag
from airflow.models.param import Param
from airflow.operators.python import PythonOperator, get_current_context
from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
from airflow.utils.dates import days_ago
DAG_NAME = "operation_render_task"
DAG_VERSION = "v1"
with DAG(
dag_id=f"{DAG_NAME}_{DAG_VERSION}",
start_date=days_ago(1),
schedule_interval=None,
tags=[],
params={
"dag_name": Param("toto", type="string"),
"task_to_render": Param("tata", type="string"),
},
max_active_runs=3,
):
def render_task(dag_id, task_to_render):
context = get_current_context()
session = settings.Session()
query = session.query(DagRun).filter(DagRun.state !="success" ,
DagRun.dag_id == dag_id)
rst = query.all()
print(f"nb dag_run to render : {len(rst)}")
src_folder =
os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
dag_bag = DagBag(dag_folder=path.join(src_folder, 'dags'),
include_examples=False)
dag: DAG = dag_bag.get_dag(dag_id, session)
task = dag.get_task(task_id=task_to_render)
for dag_run in rst:
ti, _ = _get_ti(
task, -1, exec_date_or_run_id=dag_run.run_id,
create_if_necessary="memory"
)
# task_render is executed with access to the database.
if isinstance(ti, TaskInstancePydantic):
raise ValueError("not a TaskInstance")
ti.render_templates()
PythonOperator(
task_id="render",
python_callable=render_task,
op_kwargs={"dag_id": "{{params.dag_name}}",
"task_to_render": "{{params.task_to_render}}"}
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]