This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 497f2acf16b [v3-1-test] Add bundle_path temporarily to sys.path during
DagBag parsing. (#55894) (#61053)
497f2acf16b is described below
commit 497f2acf16b56b22ed020b68745bcca0525e2777
Author: Wei Lee <[email protected]>
AuthorDate: Tue Jan 27 09:14:10 2026 +0800
[v3-1-test] Add bundle_path temporarily to sys.path during DagBag parsing.
(#55894) (#61053)
Co-authored-by: Josef Šimánek <[email protected]>
fixup! [v3-1-test] Add bundle_path temporarily to sys.path during DagBag
parsing. (#55894)
fixup! fixup! [v3-1-test] Add bundle_path temporarily to sys.path during
DagBag parsing. (#55894)
fixup! fixup! fixup! [v3-1-test] Add bundle_path temporarily to sys.path
during DagBag parsing. (#55894)
---
.../src/airflow/cli/commands/dag_command.py | 18 ++--
.../src/airflow/dag_processing/processor.py | 11 +--
airflow-core/src/airflow/models/dagbag.py | 34 ++++++++
airflow-core/src/airflow/utils/cli.py | 15 ++--
.../tests/unit/cli/commands/test_dag_command.py | 9 +-
airflow-core/tests/unit/models/test_dag.py | 4 +-
airflow-core/tests/unit/models/test_dagbag.py | 95 +++++++++++++++++++++-
task-sdk/src/airflow/sdk/definitions/dag.py | 6 +-
.../src/airflow/sdk/execution_time/task_runner.py | 14 +---
.../task_sdk/execution_time/test_task_runner.py | 52 +++++++++++-
10 files changed, 207 insertions(+), 51 deletions(-)
diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py
b/airflow-core/src/airflow/cli/commands/dag_command.py
index df7276d8643..2a7d6d24710 100644
--- a/airflow-core/src/airflow/cli/commands/dag_command.py
+++ b/airflow-core/src/airflow/cli/commands/dag_command.py
@@ -41,7 +41,7 @@ from airflow.exceptions import AirflowConfigException,
AirflowException
from airflow.jobs.job import Job
from airflow.models import DagBag, DagModel, DagRun, TaskInstance
from airflow.models.dag import get_next_data_interval
-from airflow.models.dagbag import sync_bag_to_db
+from airflow.models.dagbag import BundleDagBag, sync_bag_to_db
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils import cli as cli_utils
@@ -375,10 +375,10 @@ def dag_list_dags(args, session: Session = NEW_SESSION)
-> None:
for bundle in all_bundles:
if bundle.name in bundles_to_search:
- dagbag = DagBag(bundle.path, bundle_path=bundle.path)
- dagbag.collect_dags()
- dags_list.extend(list(dagbag.dags.values()))
- dagbag_import_errors += len(dagbag.import_errors)
+ bundle_dagbag = BundleDagBag(bundle.path,
bundle_path=bundle.path)
+ bundle_dagbag.collect_dags()
+ dags_list.extend(list(bundle_dagbag.dags.values()))
+ dagbag_import_errors += len(bundle_dagbag.import_errors)
else:
dagbag = DagBag()
dagbag.collect_dags()
@@ -471,8 +471,8 @@ def dag_list_import_errors(args, session: Session =
NEW_SESSION) -> None:
for bundle in all_bundles:
if bundle.name in bundles_to_search:
- dagbag = DagBag(bundle.path, bundle_path=bundle.path)
- for filename, errors in dagbag.import_errors.items():
+ bundle_dagbag = BundleDagBag(bundle.path,
bundle_path=bundle.path)
+ for filename, errors in
bundle_dagbag.import_errors.items():
data.append({"bundle_name": bundle.name, "filepath":
filename, "error": errors})
else:
dagbag = DagBag()
@@ -523,7 +523,7 @@ def dag_report(args) -> None:
if bundle.name not in bundles_to_reserialize:
continue
bundle.initialize()
- dagbag = DagBag(bundle.path, include_examples=False)
+ dagbag = BundleDagBag(bundle.path, bundle_path=bundle.path)
all_dagbag_stats.extend(dagbag.dagbag_stats)
AirflowConsole().print_as(
@@ -687,5 +687,5 @@ def dag_reserialize(args, session: Session = NEW_SESSION)
-> None:
if bundle.name not in bundles_to_reserialize:
continue
bundle.initialize()
- dag_bag = DagBag(bundle.path, bundle_path=bundle.path,
include_examples=False)
+ dag_bag = BundleDagBag(bundle.path, bundle_path=bundle.path)
sync_bag_to_db(dag_bag, bundle.name,
bundle_version=bundle.get_current_version(), session=session)
diff --git a/airflow-core/src/airflow/dag_processing/processor.py
b/airflow-core/src/airflow/dag_processing/processor.py
index ac30dc03358..49f997f9a1e 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -19,7 +19,6 @@ from __future__ import annotations
import contextlib
import importlib
import os
-import sys
import traceback
from collections.abc import Callable, Sequence
from pathlib import Path
@@ -36,7 +35,7 @@ from airflow.callbacks.callback_requests import (
)
from airflow.configuration import conf
from airflow.exceptions import TaskNotFound
-from airflow.models.dagbag import DagBag
+from airflow.models.dagbag import BundleDagBag, DagBag
from airflow.sdk.execution_time.comms import (
ConnectionResult,
DeleteVariable,
@@ -191,11 +190,6 @@ def _parse_file_entrypoint():
task_runner.SUPERVISOR_COMMS = comms_decoder
log = structlog.get_logger(logger_name="task")
- # Put bundle root on sys.path if needed. This allows the dag bundle to add
- # code in util modules to be shared between files within the same bundle.
- if (bundle_root := os.fspath(msg.bundle_path)) not in sys.path:
- sys.path.append(bundle_root)
-
result = _parse_file(msg, log)
if result is not None:
comms_decoder.send(result)
@@ -204,10 +198,9 @@ def _parse_file_entrypoint():
def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) ->
DagFileParsingResult | None:
# TODO: Set known_pool names on DagBag!
- bag = DagBag(
+ bag = BundleDagBag(
dag_folder=msg.file,
bundle_path=msg.bundle_path,
- include_examples=False,
load_op_links=False,
)
if msg.callback_requests:
diff --git a/airflow-core/src/airflow/models/dagbag.py
b/airflow-core/src/airflow/models/dagbag.py
index 5ee277d949d..4576819a7de 100644
--- a/airflow-core/src/airflow/models/dagbag.py
+++ b/airflow-core/src/airflow/models/dagbag.py
@@ -650,6 +650,40 @@ class DagBag(LoggingMixin):
return report
+class BundleDagBag(DagBag):
+ """
+ Bundle-aware DagBag that permanently modifies sys.path.
+
+ This class adds the bundle_path to sys.path permanently to allow DAG files
+ to import modules from their bundle directory. No cleanup is performed.
+
+ WARNING: Only use for one-off usages like CLI commands. Using this in
long-running
+ processes will cause sys.path to accumulate entries.
+
+ Same parameters as DagBag, but bundle_path is required and examples are
not loaded.
+ """
+
+ def __init__(self, *args, bundle_path: Path | None = None, **kwargs):
+ if not bundle_path:
+ raise ValueError("bundle_path is required for BundleDagBag")
+
+ if str(bundle_path) not in sys.path:
+ sys.path.append(str(bundle_path))
+
+ # Warn if user explicitly set include_examples=True, since bundles
never contain examples
+ if kwargs.get("include_examples") is True:
+ warnings.warn(
+ "include_examples=True is ignored for BundleDagBag. "
+ "Bundles do not contain example DAGs, so include_examples is
always False.",
+ UserWarning,
+ stacklevel=2,
+ )
+
+ kwargs["bundle_path"] = bundle_path
+ kwargs["include_examples"] = False
+ super().__init__(*args, **kwargs)
+
+
@provide_session
def sync_bag_to_db(
dagbag: DagBag,
diff --git a/airflow-core/src/airflow/utils/cli.py
b/airflow-core/src/airflow/utils/cli.py
index b6423c5af3a..66fe0ac47f7 100644
--- a/airflow-core/src/airflow/utils/cli.py
+++ b/airflow-core/src/airflow/utils/cli.py
@@ -271,14 +271,15 @@ def get_bagged_dag(bundle_names: list | None, dag_id:
str, dagfile_path: str | N
find the correct path (assuming it's a file) and failing that, use the
configured
dags folder.
"""
- from airflow.models.dagbag import DagBag, sync_bag_to_db
+ from airflow.models.dagbag import BundleDagBag, sync_bag_to_db
manager = DagBundlesManager()
for bundle_name in bundle_names or ():
bundle = manager.get_bundle(bundle_name)
with _airflow_parsing_context_manager(dag_id=dag_id):
- dagbag = DagBag(
- dag_folder=dagfile_path or bundle.path,
bundle_path=bundle.path, include_examples=False
+ dagbag = BundleDagBag(
+ dag_folder=dagfile_path or bundle.path,
+ bundle_path=bundle.path,
)
if dag := dagbag.dags.get(dag_id):
return dag
@@ -287,9 +288,7 @@ def get_bagged_dag(bundle_names: list | None, dag_id: str,
dagfile_path: str | N
for bundle in manager.get_all_dag_bundles():
bundle.initialize()
with _airflow_parsing_context_manager(dag_id=dag_id):
- dagbag = DagBag(
- dag_folder=dagfile_path or bundle.path,
bundle_path=bundle.path, include_examples=False
- )
+ dagbag = BundleDagBag(dag_folder=dagfile_path or bundle.path,
bundle_path=bundle.path)
sync_bag_to_db(dagbag, bundle.name, bundle.version)
if dag := dagbag.dags.get(dag_id):
return dag
@@ -315,7 +314,7 @@ def get_db_dag(bundle_names: list | None, dag_id: str,
dagfile_path: str | None
def get_dags(bundle_names: list | None, dag_id: str, use_regex: bool = False,
from_db: bool = False):
"""Return DAG(s) matching a given regex or dag_id."""
- from airflow.models import DagBag
+ from airflow.models.dagbag import BundleDagBag
bundle_names = bundle_names or []
@@ -325,7 +324,7 @@ def get_dags(bundle_names: list | None, dag_id: str,
use_regex: bool = False, fr
return [get_bagged_dag(bundle_names=bundle_names, dag_id=dag_id)]
def _find_dag(bundle):
- dagbag = DagBag(dag_folder=bundle.path, bundle_path=bundle.path)
+ dagbag = BundleDagBag(dag_folder=bundle.path, bundle_path=bundle.path)
matched_dags = [dag for dag in dagbag.dags.values() if
re.search(dag_id, dag.dag_id)]
return matched_dags
diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py
b/airflow-core/tests/unit/cli/commands/test_dag_command.py
index f6cae39b720..517fdee2a51 100644
--- a/airflow-core/tests/unit/cli/commands/test_dag_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py
@@ -759,7 +759,7 @@ class TestCliDags:
mock_render_dag.assert_has_calls([mock.call(mock_get_dag.return_value,
tis=[])])
assert "SOURCE" in output
- @mock.patch("airflow.models.dagbag.DagBag")
+ @mock.patch("airflow.models.dagbag.BundleDagBag")
def test_dag_test_with_bundle_name(self, mock_dagbag,
configure_dag_bundles):
"""Test that DAG can be tested using bundle name."""
mock_dagbag.return_value.get_dag.return_value.test.return_value =
DagRun(
@@ -783,10 +783,9 @@ class TestCliDags:
mock_dagbag.assert_called_once_with(
bundle_path=TEST_DAGS_FOLDER,
dag_folder=TEST_DAGS_FOLDER,
- include_examples=False,
)
- @mock.patch("airflow.models.dagbag.DagBag")
+ @mock.patch("airflow.models.dagbag.BundleDagBag")
def test_dag_test_with_dagfile_path(self, mock_dagbag,
configure_dag_bundles):
"""Test that DAG can be tested using dagfile path."""
mock_dagbag.return_value.get_dag.return_value.test.return_value =
DagRun(
@@ -804,10 +803,9 @@ class TestCliDags:
mock_dagbag.assert_called_once_with(
bundle_path=TEST_DAGS_FOLDER,
dag_folder=str(dag_file),
- include_examples=False,
)
- @mock.patch("airflow.models.dagbag.DagBag")
+ @mock.patch("airflow.models.dagbag.BundleDagBag")
def test_dag_test_with_both_bundle_and_dagfile_path(self, mock_dagbag,
configure_dag_bundles):
"""Test that DAG can be tested using both bundle name and dagfile
path."""
mock_dagbag.return_value.get_dag.return_value.test.return_value =
DagRun(
@@ -835,7 +833,6 @@ class TestCliDags:
mock_dagbag.assert_called_once_with(
bundle_path=TEST_DAGS_FOLDER,
dag_folder=str(dag_file),
- include_examples=False,
)
@mock.patch("airflow.models.dagrun.get_or_create_dagrun")
diff --git a/airflow-core/tests/unit/models/test_dag.py
b/airflow-core/tests/unit/models/test_dag.py
index e9c77912e9f..c511e2a4df4 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -54,7 +54,7 @@ from airflow.models.dag import (
get_next_data_interval,
get_run_data_interval,
)
-from airflow.models.dagbag import DBDagBag
+from airflow.models.dagbag import BundleDagBag, DBDagBag
from airflow.models.dagbundle import DagBundleModel
from airflow.models.dagrun import DagRun
from airflow.models.serialized_dag import SerializedDagModel
@@ -2147,7 +2147,7 @@ class TestDagModel:
rel_path = "test_assets.py"
bundle_path = TEST_DAGS_FOLDER
file_path = bundle_path / rel_path
- bag = DagBag(dag_folder=file_path, bundle_path=bundle_path)
+ bag = BundleDagBag(dag_folder=file_path, bundle_path=bundle_path)
dag = bag.get_dag("dag_with_skip_task")
diff --git a/airflow-core/tests/unit/models/test_dagbag.py
b/airflow-core/tests/unit/models/test_dagbag.py
index 5c064c973ac..5e4639750c7 100644
--- a/airflow-core/tests/unit/models/test_dagbag.py
+++ b/airflow-core/tests/unit/models/test_dagbag.py
@@ -38,7 +38,7 @@ from airflow import settings
from airflow.exceptions import UnknownExecutorException
from airflow.executors.executor_loader import ExecutorLoader
from airflow.models.dag import DagModel
-from airflow.models.dagbag import DagBag, _capture_with_reraise,
_validate_executor_fields
+from airflow.models.dagbag import BundleDagBag, DagBag, _capture_with_reraise,
_validate_executor_fields
from airflow.models.dagwarning import DagWarning, DagWarningType
from airflow.models.serialized_dag import SerializedDagModel
from airflow.sdk import DAG, BaseOperator
@@ -921,3 +921,96 @@ class TestCaptureWithReraise:
self.raise_warnings()
assert len(cw) == 1
assert len(records) == 1
+
+
+class TestBundlePathSysPath:
+ """Tests for bundle_path sys.path handling in BundleDagBag."""
+
+ def test_bundle_path_added_to_syspath(self, tmp_path):
+ """Test that BundleDagBag adds bundle_path to sys.path when
provided."""
+ util_file = tmp_path / "bundle_util.py"
+ util_file.write_text('def get_message(): return "Hello from bundle!"')
+
+ dag_file = tmp_path / "test_dag.py"
+ dag_file.write_text(
+ textwrap.dedent(
+ """\
+ from airflow.sdk import DAG
+ from airflow.operators.empty import EmptyOperator
+
+ import sys
+ import bundle_util
+
+ with DAG('test_import', description=f"DAG with sys.path:
{sys.path}"):
+ EmptyOperator(task_id="mytask")
+ """
+ )
+ )
+
+ assert str(tmp_path) not in sys.path
+
+ dagbag = BundleDagBag(dag_folder=str(dag_file), bundle_path=tmp_path)
+
+ # Check import was successful
+ assert len(dagbag.dags) == 1
+ assert not dagbag.import_errors
+
+ dag = dagbag.get_dag("test_import")
+ assert dag is not None
+ assert str(tmp_path) in dag.description # sys.path was enhanced
during parse
+
+ # Path remains in sys.path (no cleanup - intentional for ephemeral
processes)
+ assert str(tmp_path) in sys.path
+
+ # Cleanup for other tests
+ sys.path.remove(str(tmp_path))
+
+ def test_bundle_path_not_duplicated(self, tmp_path):
+ """Test that bundle_path is not added to sys.path if already
present."""
+ dag_file = tmp_path / "simple_dag.py"
+ dag_file.write_text(
+ textwrap.dedent(
+ """\
+ from airflow.sdk import DAG
+ from airflow.operators.empty import EmptyOperator
+
+ with DAG("simple_dag"):
+ EmptyOperator(task_id="mytask")
+ """
+ )
+ )
+
+ # Pre-add the path
+ sys.path.append(str(tmp_path))
+ count_before = sys.path.count(str(tmp_path))
+
+ BundleDagBag(dag_folder=str(dag_file), bundle_path=tmp_path)
+
+ # Should not add duplicate
+ assert sys.path.count(str(tmp_path)) == count_before
+
+ # Cleanup for other tests
+ sys.path.remove(str(tmp_path))
+
+ def test_dagbag_no_bundle_path_no_syspath_modification(self, tmp_path):
+ """Test that no sys.path modification occurs when DagBag is used
without bundle_path."""
+ dag_file = tmp_path / "simple_dag.py"
+ dag_file.write_text(
+ textwrap.dedent(
+ """\
+ from airflow.sdk import DAG
+ from airflow.operators.empty import EmptyOperator
+
+ import sys
+
+ with DAG("simple_dag", description=f"DAG with sys.path:
{sys.path}") as dag:
+ EmptyOperator(task_id="mytask")
+ """
+ )
+ )
+ syspath_before = deepcopy(sys.path)
+ dagbag = DagBag(dag_folder=str(dag_file), include_examples=False)
+ dag = dagbag.get_dag("simple_dag")
+
+ assert str(tmp_path) not in dag.description
+ assert sys.path == syspath_before
diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py
b/task-sdk/src/airflow/sdk/definitions/dag.py
index b51012d64a0..e8d954e880a 100644
--- a/task-sdk/src/airflow/sdk/definitions/dag.py
+++ b/task-sdk/src/airflow/sdk/definitions/dag.py
@@ -1209,7 +1209,7 @@ class DAG:
version = DagVersion.get_version(self.dag_id)
if not version:
from airflow.dag_processing.bundles.manager import
DagBundlesManager
- from airflow.models.dagbag import DagBag, sync_bag_to_db
+ from airflow.models.dagbag import BundleDagBag, sync_bag_to_db
from airflow.sdk.definitions._internal.dag_parsing_context
import (
_airflow_parsing_context_manager,
)
@@ -1223,9 +1223,7 @@ class DAG:
if not bundle.is_initialized:
bundle.initialize()
with _airflow_parsing_context_manager(dag_id=self.dag_id):
- dagbag = DagBag(
- dag_folder=bundle.path, bundle_path=bundle.path,
include_examples=False
- )
+ dagbag = BundleDagBag(dag_folder=bundle.path,
bundle_path=bundle.path)
sync_bag_to_db(dagbag, bundle.name, bundle.version)
version = DagVersion.get_version(self.dag_id)
if version:
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 4ef67fb7496..a08f842acba 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -637,9 +637,8 @@ def _maybe_reschedule_startup_failure(
def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance:
# TODO: Task-SDK:
- # Using DagBag here is about 98% wrong, but it'll do for now
-
- from airflow.models.dagbag import DagBag
+ # Using BundleDagBag here is about 98% wrong, but it'll do for now
+ from airflow.models.dagbag import BundleDagBag
bundle_info = what.bundle_info
bundle_instance = DagBundlesManager().get_bundle(
@@ -648,17 +647,12 @@ def parse(what: StartupDetails, log: Logger) ->
RuntimeTaskInstance:
)
bundle_instance.initialize()
- # Put bundle root on sys.path if needed. This allows the dag bundle to add
- # code in util modules to be shared between files within the same bundle.
- if (bundle_root := os.fspath(bundle_instance.path)) not in sys.path:
- sys.path.append(bundle_root)
-
dag_absolute_path = os.fspath(Path(bundle_instance.path,
what.dag_rel_path))
- bag = DagBag(
+ bag = BundleDagBag(
dag_folder=dag_absolute_path,
- include_examples=False,
safe_mode=False,
load_op_links=False,
+ bundle_path=bundle_instance.path,
)
if TYPE_CHECKING:
assert what.ti.dag_id
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index b8f61a2bc50..fe8f1ad667d 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -183,12 +183,60 @@ def test_parse(test_dags_dir: Path, make_ti_context):
):
ti = parse(what, mock.Mock())
- assert ti.task
- assert ti.task.dag
assert isinstance(ti.task, BaseOperator)
assert isinstance(ti.task.dag, DAG)
[email protected]("airflow.models.dagbag.BundleDagBag")
+def test_parse_dag_bag(mock_dagbag, test_dags_dir: Path, make_ti_context):
+ """Test that checks that the BundleDagBag is constructed as expected
during parsing"""
+ mock_bag_instance = mock.Mock()
+ mock_dagbag.return_value = mock_bag_instance
+ mock_dag = mock.Mock(spec=DAG)
+ mock_task = mock.Mock(spec=BaseOperator)
+
+ mock_bag_instance.dags = {"super_basic": mock_dag}
+ mock_dag.task_dict = {"a": mock_task}
+
+ what = StartupDetails(
+ ti=TaskInstance(
+ id=uuid7(),
+ task_id="a",
+ dag_id="super_basic",
+ run_id="c",
+ try_number=1,
+ dag_version_id=uuid7(),
+ ),
+ dag_rel_path="super_basic.py",
+ bundle_info=BundleInfo(name="my-bundle", version=None),
+ ti_context=make_ti_context(),
+ start_date=timezone.utcnow(),
+ )
+
+ with patch.dict(
+ os.environ,
+ {
+ "AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(
+ [
+ {
+ "name": "my-bundle",
+ "classpath":
"airflow.dag_processing.bundles.local.LocalDagBundle",
+ "kwargs": {"path": str(test_dags_dir),
"refresh_interval": 1},
+ }
+ ]
+ ),
+ },
+ ):
+ parse(what, mock.Mock())
+
+ mock_dagbag.assert_called_once_with(
+ dag_folder=mock.ANY,
+ safe_mode=False,
+ load_op_links=False,
+ bundle_path=test_dags_dir,
+ )
+
+
@pytest.mark.parametrize(
("dag_id", "task_id", "expected_error"),
(