dima-asana closed pull request #4000: [AIRFLOW-1837] Respect task start_date
when different from dag's
URL: https://github.com/apache/incubator-airflow/pull/4000
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/models.py b/airflow/models.py
index 22e8d2596a..4a94215b92 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -5252,7 +5252,9 @@ def verify_integrity(self, session=None):
# check for missing tasks
for task in six.itervalues(dag.task_dict):
- if task.adhoc:
+ if task.adhoc or task.start_date > self.execution_date:
+ continue
+ if task.start_date > self.execution_date and not self.is_backfill:
continue
if task.task_id not in task_ids:
diff --git a/tests/dags/test_scheduler_dags.py
b/tests/dags/test_scheduler_dags.py
index ae2bd202d9..94e6f8216d 100644
--- a/tests/dags/test_scheduler_dags.py
+++ b/tests/dags/test_scheduler_dags.py
@@ -17,18 +17,34 @@
# specific language governing permissions and limitations
# under the License.
-from datetime import datetime
+from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
-DEFAULT_DATE = datetime(2100, 1, 1)
+DEFAULT_DATE = datetime(2016, 1, 1)
# DAG tests backfill with pooled tasks
# Previously backfill would queue the task but never run it
dag1 = DAG(
dag_id='test_start_date_scheduling',
- start_date=datetime(2100, 1, 1))
+ start_date=datetime.utcnow() + timedelta(days=1))
dag1_task1 = DummyOperator(
task_id='dummy',
dag=dag1,
owner='airflow')
+
+dag2 = DAG(
+ dag_id='test_task_start_date_scheduling',
+ start_date=DEFAULT_DATE
+)
+dag2_task1 = DummyOperator(
+ task_id='dummy1',
+ dag=dag2,
+ owner='airflow',
+ start_date=DEFAULT_DATE + timedelta(days=3)
+)
+dag2_task2 = DummyOperator(
+ task_id='dummy2',
+ dag=dag2,
+ owner='airflow'
+)
diff --git a/tests/jobs.py b/tests/jobs.py
index bb714bd201..c23c6035c7 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -2217,7 +2217,7 @@ def test_scheduler_start_date(self):
dag_id = 'test_start_date_scheduling'
dag = self.dagbag.get_dag(dag_id)
dag.clear()
- self.assertTrue(dag.start_date > DEFAULT_DATE)
+ self.assertTrue(dag.start_date > datetime.datetime.utcnow() )
scheduler = SchedulerJob(dag_id,
num_runs=2)
@@ -2252,6 +2252,27 @@ def test_scheduler_start_date(self):
self.assertEqual(
len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)
+ def test_scheduler_task_start_date(self):
+ """
+ Test that the scheduler respects task start dates that are different
+ from DAG start dates
+ """
+ dag_id = 'test_task_start_date_scheduling'
+ dag = self.dagbag.get_dag(dag_id)
+ dag.clear()
+ scheduler = SchedulerJob(dag_id,
+ num_runs=2)
+ scheduler.run()
+
+ session = settings.Session()
+ tiq = session.query(TI).filter(TI.dag_id == dag_id)
+ ti1s = tiq.filter(TI.task_id == 'dummy1').all()
+ ti2s = tiq.filter(TI.task_id == 'dummy2').all()
+ self.assertEqual(len(ti1s), 0)
+ self.assertEqual(len(ti2s), 2)
+ for t in ti2s:
+ self.assertEqual(t.state, State.SUCCESS)
+
def test_scheduler_multiprocessing(self):
"""
Test that the scheduler can successfully queue multiple dags in
parallel
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services