This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4f96450  ShortCircuitOperator push XCom by returnung python_callable 
result (#20071)
4f96450 is described below

commit 4f964501e5a6d5685c9fa78a6272671a79b36dd1
Author: Dmytro Kazanzhy <[email protected]>
AuthorDate: Sat Dec 11 18:27:52 2021 +0200

    ShortCircuitOperator push XCom by returnung python_callable result (#20071)
---
 airflow/operators/python.py    |  2 +-
 tests/operators/test_python.py | 19 +++++++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 32a55a2..31e3a6f 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -248,7 +248,7 @@ class ShortCircuitOperator(PythonOperator, SkipMixin):
 
         if condition:
             self.log.info('Proceeding with downstream tasks...')
-            return
+            return condition
 
         self.log.info('Skipping downstream tasks...')
 
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index a457e8a..5dc365d 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -703,6 +703,25 @@ class TestShortCircuitOperator(unittest.TestCase):
             else:
                 raise ValueError(f'Invalid task id {ti.task_id} found!')
 
+    def test_xcom_push(self):
+        dag = DAG(
+            'shortcircuit_operator_test_xcom_push',
+            default_args={'owner': 'airflow', 'start_date': DEFAULT_DATE},
+            schedule_interval=INTERVAL,
+        )
+        short_op = ShortCircuitOperator(task_id='make_choice', dag=dag, 
python_callable=lambda: 'signature')
+        dag.clear()
+        dr = dag.create_dagrun(
+            run_type=DagRunType.MANUAL,
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+        short_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+        tis = dr.get_task_instances()
+        xcom_value = tis[0].xcom_pull(task_ids='make_choice', 
key='return_value')
+        assert xcom_value == 'signature'
+
 
 virtualenv_string_args: List[str] = []
 

Reply via email to