This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit e34da051d638803305b26e78c5be3ea4b67b1687
Author: Dev-iL <[email protected]>
AuthorDate: Thu Jan 15 13:32:11 2026 +0200

    Add warning when Bundle path may not be accessible to impersonated user 
(#60278)
    
    (cherry picked from commit 0e0fceb9b73aaf04e904715b9bfe48dcd1d027e4)
---
 .../administration-and-deployment/dag-bundles.rst  | 17 ++++++++
 .../src/airflow/sdk/execution_time/task_runner.py  | 37 ++++++++++++++++
 .../task_sdk/execution_time/test_task_runner.py    | 50 ++++++++++++++++++++++
 3 files changed, 104 insertions(+)

diff --git a/airflow-core/docs/administration-and-deployment/dag-bundles.rst 
b/airflow-core/docs/administration-and-deployment/dag-bundles.rst
index 60d799e4c22..e51c6c75a39 100644
--- a/airflow-core/docs/administration-and-deployment/dag-bundles.rst
+++ b/airflow-core/docs/administration-and-deployment/dag-bundles.rst
@@ -119,6 +119,23 @@ Starting Airflow 3.0.2 git is pre installed in the base 
image. However, if you a
   ENV GIT_PYTHON_REFRESH=quiet
 
 
+Using DAG Bundles with User Impersonation
+-----------------------------------------
+
+When using ``run_as_user`` (user impersonation) with DAG bundles, ensure 
proper file permissions
+are configured so that impersonated users can access bundle files created by 
the main Airflow process.
+
+1. All impersonated users and the Airflow user should be in the same group
+2. Configure appropriate umask settings (e.g., ``umask 0002``)
+
+
+.. note::
+
+    This permission-based approach is a temporary solution. Future versions of 
Airflow
+    will handle multi-user access through supervisor-based bundle operations, 
eliminating
+    the need for shared group permissions.
+
+
 Writing custom Dag bundles
 --------------------------
 
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 83002aa2ad0..f98da597639 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -646,6 +646,7 @@ def parse(what: StartupDetails, log: Logger) -> 
RuntimeTaskInstance:
         version=bundle_info.version,
     )
     bundle_instance.initialize()
+    _verify_bundle_access(bundle_instance, log)
 
     dag_absolute_path = os.fspath(Path(bundle_instance.path, 
what.dag_rel_path))
     bag = BundleDagBag(
@@ -716,6 +717,42 @@ SUPERVISOR_COMMS: CommsDecoder[ToTask, ToSupervisor]
 # 3. Shutdown and report status
 
 
+def _verify_bundle_access(bundle_instance: BaseDagBundle, log: Logger) -> None:
+    """
+    Verify bundle is accessible by the current user.
+
+    This is called after user impersonation (if any) to ensure the bundle
+    is actually accessible. Uses os.access() which works with any permission
+    scheme (standard Unix permissions, ACLs, SELinux, etc.).
+
+    :param bundle_instance: The bundle instance to check
+    :param log: Logger instance
+    :raises AirflowException: if bundle is not accessible
+    """
+    from getpass import getuser
+
+    from airflow.exceptions import AirflowException
+
+    bundle_path = bundle_instance.path
+
+    if not bundle_path.exists():
+        # Already handled by initialize() with a warning
+        return
+
+    # Check read permission (and execute for directories to list contents)
+    access_mode = os.R_OK
+    if bundle_path.is_dir():
+        access_mode |= os.X_OK
+
+    if not os.access(bundle_path, access_mode):
+        raise AirflowException(
+            f"Bundle '{bundle_instance.name}' path '{bundle_path}' is not 
accessible "
+            f"by user '{getuser()}'. When using run_as_user, ensure bundle 
directories "
+            f"are readable by the impersonated user. "
+            f"See: 
https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/dag-bundles.html";
+        )
+
+
 def startup() -> tuple[RuntimeTaskInstance, Context, Logger]:
     # The parent sends us a StartupDetails message un-prompted. After this, 
every single message is only sent
     # in response to us sending a request.
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py 
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index d5f5b5109e0..8bc7282903d 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -423,6 +423,56 @@ def test_parse_module_in_bundle_root(tmp_path: Path, 
make_ti_context):
     assert ti.task.dag.dag_id == "dag_name"
 
 
+def test_verify_bundle_access_raises_when_not_accessible(tmp_path: Path, 
make_ti_context):
+    """Test that _verify_bundle_access raises AirflowException when bundle 
path is not accessible."""
+    from airflow.sdk.execution_time.task_runner import _verify_bundle_access
+
+    # Create a directory that exists
+    bundle_path = tmp_path / "test_bundle"
+    bundle_path.mkdir()
+
+    # Create a mock bundle instance
+    mock_bundle = mock.Mock()
+    mock_bundle.path = bundle_path
+    mock_bundle.name = "test-bundle"
+
+    # Mock os.access to simulate permission denied (avoids root user issues in 
CI)
+    with patch("airflow.sdk.execution_time.task_runner.os.access", 
return_value=False):
+        with pytest.raises(AirflowException) as exc_info:
+            _verify_bundle_access(mock_bundle, mock.Mock())
+
+        assert "not accessible" in str(exc_info.value)
+        assert "test-bundle" in str(exc_info.value)
+
+
+def test_verify_bundle_access_succeeds_when_readable(tmp_path: Path, 
make_ti_context):
+    """Test that _verify_bundle_access succeeds when bundle path is 
accessible."""
+    from airflow.sdk.execution_time.task_runner import _verify_bundle_access
+
+    # Create a directory with read permissions
+    bundle_path = tmp_path / "accessible_bundle"
+    bundle_path.mkdir()
+
+    mock_bundle = mock.Mock()
+    mock_bundle.path = bundle_path
+    mock_bundle.name = "test-bundle"
+
+    # Should not raise
+    _verify_bundle_access(mock_bundle, mock.Mock())
+
+
+def test_verify_bundle_access_skips_nonexistent_path(tmp_path: Path):
+    """Test that _verify_bundle_access does nothing when bundle path doesn't 
exist."""
+    from airflow.sdk.execution_time.task_runner import _verify_bundle_access
+
+    mock_bundle = mock.Mock()
+    mock_bundle.path = tmp_path / "nonexistent"
+    mock_bundle.name = "test-bundle"
+
+    # Should not raise - nonexistent paths are handled by initialize()
+    _verify_bundle_access(mock_bundle, mock.Mock())
+
+
 def test_run_deferred_basic(time_machine, create_runtime_ti, 
mock_supervisor_comms):
     """Test that a task can transition to a deferred state."""
     from airflow.providers.standard.sensors.date_time import 
DateTimeSensorAsync

Reply via email to