This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ae2a4fd5044 Fix connection retrieval in `DagProcessorManager` for
bundle initialization (#57459)
ae2a4fd5044 is described below
commit ae2a4fd50442b04346d58383f16fd9fa0de00c26
Author: Dheeraj Turaga <[email protected]>
AuthorDate: Thu Oct 30 17:56:15 2025 -0500
Fix connection retrieval in `DagProcessorManager` for bundle initialization
(#57459)
The dag_processor was unable to retrieve connections from the database,
causing GitHook (and other hooks) to fail with:
AirflowNotFoundException: The conn_id `<conn_id>` isn't defined
Root cause: DagProcessorManager was running in FALLBACK context, which only
loads EnvironmentVariablesBackend, not MetastoreBackend. This meant
connections stored in the database were inaccessible.
The `DagProcessorManager` (parent process) needs database access for
connection
retrieval during bundle initialization (e.g., `GitDagBundle.__init__` →
`GitHook`
needs git credentials). Child `DagFileProcessorProcess` instances run user
code
and should remain isolated from direct database access.
This ensures correct secrets backend chains (when no external secrets
backend is configured):
- Manager (parent): `EnvironmentVariablesBackend` → `MetastoreBackend`
(database access)
- Parser (child): `EnvironmentVariablesBackend`
Note: This is temporary until AIP-92 removes direct DB access from
DagProcessorManager.
Long-term, the manager should use the Execution API instead of direct
database access.
Affects: DAG bundle processing with GitHook and any other hooks that rely on
database-stored connections during bundle initialization in the manager
process.
---
airflow-core/src/airflow/dag_processing/manager.py | 10 ++++++++++
airflow-core/src/airflow/dag_processing/processor.py | 4 ++++
2 files changed, 14 insertions(+)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index ff6ff81ab1e..232c0d978ac 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -251,6 +251,16 @@ class DagFileProcessorManager(LoggingMixin):
By processing them in separate processes, we can get parallelism and
isolation
from potentially harmful user code.
"""
+ # TODO: Temporary until AIP-92 removes DB access from
DagProcessorManager.
+ # The manager needs MetastoreBackend to retrieve connections from the
database
+ # during bundle initialization (e.g., GitDagBundle.__init__ → GitHook
needs git credentials).
+ # This marks the manager as "server" context so
ensure_secrets_backend_loaded() provides
+ # MetastoreBackend instead of falling back to
EnvironmentVariablesBackend only.
+ # Child parser processes explicitly override this by setting
_AIRFLOW_PROCESS_CONTEXT=client
+ # in _parse_file_entrypoint() to prevent inheriting server privileges.
+ # Related: https://github.com/apache/airflow/pull/57459
+ os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
+
self.register_exit_signals()
self.log.info("Processing files using up to %s processes at a time ",
self._parallelism)
diff --git a/airflow-core/src/airflow/dag_processing/processor.py
b/airflow-core/src/airflow/dag_processing/processor.py
index bf937eac53f..122b2fd0768 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -174,6 +174,10 @@ def _pre_import_airflow_modules(file_path: str, log:
FilteringBoundLogger) -> No
def _parse_file_entrypoint():
+ # Mark as client-side (runs user DAG code)
+ # Prevents inheriting server context from parent DagProcessorManager
+ os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "client"
+
import structlog
from airflow.sdk.execution_time import comms, task_runner