This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e1f9151b1d3 Make LocalExecutor work under heavy load (#47678)
e1f9151b1d3 is described below

commit e1f9151b1d34ab39b5cdb400d636278fa59b6e01
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Wed Mar 12 19:28:47 2025 +0000

    Make LocalExecutor work under heavy load (#47678)
    
    This change seems innocuous, and possibly even wrong, but it is the correct
    behaviour since #47320 landed. We _do not_ want to call dispose_orm, as that
    ends up reconnecting, and sometimes this results in the wrong connection
    being shared between the parent and the child. I don't love the "sometimes"
    nature of this bug, but the fix seems sound.
    
    Prior to this running one or two runs concurrently would result in the
    scheduler handing (stuck in SQLA code trying to roll back) or an error from
    psycopg about "error with status PGRES_TUPLES_OK and no message from the 
libpq".
    
    With this change we were able to repeatedly run 10 runs concurrently.
    
    The reason we don't want this is that we registered an at_fork handler 
already
    that closes/discards the socket object (without closing the DB level 
session)
    so calling dispose can, perversely, resurrect that object and try reusing 
it!
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
    Co-authored-by: Kaxil Naik <[email protected]>
---
 airflow/executors/local_executor.py                   |  5 -----
 airflow/jobs/scheduler_job_runner.py                  | 14 ++++++++------
 task-sdk/src/airflow/sdk/execution_time/supervisor.py |  4 +---
 3 files changed, 9 insertions(+), 14 deletions(-)

diff --git a/airflow/executors/local_executor.py 
b/airflow/executors/local_executor.py
index c3efa3c9d5d..87a8e71d358 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -35,7 +35,6 @@ from typing import TYPE_CHECKING, Optional
 
 from setproctitle import setproctitle
 
-from airflow import settings
 from airflow.executors import workloads
 from airflow.executors.base_executor import PARALLELISM, BaseExecutor
 from airflow.utils.session import NEW_SESSION, provide_session
@@ -61,10 +60,6 @@ def _run_worker(
     log = logging.getLogger(logger_name)
     log.info("Worker starting up pid=%d", os.getpid())
 
-    # We know we've just started a new process, so lets disconnect from the 
metadata db now
-    settings.engine.pool.dispose()
-    settings.engine.dispose()
-
     while True:
         setproctitle("airflow worker -- LocalExecutor: <idle>")
         try:
diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 88fc6abb609..a8b5ccced84 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1061,14 +1061,16 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 with create_session() as session:
                     # This will schedule for as many executors as possible.
                     num_queued_tis = self._do_scheduling(session)
+                    # Don't keep any objects alive -- we've possibly just 
looked at 500+ ORM objects!
+                    session.expunge_all()
 
-                    # Heartbeat all executors, even if they're not receiving 
new tasks this loop. It will be
-                    # either a no-op, or they will check-in on currently 
running tasks and send out new
-                    # events to be processed below.
-                    for executor in self.job.executors:
-                        executor.heartbeat()
+                # Heartbeat all executors, even if they're not receiving new 
tasks this loop. It will be
+                # either a no-op, or they will check-in on currently running 
tasks and send out new
+                # events to be processed below.
+                for executor in self.job.executors:
+                    executor.heartbeat()
 
-                    session.expunge_all()
+                with create_session() as session:
                     num_finished_events = 0
                     for executor in self.job.executors:
                         num_finished_events += self._process_executor_events(
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 39ad05aebf9..36986a3a5f2 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -234,8 +234,6 @@ def block_orm_access():
         from airflow import settings
         from airflow.configuration import conf
 
-        settings.dispose_orm()
-
         for attr in ("engine", "async_engine", "Session", "AsyncSession", 
"NonScopedSession"):
             if hasattr(settings, attr):
                 delattr(settings, attr)
@@ -329,7 +327,7 @@ def _fork_main(
         import traceback
 
         try:
-            last_chance_stderr.write("--- Last chance exception handler ---\n")
+            last_chance_stderr.write("--- Supervised process Last chance 
exception handler ---\n")
             traceback.print_exception(exc, value=v, tb=tb, 
file=last_chance_stderr)
             # Exit code 126 and 125 don't have any "special" meaning, they are 
only meant to serve as an
             # identifier that the task process died in a really odd way.

Reply via email to