jscheffl opened a new pull request, #38440: URL: https://github.com/apache/airflow/pull/38440
We are using the [Cluster Policies](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/cluster-policies.html#task-instance-mutation) and in the the feature of the "Task Instance Mutation" to route workload to the respective endpoint. Respective endpoint means that we use multiple Celery queues and distribute the work. As the distribution is based on workflow meta data and we don't want to add the routing complexity into the workflow (modelling the workflow statically for all routing combinations) the task instance mutation is the only option. As discussed in #32471 we have seen that the task instance mutation works in general "well" for the first execution but we saw a couple of errors: - When using task_instance_mutation_hook the UI in DAGs->Grid->Task Details->More Details always shows the task definition value of queue and not the mutated value, which actually is stored in DB. More worse, when navigating in the UI the existing queue value in DB is reset to standard queue value w/o hook applied - When task fails, the retry does not apply the mutation hook and the task will go to standard queue again - When using dynamic task mapping, only first mapped task receives the queue from the mutation hook (later created during mapping not) Root cause is that after initial task creation defaults are loaded from python code many times on multiple levels. Root casue seems to be `TaskInstance._refresh_from_task()`. Fixing these to lines as in this PR removes all problems as described above. Trade-off will be that the policy code is executed a lot more often. But assuming this is not implemented with performance overhead it should not generate a performance impact. How to test: - Apply a cluster policy that changes the `queue` on some (or all :-D) tasks - Use for example the `example_params_trigger_ui` and introduce some random errors in the code. Example attached below. - Run this, ensure you have celcery workers serving the default and the "other_queue". I was setting an env `QUEUE` for the queue worker to print this in the DAG when testing - Check logs of failed tasks, mapped tasks that are not the first ones and UI display for "queue" field closes: #32471 FYI @AutomationDev85 @wolfdn @clellmann Example cluster policy used for testing as `airflow_local_settings.py`: ``` from airflow.models.taskinstance import TaskInstance def task_instance_mutation_hook(task_instance: TaskInstance): print("################# POLICY IS APPLIED! ##################################") task_instance.queue = "other_queue" ``` Modified DAG for testing - `example_params_trigger_ui.py`: ``` from __future__ import annotations import datetime from random import randint from pathlib import Path from os import getenv from typing import TYPE_CHECKING from airflow.decorators import task from airflow.models.dag import DAG from airflow.models.param import Param from airflow.utils.trigger_rule import TriggerRule if TYPE_CHECKING: from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance def print_where_executed(): print("####################################################") print(f"This taks is executed on queue {getenv('QUEUE', 'UNDEFINED!')}") print("####################################################") with DAG( dag_id=Path(__file__).stem, description=__doc__.partition(".")[0], doc_md=__doc__, schedule=None, start_date=datetime.datetime(2022, 3, 4), catchup=False, tags=["example_ui"], params={ "names": Param( ["Linda", "Martha", "Thomas"], type="array", description="Define the list of names for which greetings should be generated in the logs." " Please have one name per line.", title="Names to greet", ), "english": Param(True, type="boolean", title="English"), "german": Param(True, type="boolean", title="German (Formal)"), "french": Param(True, type="boolean", title="French"), }, ) as dag: @task(task_id="get_names", retries=4, retry_delay=5.0) def get_names(**kwargs) -> list[str]: ti: TaskInstance = kwargs["ti"] dag_run: DagRun = ti.dag_run print_where_executed() if randint(0, 1) > 0: raise Exception("Something went wrong!") if "names" not in dag_run.conf: print("Uuups, no names given, was no UI used to trigger?") return [] return dag_run.conf["names"] @task.branch(task_id="select_languages", retries=4, retry_delay=5.0) def select_languages(**kwargs) -> list[str]: ti: TaskInstance = kwargs["ti"] dag_run: DagRun = ti.dag_run selected_languages = [] print_where_executed() if randint(0, 1) > 0: raise Exception("Something went wrong!") for lang in ["english", "german", "french"]: if lang in dag_run.conf and dag_run.conf[lang]: selected_languages.append(f"generate_{lang}_greeting") return selected_languages @task(task_id="generate_english_greeting", retries=4, retry_delay=5.0) def generate_english_greeting(name: str) -> str: print_where_executed() if randint(0, 1) > 0: raise Exception("Something went wrong!") return f"Hello {name}!" @task(task_id="generate_german_greeting", retries=4, retry_delay=5.0) def generate_german_greeting(name: str) -> str: print_where_executed() if randint(0, 1) > 0: raise Exception("Something went wrong!") return f"Sehr geehrter Herr/Frau {name}." @task(task_id="generate_french_greeting", retries=4, retry_delay=5.0) def generate_french_greeting(name: str) -> str: print_where_executed() if randint(0, 1) > 0: raise Exception("Something went wrong!") return f"Bonjour {name}!" @task(task_id="print_greetings", trigger_rule=TriggerRule.ALL_DONE, retries=4, retry_delay=5.0) def print_greetings(greetings1, greetings2, greetings3) -> None: print_where_executed() if randint(0, 1) > 0: raise Exception("Something went wrong!") for g in greetings1 or []: print(g) for g in greetings2 or []: print(g) for g in greetings3 or []: print(g) if not (greetings1 or greetings2 or greetings3): print("sad, nobody to greet :-(") lang_select = select_languages() names = get_names() english_greetings = generate_english_greeting.expand(name=names) german_greetings = generate_german_greeting.expand(name=names) french_greetings = generate_french_greeting.expand(name=names) lang_select >> [english_greetings, german_greetings, french_greetings] results_print = print_greetings(english_greetings, german_greetings, french_greetings) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
