Repository: incubator-airflow Updated Branches: refs/heads/master 275aa15fa -> 18009d033
[AIRFLOW-131] Make XCom.clear more selective XCOMs for a task were getting cleared on every run, no matter what. Selectively clear only when the task is actually going to be run. Closes #1570 from johnnason/AIRFLOW-131 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/18009d03 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/18009d03 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/18009d03 Branch: refs/heads/master Commit: 18009d03311a0b29e14811865e0b13b19427b5e4 Parents: 275aa15 Author: John Nason <jna...@us.ibm.com> Authored: Mon Jun 6 17:50:06 2016 -0400 Committer: jlowin <jlo...@users.noreply.github.com> Committed: Mon Jun 6 17:50:10 2016 -0400 ---------------------------------------------------------------------- airflow/models.py | 3 ++- tests/models.py | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/18009d03/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index fa3f6ca..08d0890 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -1186,7 +1186,6 @@ class TaskInstance(Base): self.test_mode = test_mode self.force = force self.refresh_from_db(session=session, lock_for_update=True) - self.clear_xcom_data() self.job_id = job_id iso = datetime.now().isoformat() self.hostname = socket.getfqdn() @@ -1195,6 +1194,7 @@ class TaskInstance(Base): if self.state == State.RUNNING: logging.warning("Another instance is running, skipping.") elif self.state == State.REMOVED: + self.clear_xcom_data() logging.debug("Task {} was removed from the dag".format(self)) elif not force and self.state == State.SUCCESS: logging.info( @@ -1219,6 +1219,7 @@ class TaskInstance(Base): "Next run after {0}".format(next_run) ) elif force or self.state in State.runnable(): + self.clear_xcom_data() HR = "\n" + ("-" * 80) + "\n" # Line break # For reporting purposes, we report based on 1-indexed, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/18009d03/tests/models.py ---------------------------------------------------------------------- diff --git a/tests/models.py b/tests/models.py index bc3f1e1..2aae476 100644 --- a/tests/models.py +++ b/tests/models.py @@ -507,3 +507,64 @@ class TaskInstanceTest(unittest.TestCase): self.assertEqual(completed, expect_completed) self.assertEqual(ti.state, expect_state) + + def test_xcom_pull_after_success(self): + """ + tests xcom set/clear relative to a task in a 'success' rerun scenario + """ + key = 'xcom_key' + value = 'xcom_value' + + dag = models.DAG(dag_id='test_xcom', schedule_interval='@monthly') + task = DummyOperator( + task_id='test_xcom', + dag=dag, + pool='test_xcom', + owner='airflow', + start_date=datetime.datetime(2016, 6, 2, 0, 0, 0)) + exec_date = datetime.datetime.now() + ti = TI( + task=task, execution_date=exec_date) + ti.run(mark_success=True) + ti.xcom_push(key=key, value=value) + self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), value) + ti.run() + # The second run and assert is to handle AIRFLOW-131 (don't clear on + # prior success) + self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), value) + + def test_xcom_pull_different_execution_date(self): + """ + tests xcom fetch behavior with different execution dates, using + both xcom_pull with "include_prior_dates" and without + """ + key = 'xcom_key' + value = 'xcom_value' + + dag = models.DAG(dag_id='test_xcom', schedule_interval='@monthly') + task = DummyOperator( + task_id='test_xcom', + dag=dag, + pool='test_xcom', + owner='airflow', + start_date=datetime.datetime(2016, 6, 2, 0, 0, 0)) + exec_date = datetime.datetime.now() + ti = TI( + task=task, execution_date=exec_date) + ti.run(mark_success=True) + ti.xcom_push(key=key, value=value) + self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), value) + ti.run() + exec_date = exec_date.replace(day=exec_date.day + 1) + ti = TI( + task=task, execution_date=exec_date) + ti.run() + # We have set a new execution date (and did not pass in + # 'include_prior_dates'which means this task should now have a cleared + # xcom value + self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), None) + # We *should* get a value using 'include_prior_dates' + self.assertEqual(ti.xcom_pull(task_ids='test_xcom', + key=key, + include_prior_dates=True), + value)