Repository: incubator-airflow Updated Branches: refs/heads/master e8f91c623 -> 96c787f39
[AIRFLOW-703][AIRFLOW-1] Stop Xcom being cleared too early XComs should only be cleared when it is certain that the task will run. Previously, XComs were cleared before it was determined if tasks were runnable, queable, or just being marked success. Now XComs are cleared immediately before the task actually starts. Closes #1951 from blrnw3/fix/xcom_bug_AIRFLOW-703 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/96c787f3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/96c787f3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/96c787f3 Branch: refs/heads/master Commit: 96c787f390ad9852ae6c0c0fbb0510e36df185b1 Parents: e8f91c6 Author: Ben Lee Rodgers <[email protected]> Authored: Wed Dec 21 10:46:05 2016 -0500 Committer: Jeremiah Lowin <[email protected]> Committed: Wed Dec 21 10:46:14 2016 -0500 ---------------------------------------------------------------------- airflow/models.py | 4 +++- tests/models.py | 8 ++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/96c787f3/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 5d7075d..55b855b 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -1200,7 +1200,6 @@ class TaskInstance(Base): session.commit() return - self.clear_xcom_data() hr = "\n" + ("-" * 80) + "\n" # Line break # For reporting purposes, we report based on 1-indexed, @@ -1285,6 +1284,9 @@ class TaskInstance(Base): raise AirflowException("Task received SIGTERM signal") signal.signal(signal.SIGTERM, signal_handler) + # Don't clear Xcom until the task is certain to execute + self.clear_xcom_data() + self.render_templates() task_copy.pre_execute(context=context) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/96c787f3/tests/models.py ---------------------------------------------------------------------- diff --git a/tests/models.py b/tests/models.py index 74103fe..003fb21 100644 --- a/tests/models.py +++ b/tests/models.py @@ -588,6 +588,14 @@ class TaskInstanceTest(unittest.TestCase): # prior success) self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), value) + # Test AIRFLOW-703: Xcom shouldn't be cleared if the task doesn't + # execute, even if dependencies are ignored + ti.run(ignore_all_deps=True, mark_success=True) + self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), value) + # Xcom IS finally cleared once task has executed + ti.run(ignore_all_deps=True) + self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), None) + def test_xcom_pull_different_execution_date(self): """ tests xcom fetch behavior with different execution dates, using
