This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4f96450 ShortCircuitOperator push XCom by returnung python_callable
result (#20071)
4f96450 is described below
commit 4f964501e5a6d5685c9fa78a6272671a79b36dd1
Author: Dmytro Kazanzhy <[email protected]>
AuthorDate: Sat Dec 11 18:27:52 2021 +0200
ShortCircuitOperator push XCom by returnung python_callable result (#20071)
---
airflow/operators/python.py | 2 +-
tests/operators/test_python.py | 19 +++++++++++++++++++
2 files changed, 20 insertions(+), 1 deletion(-)
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 32a55a2..31e3a6f 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -248,7 +248,7 @@ class ShortCircuitOperator(PythonOperator, SkipMixin):
if condition:
self.log.info('Proceeding with downstream tasks...')
- return
+ return condition
self.log.info('Skipping downstream tasks...')
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index a457e8a..5dc365d 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -703,6 +703,25 @@ class TestShortCircuitOperator(unittest.TestCase):
else:
raise ValueError(f'Invalid task id {ti.task_id} found!')
+ def test_xcom_push(self):
+ dag = DAG(
+ 'shortcircuit_operator_test_xcom_push',
+ default_args={'owner': 'airflow', 'start_date': DEFAULT_DATE},
+ schedule_interval=INTERVAL,
+ )
+ short_op = ShortCircuitOperator(task_id='make_choice', dag=dag,
python_callable=lambda: 'signature')
+ dag.clear()
+ dr = dag.create_dagrun(
+ run_type=DagRunType.MANUAL,
+ start_date=timezone.utcnow(),
+ execution_date=DEFAULT_DATE,
+ state=State.RUNNING,
+ )
+ short_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+ tis = dr.get_task_instances()
+ xcom_value = tis[0].xcom_pull(task_ids='make_choice',
key='return_value')
+ assert xcom_value == 'signature'
+
virtualenv_string_args: List[str] = []