Lee-W commented on code in PR #57631:
URL: https://github.com/apache/airflow/pull/57631#discussion_r2526347960
##########
providers/standard/tests/unit/standard/operators/test_python.py:
##########
@@ -937,6 +937,88 @@ def poke(self, context):
virtualenv_string_args: list[str] = []
+def _callable_that_imports_from_bundle():
+ """
+ A callable function for testing DAG bundle imports.
Review Comment:
```suggestion
A callable function for testing Dag bundle imports.
```
##########
providers/standard/src/airflow/providers/standard/operators/python.py:
##########
@@ -488,8 +488,30 @@ def execute(self, context: Context) -> Any:
serializable_keys = set(self._iter_serializable_context_keys())
new = {k: v for k, v in context.items() if k in serializable_keys}
serializable_context = cast("Context", new)
+ # Store bundle_path for subprocess execution
+ self._bundle_path = self._get_bundle_path_from_context(context)
return super().execute(context=serializable_context)
+ def _get_bundle_path_from_context(self, context: Context) -> Path | None:
+ """
+ Extract bundle_path from the task instance's bundle_instance.
+
+ :param context: The task execution context
+ :return: Path to the bundle root directory, or None if not in a bundle
+ """
+ if not AIRFLOW_V_3_0_PLUS:
+ return None
+
+ # In Airflow 3.x, the RuntimeTaskInstance has a bundle_instance
attribute
+ # that contains the bundle information including its path
+ ti = context.get("ti")
+ if ti and hasattr(ti, "bundle_instance"):
+ bundle_instance = ti.bundle_instance
+ if bundle_instance and hasattr(bundle_instance, "path"):
+ return Path(bundle_instance.path)
+
+ return None
Review Comment:
```suggestion
if (
ti := context.get("ti")
and bundle_instance := getatter(ti, "bundle_instance", None)
and path := getattr(bundle_instance, "path", None)
):
return Path(path)
return None
```
##########
providers/standard/tests/unit/standard/operators/test_python.py:
##########
@@ -937,6 +937,88 @@ def poke(self, context):
virtualenv_string_args: list[str] = []
+def _callable_that_imports_from_bundle():
+ """
+ A callable function for testing DAG bundle imports.
+ This import will fail if PYTHONPATH is not set correctly by the operator.
+ The module 'bug_test_dag_repro' is created dynamically in the test.
+ """
+ try:
+ from bug_test_dag_repro.lib.helper import get_message
+
+ return get_message()
+ except ImportError as e:
+ # This helps debug if the import fails during the test
+ import sys
+
+ return f"Failed to import: {e}. PYTHONPATH: {sys.path}"
+
+
[email protected]_timeout(120)
[email protected](
+ ("opcls", "pytest_marks", "test_class_ref"),
+ [
+ pytest.param(
+ PythonVirtualenvOperator,
+ [pytest.mark.virtualenv_operator],
+ lambda: TestPythonVirtualenvOperator,
+ id="PythonVirtualenvOperator",
+ ),
+ pytest.param(
+ ExternalPythonOperator,
+ [pytest.mark.external_python_operator],
+ lambda: TestExternalPythonOperator,
+ id="ExternalPythonOperator",
+ ),
+ ],
+)
+class TestDagBundleImportInSubprocess(BasePythonTest):
+ """
+ Test DAG bundle imports for subprocess-based Python operators.
+
+ This test ensures that callables running in subprocesses can import modules
+ from their DAG bundle by verifying PYTHONPATH is correctly set (Airflow
3.x+).
+ """
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="DAG Bundle import fix
is for Airflow 3.x+")
+ def test_dag_bundle_import_in_subprocess(self, dag_maker, opcls,
pytest_marks, test_class_ref):
+ """
+ Tests that a callable in a subprocess can import modules from its
+ own DAG bundle (fix for Airflow 3.x).
Review Comment:
```suggestion
own Dag bundle (fix for Airflow 3.x).
```
##########
providers/standard/tests/unit/standard/operators/test_python.py:
##########
@@ -937,6 +937,88 @@ def poke(self, context):
virtualenv_string_args: list[str] = []
+def _callable_that_imports_from_bundle():
+ """
+ A callable function for testing DAG bundle imports.
+ This import will fail if PYTHONPATH is not set correctly by the operator.
+ The module 'bug_test_dag_repro' is created dynamically in the test.
+ """
+ try:
+ from bug_test_dag_repro.lib.helper import get_message
+
+ return get_message()
+ except ImportError as e:
+ # This helps debug if the import fails during the test
+ import sys
+
+ return f"Failed to import: {e}. PYTHONPATH: {sys.path}"
+
+
[email protected]_timeout(120)
[email protected](
+ ("opcls", "pytest_marks", "test_class_ref"),
+ [
+ pytest.param(
+ PythonVirtualenvOperator,
+ [pytest.mark.virtualenv_operator],
+ lambda: TestPythonVirtualenvOperator,
+ id="PythonVirtualenvOperator",
+ ),
+ pytest.param(
+ ExternalPythonOperator,
+ [pytest.mark.external_python_operator],
+ lambda: TestExternalPythonOperator,
+ id="ExternalPythonOperator",
+ ),
+ ],
+)
+class TestDagBundleImportInSubprocess(BasePythonTest):
+ """
+ Test DAG bundle imports for subprocess-based Python operators.
+
+ This test ensures that callables running in subprocesses can import modules
+ from their DAG bundle by verifying PYTHONPATH is correctly set (Airflow
3.x+).
+ """
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="DAG Bundle import fix
is for Airflow 3.x+")
+ def test_dag_bundle_import_in_subprocess(self, dag_maker, opcls,
pytest_marks, test_class_ref):
+ """
+ Tests that a callable in a subprocess can import modules from its
+ own DAG bundle (fix for Airflow 3.x).
+ """
+ with TemporaryDirectory() as tmp_dir:
Review Comment:
Should we just use
https://docs.pytest.org/en/stable/how-to/tmp_path.html#the-tmp-path-fixture
##########
providers/standard/src/airflow/providers/standard/operators/python.py:
##########
@@ -565,6 +587,18 @@ def _execute_python_callable_in_subprocess(self,
python_path: Path):
if self.env_vars:
env_vars.update(self.env_vars)
+ # Add bundle_path to PYTHONPATH for subprocess to import DAG
bundle modules
Review Comment:
```suggestion
# Add bundle_path to PYTHONPATH for subprocess to import Dag
bundle modules
```
##########
providers/standard/tests/unit/standard/operators/test_python.py:
##########
@@ -937,6 +937,88 @@ def poke(self, context):
virtualenv_string_args: list[str] = []
+def _callable_that_imports_from_bundle():
+ """
+ A callable function for testing DAG bundle imports.
+ This import will fail if PYTHONPATH is not set correctly by the operator.
+ The module 'bug_test_dag_repro' is created dynamically in the test.
+ """
+ try:
+ from bug_test_dag_repro.lib.helper import get_message
+
+ return get_message()
+ except ImportError as e:
+ # This helps debug if the import fails during the test
+ import sys
+
+ return f"Failed to import: {e}. PYTHONPATH: {sys.path}"
+
+
[email protected]_timeout(120)
[email protected](
+ ("opcls", "pytest_marks", "test_class_ref"),
+ [
+ pytest.param(
+ PythonVirtualenvOperator,
+ [pytest.mark.virtualenv_operator],
+ lambda: TestPythonVirtualenvOperator,
+ id="PythonVirtualenvOperator",
+ ),
+ pytest.param(
+ ExternalPythonOperator,
+ [pytest.mark.external_python_operator],
+ lambda: TestExternalPythonOperator,
+ id="ExternalPythonOperator",
+ ),
+ ],
+)
+class TestDagBundleImportInSubprocess(BasePythonTest):
+ """
+ Test DAG bundle imports for subprocess-based Python operators.
+
+ This test ensures that callables running in subprocesses can import modules
+ from their DAG bundle by verifying PYTHONPATH is correctly set (Airflow
3.x+).
Review Comment:
```suggestion
from their Dag bundle by verifying PYTHONPATH is correctly set (Airflow
3.x+).
```
##########
providers/standard/tests/unit/standard/operators/test_python.py:
##########
@@ -937,6 +937,88 @@ def poke(self, context):
virtualenv_string_args: list[str] = []
+def _callable_that_imports_from_bundle():
+ """
+ A callable function for testing DAG bundle imports.
+ This import will fail if PYTHONPATH is not set correctly by the operator.
+ The module 'bug_test_dag_repro' is created dynamically in the test.
+ """
+ try:
+ from bug_test_dag_repro.lib.helper import get_message
+
+ return get_message()
+ except ImportError as e:
+ # This helps debug if the import fails during the test
+ import sys
+
+ return f"Failed to import: {e}. PYTHONPATH: {sys.path}"
+
+
[email protected]_timeout(120)
[email protected](
+ ("opcls", "pytest_marks", "test_class_ref"),
+ [
+ pytest.param(
+ PythonVirtualenvOperator,
+ [pytest.mark.virtualenv_operator],
+ lambda: TestPythonVirtualenvOperator,
+ id="PythonVirtualenvOperator",
+ ),
+ pytest.param(
+ ExternalPythonOperator,
+ [pytest.mark.external_python_operator],
+ lambda: TestExternalPythonOperator,
+ id="ExternalPythonOperator",
+ ),
+ ],
+)
+class TestDagBundleImportInSubprocess(BasePythonTest):
+ """
+ Test DAG bundle imports for subprocess-based Python operators.
+
+ This test ensures that callables running in subprocesses can import modules
+ from their DAG bundle by verifying PYTHONPATH is correctly set (Airflow
3.x+).
+ """
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="DAG Bundle import fix
is for Airflow 3.x+")
Review Comment:
```suggestion
@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Dag Bundle import
fix is for Airflow 3.x+")
```
##########
providers/standard/tests/unit/standard/operators/test_python.py:
##########
@@ -937,6 +937,88 @@ def poke(self, context):
virtualenv_string_args: list[str] = []
+def _callable_that_imports_from_bundle():
+ """
+ A callable function for testing DAG bundle imports.
+ This import will fail if PYTHONPATH is not set correctly by the operator.
+ The module 'bug_test_dag_repro' is created dynamically in the test.
+ """
+ try:
+ from bug_test_dag_repro.lib.helper import get_message
+
+ return get_message()
+ except ImportError as e:
+ # This helps debug if the import fails during the test
+ import sys
+
+ return f"Failed to import: {e}. PYTHONPATH: {sys.path}"
+
+
[email protected]_timeout(120)
[email protected](
+ ("opcls", "pytest_marks", "test_class_ref"),
+ [
+ pytest.param(
+ PythonVirtualenvOperator,
+ [pytest.mark.virtualenv_operator],
+ lambda: TestPythonVirtualenvOperator,
+ id="PythonVirtualenvOperator",
+ ),
+ pytest.param(
+ ExternalPythonOperator,
+ [pytest.mark.external_python_operator],
+ lambda: TestExternalPythonOperator,
+ id="ExternalPythonOperator",
+ ),
+ ],
+)
+class TestDagBundleImportInSubprocess(BasePythonTest):
+ """
+ Test DAG bundle imports for subprocess-based Python operators.
Review Comment:
```suggestion
Test Dag bundle imports for subprocess-based Python operators.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]