This is an automated email from the ASF dual-hosted git repository. pierrejeambrun pushed a commit to branch v2-5-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit e9853c779428eb586955ffae9316ce89bfda510d Author: SoxMax <[email protected]> AuthorDate: Wed Jan 18 19:57:31 2023 -0500 Update how PythonSensor returns values from python_callable (#28932) * Update how PythonSensor returns values from python_callable * test poke returns the xcom value * update test to only run poke * reformat based on changes * use full if rather than ternary (cherry picked from commit b0f302e027d09a50493f4cdd808559984e433ee1) --- airflow/sensors/python.py | 5 ++++- tests/sensors/test_python.py | 11 +++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/airflow/sensors/python.py b/airflow/sensors/python.py index 615e4e20ee..0a91031fd6 100644 --- a/airflow/sensors/python.py +++ b/airflow/sensors/python.py @@ -71,4 +71,7 @@ class PythonSensor(BaseSensorOperator): self.log.info("Poking callable: %s", str(self.python_callable)) return_value = self.python_callable(*self.op_args, **self.op_kwargs) - return PokeReturnValue(bool(return_value)) + if isinstance(return_value, PokeReturnValue): + return return_value + else: + return PokeReturnValue(bool(return_value)) diff --git a/tests/sensors/test_python.py b/tests/sensors/test_python.py index 73a0ddffe4..ec515d8dee 100644 --- a/tests/sensors/test_python.py +++ b/tests/sensors/test_python.py @@ -23,6 +23,7 @@ from datetime import date import pytest from airflow.exceptions import AirflowSensorTimeout +from airflow.sensors.base import PokeReturnValue from airflow.sensors.python import PythonSensor from tests.operators.test_python import BasePythonTest @@ -41,6 +42,16 @@ class TestPythonSensor(BasePythonTest): with pytest.raises(ZeroDivisionError): self.run_as_task(lambda: 1 / 0) + def test_python_sensor_xcom(self): + with self.dag: + task = self.opcls( + task_id=self.task_id, + python_callable=lambda: PokeReturnValue(True, "xcom"), + **self.default_kwargs(), + ) + poke_result = task.poke({}) + assert poke_result.xcom_value == "xcom" + def test_python_callable_arguments_are_templatized(self): """Test PythonSensor op_args are templatized""" # Create a named tuple and ensure it is still preserved
