dabla opened a new issue, #58812: URL: https://github.com/apache/airflow/issues/58812
### Apache Airflow version 3.1.3 ### If "Other Airflow 2/3 version" selected, which one? _No response_ ### What happened? We had a Dag where we used dynamic task mapping with the StmpOperator to send mails to multiple recipients. As there where a lot of recipients, this caused a lot of pressure on the scheduler and workers, so we decided to refactor this using an @task decorated method with the SmtpHook and do the for loop within the python task which solved our performance issues. But of course as our last DagRun failed due to the heavy load, I've cleared the last DagRun so it could re-run using the refactored DAG, which ran fine but the UI still assume's it's a mapped task. <img width="1819" height="297" alt="Image" src="https://github.com/user-attachments/assets/7ba0265e-e534-468c-a899-65c935c5a731" /> ### What you think should happen instead? The UI (api-server?) should detect the change and only want to display one task instead of mapped tasks. ### How to reproduce Have a DAG that uses the partial/expand mechanism on an PythonOperator but make it fail, then refactor it so it does the looping within the PythonOperator callable instead of using the dynamic task mapping feature. Clear the failed DagRun and you'll see the error: ``` def mailing_dag(language: str, title: str): query_language = {"fr": "F", "nl": "N"} with DAG( dag_id=f"{Path(__file__).stem}_{language}", description=__doc__.partition(".")[0], doc_md=__doc__, schedule=None, start_date=datetime(2025, 11, 17), catchup=False, ) as dag: pull_email_recipients = SQLExecuteQueryOperator( task_id="pull_email_recipients", conn_id="jdbc", sql=f""" SELECT DISTINCT "Email" AS "Mail" FROM EXMailDistributionList WHERE "Language" = '{query_language[language]}' """, show_return_value_in_logs=True, ) pull_last_closed_month = SQLExecuteQueryOperator( task_id="pull_last_closed_month", conn_id="jdbc", sql=""" SELECT distinct "MonthNumber", "YearNumber" FROM "MailMonth" """, show_return_value_in_logs=True, ) @task(task_id="bcc_recipients") def send_mail( recipients: list[str], subject: str, html_content: str, conn_id: str = "smtp", ): import logging from airflow.providers.smtp.hooks.smtp import SmtpHook from airflow.sdk import get_current_context context = get_current_context() logging.debug("context: %s", context) # Create a Jinja environment (same as Airflow uses) jinja_env = dag.get_template_env(force_sandboxed=False) # Render the subject (title) rendered_subject = jinja_env.from_string(subject).render(context) logging.debug("rendered_subject: %s", rendered_subject) # Render the HTML template file (Airflow finds it via template_searchpath) rendered_html = jinja_env.get_template(html_content).render(context) logging.debug("rendered_html: %s", rendered_html) # Send the email with SmtpHook(smtp_conn_id=conn_id) as hook: for to in recipients: hook.send_email_smtp( to=to, subject=rendered_subject, html_content=rendered_html, conn_id=conn_id, ) send_email = send_mail( recipients=pull_email_recipients.output, subject=title, html_content=f"html/email_{language}.jinja.html", ) pull_last_closed_month >> send_email pull_email_recipients >> send_email ``` ### Operating System Redhat Linux ### Versions of Apache Airflow Providers _No response_ ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
