mcarter-twosigma opened a new issue, #61964:
URL: https://github.com/apache/airflow/issues/61964
### Apache Airflow version
2.11.X
### If "Other Airflow 3 version" selected, which one?
_No response_
### What happened?
On Apache Airflow Version: 2.10.5 (version not available in this reporting
tool), when using LocalExecutor with AIP-44 (Internal API) enabled via
AIRFLOW_ENABLE_AIP_44=true and database_access_isolation=True, the Internal API
is not properly activated in task supervisor processes. This causes
InternalApiConfig.get_use_internal_api() to return False, which triggers the
reconfiguration of SQLAlchemy to use NullPool instead of QueuePool at
airflow/cli/commands/task_command.py:476.
This defeats the purpose of AIP-44 and has serious consequences:
1. Task supervisors use direct database access (not via Internal API)
2. SQLAlchemy uses NullPool, which causes memory leaks with certain database
drivers (notably PostgreSQL with libpq 16)
3. The behavior is inconsistent with other Airflow components that properly
activate the Internal API
When database_access_isolation=True and AIRFLOW_ENABLE_AIP_44=true:
- InternalApiConfig.set_use_internal_api() is never called in task
supervisor processes
- InternalApiConfig.get_use_internal_api() returns False at
task_command.py:469
- The ORM is reconfigured to use NullPool at task_command.py:476
- Task supervisors use direct database access with NullPool (worst of both
worlds)
Root Cause
The issue occurs in the execution flow when LocalExecutor forks to create
task supervisor processes:
Normal execution path (e.g., airflow tasks run from CLI):
1. __main__.py:main() is called
2. configure_internal_api() is called at line 60
3. If database_access_isolation=True, calls
InternalApiConfig.set_use_internal_api() at line 75
4. task_run() is called
5. Check at task_command.py:469 correctly sees get_use_internal_api()
returns True
LocalExecutor fork path (the problematic path):
1. LocalExecutor worker forks at airflow/executors/local_executor.py:117
2. Child process directly parses args and calls args.func(args) at line 142
3. Skips __main__.py:configure_internal_api() entirely
4. task_run() is called with InternalApiConfig._use_internal_api still at
default False
5. Check at task_command.py:469 sees get_use_internal_api() returns False
6. Triggers settings.reconfigure_orm(disable_connection_pool=True) at line
476
Note: There is a call to InternalApiConfig.set_use_internal_api() in
StandardTaskRunner._start_by_fork() at
airflow/task/task_runner/standard_task_runner.py:79, but this happens in the
task runner child process, not the task supervisor process where the check at
line 469 occurs.
Impact:
1. Memory leak: With NullPool and PostgreSQL libpq 16, task supervisor
processes leak ~22 MB/hour
2. Scalability issues: With 96 parallel tasks, memory leaks quickly exhaust
container limits, causing OOM kills
3. AIP-44 broken for LocalExecutor: The feature doesn't work as designed for
one of the most common executors
4. Security concern: Untrusted components (task supervisors running user
code) have direct database access when they shouldn't
Proposed Fix
In airflow/executors/local_executor.py, the _execute_work_in_fork() method
should check database_access_isolation and call
InternalApiConfig.set_use_internal_api() before invoking the task command:
```
def _execute_work_in_fork(self, command: CommandType) -> TaskInstanceState:
pid = os.fork()
if pid:
# In parent, wait for the child
pid, ret = os.waitpid(pid, 0)
return TaskInstanceState.SUCCESS if ret == 0 else
TaskInstanceState.FAILED
from airflow.sentry import Sentry
ret = 1
try:
import signal
from airflow.cli.cli_parser import get_parser
from airflow.configuration import conf # Add this import
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGUSR2, signal.SIG_DFL)
# Add this block to configure Internal API if needed
if conf.getboolean("core", "database_access_isolation",
fallback=False):
from airflow.api_internal.internal_api_call import
InternalApiConfig
# Set SQL connection to none:// to prevent direct DB access
if "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN" in os.environ:
os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = "none://"
conf.set("database", "sql_alchemy_conn", "none://")
InternalApiConfig.set_use_internal_api("LocalExecutor task
supervisor")
parser = get_parser()
# [1:] - remove "airflow" from the start of the command
args = parser.parse_args(command[1:])
args.shut_down_logging = False
setproctitle(f"airflow task supervisor: {command}")
args.func(args)
ret = 0
return TaskInstanceState.SUCCESS
except Exception as e:
self.log.exception("Failed to execute task %s.", e)
return TaskInstanceState.FAILED
finally:
Sentry.flush()
logging.shutdown()
os._exit(ret)
```
This mirrors the logic in __main__.py:configure_internal_api() but applies
it to the forked task supervisor process.
Workaround
As a temporary workaround, we've patched task_command.py:476-478 to skip the
NullPool reconfiguration:
```
if not InternalApiConfig.get_use_internal_api():
# PATCH: Skip NullPool to avoid memory leak
# Original: settings.reconfigure_orm(disable_connection_pool=True)
pass # Keep existing QueuePool from initial ORM configuration
```
and configured SQLAlchemy connection pooling in airflow.cfg:
```
[database]
sql_alchemy_pool_enabled = True
sql_alchemy_pool_size = 5
sql_alchemy_max_overflow = 0
sql_alchemy_pool_recycle = 3600
sql_alchemy_pool_pre_ping = True
```
This eliminates the memory leak but doesn't properly implement AIP-44.
### What you think should happen instead?
When database_access_isolation=True and AIRFLOW_ENABLE_AIP_44=true:
- Task supervisor processes should call
InternalApiConfig.set_use_internal_api() during initialization
- InternalApiConfig.get_use_internal_api() should return True at
task_command.py:469
- The ORM should NOT be reconfigured to use NullPool
- Task supervisors should either:
- Use the Internal API for database access, OR
- Use QueuePool if direct database access is needed
### How to reproduce
1. Set up Airflow 2.10.5 with PostgreSQL database
2. Configure in airflow.cfg:
```
[core]
executor = LocalExecutor
database_access_isolation = True
```
3. Set environment variable:
```
export AIRFLOW_ENABLE_AIP_44=true
```
4. Start Airflow scheduler
5. Trigger a DAG with a simple task
6. Add debug logging to task_command.py:469:
```
print(f"PID {os.getpid()}: get_use_internal_api() =
{InternalApiConfig.get_use_internal_api()}")
```
7. Observe that task supervisor processes print False
### Operating System
Debian GNU/Linux 12 (bookworm)
### Versions of Apache Airflow Providers
N/A
### Deployment
Other
### Deployment details
Environment:
- Kubernetes
- Executor: LocalExecutor
- Python: 3.11+
- Database: PostgreSQL with libpq 16
### Anything else?
- Related to AIP-44:
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API
- The memory leak with NullPool and libpq 16 is a separate issue but is
triggered by this bug
- Other executors may have similar issues if they fork processes without
going through __main__.py
### Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]