This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-0-test by this push:
     new 16d5bc9ffe9 [v3-0-test] Add bundle path to sys.path in task runner 
(#51318) (#51341)
16d5bc9ffe9 is described below

commit 16d5bc9ffe94a2c2a7fb7f0b1e86f32cb246ac8a
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jun 3 14:25:03 2025 +0530

    [v3-0-test] Add bundle path to sys.path in task runner (#51318) (#51341)
    
    (cherry picked from commit b1bbc82c80d28bd0d8066cec81e13640a08a04f4)
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
    Co-authored-by: Amogh Desai <[email protected]>
---
 .../src/airflow/sdk/execution_time/task_runner.py  |  5 +++
 .../task_sdk/execution_time/test_task_runner.py    | 49 ++++++++++++++++++++++
 2 files changed, 54 insertions(+)

diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 2036ee22b47..f7faea7489c 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -562,6 +562,11 @@ def parse(what: StartupDetails, log: Logger) -> 
RuntimeTaskInstance:
     )
     bundle_instance.initialize()
 
+    # Put bundle root on sys.path if needed. This allows the dag bundle to add
+    # code in util modules to be shared between files within the same bundle.
+    if (bundle_root := os.fspath(bundle_instance.path)) not in sys.path:
+        sys.path.append(bundle_root)
+
     dag_absolute_path = os.fspath(Path(bundle_instance.path, 
what.dag_rel_path))
     bag = DagBag(
         dag_folder=dag_absolute_path,
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py 
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 8df65b88ebf..3077a47f41b 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -21,6 +21,7 @@ import contextlib
 import functools
 import json
 import os
+import textwrap
 import uuid
 from collections.abc import Iterable
 from datetime import datetime, timedelta
@@ -274,6 +275,54 @@ def test_parse_not_found(test_dags_dir: Path, 
make_ti_context, dag_id, task_id,
     log.error.assert_has_calls([expected_error])
 
 
+def test_parse_module_in_bundle_root(tmp_path: Path, make_ti_context):
+    """Check that the bundle path is added to sys.path, so Dags can import 
shared modules."""
+    tmp_path.joinpath("util.py").write_text("NAME = 'dag_name'")
+
+    dag1_path = tmp_path.joinpath("path_test.py")
+    dag1_code = """
+    from util import NAME
+    from airflow.sdk import DAG
+    from airflow.sdk.bases.operator import BaseOperator
+    with DAG(NAME):
+        BaseOperator(task_id="a")
+    """
+    dag1_path.write_text(textwrap.dedent(dag1_code))
+
+    what = StartupDetails(
+        ti=TaskInstance(
+            id=uuid7(),
+            task_id="a",
+            dag_id="dag_name",
+            run_id="c",
+            try_number=1,
+        ),
+        dag_rel_path="path_test.py",
+        bundle_info=BundleInfo(name="my-bundle", version=None),
+        requests_fd=0,
+        ti_context=make_ti_context(),
+        start_date=timezone.utcnow(),
+    )
+
+    with patch.dict(
+        os.environ,
+        {
+            "AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(
+                [
+                    {
+                        "name": "my-bundle",
+                        "classpath": 
"airflow.dag_processing.bundles.local.LocalDagBundle",
+                        "kwargs": {"path": str(tmp_path), "refresh_interval": 
1},
+                    }
+                ]
+            ),
+        },
+    ):
+        ti = parse(what, mock.Mock())
+
+    assert ti.task.dag.dag_id == "dag_name"
+
+
 def test_run_deferred_basic(time_machine, create_runtime_ti, 
mock_supervisor_comms):
     """Test that a task can transition to a deferred state."""
     from airflow.providers.standard.sensors.date_time import 
DateTimeSensorAsync

Reply via email to