jscheffl opened a new pull request, #38440:
URL: https://github.com/apache/airflow/pull/38440

   We are using the [Cluster 
Policies](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/cluster-policies.html#task-instance-mutation)
 and in the the feature of the "Task Instance Mutation" to route workload to 
the respective endpoint. Respective endpoint means that we use multiple Celery 
queues and distribute the work. As the distribution is based on workflow meta 
data and we don't want to add the routing complexity into the workflow 
(modelling the workflow statically for all routing combinations) the task 
instance mutation is the only option.
   
   As discussed in #32471 we have seen that the task instance mutation works in 
general "well" for the first execution but we saw a couple of errors:
   - When using task_instance_mutation_hook the UI in DAGs->Grid->Task 
Details->More Details always shows the task definition value of queue and not 
the mutated value, which actually is stored in DB. More worse, when navigating 
in the UI the existing queue value in DB is reset to standard queue value w/o 
hook applied
   - When task fails, the retry does not apply the mutation hook and the task 
will go to standard queue again
   - When using dynamic task mapping, only first mapped task receives the queue 
from the mutation hook (later created during mapping not)
   
   Root cause is that after initial task creation defaults are loaded from 
python code many times on multiple levels. Root casue seems to be 
`TaskInstance._refresh_from_task()`.
   
   Fixing these to lines as in this PR removes all problems as described above. 
Trade-off will be that the policy code is executed a lot more often. But 
assuming this is not implemented with performance overhead it should not 
generate a performance impact.
   
   How to test:
   - Apply a cluster policy that changes the `queue` on some (or all :-D) tasks
   - Use for example the `example_params_trigger_ui` and introduce some random 
errors in the code. Example attached below.
   - Run this, ensure you have celcery workers serving the default and the 
"other_queue". I was setting an env `QUEUE` for the queue worker to print this 
in the DAG when testing
   - Check logs of failed tasks, mapped tasks that are not the first ones and 
UI display for "queue" field
   
   closes: #32471
   
   FYI @AutomationDev85 @wolfdn @clellmann
   
   Example cluster policy used for testing as `airflow_local_settings.py`:
   ```
   from airflow.models.taskinstance import TaskInstance
   def task_instance_mutation_hook(task_instance: TaskInstance):
       print("################# POLICY IS APPLIED! 
##################################")
       task_instance.queue = "other_queue"
   ```
   
   Modified DAG for testing - `example_params_trigger_ui.py`:
   ```
   from __future__ import annotations
   
   import datetime
   from random import randint
   from pathlib import Path
   from os import getenv
   from typing import TYPE_CHECKING
   
   from airflow.decorators import task
   from airflow.models.dag import DAG
   from airflow.models.param import Param
   from airflow.utils.trigger_rule import TriggerRule
   
   if TYPE_CHECKING:
       from airflow.models.dagrun import DagRun
       from airflow.models.taskinstance import TaskInstance
   
   def print_where_executed():
       print("####################################################")
       print(f"This taks is executed on queue {getenv('QUEUE', 'UNDEFINED!')}")
       print("####################################################")
   
   with DAG(
       dag_id=Path(__file__).stem,
       description=__doc__.partition(".")[0],
       doc_md=__doc__,
       schedule=None,
       start_date=datetime.datetime(2022, 3, 4),
       catchup=False,
       tags=["example_ui"],
       params={
           "names": Param(
               ["Linda", "Martha", "Thomas"],
               type="array",
               description="Define the list of names for which greetings should 
be generated in the logs."
               " Please have one name per line.",
               title="Names to greet",
           ),
           "english": Param(True, type="boolean", title="English"),
           "german": Param(True, type="boolean", title="German (Formal)"),
           "french": Param(True, type="boolean", title="French"),
       },
   ) as dag:
   
       @task(task_id="get_names", retries=4, retry_delay=5.0)
       def get_names(**kwargs) -> list[str]:
           ti: TaskInstance = kwargs["ti"]
           dag_run: DagRun = ti.dag_run
           print_where_executed()
           if randint(0, 1) > 0:
               raise Exception("Something went wrong!")
           if "names" not in dag_run.conf:
               print("Uuups, no names given, was no UI used to trigger?")
               return []
           return dag_run.conf["names"]
   
       @task.branch(task_id="select_languages", retries=4, retry_delay=5.0)
       def select_languages(**kwargs) -> list[str]:
           ti: TaskInstance = kwargs["ti"]
           dag_run: DagRun = ti.dag_run
           selected_languages = []
           print_where_executed()
           if randint(0, 1) > 0:
               raise Exception("Something went wrong!")
           for lang in ["english", "german", "french"]:
               if lang in dag_run.conf and dag_run.conf[lang]:
                   selected_languages.append(f"generate_{lang}_greeting")
           return selected_languages
   
       @task(task_id="generate_english_greeting", retries=4, retry_delay=5.0)
       def generate_english_greeting(name: str) -> str:
           print_where_executed()
           if randint(0, 1) > 0:
               raise Exception("Something went wrong!")
           return f"Hello {name}!"
   
       @task(task_id="generate_german_greeting", retries=4, retry_delay=5.0)
       def generate_german_greeting(name: str) -> str:
           print_where_executed()
           if randint(0, 1) > 0:
               raise Exception("Something went wrong!")
           return f"Sehr geehrter Herr/Frau {name}."
   
       @task(task_id="generate_french_greeting", retries=4, retry_delay=5.0)
       def generate_french_greeting(name: str) -> str:
           print_where_executed()
           if randint(0, 1) > 0:
               raise Exception("Something went wrong!")
           return f"Bonjour {name}!"
   
       @task(task_id="print_greetings", trigger_rule=TriggerRule.ALL_DONE, 
retries=4, retry_delay=5.0)
       def print_greetings(greetings1, greetings2, greetings3) -> None:
           print_where_executed()
           if randint(0, 1) > 0:
               raise Exception("Something went wrong!")
           for g in greetings1 or []:
               print(g)
           for g in greetings2 or []:
               print(g)
           for g in greetings3 or []:
               print(g)
           if not (greetings1 or greetings2 or greetings3):
               print("sad, nobody to greet :-(")
   
       lang_select = select_languages()
       names = get_names()
       english_greetings = generate_english_greeting.expand(name=names)
       german_greetings = generate_german_greeting.expand(name=names)
       french_greetings = generate_french_greeting.expand(name=names)
       lang_select >> [english_greetings, german_greetings, french_greetings]
       results_print = print_greetings(english_greetings, german_greetings, 
french_greetings)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to