This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-0-test by this push:
new 16d5bc9ffe9 [v3-0-test] Add bundle path to sys.path in task runner
(#51318) (#51341)
16d5bc9ffe9 is described below
commit 16d5bc9ffe94a2c2a7fb7f0b1e86f32cb246ac8a
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jun 3 14:25:03 2025 +0530
[v3-0-test] Add bundle path to sys.path in task runner (#51318) (#51341)
(cherry picked from commit b1bbc82c80d28bd0d8066cec81e13640a08a04f4)
Co-authored-by: Jed Cunningham
<[email protected]>
Co-authored-by: Amogh Desai <[email protected]>
---
.../src/airflow/sdk/execution_time/task_runner.py | 5 +++
.../task_sdk/execution_time/test_task_runner.py | 49 ++++++++++++++++++++++
2 files changed, 54 insertions(+)
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 2036ee22b47..f7faea7489c 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -562,6 +562,11 @@ def parse(what: StartupDetails, log: Logger) ->
RuntimeTaskInstance:
)
bundle_instance.initialize()
+ # Put bundle root on sys.path if needed. This allows the dag bundle to add
+ # code in util modules to be shared between files within the same bundle.
+ if (bundle_root := os.fspath(bundle_instance.path)) not in sys.path:
+ sys.path.append(bundle_root)
+
dag_absolute_path = os.fspath(Path(bundle_instance.path,
what.dag_rel_path))
bag = DagBag(
dag_folder=dag_absolute_path,
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 8df65b88ebf..3077a47f41b 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -21,6 +21,7 @@ import contextlib
import functools
import json
import os
+import textwrap
import uuid
from collections.abc import Iterable
from datetime import datetime, timedelta
@@ -274,6 +275,54 @@ def test_parse_not_found(test_dags_dir: Path,
make_ti_context, dag_id, task_id,
log.error.assert_has_calls([expected_error])
+def test_parse_module_in_bundle_root(tmp_path: Path, make_ti_context):
+ """Check that the bundle path is added to sys.path, so Dags can import
shared modules."""
+ tmp_path.joinpath("util.py").write_text("NAME = 'dag_name'")
+
+ dag1_path = tmp_path.joinpath("path_test.py")
+ dag1_code = """
+ from util import NAME
+ from airflow.sdk import DAG
+ from airflow.sdk.bases.operator import BaseOperator
+ with DAG(NAME):
+ BaseOperator(task_id="a")
+ """
+ dag1_path.write_text(textwrap.dedent(dag1_code))
+
+ what = StartupDetails(
+ ti=TaskInstance(
+ id=uuid7(),
+ task_id="a",
+ dag_id="dag_name",
+ run_id="c",
+ try_number=1,
+ ),
+ dag_rel_path="path_test.py",
+ bundle_info=BundleInfo(name="my-bundle", version=None),
+ requests_fd=0,
+ ti_context=make_ti_context(),
+ start_date=timezone.utcnow(),
+ )
+
+ with patch.dict(
+ os.environ,
+ {
+ "AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(
+ [
+ {
+ "name": "my-bundle",
+ "classpath":
"airflow.dag_processing.bundles.local.LocalDagBundle",
+ "kwargs": {"path": str(tmp_path), "refresh_interval":
1},
+ }
+ ]
+ ),
+ },
+ ):
+ ti = parse(what, mock.Mock())
+
+ assert ti.task.dag.dag_id == "dag_name"
+
+
def test_run_deferred_basic(time_machine, create_runtime_ti,
mock_supervisor_comms):
"""Test that a task can transition to a deferred state."""
from airflow.providers.standard.sensors.date_time import
DateTimeSensorAsync