[
https://issues.apache.org/jira/browse/AIRFLOW-1837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647559#comment-16647559
]
ASF GitHub Bot commented on AIRFLOW-1837:
-----------------------------------------
Fokko closed pull request #4010: [AIRFLOW-1837] Respect task start_date when
different from dag's
URL: https://github.com/apache/incubator-airflow/pull/4010
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/models.py b/airflow/models.py
index 22e8d2596a..775d9f63de 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -5254,6 +5254,8 @@ def verify_integrity(self, session=None):
for task in six.itervalues(dag.task_dict):
if task.adhoc:
continue
+ if task.start_date > self.execution_date and not self.is_backfill:
+ continue
if task.task_id not in task_ids:
Stats.incr(
diff --git a/tests/core.py b/tests/core.py
index c37b1f9c8b..918e9b4d49 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -62,7 +62,7 @@
from airflow.utils import timezone
from airflow.utils.timezone import datetime
from airflow.utils.state import State
-from airflow.utils.dates import infer_time_unit, round_time, scale_time_units
+from airflow.utils.dates import days_ago, infer_time_unit, round_time,
scale_time_units
from lxml import html
from airflow.exceptions import AirflowException
from airflow.configuration import AirflowConfigException, run_command
@@ -80,6 +80,7 @@
DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
TEST_DAG_ID = 'unit_tests'
+EXAMPLE_DAG_DEFAULT_DATE = days_ago(2)
try:
import cPickle as pickle
@@ -1805,21 +1806,21 @@ def setUp(self):
self.dagrun_python = self.dag_python.create_dagrun(
run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())),
- execution_date=DEFAULT_DATE,
+ execution_date=EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING
)
self.sub_dag.create_dagrun(
run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())),
- execution_date=DEFAULT_DATE,
+ execution_date=EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING
)
self.example_xcom.create_dagrun(
run_id="test_{}".format(models.DagRun.id_for_date(timezone.utcnow())),
- execution_date=DEFAULT_DATE,
+ execution_date=EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING
)
@@ -1912,7 +1913,7 @@ def test_dag_views(self):
response = self.app.get(
'/admin/airflow/task?'
'task_id=runme_0&dag_id=example_bash_operator&'
- 'execution_date={}'.format(DEFAULT_DATE_DS))
+ 'execution_date={}'.format(EXAMPLE_DAG_DEFAULT_DATE))
self.assertIn("Attributes", response.data.decode('utf-8'))
response = self.app.get(
'/admin/airflow/dag_stats')
@@ -1924,22 +1925,21 @@ def test_dag_views(self):
"/admin/airflow/success?task_id=print_the_context&"
"dag_id=example_python_operator&upstream=false&downstream=false&"
"future=false&past=false&execution_date={}&"
- "origin=/admin".format(DEFAULT_DATE_DS))
+ "origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
self.assertIn("Wait a minute", response.data.decode('utf-8'))
- response = self.app.get(url + "&confirmed=true")
response = self.app.get(
'/admin/airflow/clear?task_id=print_the_context&'
'dag_id=example_python_operator&future=true&past=false&'
'upstream=true&downstream=false&'
'execution_date={}&'
- 'origin=/admin'.format(DEFAULT_DATE_DS))
+ 'origin=/admin'.format(EXAMPLE_DAG_DEFAULT_DATE))
self.assertIn("Wait a minute", response.data.decode('utf-8'))
url = (
"/admin/airflow/success?task_id=section-1&"
"dag_id=example_subdag_operator&upstream=true&downstream=true&"
"future=false&past=false&execution_date={}&"
- "origin=/admin".format(DEFAULT_DATE_DS))
+ "origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
self.assertIn("Wait a minute", response.data.decode('utf-8'))
self.assertIn("section-1-task-1", response.data.decode('utf-8'))
@@ -1953,7 +1953,7 @@ def test_dag_views(self):
"dag_id=example_python_operator&future=false&past=false&"
"upstream=false&downstream=true&"
"execution_date={}&"
- "origin=/admin".format(DEFAULT_DATE_DS))
+ "origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
self.assertIn("Wait a minute", response.data.decode('utf-8'))
response = self.app.get(url + "&confirmed=true")
@@ -1962,7 +1962,7 @@ def test_dag_views(self):
"dag_id=example_subdag_operator.section-1&future=false&past=false&"
"upstream=false&downstream=true&recursive=true&"
"execution_date={}&"
- "origin=/admin".format(DEFAULT_DATE_DS))
+ "origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
self.assertIn("Wait a minute", response.data.decode('utf-8'))
self.assertIn("example_subdag_operator.end",
@@ -1989,7 +1989,7 @@ def test_dag_views(self):
"/admin/airflow/run?task_id=runme_0&"
"dag_id=example_bash_operator&ignore_all_deps=false&ignore_ti_state=true&"
"ignore_task_deps=true&execution_date={}&"
- "origin=/admin".format(DEFAULT_DATE_DS))
+ "origin=/admin".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
response = self.app.get(
"/admin/airflow/refresh?dag_id=example_bash_operator")
@@ -2024,7 +2024,7 @@ def test_fetch_task_instance(self):
url = (
"/admin/airflow/object/task_instances?"
"dag_id=example_python_operator&"
- "execution_date={}".format(DEFAULT_DATE_DS))
+ "execution_date={}".format(EXAMPLE_DAG_DEFAULT_DATE))
response = self.app.get(url)
self.assertIn("print_the_context", response.data.decode('utf-8'))
@@ -2032,19 +2032,20 @@ def
test_dag_view_task_with_python_operator_using_partial(self):
response = self.app.get(
'/admin/airflow/task?'
'task_id=test_dagrun_functool_partial&dag_id=test_task_view_type_check&'
- 'execution_date={}'.format(DEFAULT_DATE_DS))
+ 'execution_date={}'.format(EXAMPLE_DAG_DEFAULT_DATE))
self.assertIn("A function with two args",
response.data.decode('utf-8'))
def test_dag_view_task_with_python_operator_using_instance(self):
response = self.app.get(
'/admin/airflow/task?'
'task_id=test_dagrun_instance&dag_id=test_task_view_type_check&'
- 'execution_date={}'.format(DEFAULT_DATE_DS))
+ 'execution_date={}'.format(EXAMPLE_DAG_DEFAULT_DATE))
self.assertIn("A __call__ method", response.data.decode('utf-8'))
def tearDown(self):
configuration.conf.set("webserver", "expose_config", "False")
- self.dag_bash.clear(start_date=DEFAULT_DATE,
end_date=timezone.utcnow())
+ self.dag_bash.clear(start_date=EXAMPLE_DAG_DEFAULT_DATE,
+ end_date=timezone.utcnow())
session = Session()
session.query(models.DagRun).delete()
session.query(models.TaskInstance).delete()
diff --git a/tests/dags/test_scheduler_dags.py
b/tests/dags/test_scheduler_dags.py
index ae2bd202d9..94e6f8216d 100644
--- a/tests/dags/test_scheduler_dags.py
+++ b/tests/dags/test_scheduler_dags.py
@@ -17,18 +17,34 @@
# specific language governing permissions and limitations
# under the License.
-from datetime import datetime
+from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
-DEFAULT_DATE = datetime(2100, 1, 1)
+DEFAULT_DATE = datetime(2016, 1, 1)
# DAG tests backfill with pooled tasks
# Previously backfill would queue the task but never run it
dag1 = DAG(
dag_id='test_start_date_scheduling',
- start_date=datetime(2100, 1, 1))
+ start_date=datetime.utcnow() + timedelta(days=1))
dag1_task1 = DummyOperator(
task_id='dummy',
dag=dag1,
owner='airflow')
+
+dag2 = DAG(
+ dag_id='test_task_start_date_scheduling',
+ start_date=DEFAULT_DATE
+)
+dag2_task1 = DummyOperator(
+ task_id='dummy1',
+ dag=dag2,
+ owner='airflow',
+ start_date=DEFAULT_DATE + timedelta(days=3)
+)
+dag2_task2 = DummyOperator(
+ task_id='dummy2',
+ dag=dag2,
+ owner='airflow'
+)
diff --git a/tests/jobs.py b/tests/jobs.py
index bb714bd201..9dcd15fbe6 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -2217,7 +2217,7 @@ def test_scheduler_start_date(self):
dag_id = 'test_start_date_scheduling'
dag = self.dagbag.get_dag(dag_id)
dag.clear()
- self.assertTrue(dag.start_date > DEFAULT_DATE)
+ self.assertTrue(dag.start_date > datetime.datetime.utcnow())
scheduler = SchedulerJob(dag_id,
num_runs=2)
@@ -2252,6 +2252,27 @@ def test_scheduler_start_date(self):
self.assertEqual(
len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)
+ def test_scheduler_task_start_date(self):
+ """
+ Test that the scheduler respects task start dates that are different
+ from DAG start dates
+ """
+ dag_id = 'test_task_start_date_scheduling'
+ dag = self.dagbag.get_dag(dag_id)
+ dag.clear()
+ scheduler = SchedulerJob(dag_id,
+ num_runs=2)
+ scheduler.run()
+
+ session = settings.Session()
+ tiq = session.query(TI).filter(TI.dag_id == dag_id)
+ ti1s = tiq.filter(TI.task_id == 'dummy1').all()
+ ti2s = tiq.filter(TI.task_id == 'dummy2').all()
+ self.assertEqual(len(ti1s), 0)
+ self.assertEqual(len(ti2s), 2)
+ for t in ti2s:
+ self.assertEqual(t.state, State.SUCCESS)
+
def test_scheduler_multiprocessing(self):
"""
Test that the scheduler can successfully queue multiple dags in
parallel
diff --git a/tests/www_rbac/test_views.py b/tests/www_rbac/test_views.py
index 6453124465..a952b9874c 100644
--- a/tests/www_rbac/test_views.py
+++ b/tests/www_rbac/test_views.py
@@ -39,7 +39,7 @@
from airflow.models import DAG, DagRun, TaskInstance
from airflow.operators.dummy_operator import DummyOperator
from airflow.settings import Session
-from airflow.utils import timezone
+from airflow.utils import dates, timezone
from airflow.utils.state import State
from airflow.utils.timezone import datetime
from airflow.www_rbac import app as application
@@ -267,8 +267,8 @@ def test_mount(self):
class TestAirflowBaseViews(TestBase):
- default_date = timezone.datetime(2018, 3, 1)
- run_id = "test_{}".format(models.DagRun.id_for_date(default_date))
+ EXAMPLE_DAG_DEFAULT_DATE = dates.days_ago(2)
+ run_id =
"test_{}".format(models.DagRun.id_for_date(EXAMPLE_DAG_DEFAULT_DATE))
def setUp(self):
super(TestAirflowBaseViews, self).setUp()
@@ -297,19 +297,19 @@ def prepare_dagruns(self):
self.bash_dagrun = self.bash_dag.create_dagrun(
run_id=self.run_id,
- execution_date=self.default_date,
+ execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING)
self.sub_dagrun = self.sub_dag.create_dagrun(
run_id=self.run_id,
- execution_date=self.default_date,
+ execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING)
self.xcom_dagrun = self.xcom_dag.create_dagrun(
run_id=self.run_id,
- execution_date=self.default_date,
+ execution_date=self.EXAMPLE_DAG_DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.RUNNING)
@@ -327,19 +327,19 @@ def test_home(self):
def test_task(self):
url =
('task?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
- .format(self.percent_encode(self.default_date)))
+ .format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url, follow_redirects=True)
self.check_content_in_response('Task Instance Details', resp)
def test_xcom(self):
url =
('xcom?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
- .format(self.percent_encode(self.default_date)))
+ .format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url, follow_redirects=True)
self.check_content_in_response('XCom', resp)
def test_rendered(self):
url =
('rendered?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
- .format(self.percent_encode(self.default_date)))
+ .format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url, follow_redirects=True)
self.check_content_in_response('Rendered Template', resp)
@@ -409,28 +409,28 @@ def test_paused(self):
def test_failed(self):
url = ('failed?task_id=run_this_last&dag_id=example_bash_operator&'
'execution_date={}&upstream=false&downstream=false&future=false&past=false'
- .format(self.percent_encode(self.default_date)))
+ .format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url)
self.check_content_in_response('Wait a minute', resp)
def test_success(self):
url = ('success?task_id=run_this_last&dag_id=example_bash_operator&'
'execution_date={}&upstream=false&downstream=false&future=false&past=false'
- .format(self.percent_encode(self.default_date)))
+ .format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url)
self.check_content_in_response('Wait a minute', resp)
def test_clear(self):
url = ('clear?task_id=runme_1&dag_id=example_bash_operator&'
'execution_date={}&upstream=false&downstream=false&future=false&past=false'
- .format(self.percent_encode(self.default_date)))
+ .format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url)
self.check_content_in_response(['example_bash_operator', 'Wait a
minute'], resp)
def test_run(self):
url =
('run?task_id=runme_0&dag_id=example_bash_operator&ignore_all_deps=false&'
'ignore_ti_state=true&execution_date={}'
- .format(self.percent_encode(self.default_date)))
+ .format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
resp = self.client.get(url)
self.check_content_in_response('', resp, resp_code=302)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Differing start_dates on tasks not respected by scheduler.
> ----------------------------------------------------------
>
> Key: AIRFLOW-1837
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1837
> Project: Apache Airflow
> Issue Type: Bug
> Affects Versions: 1.9.0
> Reporter: Ash Berlin-Taylor
> Assignee: Dima Kamalov
> Priority: Major
> Attachments: everything_ran.png
>
>
> It it possible to specify start_date directly on tasks in dag, as well as on
> the DAG. This is correctly handled when creating dag runs, but it is
> seemingly ignored when scheduling tasks.
> Given this example:
> {code}
> dag_args = {
> "start_date": datetime(2017, 9, 4),
> }
> dag = DAG(
> "my-dag",
> default_args=dag_args,
> schedule_interval="0 0 * * Mon",
> )
> # ...
> with dag:
> op = PythonOperator(
> python_callable=fetcher.run,
> task_id="fetch_all_respondents",
> provide_context=True,
> # The "unfiltered" API calls are a lot quicker, so lets put them
> # ahead of any other filtered job in the queue.
> priority_weight=10,
> start_date=datetime(2014, 9, 1),
> )
> op = PythonOperator(
> python_callable=fetcher.run,
> task_id="fetch_by_demographics",
> op_kwargs={
> 'demo_names': demo_names,
> },
> provide_context=True,
> priority_weight=5,
> )
> {code}
> I only want the fetch_all_respondents tasks to run for 2014..2017, and then
> from September 2017 I also want the fetch_by_demographics task to run.
> However right now both tasks are being scheduled from 2014-09-01.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)