This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 32a18d9e43 Fix returned value when ShortCircuitOperator condition is
falsy (#32569)
32a18d9e43 is described below
commit 32a18d9e4373bd705087992d0066663833c65abd
Author: Hussein Awala <[email protected]>
AuthorDate: Sat Jul 15 09:21:40 2023 +0200
Fix returned value when ShortCircuitOperator condition is falsy (#32569)
* Fix a bug in the ShortCircuitOperator
Signed-off-by: Hussein Awala <[email protected]>
Signed-off-by: Hussein Awala <[email protected]>
---------
Signed-off-by: Hussein Awala <[email protected]>
---
airflow/operators/python.py | 2 ++
tests/decorators/test_python.py | 13 +++++++++++++
tests/decorators/test_short_circuit.py | 28 ++++++++++++++++++++++++++++
tests/operators/test_python.py | 2 +-
4 files changed, 44 insertions(+), 1 deletion(-)
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 2db5ca69c3..f7b9883eaf 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -285,6 +285,8 @@ class ShortCircuitOperator(PythonOperator, SkipMixin):
map_index=context["ti"].map_index,
)
self.log.info("Done.")
+ # returns the result of the super execute method as it is instead of
returning None
+ return condition
class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta):
diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py
index 2c877c60a7..3cf49c98b0 100644
--- a/tests/decorators/test_python.py
+++ b/tests/decorators/test_python.py
@@ -254,6 +254,19 @@ class TestAirflowTaskDecorator(BasePythonTest):
with pytest.raises(AirflowException):
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+ def test_multiple_outputs_empty_dict(self):
+ @task_decorator(multiple_outputs=True)
+ def empty_dict():
+ return {}
+
+ with self.dag:
+ ret = empty_dict()
+
+ dr = self.create_dag_run()
+ ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+ ti = dr.get_task_instances()[0]
+ assert ti.xcom_pull() == {}
+
def test_python_callable_arguments_are_templatized(self):
"""Test @task op_args are templatized"""
diff --git a/tests/decorators/test_short_circuit.py
b/tests/decorators/test_short_circuit.py
index dc3bf81b65..27240c69b8 100644
--- a/tests/decorators/test_short_circuit.py
+++ b/tests/decorators/test_short_circuit.py
@@ -72,3 +72,31 @@ def test_short_circuit_decorator(dag_maker):
tis = dr.get_task_instances()
for ti in tis:
assert ti.state == task_state_mapping[ti.task_id]
+
+
+def test_short_circuit_with_multiple_outputs(dag_maker):
+ @task.short_circuit(multiple_outputs=True)
+ def multiple_output():
+ return {"x": 1, "y": 2}
+
+ with dag_maker():
+ ret = multiple_output()
+
+ dr = dag_maker.create_dagrun()
+ ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+ ti = dr.get_task_instances()[0]
+ assert ti.xcom_pull() == {"x": 1, "y": 2}
+
+
+def test_short_circuit_with_multiple_outputs_and_empty_dict(dag_maker):
+ @task.short_circuit(multiple_outputs=True)
+ def empty_dict():
+ return {}
+
+ with dag_maker():
+ ret = empty_dict()
+
+ dr = dag_maker.create_dagrun()
+ ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+ ti = dr.get_task_instances()[0]
+ assert ti.xcom_pull() == {}
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index 1df74fef4f..d262aa0e46 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -619,7 +619,7 @@ class TestShortCircuitOperator(BasePythonTest):
tis = dr.get_task_instances()
assert tis[0].xcom_pull(task_ids=short_op_push_xcom.task_id,
key="return_value") == "signature"
- assert tis[0].xcom_pull(task_ids=short_op_no_push_xcom.task_id,
key="return_value") is None
+ assert tis[0].xcom_pull(task_ids=short_op_no_push_xcom.task_id,
key="return_value") is False
def test_xcom_push_skipped_tasks(self):
with self.dag: