jason810496 opened a new pull request, #58803:
URL: https://github.com/apache/airflow/pull/58803

   ## Why
   
   I found that for Triggerer HA setup, one Triggerer will pick up all the jobs 
and another Triggerer will be starving because we limit `remaining_capacity` to 
`[triggerer] capacity` instead of a smaller limit to let another Triggerer to 
have chance to pick up jobs in the same time.
   
   
   <details>
   <summary>Steps to reproduce</summary>
   
   Before reproducing, we have to change `get_hostname` temporarily to return 
`return str(os.getpid())` as we will always get some hostname with `breeze` 
setup on same machine, just to distingush different Triggerer.
   
   
https://github.com/apache/airflow/blob/6798aa79586c37e7f7ca8068bb7f369ed9580926/airflow-core/src/airflow/utils/net.py#L52-L56
   
   1. Start Airflow with one Triggerer (normal `breeze start-airflow`) and run 
the following Dag that produced 1000 mapped deferable tasks.
   2. Wait for all mapped tasks are in `deferred` state.
   3. Stop the Triggerer and wait for 30 seconds (wait for `[triggerer] 
job_heartbeat_sec` timeout).
   4. Start two Triggerer in different terminal at the same time. (Open 
different terminal then`breeze exec` and run `airflow triggerer` )
   5. We can found that only 1 Triggerer pick up all the jobs and another 
Triggerer is starving.
   
   ```python
   from typing import Any
   
   from datetime import timedelta
   from airflow.providers.common.compat.sdk import BaseSensorOperator
   from airflow.sdk import DAG, task, Context
   from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger
   
   class WaitHoursSensor(BaseSensorOperator):
       def __init__(
           self,
           task_id: str,
           trigger_kwargs,
           **kwargs: dict[str, Any],
       ) -> None:
           super().__init__(task_id=task_id, **kwargs)
           self.trigger_kwargs = trigger_kwargs
   
       def execute(self, context: Context) -> None:
           self.defer(
               
trigger=TimeDeltaTrigger(timedelta(hours=self.trigger_kwargs["delta"])),
               method_name="execute_complete",
           )
   
       def execute_complete(
           self,
           context: Context,
           event: dict[str, Any] | None = None,
       ) -> None:
           print("Wait complete.")
           return
   
   
   with DAG(dag_id="ha_triggerer", schedule=None):
   
       @task
       def items_to_process():
           return [{"delta": i+1} for i in range(1000)]
   
       WaitHoursSensor.partial(task_id="deferable").expand(
           trigger_kwargs=items_to_process(),
       )
   ```
   
   </details>
   
   <img width="636" height="261" alt="max_trigger_to_select_per_loop" 
src="https://github.com/user-attachments/assets/7b4c2cb7-e81c-469c-8f98-ec84f4aeff94";
 />
   
   - **Before Fix**: One picked up `1000` triggers and another picked `0` 
trigger -> Starving!
   - **After Fix**: One picked up `480` triggers and another picked `520` 
triggers.
   
   
   
   ## What
   
   - Introduce `[triggerer] max_trigger_to_select_per_loop` config to give the 
Triggerer(s) chance to pick up triggers in order to make the workloads more 
balance in HA setup.
   - Even though the `max_trigger_to_select_per_loop` is not an exact accurate 
term as we will use the same limit to select `Callback`,  `TaskInstance` and 
`Asset` triggers in 
[get_sorted_triggers](https://github.com/apache/airflow/blob/6798aa79586c37e7f7ca8068bb7f369ed9580926/airflow-core/src/airflow/models/trigger.py#L365-L366)
 loop.
     - But the main loop of Triggerer is in 
[load_triggers](https://github.com/apache/airflow/blob/6798aa79586c37e7f7ca8068bb7f369ed9580926/airflow-core/src/airflow/jobs/triggerer_job_runner.py#L527-L528).
     - Which mean we will actually pick `type of triggers(Callback, TI andAsset 
= 3 for now) * max_trigger_to_select_per_loop(defaut by 10)` in main loop of 
Triggerer. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to