[AIRFLOW-993] Update date inference logic DAGs should set task start_date and end_date when possible, making sure they agree with the DAGâs own dates.
Closes #2157 from jlowin/run-bug Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/65039959 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/65039959 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/65039959 Branch: refs/heads/v1-8-test Commit: 650399590b06b11b67bbda699947574e50faed38 Parents: c5d6c3a Author: Jeremiah Lowin <[email protected]> Authored: Sat May 13 14:53:08 2017 +0200 Committer: Maxime Beauchemin <[email protected]> Committed: Thu Jun 8 08:36:20 2017 -0700 ---------------------------------------------------------------------- airflow/models.py | 15 ++++++++++++++- tests/models.py | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/65039959/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 484cb8c..bf50c4c 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -3269,8 +3269,21 @@ class DAG(BaseDag, LoggingMixin): """ if not self.start_date and not task.start_date: raise AirflowException("Task is missing the start_date parameter") - if not task.start_date: + # if the task has no start date, assign it the same as the DAG + elif not task.start_date: task.start_date = self.start_date + # otherwise, the task will start on the later of its own start date and + # the DAG's start date + elif self.start_date: + task.start_date = max(task.start_date, self.start_date) + + # if the task has no end date, assign it the same as the dag + if not task.end_date: + task.end_date = self.end_date + # otherwise, the task will end on the earlier of its own end date and + # the DAG's end date + elif task.end_date and self.end_date: + task.end_date = min(task.end_date, self.end_date) if task.task_id in self.task_dict: # TODO: raise an error in Airflow 2.0 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/65039959/tests/models.py ---------------------------------------------------------------------- diff --git a/tests/models.py b/tests/models.py index 392de03..b3b8c2a 100644 --- a/tests/models.py +++ b/tests/models.py @@ -575,6 +575,45 @@ class DagBagTest(unittest.TestCase): class TaskInstanceTest(unittest.TestCase): + def test_set_task_dates(self): + """ + Test that tasks properly take start/end dates from DAGs + """ + dag = DAG('dag', start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + datetime.timedelta(days=10)) + + op1 = DummyOperator(task_id='op_1', owner='test') + + self.assertTrue(op1.start_date is None and op1.end_date is None) + + # dag should assign its dates to op1 because op1 has no dates + dag.add_task(op1) + self.assertTrue( + op1.start_date == dag.start_date and op1.end_date == dag.end_date) + + op2 = DummyOperator( + task_id='op_2', + owner='test', + start_date=DEFAULT_DATE - datetime.timedelta(days=1), + end_date=DEFAULT_DATE + datetime.timedelta(days=11)) + + # dag should assign its dates to op2 because they are more restrictive + dag.add_task(op2) + self.assertTrue( + op2.start_date == dag.start_date and op2.end_date == dag.end_date) + + op3 = DummyOperator( + task_id='op_3', + owner='test', + start_date=DEFAULT_DATE + datetime.timedelta(days=1), + end_date=DEFAULT_DATE + datetime.timedelta(days=9)) + # op3 should keep its dates because they are more restrictive + dag.add_task(op3) + self.assertTrue( + op3.start_date == DEFAULT_DATE + datetime.timedelta(days=1)) + self.assertTrue( + op3.end_date == DEFAULT_DATE + datetime.timedelta(days=9)) + + def test_set_dag(self): """ Test assigning Operators to Dags, including deferred assignment
