This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 32a18d9e43 Fix returned value when ShortCircuitOperator condition is 
falsy (#32569)
32a18d9e43 is described below

commit 32a18d9e4373bd705087992d0066663833c65abd
Author: Hussein Awala <[email protected]>
AuthorDate: Sat Jul 15 09:21:40 2023 +0200

    Fix returned value when ShortCircuitOperator condition is falsy (#32569)
    
    * Fix a bug in the ShortCircuitOperator
    
    Signed-off-by: Hussein Awala <[email protected]>
    Signed-off-by: Hussein Awala <[email protected]>
    
    ---------
    
    Signed-off-by: Hussein Awala <[email protected]>
---
 airflow/operators/python.py            |  2 ++
 tests/decorators/test_python.py        | 13 +++++++++++++
 tests/decorators/test_short_circuit.py | 28 ++++++++++++++++++++++++++++
 tests/operators/test_python.py         |  2 +-
 4 files changed, 44 insertions(+), 1 deletion(-)

diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 2db5ca69c3..f7b9883eaf 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -285,6 +285,8 @@ class ShortCircuitOperator(PythonOperator, SkipMixin):
             map_index=context["ti"].map_index,
         )
         self.log.info("Done.")
+        # returns the result of the super execute method as it is instead of 
returning None
+        return condition
 
 
 class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta):
diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py
index 2c877c60a7..3cf49c98b0 100644
--- a/tests/decorators/test_python.py
+++ b/tests/decorators/test_python.py
@@ -254,6 +254,19 @@ class TestAirflowTaskDecorator(BasePythonTest):
         with pytest.raises(AirflowException):
             ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
+    def test_multiple_outputs_empty_dict(self):
+        @task_decorator(multiple_outputs=True)
+        def empty_dict():
+            return {}
+
+        with self.dag:
+            ret = empty_dict()
+
+        dr = self.create_dag_run()
+        ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+        ti = dr.get_task_instances()[0]
+        assert ti.xcom_pull() == {}
+
     def test_python_callable_arguments_are_templatized(self):
         """Test @task op_args are templatized"""
 
diff --git a/tests/decorators/test_short_circuit.py 
b/tests/decorators/test_short_circuit.py
index dc3bf81b65..27240c69b8 100644
--- a/tests/decorators/test_short_circuit.py
+++ b/tests/decorators/test_short_circuit.py
@@ -72,3 +72,31 @@ def test_short_circuit_decorator(dag_maker):
     tis = dr.get_task_instances()
     for ti in tis:
         assert ti.state == task_state_mapping[ti.task_id]
+
+
+def test_short_circuit_with_multiple_outputs(dag_maker):
+    @task.short_circuit(multiple_outputs=True)
+    def multiple_output():
+        return {"x": 1, "y": 2}
+
+    with dag_maker():
+        ret = multiple_output()
+
+    dr = dag_maker.create_dagrun()
+    ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+    ti = dr.get_task_instances()[0]
+    assert ti.xcom_pull() == {"x": 1, "y": 2}
+
+
+def test_short_circuit_with_multiple_outputs_and_empty_dict(dag_maker):
+    @task.short_circuit(multiple_outputs=True)
+    def empty_dict():
+        return {}
+
+    with dag_maker():
+        ret = empty_dict()
+
+    dr = dag_maker.create_dagrun()
+    ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+    ti = dr.get_task_instances()[0]
+    assert ti.xcom_pull() == {}
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index 1df74fef4f..d262aa0e46 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -619,7 +619,7 @@ class TestShortCircuitOperator(BasePythonTest):
 
         tis = dr.get_task_instances()
         assert tis[0].xcom_pull(task_ids=short_op_push_xcom.task_id, 
key="return_value") == "signature"
-        assert tis[0].xcom_pull(task_ids=short_op_no_push_xcom.task_id, 
key="return_value") is None
+        assert tis[0].xcom_pull(task_ids=short_op_no_push_xcom.task_id, 
key="return_value") is False
 
     def test_xcom_push_skipped_tasks(self):
         with self.dag:

Reply via email to