jason810496 opened a new pull request, #58803: URL: https://github.com/apache/airflow/pull/58803
## Why I found that for Triggerer HA setup, one Triggerer will pick up all the jobs and another Triggerer will be starving because we limit `remaining_capacity` to `[triggerer] capacity` instead of a smaller limit to let another Triggerer to have chance to pick up jobs in the same time. <details> <summary>Steps to reproduce</summary> Before reproducing, we have to change `get_hostname` temporarily to return `return str(os.getpid())` as we will always get some hostname with `breeze` setup on same machine, just to distingush different Triggerer. https://github.com/apache/airflow/blob/6798aa79586c37e7f7ca8068bb7f369ed9580926/airflow-core/src/airflow/utils/net.py#L52-L56 1. Start Airflow with one Triggerer (normal `breeze start-airflow`) and run the following Dag that produced 1000 mapped deferable tasks. 2. Wait for all mapped tasks are in `deferred` state. 3. Stop the Triggerer and wait for 30 seconds (wait for `[triggerer] job_heartbeat_sec` timeout). 4. Start two Triggerer in different terminal at the same time. (Open different terminal then`breeze exec` and run `airflow triggerer` ) 5. We can found that only 1 Triggerer pick up all the jobs and another Triggerer is starving. ```python from typing import Any from datetime import timedelta from airflow.providers.common.compat.sdk import BaseSensorOperator from airflow.sdk import DAG, task, Context from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger class WaitHoursSensor(BaseSensorOperator): def __init__( self, task_id: str, trigger_kwargs, **kwargs: dict[str, Any], ) -> None: super().__init__(task_id=task_id, **kwargs) self.trigger_kwargs = trigger_kwargs def execute(self, context: Context) -> None: self.defer( trigger=TimeDeltaTrigger(timedelta(hours=self.trigger_kwargs["delta"])), method_name="execute_complete", ) def execute_complete( self, context: Context, event: dict[str, Any] | None = None, ) -> None: print("Wait complete.") return with DAG(dag_id="ha_triggerer", schedule=None): @task def items_to_process(): return [{"delta": i+1} for i in range(1000)] WaitHoursSensor.partial(task_id="deferable").expand( trigger_kwargs=items_to_process(), ) ``` </details> <img width="636" height="261" alt="max_trigger_to_select_per_loop" src="https://github.com/user-attachments/assets/7b4c2cb7-e81c-469c-8f98-ec84f4aeff94" /> - **Before Fix**: One picked up `1000` triggers and another picked `0` trigger -> Starving! - **After Fix**: One picked up `480` triggers and another picked `520` triggers. ## What - Introduce `[triggerer] max_trigger_to_select_per_loop` config to give the Triggerer(s) chance to pick up triggers in order to make the workloads more balance in HA setup. - Even though the `max_trigger_to_select_per_loop` is not an exact accurate term as we will use the same limit to select `Callback`, `TaskInstance` and `Asset` triggers in [get_sorted_triggers](https://github.com/apache/airflow/blob/6798aa79586c37e7f7ca8068bb7f369ed9580926/airflow-core/src/airflow/models/trigger.py#L365-L366) loop. - But the main loop of Triggerer is in [load_triggers](https://github.com/apache/airflow/blob/6798aa79586c37e7f7ca8068bb7f369ed9580926/airflow-core/src/airflow/jobs/triggerer_job_runner.py#L527-L528). - Which mean we will actually pick `type of triggers(Callback, TI andAsset = 3 for now) * max_trigger_to_select_per_loop(defaut by 10)` in main loop of Triggerer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
