Repository: incubator-airflow
Updated Branches:
  refs/heads/master 275aa15fa -> 18009d033


[AIRFLOW-131] Make XCom.clear more selective

XCOMs for a task were getting cleared on every run, no matter what.
Selectively clear only when the task is actually going to be run.

Closes #1570 from johnnason/AIRFLOW-131


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/18009d03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/18009d03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/18009d03

Branch: refs/heads/master
Commit: 18009d03311a0b29e14811865e0b13b19427b5e4
Parents: 275aa15
Author: John Nason <jna...@us.ibm.com>
Authored: Mon Jun 6 17:50:06 2016 -0400
Committer: jlowin <jlo...@users.noreply.github.com>
Committed: Mon Jun 6 17:50:10 2016 -0400

----------------------------------------------------------------------
 airflow/models.py |  3 ++-
 tests/models.py   | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 63 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/18009d03/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index fa3f6ca..08d0890 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1186,7 +1186,6 @@ class TaskInstance(Base):
         self.test_mode = test_mode
         self.force = force
         self.refresh_from_db(session=session, lock_for_update=True)
-        self.clear_xcom_data()
         self.job_id = job_id
         iso = datetime.now().isoformat()
         self.hostname = socket.getfqdn()
@@ -1195,6 +1194,7 @@ class TaskInstance(Base):
         if self.state == State.RUNNING:
             logging.warning("Another instance is running, skipping.")
         elif self.state == State.REMOVED:
+            self.clear_xcom_data()
             logging.debug("Task {} was removed from the dag".format(self))
         elif not force and self.state == State.SUCCESS:
             logging.info(
@@ -1219,6 +1219,7 @@ class TaskInstance(Base):
                 "Next run after {0}".format(next_run)
             )
         elif force or self.state in State.runnable():
+            self.clear_xcom_data()
             HR = "\n" + ("-" * 80) + "\n"  # Line break
 
             # For reporting purposes, we report based on 1-indexed,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/18009d03/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index bc3f1e1..2aae476 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -507,3 +507,64 @@ class TaskInstanceTest(unittest.TestCase):
 
         self.assertEqual(completed, expect_completed)
         self.assertEqual(ti.state, expect_state)
+
+    def test_xcom_pull_after_success(self):
+        """
+        tests xcom set/clear relative to a task in a 'success' rerun scenario
+        """
+        key = 'xcom_key'
+        value = 'xcom_value'
+
+        dag = models.DAG(dag_id='test_xcom', schedule_interval='@monthly')
+        task = DummyOperator(
+            task_id='test_xcom',
+            dag=dag,
+            pool='test_xcom',
+            owner='airflow',
+            start_date=datetime.datetime(2016, 6, 2, 0, 0, 0))
+        exec_date = datetime.datetime.now()
+        ti = TI(
+            task=task, execution_date=exec_date)
+        ti.run(mark_success=True)
+        ti.xcom_push(key=key, value=value)
+        self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), value)
+        ti.run()
+        # The second run and assert is to handle AIRFLOW-131 (don't clear on
+        # prior success)
+        self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), value)
+
+    def test_xcom_pull_different_execution_date(self):
+        """
+        tests xcom fetch behavior with different execution dates, using
+        both xcom_pull with "include_prior_dates" and without
+        """
+        key = 'xcom_key'
+        value = 'xcom_value'
+
+        dag = models.DAG(dag_id='test_xcom', schedule_interval='@monthly')
+        task = DummyOperator(
+            task_id='test_xcom',
+            dag=dag,
+            pool='test_xcom',
+            owner='airflow',
+            start_date=datetime.datetime(2016, 6, 2, 0, 0, 0))
+        exec_date = datetime.datetime.now()
+        ti = TI(
+            task=task, execution_date=exec_date)
+        ti.run(mark_success=True)
+        ti.xcom_push(key=key, value=value)
+        self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), value)
+        ti.run()
+        exec_date = exec_date.replace(day=exec_date.day + 1)
+        ti = TI(
+            task=task, execution_date=exec_date)
+        ti.run()
+        # We have set a new execution date (and did not pass in
+        # 'include_prior_dates'which means this task should now have a cleared
+        # xcom value
+        self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), None)
+        # We *should* get a value using 'include_prior_dates'
+        self.assertEqual(ti.xcom_pull(task_ids='test_xcom',
+                                      key=key,
+                                      include_prior_dates=True),
+                         value)

Reply via email to