This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new d60088251ad Fix connection retrieval in `DagProcessorManager` for 
bundle initialization (#57459)
d60088251ad is described below

commit d60088251ad43fe2994c116bc37e42d732511a77
Author: Dheeraj Turaga <[email protected]>
AuthorDate: Thu Oct 30 17:56:15 2025 -0500

    Fix connection retrieval in `DagProcessorManager` for bundle initialization 
(#57459)
    
    The dag_processor was unable to retrieve connections from the database,
    causing GitHook (and other hooks) to fail with:
      AirflowNotFoundException: The conn_id `<conn_id>` isn't defined
    
    Root cause: DagProcessorManager was running in FALLBACK context, which only
    loads EnvironmentVariablesBackend, not MetastoreBackend. This meant
    connections stored in the database were inaccessible.
    
    The `DagProcessorManager` (parent process) needs database access for 
connection
    retrieval during bundle initialization (e.g., `GitDagBundle.__init__` → 
`GitHook`
    needs git credentials). Child `DagFileProcessorProcess` instances run user 
code
    and should remain isolated from direct database access.
    
    This ensures correct secrets backend chains (when no external secrets 
backend is configured):
    - Manager (parent): `EnvironmentVariablesBackend` → `MetastoreBackend` 
(database access)
    - Parser (child): `EnvironmentVariablesBackend`
    
    Note: This is temporary until AIP-92 removes direct DB access from 
DagProcessorManager.
    Long-term, the manager should use the Execution API instead of direct 
database access.
    
    Affects: DAG bundle processing with GitHook and any other hooks that rely on
    database-stored connections during bundle initialization in the manager 
process.
    
    (cherry picked from commit ae2a4fd50442b04346d58383f16fd9fa0de00c26)
---
 airflow-core/src/airflow/dag_processing/manager.py   | 10 ++++++++++
 airflow-core/src/airflow/dag_processing/processor.py |  4 ++++
 2 files changed, 14 insertions(+)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index 5c32d7daa6f..c774aa29fc6 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -250,6 +250,16 @@ class DagFileProcessorManager(LoggingMixin):
         By processing them in separate processes, we can get parallelism and 
isolation
         from potentially harmful user code.
         """
+        # TODO: Temporary until AIP-92 removes DB access from 
DagProcessorManager.
+        # The manager needs MetastoreBackend to retrieve connections from the 
database
+        # during bundle initialization (e.g., GitDagBundle.__init__ → GitHook 
needs git credentials).
+        # This marks the manager as "server" context so 
ensure_secrets_backend_loaded() provides
+        # MetastoreBackend instead of falling back to 
EnvironmentVariablesBackend only.
+        # Child parser processes explicitly override this by setting 
_AIRFLOW_PROCESS_CONTEXT=client
+        # in _parse_file_entrypoint() to prevent inheriting server privileges.
+        # Related: https://github.com/apache/airflow/pull/57459
+        os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
+
         self.register_exit_signals()
 
         self.log.info("Processing files using up to %s processes at a time ", 
self._parallelism)
diff --git a/airflow-core/src/airflow/dag_processing/processor.py 
b/airflow-core/src/airflow/dag_processing/processor.py
index 9e7215784f2..ac30dc03358 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -171,6 +171,10 @@ def _pre_import_airflow_modules(file_path: str, log: 
FilteringBoundLogger) -> No
 
 
 def _parse_file_entrypoint():
+    # Mark as client-side (runs user DAG code)
+    # Prevents inheriting server context from parent DagProcessorManager
+    os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "client"
+
     import structlog
 
     from airflow.sdk.execution_time import comms, task_runner

Reply via email to