Repository: incubator-airflow Updated Branches: refs/heads/master c458a22cf -> 772dbae29
[AIRFLOW-1927] Convert naive datetimes for TaskInstances TaskInstances are sometimes instantiated outside core Airflow with naive datetimes. In case this happens we now default to using the time zone of the DAG if that is available or the default system time zone. Closes #2946 from bolkedebruin/AIRFLOW-1927 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/772dbae2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/772dbae2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/772dbae2 Branch: refs/heads/master Commit: 772dbae298680feb9d521e7cd5526f4059d7cb69 Parents: c458a22 Author: Bolke de Bruin <[email protected]> Authored: Tue Feb 6 17:26:08 2018 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Tue Feb 6 17:26:08 2018 +0100 ---------------------------------------------------------------------- airflow/bin/cli.py | 2 +- airflow/models.py | 18 ++++++++++++++++-- docs/timezone.rst | 6 +++--- setup.py | 2 +- tests/jobs.py | 25 +++++++++++++++++++++++++ tests/models.py | 23 +++++++++++++++++++++++ 6 files changed, 69 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/772dbae2/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index d0c11d3..6bfcdcc 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -27,7 +27,7 @@ from importlib import import_module import argparse from builtins import input from collections import namedtuple -from dateutil.parser import parse as parsedate +from airflow.utils.timezone import parse as parsedate import json from tabulate import tabulate http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/772dbae2/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 9854213..78d1580 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -797,8 +797,23 @@ class TaskInstance(Base, LoggingMixin): def __init__(self, task, execution_date, state=None): self.dag_id = task.dag_id self.task_id = task.task_id - self.execution_date = execution_date self.task = task + self._log = logging.getLogger("airflow.task") + + # make sure we have a localized execution_date stored in UTC + if execution_date and not timezone.is_localized(execution_date): + self.log.warning("execution date %s has no timezone information. Using " + "default from dag or system", execution_date) + if self.task.has_dag(): + execution_date = timezone.make_aware(execution_date, + self.task.dag.timezone) + else: + execution_date = timezone.make_aware(execution_date) + + execution_date = timezone.convert_to_utc(execution_date) + + self.execution_date = execution_date + self.queue = task.queue self.pool = task.pool self.priority_weight = task.priority_weight_total @@ -810,7 +825,6 @@ class TaskInstance(Base, LoggingMixin): self.state = state self.hostname = '' self.init_on_load() - self._log = logging.getLogger("airflow.task") # Is this TaskInstance being currently running within `airflow run --raw`. # Not persisted to the database so only valid for the current process self.is_raw = False http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/772dbae2/docs/timezone.rst ---------------------------------------------------------------------- diff --git a/docs/timezone.rst b/docs/timezone.rst index ca30686..9e8598e 100644 --- a/docs/timezone.rst +++ b/docs/timezone.rst @@ -2,9 +2,9 @@ Time zones ========== Support for time zones is enabled by default. Airflow stores datetime information in UTC internally and in the database. - It allows you to run your DAGs with time zone dependent schedules. At the moment Airflow does not them to the end - userâs time zone in the user interface. Also templates used in Operators are not translated. Time zone information - is exposed and it is left up to the writer of DAG what do with it. +It allows you to run your DAGs with time zone dependent schedules. At the moment Airflow does not convert them to the +end userâs time zone in the user interface. There it will always be displayed in UTC. Also templates used in Operators +are not converted. Time zone information is exposed and it is up to the writer of DAG what do with it. This is handy if your users live in more than one time zone and you want to display datetime information according to each userâs wall clock. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/772dbae2/setup.py ---------------------------------------------------------------------- diff --git a/setup.py b/setup.py index 0b9c680..148e53d 100644 --- a/setup.py +++ b/setup.py @@ -221,7 +221,7 @@ def do_setup(): 'lxml>=3.6.0, <4.0', 'markdown>=2.5.2, <3.0', 'pandas>=0.17.1, <1.0.0', - 'pendulum==1.3.2', + 'pendulum==1.4.0', 'psutil>=4.2.0, <5.0.0', 'pygments>=2.0.1, <3.0', 'python-daemon>=2.1.1, <2.2', http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/772dbae2/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index aa78721..b2ca15e 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -333,6 +333,31 @@ class BackfillJobTest(unittest.TestCase): ti_dependent.refresh_from_db() self.assertEquals(ti_dependent.state, State.SUCCESS) + def test_run_naive_taskinstance(self): + """ + Test that we can run naive (non-localized) task instances + """ + NAIVE_DATE = datetime.datetime(2016, 1, 1) + dag_id = 'test_run_ignores_all_dependencies' + + dag = self.dagbag.get_dag('test_run_ignores_all_dependencies') + dag.clear() + + task0_id = 'test_run_dependent_task' + args0 = ['run', + '-A', + dag_id, + task0_id, + NAIVE_DATE.isoformat()] + + cli.run(self.parser.parse_args(args0)) + ti_dependent0 = TI( + task=dag.get_task(task0_id), + execution_date=NAIVE_DATE) + + ti_dependent0.refresh_from_db() + self.assertEquals(ti_dependent0.state, State.FAILED) + def test_cli_backfill_depends_on_past(self): """ Test that CLI respects -I argument http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/772dbae2/tests/models.py ---------------------------------------------------------------------- diff --git a/tests/models.py b/tests/models.py index 17a9043..c89296e 100644 --- a/tests/models.py +++ b/tests/models.py @@ -898,6 +898,29 @@ class TaskInstanceTest(unittest.TestCase): self.assertTrue( op3.end_date == DEFAULT_DATE + datetime.timedelta(days=9)) + def test_timezone_awareness(self): + NAIVE_DATETIME = DEFAULT_DATE.replace(tzinfo=None) + + # check ti without dag (just for bw compat) + op_no_dag = DummyOperator(task_id='op_no_dag') + ti = TI(task=op_no_dag, execution_date=NAIVE_DATETIME) + + self.assertEquals(ti.execution_date, DEFAULT_DATE) + + # check with dag without localized execution_date + dag = DAG('dag', start_date=DEFAULT_DATE) + op1 = DummyOperator(task_id='op_1') + dag.add_task(op1) + ti = TI(task=op1, execution_date=NAIVE_DATETIME) + + self.assertEquals(ti.execution_date, DEFAULT_DATE) + + # with dag and localized execution_date + tz = pendulum.timezone("Europe/Amsterdam") + execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=tz) + utc_date = timezone.convert_to_utc(execution_date) + ti = TI(task=op1, execution_date=execution_date) + self.assertEquals(ti.execution_date, utc_date) def test_set_dag(self): """
