uranusjr commented on issue #49508:
URL: https://github.com/apache/airflow/issues/49508#issuecomment-2886094860

   I have a weird finding. I set up the dags similar to 
https://github.com/apache/airflow/issues/49508#issuecomment-2834521949 above, 
set up a `special_pool` with 2 slots. 16 runs are created against `scheduler1`, 
for every day from 2023-02-01 to 2023-02-16. Everything is as expected so far. 
Runs 1st to 8th are running, 9th to 16th are scheduled.
   
   Now I trigger a manual run on `scheduler2` (which uses `special_pool` 
instead of the default). The scheduler shows one task has been triggered. But 
if you look at the logs closely, the supervisor wants to trigger the 9th ti for 
`scheduler1` instead!
   
   ```
   [supervisor] 
msg=StartupDetails(ti=TaskInstance(id=UUID('0196d838-9ad3-7c16-a551-f9997f0b5960'),
 task_id='sleep_it1', dag_id='scheduler1', 
run_id='scheduled__2023-02-09T00:00:00+00:00', try_number=1, map_index=-1, 
pool_slots=1, queue='default', priority_weight=1, executor_config={}, 
parent_context_carrier={}, context_carrier=None, queued_dttm=None), 
dag_rel_path='schestuck.py', bundle_info=BundleInfo(name='dags-folder', 
version=None), requests_fd=39, start_date=datetime.datetime(2025, 5, 16, 8, 32, 
30, 713298, tzinfo=datetime.timezone.utc), 
ti_context=TIRunContext(dag_run=DagRun(dag_id='scheduler1', 
run_id='scheduled__2023-02-09T00:00:00+00:00', 
logical_date=datetime.datetime(2023, 2, 9, 0, 0, tzinfo=TzInfo(UTC)), 
data_interval_start=datetime.datetime(2023, 2, 9, 0, 0, tzinfo=TzInfo(UTC)), 
data_interval_end=datetime.datetime(2023, 2, 9, 0, 0, tzinfo=TzInfo(UTC)), 
run_after=datetime.datetime(2023, 2, 9, 0, 0, tzinfo=TzInfo(UTC)), 
start_date=datetime.datetime(2025, 5, 16, 8, 31, 50,
  488281, tzinfo=TzInfo(UTC)), end_date=None, clear_number=0, 
run_type=<DagRunType.SCHEDULED: 'scheduled'>, conf={}, 
consumed_asset_events=[]), task_reschedule_count=0, max_tries=0, variables=[], 
connections=[], upstream_map_indexes={}, next_method=None, next_kwargs=None, 
xcom_keys_to_clear=[], should_retry=False), type='StartupDetails')
   ```
   
   I think this is due to how LocalExecutor manages task information with a 
`multiprocessing.Queue`. When a pool reaches limit, somehow _one more_ message 
gets pushed into `activity_queue`, and it clogs up everything behind it until 
the executor can finally process it after all previous tis are done.
   
   Also note that this is against 3.0. Implementation around this changed a bit 
between 2 and 3 (due to task sdk), so it’s possible we have two different bugs 
in different versions. It’s also possible the culprit is the same in 2 since 
the `activity_queue` usage did not change that much.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to