This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit e34da051d638803305b26e78c5be3ea4b67b1687 Author: Dev-iL <[email protected]> AuthorDate: Thu Jan 15 13:32:11 2026 +0200 Add warning when Bundle path may not be accessible to impersonated user (#60278) (cherry picked from commit 0e0fceb9b73aaf04e904715b9bfe48dcd1d027e4) --- .../administration-and-deployment/dag-bundles.rst | 17 ++++++++ .../src/airflow/sdk/execution_time/task_runner.py | 37 ++++++++++++++++ .../task_sdk/execution_time/test_task_runner.py | 50 ++++++++++++++++++++++ 3 files changed, 104 insertions(+) diff --git a/airflow-core/docs/administration-and-deployment/dag-bundles.rst b/airflow-core/docs/administration-and-deployment/dag-bundles.rst index 60d799e4c22..e51c6c75a39 100644 --- a/airflow-core/docs/administration-and-deployment/dag-bundles.rst +++ b/airflow-core/docs/administration-and-deployment/dag-bundles.rst @@ -119,6 +119,23 @@ Starting Airflow 3.0.2 git is pre installed in the base image. However, if you a ENV GIT_PYTHON_REFRESH=quiet +Using DAG Bundles with User Impersonation +----------------------------------------- + +When using ``run_as_user`` (user impersonation) with DAG bundles, ensure proper file permissions +are configured so that impersonated users can access bundle files created by the main Airflow process. + +1. All impersonated users and the Airflow user should be in the same group +2. Configure appropriate umask settings (e.g., ``umask 0002``) + + +.. note:: + + This permission-based approach is a temporary solution. Future versions of Airflow + will handle multi-user access through supervisor-based bundle operations, eliminating + the need for shared group permissions. + + Writing custom Dag bundles -------------------------- diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 83002aa2ad0..f98da597639 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -646,6 +646,7 @@ def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance: version=bundle_info.version, ) bundle_instance.initialize() + _verify_bundle_access(bundle_instance, log) dag_absolute_path = os.fspath(Path(bundle_instance.path, what.dag_rel_path)) bag = BundleDagBag( @@ -716,6 +717,42 @@ SUPERVISOR_COMMS: CommsDecoder[ToTask, ToSupervisor] # 3. Shutdown and report status +def _verify_bundle_access(bundle_instance: BaseDagBundle, log: Logger) -> None: + """ + Verify bundle is accessible by the current user. + + This is called after user impersonation (if any) to ensure the bundle + is actually accessible. Uses os.access() which works with any permission + scheme (standard Unix permissions, ACLs, SELinux, etc.). + + :param bundle_instance: The bundle instance to check + :param log: Logger instance + :raises AirflowException: if bundle is not accessible + """ + from getpass import getuser + + from airflow.exceptions import AirflowException + + bundle_path = bundle_instance.path + + if not bundle_path.exists(): + # Already handled by initialize() with a warning + return + + # Check read permission (and execute for directories to list contents) + access_mode = os.R_OK + if bundle_path.is_dir(): + access_mode |= os.X_OK + + if not os.access(bundle_path, access_mode): + raise AirflowException( + f"Bundle '{bundle_instance.name}' path '{bundle_path}' is not accessible " + f"by user '{getuser()}'. When using run_as_user, ensure bundle directories " + f"are readable by the impersonated user. " + f"See: https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/dag-bundles.html" + ) + + def startup() -> tuple[RuntimeTaskInstance, Context, Logger]: # The parent sends us a StartupDetails message un-prompted. After this, every single message is only sent # in response to us sending a request. diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index d5f5b5109e0..8bc7282903d 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -423,6 +423,56 @@ def test_parse_module_in_bundle_root(tmp_path: Path, make_ti_context): assert ti.task.dag.dag_id == "dag_name" +def test_verify_bundle_access_raises_when_not_accessible(tmp_path: Path, make_ti_context): + """Test that _verify_bundle_access raises AirflowException when bundle path is not accessible.""" + from airflow.sdk.execution_time.task_runner import _verify_bundle_access + + # Create a directory that exists + bundle_path = tmp_path / "test_bundle" + bundle_path.mkdir() + + # Create a mock bundle instance + mock_bundle = mock.Mock() + mock_bundle.path = bundle_path + mock_bundle.name = "test-bundle" + + # Mock os.access to simulate permission denied (avoids root user issues in CI) + with patch("airflow.sdk.execution_time.task_runner.os.access", return_value=False): + with pytest.raises(AirflowException) as exc_info: + _verify_bundle_access(mock_bundle, mock.Mock()) + + assert "not accessible" in str(exc_info.value) + assert "test-bundle" in str(exc_info.value) + + +def test_verify_bundle_access_succeeds_when_readable(tmp_path: Path, make_ti_context): + """Test that _verify_bundle_access succeeds when bundle path is accessible.""" + from airflow.sdk.execution_time.task_runner import _verify_bundle_access + + # Create a directory with read permissions + bundle_path = tmp_path / "accessible_bundle" + bundle_path.mkdir() + + mock_bundle = mock.Mock() + mock_bundle.path = bundle_path + mock_bundle.name = "test-bundle" + + # Should not raise + _verify_bundle_access(mock_bundle, mock.Mock()) + + +def test_verify_bundle_access_skips_nonexistent_path(tmp_path: Path): + """Test that _verify_bundle_access does nothing when bundle path doesn't exist.""" + from airflow.sdk.execution_time.task_runner import _verify_bundle_access + + mock_bundle = mock.Mock() + mock_bundle.path = tmp_path / "nonexistent" + mock_bundle.name = "test-bundle" + + # Should not raise - nonexistent paths are handled by initialize() + _verify_bundle_access(mock_bundle, mock.Mock()) + + def test_run_deferred_basic(time_machine, create_runtime_ti, mock_supervisor_comms): """Test that a task can transition to a deferred state.""" from airflow.providers.standard.sensors.date_time import DateTimeSensorAsync
