dabla opened a new issue, #58812:
URL: https://github.com/apache/airflow/issues/58812

   ### Apache Airflow version
   
   3.1.3
   
   ### If "Other Airflow 2/3 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   We had a Dag where we used dynamic task mapping with the StmpOperator to 
send mails to multiple recipients.  As there where a lot of recipients, this 
caused a lot of pressure on the scheduler and workers, so we decided to 
refactor this using an @task decorated method with the SmtpHook and do the for 
loop within the python task which solved our performance issues.
   
   But of course as our last DagRun failed due to the heavy load, I've cleared 
the last DagRun so it could re-run using the refactored DAG, which ran fine but 
the UI still assume's it's a mapped task.
   
   <img width="1819" height="297" alt="Image" 
src="https://github.com/user-attachments/assets/7ba0265e-e534-468c-a899-65c935c5a731";
 />
   
   ### What you think should happen instead?
   
   The UI (api-server?) should detect the change and only want to display one 
task instead of mapped tasks.
   
   ### How to reproduce
   
   Have a DAG that uses the partial/expand mechanism on an PythonOperator but 
make it fail, then refactor it so it does the looping within the PythonOperator 
callable instead of using the dynamic task mapping feature.  Clear the failed 
DagRun and you'll see the error:
   
   
   ```
   def mailing_dag(language: str, title: str):
       query_language = {"fr": "F", "nl": "N"}
   
       with DAG(
           dag_id=f"{Path(__file__).stem}_{language}",
           description=__doc__.partition(".")[0],
           doc_md=__doc__,
           schedule=None,
           start_date=datetime(2025, 11, 17),
           catchup=False,
       ) as dag:
           pull_email_recipients = SQLExecuteQueryOperator(
               task_id="pull_email_recipients",
               conn_id="jdbc",
               sql=f"""
                   SELECT DISTINCT "Email" AS "Mail"
                   FROM EXMailDistributionList
                   WHERE "Language" = '{query_language[language]}'
                   """,
               show_return_value_in_logs=True,
           )
           pull_last_closed_month = SQLExecuteQueryOperator(
               task_id="pull_last_closed_month",
               conn_id="jdbc",
               sql="""
                   SELECT distinct "MonthNumber", "YearNumber"
                   FROM "MailMonth"
                   """,
               show_return_value_in_logs=True,
           )
   
           @task(task_id="bcc_recipients")
           def send_mail(
               recipients: list[str],
               subject: str,
               html_content: str,
               conn_id: str = "smtp",
           ):
               import logging
   
               from airflow.providers.smtp.hooks.smtp import SmtpHook
               from airflow.sdk import get_current_context
   
               context = get_current_context()
   
               logging.debug("context: %s", context)
   
               # Create a Jinja environment (same as Airflow uses)
               jinja_env = dag.get_template_env(force_sandboxed=False)
   
               # Render the subject (title)
               rendered_subject = jinja_env.from_string(subject).render(context)
   
               logging.debug("rendered_subject: %s", rendered_subject)
   
               # Render the HTML template file (Airflow finds it via 
template_searchpath)
               rendered_html = 
jinja_env.get_template(html_content).render(context)
   
               logging.debug("rendered_html: %s", rendered_html)
   
               # Send the email
               with SmtpHook(smtp_conn_id=conn_id) as hook:
                   for to in recipients:
                       hook.send_email_smtp(
                           to=to,
                           subject=rendered_subject,
                           html_content=rendered_html,
                           conn_id=conn_id,
                       )
   
           send_email = send_mail(
               recipients=pull_email_recipients.output,
               subject=title,
               html_content=f"html/email_{language}.jinja.html",
           )
           pull_last_closed_month >> send_email
           pull_email_recipients >> send_email
   ```
   
   ### Operating System
   
   Redhat Linux
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to