[AIRFLOW-993] Update date inference logic

DAGs should set task start_date and end_date when
possible, making sure
they agree with the DAG’s own dates.

Closes #2157 from jlowin/run-bug


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/65039959
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/65039959
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/65039959

Branch: refs/heads/v1-8-test
Commit: 650399590b06b11b67bbda699947574e50faed38
Parents: c5d6c3a
Author: Jeremiah Lowin <[email protected]>
Authored: Sat May 13 14:53:08 2017 +0200
Committer: Maxime Beauchemin <[email protected]>
Committed: Thu Jun 8 08:36:20 2017 -0700

----------------------------------------------------------------------
 airflow/models.py | 15 ++++++++++++++-
 tests/models.py   | 39 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 53 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/65039959/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 484cb8c..bf50c4c 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3269,8 +3269,21 @@ class DAG(BaseDag, LoggingMixin):
         """
         if not self.start_date and not task.start_date:
             raise AirflowException("Task is missing the start_date parameter")
-        if not task.start_date:
+        # if the task has no start date, assign it the same as the DAG
+        elif not task.start_date:
             task.start_date = self.start_date
+        # otherwise, the task will start on the later of its own start date and
+        # the DAG's start date
+        elif self.start_date:
+            task.start_date = max(task.start_date, self.start_date)
+
+        # if the task has no end date, assign it the same as the dag
+        if not task.end_date:
+            task.end_date = self.end_date
+        # otherwise, the task will end on the earlier of its own end date and
+        # the DAG's end date
+        elif task.end_date and self.end_date:
+            task.end_date = min(task.end_date, self.end_date)
 
         if task.task_id in self.task_dict:
             # TODO: raise an error in Airflow 2.0

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/65039959/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 392de03..b3b8c2a 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -575,6 +575,45 @@ class DagBagTest(unittest.TestCase):
 
 class TaskInstanceTest(unittest.TestCase):
 
+    def test_set_task_dates(self):
+        """
+        Test that tasks properly take start/end dates from DAGs
+        """
+        dag = DAG('dag', start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + 
datetime.timedelta(days=10))
+
+        op1 = DummyOperator(task_id='op_1', owner='test')
+
+        self.assertTrue(op1.start_date is None and op1.end_date is None)
+
+        # dag should assign its dates to op1 because op1 has no dates
+        dag.add_task(op1)
+        self.assertTrue(
+            op1.start_date == dag.start_date and op1.end_date == dag.end_date)
+
+        op2 = DummyOperator(
+            task_id='op_2',
+            owner='test',
+            start_date=DEFAULT_DATE - datetime.timedelta(days=1),
+            end_date=DEFAULT_DATE + datetime.timedelta(days=11))
+
+        # dag should assign its dates to op2 because they are more restrictive
+        dag.add_task(op2)
+        self.assertTrue(
+            op2.start_date == dag.start_date and op2.end_date == dag.end_date)
+
+        op3 = DummyOperator(
+            task_id='op_3',
+            owner='test',
+            start_date=DEFAULT_DATE + datetime.timedelta(days=1),
+            end_date=DEFAULT_DATE + datetime.timedelta(days=9))
+        # op3 should keep its dates because they are more restrictive
+        dag.add_task(op3)
+        self.assertTrue(
+            op3.start_date == DEFAULT_DATE + datetime.timedelta(days=1))
+        self.assertTrue(
+            op3.end_date == DEFAULT_DATE + datetime.timedelta(days=9))
+
+
     def test_set_dag(self):
         """
         Test assigning Operators to Dags, including deferred assignment

Reply via email to