[AIRFLOW-1808] Convert all utcnow() to time zone aware

datetime.utcnow() does not set time zone information.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c857436b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c857436b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c857436b

Branch: refs/heads/master
Commit: c857436b7565777695c8336dbac2ef60a74d71d1
Parents: a47255f
Author: Bolke de Bruin <[email protected]>
Authored: Mon Nov 27 15:54:20 2017 +0100
Committer: Bolke de Bruin <[email protected]>
Committed: Mon Nov 27 15:54:20 2017 +0100

----------------------------------------------------------------------
 airflow/api/common/experimental/mark_tasks.py   |  9 +-
 airflow/api/common/experimental/trigger_dag.py  |  6 +-
 airflow/jobs.py                                 | 71 ++++++++--------
 airflow/models.py                               | 88 ++++++++++++--------
 airflow/operators/dagrun_operator.py            |  5 +-
 airflow/operators/latest_only_operator.py       |  4 +-
 airflow/operators/sensors.py                    | 10 +--
 airflow/ti_deps/deps/not_in_retry_period_dep.py |  4 +-
 airflow/ti_deps/deps/runnable_exec_date_dep.py  |  4 +-
 airflow/utils/dag_processing.py                 |  8 +-
 airflow/utils/dates.py                          |  5 +-
 airflow/www/forms.py                            |  4 +-
 airflow/www/utils.py                            |  7 +-
 airflow/www/views.py                            | 31 +++----
 setup.py                                        |  1 +
 15 files changed, 142 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/api/common/experimental/mark_tasks.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/mark_tasks.py 
b/airflow/api/common/experimental/mark_tasks.py
index e0ea313..e9366e0 100644
--- a/airflow/api/common/experimental/mark_tasks.py
+++ b/airflow/api/common/experimental/mark_tasks.py
@@ -12,16 +12,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import datetime
-
 from airflow.jobs import BackfillJob
 from airflow.models import DagRun, TaskInstance
 from airflow.operators.subdag_operator import SubDagOperator
 from airflow.settings import Session
+from airflow.utils import timezone
 from airflow.utils.state import State
 
 from sqlalchemy import or_
 
+
 def _create_dagruns(dag, execution_dates, state, run_id_template):
     """
     Infers from the dates which dag runs need to be created and does so.
@@ -39,7 +39,7 @@ def _create_dagruns(dag, execution_dates, state, 
run_id_template):
         dr = dag.create_dagrun(
             run_id=run_id_template.format(date.isoformat()),
             execution_date=date,
-            start_date=datetime.datetime.utcnow(),
+            start_date=timezone.utcnow(),
             external_trigger=False,
             state=state,
         )
@@ -67,7 +67,7 @@ def set_state(task, execution_date, upstream=False, 
downstream=False,
     :param commit: Commit tasks to be altered to the database
     :return: list of tasks that have been created and updated
     """
-    assert isinstance(execution_date, datetime.datetime)
+    assert timezone.is_localized(execution_date)
 
     # microseconds are supported by the database, but is not handled
     # correctly by airflow on e.g. the filesystem and in other places
@@ -185,6 +185,7 @@ def set_state(task, execution_date, upstream=False, 
downstream=False,
 
     return tis_altered
 
+
 def set_dag_run_state(dag, execution_date, state=State.SUCCESS, commit=False):
     """
     Set the state of a dag run and all task instances associated with the dag

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/api/common/experimental/trigger_dag.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/trigger_dag.py 
b/airflow/api/common/experimental/trigger_dag.py
index bfb6ad4..9d9934d 100644
--- a/airflow/api/common/experimental/trigger_dag.py
+++ b/airflow/api/common/experimental/trigger_dag.py
@@ -12,11 +12,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import datetime
 import json
 
 from airflow.exceptions import AirflowException
 from airflow.models import DagRun, DagBag
+from airflow.utils import timezone
 from airflow.utils.state import State
 
 
@@ -29,9 +29,9 @@ def trigger_dag(dag_id, run_id=None, conf=None, 
execution_date=None):
     dag = dagbag.get_dag(dag_id)
 
     if not execution_date:
-        execution_date = datetime.datetime.utcnow()
+        execution_date = timezone.utcnow()
 
-    assert isinstance(execution_date, datetime.datetime)
+    assert timezone.is_localized(execution_date)
     execution_date = execution_date.replace(microsecond=0)
 
     if not run_id:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 664fab5..4e1864e 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -28,8 +28,9 @@ import socket
 import sys
 import threading
 import time
+import datetime
+
 from collections import defaultdict
-from datetime import datetime
 from past.builtins import basestring
 from sqlalchemy import (
     Column, Integer, String, DateTime, func, Index, or_, and_, not_)
@@ -46,7 +47,7 @@ from airflow.models import DAG, DagRun
 from airflow.settings import Stats
 from airflow.task_runner import get_task_runner
 from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
-from airflow.utils import asciiart
+from airflow.utils import asciiart, timezone
 from airflow.utils.dag_processing import (AbstractDagFileProcessor,
                                           DagFileProcessorManager,
                                           SimpleDag,
@@ -100,22 +101,22 @@ class BaseJob(Base, LoggingMixin):
         self.hostname = socket.getfqdn()
         self.executor = executor
         self.executor_class = executor.__class__.__name__
-        self.start_date = datetime.utcnow()
-        self.latest_heartbeat = datetime.utcnow()
+        self.start_date = timezone.utcnow()
+        self.latest_heartbeat = timezone.utcnow()
         self.heartrate = heartrate
         self.unixname = getpass.getuser()
         super(BaseJob, self).__init__(*args, **kwargs)
 
     def is_alive(self):
         return (
-            (datetime.utcnow() - self.latest_heartbeat).seconds <
+            (timezone.utcnow() - self.latest_heartbeat).seconds <
             (conf.getint('scheduler', 'JOB_HEARTBEAT_SEC') * 2.1)
         )
 
     @provide_session
     def kill(self, session=None):
         job = session.query(BaseJob).filter(BaseJob.id == self.id).first()
-        job.end_date = datetime.utcnow()
+        job.end_date = timezone.utcnow()
         try:
             self.on_kill()
         except:
@@ -165,14 +166,14 @@ class BaseJob(Base, LoggingMixin):
         if job.latest_heartbeat:
             sleep_for = max(
                 0,
-                self.heartrate - (datetime.utcnow() - 
job.latest_heartbeat).total_seconds())
+                self.heartrate - (timezone.utcnow() - 
job.latest_heartbeat).total_seconds())
 
         sleep(sleep_for)
 
         # Update last heartbeat time
         with create_session() as session:
             job = session.query(BaseJob).filter(BaseJob.id == self.id).first()
-            job.latest_heartbeat = datetime.utcnow()
+            job.latest_heartbeat = timezone.utcnow()
             session.merge(job)
             session.commit()
 
@@ -194,7 +195,7 @@ class BaseJob(Base, LoggingMixin):
             self._execute()
 
             # Marking the success in the DB
-            self.end_date = datetime.utcnow()
+            self.end_date = timezone.utcnow()
             self.state = State.SUCCESS
             session.merge(self)
             session.commit()
@@ -399,7 +400,7 @@ class DagFileProcessor(AbstractDagFileProcessor, 
LoggingMixin):
             self._pickle_dags,
             self._dag_id_white_list,
             "DagFileProcessor{}".format(self._instance_id))
-        self._start_time = datetime.utcnow()
+        self._start_time = timezone.utcnow()
 
     def terminate(self, sigkill=False):
         """
@@ -615,16 +616,16 @@ class SchedulerJob(BaseJob):
             TI.execution_date == sq.c.max_ti,
         ).all()
 
-        ts = datetime.utcnow()
+        ts = timezone.utcnow()
         SlaMiss = models.SlaMiss
         for ti in max_tis:
             task = dag.get_task(ti.task_id)
             dttm = ti.execution_date
             if task.sla:
                 dttm = dag.following_schedule(dttm)
-                while dttm < datetime.utcnow():
+                while dttm < timezone.utcnow():
                     following_schedule = dag.following_schedule(dttm)
-                    if following_schedule + task.sla < datetime.utcnow():
+                    if following_schedule + task.sla < timezone.utcnow():
                         session.merge(models.SlaMiss(
                             task_id=ti.task_id,
                             dag_id=ti.dag_id,
@@ -772,9 +773,9 @@ class SchedulerJob(BaseJob):
             for dr in active_runs:
                 if (
                         dr.start_date and dag.dagrun_timeout and
-                        dr.start_date < datetime.utcnow() - 
dag.dagrun_timeout):
+                        dr.start_date < timezone.utcnow() - 
dag.dagrun_timeout):
                     dr.state = State.FAILED
-                    dr.end_date = datetime.utcnow()
+                    dr.end_date = timezone.utcnow()
                     timedout_runs += 1
             session.commit()
             if len(active_runs) - timedout_runs >= dag.max_active_runs:
@@ -799,9 +800,9 @@ class SchedulerJob(BaseJob):
             # don't do scheduler catchup for dag's that don't have dag.catchup 
= True
             if not dag.catchup:
                 # The logic is that we move start_date up until
-                # one period before, so that datetime.utcnow() is AFTER
+                # one period before, so that timezone.utcnow() is AFTER
                 # the period end, and the job can be created...
-                now = datetime.utcnow()
+                now = timezone.utcnow()
                 next_start = dag.following_schedule(now)
                 last_start = dag.previous_schedule(now)
                 if next_start <= now:
@@ -847,7 +848,7 @@ class SchedulerJob(BaseJob):
                 )
 
             # don't ever schedule in the future
-            if next_run_date > datetime.utcnow():
+            if next_run_date > timezone.utcnow():
                 return
 
             # this structure is necessary to avoid a TypeError from 
concatenating
@@ -870,11 +871,11 @@ class SchedulerJob(BaseJob):
             if next_run_date and min_task_end_date and next_run_date > 
min_task_end_date:
                 return
 
-            if next_run_date and period_end and period_end <= 
datetime.utcnow():
+            if next_run_date and period_end and period_end <= 
timezone.utcnow():
                 next_run = dag.create_dagrun(
                     run_id=DagRun.ID_PREFIX + next_run_date.isoformat(),
                     execution_date=next_run_date,
-                    start_date=datetime.utcnow(),
+                    start_date=timezone.utcnow(),
                     state=State.RUNNING,
                     external_trigger=False
                 )
@@ -894,7 +895,7 @@ class SchedulerJob(BaseJob):
         for run in dag_runs:
             self.log.info("Examining DAG run %s", run)
             # don't consider runs that are executed in the future
-            if run.execution_date > datetime.utcnow():
+            if run.execution_date > timezone.utcnow():
                 self.log.error(
                     "Execution date is in future: %s",
                     run.execution_date
@@ -1231,7 +1232,7 @@ class SchedulerJob(BaseJob):
         # set TIs to queued state
         for task_instance in tis_to_set_to_queued:
             task_instance.state = State.QUEUED
-            task_instance.queued_dttm = (datetime.utcnow()
+            task_instance.queued_dttm = (timezone.utcnow()
                                          if not task_instance.queued_dttm
                                          else task_instance.queued_dttm)
             session.merge(task_instance)
@@ -1468,7 +1469,7 @@ class SchedulerJob(BaseJob):
             last_runtime = processor_manager.get_last_runtime(file_path)
             processor_pid = processor_manager.get_pid(file_path)
             processor_start_time = processor_manager.get_start_time(file_path)
-            runtime = ((datetime.utcnow() - 
processor_start_time).total_seconds()
+            runtime = ((timezone.utcnow() - 
processor_start_time).total_seconds()
                        if processor_start_time else None)
             last_run = processor_manager.get_last_finish_time(file_path)
 
@@ -1585,34 +1586,34 @@ class SchedulerJob(BaseJob):
         self.log.info("Resetting orphaned tasks for active dag runs")
         self.reset_state_for_orphaned_tasks()
 
-        execute_start_time = datetime.utcnow()
+        execute_start_time = timezone.utcnow()
 
         # Last time stats were printed
-        last_stat_print_time = datetime(2000, 1, 1)
+        last_stat_print_time = datetime.datetime(2000, 1, 1, 
tzinfo=timezone.utc)
         # Last time that self.heartbeat() was called.
-        last_self_heartbeat_time = datetime.utcnow()
+        last_self_heartbeat_time = timezone.utcnow()
         # Last time that the DAG dir was traversed to look for files
-        last_dag_dir_refresh_time = datetime.utcnow()
+        last_dag_dir_refresh_time = timezone.utcnow()
 
         # Use this value initially
         known_file_paths = processor_manager.file_paths
 
         # For the execute duration, parse and schedule DAGs
-        while (datetime.utcnow() - execute_start_time).total_seconds() < \
+        while (timezone.utcnow() - execute_start_time).total_seconds() < \
                 self.run_duration or self.run_duration < 0:
             self.log.debug("Starting Loop...")
             loop_start_time = time.time()
 
             # Traverse the DAG directory for Python files containing DAGs
             # periodically
-            elapsed_time_since_refresh = (datetime.utcnow() -
+            elapsed_time_since_refresh = (timezone.utcnow() -
                                           
last_dag_dir_refresh_time).total_seconds()
 
             if elapsed_time_since_refresh > self.dag_dir_list_interval:
                 # Build up a list of Python files that could contain DAGs
                 self.log.info("Searching for files in %s", self.subdir)
                 known_file_paths = list_py_file_paths(self.subdir)
-                last_dag_dir_refresh_time = datetime.utcnow()
+                last_dag_dir_refresh_time = timezone.utcnow()
                 self.log.info("There are %s files in %s", 
len(known_file_paths), self.subdir)
                 processor_manager.set_file_paths(known_file_paths)
 
@@ -1662,20 +1663,20 @@ class SchedulerJob(BaseJob):
             self._process_executor_events(simple_dag_bag)
 
             # Heartbeat the scheduler periodically
-            time_since_last_heartbeat = (datetime.utcnow() -
+            time_since_last_heartbeat = (timezone.utcnow() -
                                          
last_self_heartbeat_time).total_seconds()
             if time_since_last_heartbeat > self.heartrate:
                 self.log.info("Heartbeating the scheduler")
                 self.heartbeat()
-                last_self_heartbeat_time = datetime.utcnow()
+                last_self_heartbeat_time = timezone.utcnow()
 
             # Occasionally print out stats about how fast the files are 
getting processed
-            if ((datetime.utcnow() - last_stat_print_time).total_seconds() >
+            if ((timezone.utcnow() - last_stat_print_time).total_seconds() >
                     self.print_stats_interval):
                 if len(known_file_paths) > 0:
                     self._log_file_processing_stats(known_file_paths,
                                                     processor_manager)
-                last_stat_print_time = datetime.utcnow()
+                last_stat_print_time = timezone.utcnow()
 
             loop_end_time = time.time()
             self.log.debug("Ran scheduling loop in %.2f seconds", 
loop_end_time - loop_start_time)
@@ -2049,7 +2050,7 @@ class BackfillJob(BaseJob):
         run = run or self.dag.create_dagrun(
             run_id=run_id,
             execution_date=run_date,
-            start_date=datetime.utcnow(),
+            start_date=timezone.utcnow(),
             state=State.RUNNING,
             external_trigger=False,
             session=session

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 37a49cf..fe62ac5 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -24,7 +24,8 @@ from builtins import str
 from builtins import object, bytes
 import copy
 from collections import namedtuple
-from datetime import datetime, timedelta
+from datetime import timedelta
+
 import dill
 import functools
 import getpass
@@ -69,6 +70,7 @@ from airflow.ti_deps.deps.trigger_rule_dep import 
TriggerRuleDep
 from airflow.ti_deps.deps.task_concurrency_dep import TaskConcurrencyDep
 
 from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
+from airflow.utils import timezone
 from airflow.utils.dates import cron_presets, date_range as utils_date_range
 from airflow.utils.db import provide_session
 from airflow.utils.decorators import apply_defaults
@@ -154,7 +156,7 @@ def clear_task_instances(tis, session, 
activate_dag_runs=True, dag=None):
         ).all()
         for dr in drs:
             dr.state = State.RUNNING
-            dr.start_date = datetime.utcnow()
+            dr.start_date = timezone.utcnow()
 
 
 class DagBag(BaseDagBag, LoggingMixin):
@@ -341,7 +343,7 @@ class DagBag(BaseDagBag, LoggingMixin):
         self.log.info("Finding 'running' jobs without a recent heartbeat")
         TI = TaskInstance
         secs = configuration.getint('scheduler', 
'scheduler_zombie_task_threshold')
-        limit_dttm = datetime.utcnow() - timedelta(seconds=secs)
+        limit_dttm = timezone.utcnow() - timedelta(seconds=secs)
         self.log.info("Failing jobs without heartbeat after %s", limit_dttm)
 
         tis = (
@@ -373,7 +375,7 @@ class DagBag(BaseDagBag, LoggingMixin):
         """
         self.dags[dag.dag_id] = dag
         dag.resolve_template_files()
-        dag.last_loaded = datetime.utcnow()
+        dag.last_loaded = timezone.utcnow()
 
         for task in dag.tasks:
             settings.policy(task)
@@ -398,7 +400,7 @@ class DagBag(BaseDagBag, LoggingMixin):
         ignoring files that match any of the regex patterns specified
         in the file.
         """
-        start_dttm = datetime.utcnow()
+        start_dttm = timezone.utcnow()
         dag_folder = dag_folder or self.dag_folder
 
         # Used to store stats around DagBag processing
@@ -426,11 +428,11 @@ class DagBag(BaseDagBag, LoggingMixin):
                             continue
                         if not any(
                                 [re.findall(p, filepath) for p in patterns]):
-                            ts = datetime.utcnow()
+                            ts = timezone.utcnow()
                             found_dags = self.process_file(
                                 filepath, only_if_updated=only_if_updated)
 
-                            td = datetime.utcnow() - ts
+                            td = timezone.utcnow() - ts
                             td = td.total_seconds() + (
                                 float(td.microseconds) / 1000000)
                             stats.append(FileLoadStat(
@@ -443,7 +445,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                     except Exception as e:
                         self.log.exception(e)
         Stats.gauge(
-            'collect_dags', (datetime.utcnow() - start_dttm).total_seconds(), 
1)
+            'collect_dags', (timezone.utcnow() - start_dttm).total_seconds(), 
1)
         Stats.gauge(
             'dagbag_size', len(self.dags), 1)
         Stats.gauge(
@@ -1065,8 +1067,8 @@ class TaskInstance(Base, LoggingMixin):
     @provide_session
     def set_state(self, state, session=None):
         self.state = state
-        self.start_date = datetime.utcnow()
-        self.end_date = datetime.utcnow()
+        self.start_date = timezone.utcnow()
+        self.end_date = timezone.utcnow()
         session.merge(self)
         session.commit()
 
@@ -1231,7 +1233,7 @@ class TaskInstance(Base, LoggingMixin):
         to be retried.
         """
         return (self.state == State.UP_FOR_RETRY and
-                self.next_retry_datetime() < datetime.utcnow())
+                self.next_retry_datetime() < timezone.utcnow())
 
     @provide_session
     def pool_full(self, session):
@@ -1339,7 +1341,7 @@ class TaskInstance(Base, LoggingMixin):
         msg = "Starting attempt {attempt} of {total}".format(
             attempt=self.try_number + 1,
             total=self.max_tries + 1)
-        self.start_date = datetime.utcnow()
+        self.start_date = timezone.utcnow()
 
         dep_context = DepContext(
             deps=RUN_DEPS - QUEUE_DEPS,
@@ -1363,7 +1365,7 @@ class TaskInstance(Base, LoggingMixin):
                 total=self.max_tries + 1)
             self.log.warning(hr + msg + hr)
 
-            self.queued_dttm = datetime.utcnow()
+            self.queued_dttm = timezone.utcnow()
             self.log.info("Queuing into pool %s", self.pool)
             session.merge(self)
             session.commit()
@@ -1508,7 +1510,7 @@ class TaskInstance(Base, LoggingMixin):
             raise
 
         # Recording SUCCESS
-        self.end_date = datetime.utcnow()
+        self.end_date = timezone.utcnow()
         self.set_duration()
         if not test_mode:
             session.add(Log(self.state, self))
@@ -1569,7 +1571,7 @@ class TaskInstance(Base, LoggingMixin):
     def handle_failure(self, error, test_mode=False, context=None, 
session=None):
         self.log.exception(error)
         task = self.task
-        self.end_date = datetime.utcnow()
+        self.end_date = timezone.utcnow()
         self.set_duration()
         Stats.incr('operator_failures_{}'.format(task.__class__.__name__), 1, 
1)
         Stats.incr('ti_failures')
@@ -1891,7 +1893,7 @@ class Log(Base):
     extra = Column(Text)
 
     def __init__(self, event, task_instance, owner=None, extra=None, **kwargs):
-        self.dttm = datetime.utcnow()
+        self.dttm = timezone.utcnow()
         self.event = event
         self.extra = extra
 
@@ -1929,7 +1931,7 @@ class SkipMixin(LoggingMixin):
             return
 
         task_ids = [d.task_id for d in tasks]
-        now = datetime.utcnow()
+        now = timezone.utcnow()
 
         if dag_run:
             session.query(TaskInstance).filter(
@@ -2544,7 +2546,7 @@ class BaseOperator(LoggingMixin):
         range.
         """
         TI = TaskInstance
-        end_date = end_date or datetime.utcnow()
+        end_date = end_date or timezone.utcnow()
         return session.query(TI).filter(
             TI.dag_id == self.dag_id,
             TI.task_id == self.task_id,
@@ -2591,7 +2593,7 @@ class BaseOperator(LoggingMixin):
         Run a set of task instances for a date range.
         """
         start_date = start_date or self.start_date
-        end_date = end_date or self.end_date or datetime.utcnow()
+        end_date = end_date or self.end_date or timezone.utcnow()
 
         for dt in self.dag.date_range(start_date, end_date=end_date):
             TaskInstance(self, dt).run(
@@ -2883,8 +2885,28 @@ class DAG(BaseDag, LoggingMixin):
         # set file location to caller source path
         self.fileloc = sys._getframe().f_back.f_code.co_filename
         self.task_dict = dict()
-        self.start_date = start_date
-        self.end_date = end_date
+
+        # set timezone
+        if start_date and start_date.tzinfo:
+            self.timezone = start_date.tzinfo
+        elif 'start_date' in self.default_args and 
self.default_args['start_date'].tzinfo:
+            self.timezone = self.default_args['start_date'].tzinfo
+        else:
+            self.timezone = settings.TIMEZONE
+
+        self.start_date = timezone.convert_to_utc(start_date)
+        self.end_date = timezone.convert_to_utc(end_date)
+
+        # also convert tasks
+        if 'start_date' in self.default_args:
+            self.default_args['start_date'] = (
+                timezone.convert_to_utc(self.default_args['start_date'])
+            )
+        if 'end_date' in self.default_args:
+            self.default_args['end_date'] = (
+                timezone.convert_to_utc(self.default_args['end_date'])
+            )
+
         self.schedule_interval = schedule_interval
         if schedule_interval in cron_presets:
             self._schedule_interval = cron_presets.get(schedule_interval)
@@ -2896,7 +2918,7 @@ class DAG(BaseDag, LoggingMixin):
             template_searchpath = [template_searchpath]
         self.template_searchpath = template_searchpath
         self.parent_dag = None  # Gets set when DAGs are loaded
-        self.last_loaded = datetime.utcnow()
+        self.last_loaded = timezone.utcnow()
         self.safe_dag_id = dag_id.replace('.', '__dot__')
         self.max_active_runs = max_active_runs
         self.dagrun_timeout = dagrun_timeout
@@ -2965,7 +2987,7 @@ class DAG(BaseDag, LoggingMixin):
 
     # /Context Manager ----------------------------------------------
 
-    def date_range(self, start_date, num=None, end_date=datetime.utcnow()):
+    def date_range(self, start_date, num=None, end_date=timezone.utcnow()):
         if num:
             end_date = None
         return utils_date_range(
@@ -2993,7 +3015,7 @@ class DAG(BaseDag, LoggingMixin):
 
         :param start_date: the start date of the interval
         :type start_date: datetime
-        :param end_date: the end date of the interval, defaults to 
datetime.utcnow()
+        :param end_date: the end date of the interval, defaults to 
timezone.utcnow()
         :type end_date: datetime
         :return: a list of dates within the interval following the dag's 
schedule
         :rtype: list
@@ -3005,7 +3027,7 @@ class DAG(BaseDag, LoggingMixin):
 
         # dates for dag runs
         using_start_date = using_start_date or min([t.start_date for t in 
self.tasks])
-        using_end_date = using_end_date or datetime.utcnow()
+        using_end_date = using_end_date or timezone.utcnow()
 
         # next run date for a subdag isn't relevant (schedule_interval for 
subdags
         # is ignored) so we use the dag run's start date in the case of a 
subdag
@@ -3274,9 +3296,9 @@ class DAG(BaseDag, LoggingMixin):
             self, session, start_date=None, end_date=None, state=None):
         TI = TaskInstance
         if not start_date:
-            start_date = (datetime.utcnow() - timedelta(30)).date()
+            start_date = (timezone.utcnow() - timedelta(30)).date()
             start_date = datetime.combine(start_date, datetime.min.time())
-        end_date = end_date or datetime.utcnow()
+        end_date = end_date or timezone.utcnow()
         tis = session.query(TI).filter(
             TI.dag_id == self.dag_id,
             TI.execution_date >= start_date,
@@ -3536,10 +3558,10 @@ class DAG(BaseDag, LoggingMixin):
         d = {}
         d['is_picklable'] = True
         try:
-            dttm = datetime.utcnow()
+            dttm = timezone.utcnow()
             pickled = pickle.dumps(self)
             d['pickle_len'] = len(pickled)
-            d['pickling_duration'] = "{}".format(datetime.utcnow() - dttm)
+            d['pickling_duration'] = "{}".format(timezone.utcnow() - dttm)
         except Exception as e:
             self.log.debug(e)
             d['is_picklable'] = False
@@ -3557,7 +3579,7 @@ class DAG(BaseDag, LoggingMixin):
         if not dp or dp.pickle != self:
             dp = DagPickle(dag=self)
             session.add(dp)
-            self.last_pickled = datetime.utcnow()
+            self.last_pickled = timezone.utcnow()
             session.commit()
             self.pickle_id = dp.id
 
@@ -3773,7 +3795,7 @@ class DAG(BaseDag, LoggingMixin):
         if owner is None:
             owner = self.owner
         if sync_time is None:
-            sync_time = datetime.utcnow()
+            sync_time = timezone.utcnow()
 
         orm_dag = session.query(
             DagModel).filter(DagModel.dag_id == self.dag_id).first()
@@ -4566,7 +4588,7 @@ class DagRun(Base, LoggingMixin):
 
         # pre-calculate
         # db is faster
-        start_dttm = datetime.utcnow()
+        start_dttm = timezone.utcnow()
         unfinished_tasks = self.get_task_instances(
             state=State.unfinished(),
             session=session
@@ -4590,7 +4612,7 @@ class DagRun(Base, LoggingMixin):
                     no_dependencies_met = False
                     break
 
-        duration = (datetime.utcnow() - start_dttm).total_seconds() * 1000
+        duration = (timezone.utcnow() - start_dttm).total_seconds() * 1000
         Stats.timing("dagrun.dependency-check.{}".format(self.dag_id), 
duration)
 
         # future: remove the check on adhoc tasks (=active_tasks)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/operators/dagrun_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/dagrun_operator.py 
b/airflow/operators/dagrun_operator.py
index 923b8a4..2b5a814 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -12,9 +12,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from datetime import datetime
-
 from airflow.models import BaseOperator, DagBag
+from airflow.utils import timezone
 from airflow.utils.db import create_session
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.state import State
@@ -59,7 +58,7 @@ class TriggerDagRunOperator(BaseOperator):
         self.trigger_dag_id = trigger_dag_id
 
     def execute(self, context):
-        dro = DagRunOrder(run_id='trig__' + datetime.utcnow().isoformat())
+        dro = DagRunOrder(run_id='trig__' + timezone.utcnow().isoformat())
         dro = self.python_callable(context, dro)
         if dro:
             with create_session() as session:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/operators/latest_only_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/latest_only_operator.py 
b/airflow/operators/latest_only_operator.py
index 7abd92d..7b4e0ca 100644
--- a/airflow/operators/latest_only_operator.py
+++ b/airflow/operators/latest_only_operator.py
@@ -12,9 +12,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import datetime
 
 from airflow.models import BaseOperator, SkipMixin
+from airflow.utils import timezone
 
 
 class LatestOnlyOperator(BaseOperator, SkipMixin):
@@ -35,7 +35,7 @@ class LatestOnlyOperator(BaseOperator, SkipMixin):
             self.log.info("Externally triggered DAG_Run: allowing execution to 
proceed.")
             return
 
-        now = datetime.datetime.utcnow()
+        now = timezone.utcnow()
         left_window = context['dag'].following_schedule(
             context['execution_date'])
         right_window = context['dag'].following_schedule(left_window)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index bd073b8..c8a8df6 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -21,7 +21,7 @@ standard_library.install_aliases()
 from builtins import str
 from past.builtins import basestring
 
-from datetime import datetime
+from airflow.utils import timezone
 from urllib.parse import urlparse
 from time import sleep
 import re
@@ -75,9 +75,9 @@ class BaseSensorOperator(BaseOperator):
         raise AirflowException('Override me.')
 
     def execute(self, context):
-        started_at = datetime.utcnow()
+        started_at = timezone.utcnow()
         while not self.poke(context):
-            if (datetime.utcnow() - started_at).total_seconds() > self.timeout:
+            if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
                 if self.soft_fail:
                     raise AirflowSkipException('Snap. Time is OUT.')
                 else:
@@ -602,7 +602,7 @@ class TimeSensor(BaseSensorOperator):
 
     def poke(self, context):
         self.log.info('Checking if the time (%s) has come', self.target_time)
-        return datetime.utcnow().time() > self.target_time
+        return timezone.utcnow().time() > self.target_time
 
 
 class TimeDeltaSensor(BaseSensorOperator):
@@ -627,7 +627,7 @@ class TimeDeltaSensor(BaseSensorOperator):
         target_dttm = dag.following_schedule(context['execution_date'])
         target_dttm += self.delta
         self.log.info('Checking if the time (%s) has come', target_dttm)
-        return datetime.utcnow() > target_dttm
+        return timezone.utcnow() > target_dttm
 
 
 class HttpSensor(BaseSensorOperator):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/ti_deps/deps/not_in_retry_period_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/not_in_retry_period_dep.py 
b/airflow/ti_deps/deps/not_in_retry_period_dep.py
index 7f9bff6..6628ff3 100644
--- a/airflow/ti_deps/deps/not_in_retry_period_dep.py
+++ b/airflow/ti_deps/deps/not_in_retry_period_dep.py
@@ -11,9 +11,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from datetime import datetime
 
 from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+from airflow.utils import timezone
 from airflow.utils.db import provide_session
 from airflow.utils.state import State
 
@@ -38,7 +38,7 @@ class NotInRetryPeriodDep(BaseTIDep):
 
         # Calculate the date first so that it is always smaller than the 
timestamp used by
         # ready_for_retry
-        cur_date = datetime.utcnow()
+        cur_date = timezone.utcnow()
         next_task_retry_date = ti.next_retry_datetime()
         if ti.is_premature:
             yield self._failing_status(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/ti_deps/deps/runnable_exec_date_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/runnable_exec_date_dep.py 
b/airflow/ti_deps/deps/runnable_exec_date_dep.py
index 13e5345..69321d9 100644
--- a/airflow/ti_deps/deps/runnable_exec_date_dep.py
+++ b/airflow/ti_deps/deps/runnable_exec_date_dep.py
@@ -11,9 +11,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from datetime import datetime
 
 from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+from airflow.utils import timezone
 from airflow.utils.db import provide_session
 
 
@@ -23,7 +23,7 @@ class RunnableExecDateDep(BaseTIDep):
 
     @provide_session
     def _get_dep_statuses(self, ti, session, dep_context):
-        cur_date = datetime.utcnow()
+        cur_date = timezone.utcnow()
 
         if ti.execution_date > cur_date:
             yield self._failing_status(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/utils/dag_processing.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 68cee76..965e88b 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -23,10 +23,10 @@ import time
 import zipfile
 from abc import ABCMeta, abstractmethod
 from collections import defaultdict
-from datetime import datetime
 
 from airflow.dag.base_dag import BaseDag, BaseDagBag
 from airflow.exceptions import AirflowException
+from airflow.utils import timezone
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 
@@ -376,7 +376,7 @@ class DagFileProcessorManager(LoggingMixin):
         being processed
         """
         if file_path in self._processors:
-            return (datetime.utcnow() - 
self._processors[file_path].start_time)\
+            return (timezone.utcnow() - 
self._processors[file_path].start_time)\
                 .total_seconds()
         return None
 
@@ -466,7 +466,7 @@ class DagFileProcessorManager(LoggingMixin):
         for file_path, processor in self._processors.items():
             if processor.done:
                 self.log.info("Processor for %s finished", file_path)
-                now = datetime.utcnow()
+                now = timezone.utcnow()
                 finished_processors[file_path] = processor
                 self._last_runtime[file_path] = (now -
                                                  
processor.start_time).total_seconds()
@@ -494,7 +494,7 @@ class DagFileProcessorManager(LoggingMixin):
             # If the file path is already being processed, or if a file was
             # processed recently, wait until the next batch
             file_paths_in_progress = self._processors.keys()
-            now = datetime.utcnow()
+            now = timezone.utcnow()
             file_paths_recently_processed = []
             for file_path in self._file_paths:
                 last_finish_time = self.get_last_finish_time(file_path)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/utils/dates.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py
index 81e1c2c..7d0d9d9 100644
--- a/airflow/utils/dates.py
+++ b/airflow/utils/dates.py
@@ -17,6 +17,7 @@ from __future__ import division
 from __future__ import print_function
 from __future__ import unicode_literals
 
+from airflow.utils import timezone
 from datetime import datetime, timedelta
 from dateutil.relativedelta import relativedelta  # for doctest
 import six
@@ -66,7 +67,7 @@ def date_range(
     if end_date and num:
         raise Exception("Wait. Either specify end_date OR num")
     if not end_date and not num:
-        end_date = datetime.utcnow()
+        end_date = timezone.utcnow()
 
     delta_iscron = False
     if isinstance(delta, six.string_types):
@@ -219,7 +220,7 @@ def days_ago(n, hour=0, minute=0, second=0, microsecond=0):
     Get a datetime object representing `n` days ago. By default the time is
     set to midnight.
     """
-    today = datetime.utcnow().replace(
+    today = timezone.utcnow().replace(
         hour=hour,
         minute=minute,
         second=second,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/www/forms.py
----------------------------------------------------------------------
diff --git a/airflow/www/forms.py b/airflow/www/forms.py
index 2c6118c..f5af35a 100644
--- a/airflow/www/forms.py
+++ b/airflow/www/forms.py
@@ -17,7 +17,7 @@ from __future__ import division
 from __future__ import print_function
 from __future__ import unicode_literals
 
-from datetime import datetime
+from airflow.utils import timezone
 from flask_admin.form import DateTimePickerWidget
 from wtforms import DateTimeField, SelectField
 from flask_wtf import Form
@@ -33,7 +33,7 @@ class DateTimeWithNumRunsForm(Form):
     # Date time and number of runs form for tree view, task duration
     # and landing times
     base_date = DateTimeField(
-        "Anchor date", widget=DateTimePickerWidget(), 
default=datetime.utcnow())
+        "Anchor date", widget=DateTimePickerWidget(), 
default=timezone.utcnow())
     num_runs = SelectField("Number of runs", default=25, choices=(
         (5, "5"),
         (25, "25"),

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/www/utils.py
----------------------------------------------------------------------
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 52b22fc..ae1fb5f 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -21,7 +21,7 @@ from cgi import escape
 from io import BytesIO as IO
 import functools
 import gzip
-import dateutil.parser as dateparser
+import iso8601
 import json
 import time
 
@@ -46,6 +46,7 @@ DEFAULT_SENSITIVE_VARIABLE_FIELDS = (
     'access_token',
 )
 
+
 def should_hide_value_for_key(key_name):
     return any(s in key_name.lower() for s in 
DEFAULT_SENSITIVE_VARIABLE_FIELDS) \
            and configuration.getboolean('admin', 
'hide_sensitive_variable_fields')
@@ -252,8 +253,8 @@ def action_logging(f):
             dag_id=request.args.get('dag_id'))
 
         if 'execution_date' in request.args:
-            log.execution_date = dateparser.parse(
-                request.args.get('execution_date'))
+            log.execution_date = iso8601.parse_date(
+                request.args.get('execution_date'), settings.TIMEZONE)
 
         with create_session() as session:
             session.add(log)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 1191bde..a6378bf 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -21,7 +21,7 @@ import os
 import pkg_resources
 import socket
 from functools import wraps
-from datetime import datetime, timedelta
+from datetime import timedelta
 import dateutil.parser
 import copy
 import math
@@ -72,6 +72,7 @@ from airflow.ti_deps.dep_context import DepContext, 
QUEUE_DEPS, SCHEDULER_DEPS
 from airflow.models import BaseOperator
 from airflow.operators.subdag_operator import SubDagOperator
 
+from airflow.utils import timezone
 from airflow.utils.json import json_ser
 from airflow.utils.state import State
 from airflow.utils.db import create_session, provide_session
@@ -170,7 +171,7 @@ def duration_f(v, c, m, p):
 def datetime_f(v, c, m, p):
     attr = getattr(m, p)
     dttm = attr.isoformat() if attr else ''
-    if datetime.utcnow().isoformat()[:4] == dttm[:4]:
+    if timezone.utcnow().isoformat()[:4] == dttm[:4]:
         dttm = dttm[5:]
     return Markup("<nobr>{}</nobr>".format(dttm))
 
@@ -922,7 +923,7 @@ class Airflow(BaseView):
             flash("Cannot find dag {}".format(dag_id))
             return redirect(origin)
 
-        execution_date = datetime.utcnow()
+        execution_date = timezone.utcnow()
         run_id = "manual__{0}".format(execution_date.isoformat())
 
         dr = DagRun.find(dag_id=dag_id, run_id=run_id)
@@ -1161,7 +1162,7 @@ class Airflow(BaseView):
         if base_date:
             base_date = dateutil.parser.parse(base_date)
         else:
-            base_date = dag.latest_execution_date or datetime.utcnow()
+            base_date = dag.latest_execution_date or timezone.utcnow()
 
         dates = dag.date_range(base_date, num=-abs(num_runs))
         min_date = dates[0] if dates else datetime(2000, 1, 1)
@@ -1217,7 +1218,7 @@ class Airflow(BaseView):
             def set_duration(tid):
                 if (isinstance(tid, dict) and tid.get("state") == 
State.RUNNING and
                             tid["start_date"] is not None):
-                    d = datetime.utcnow() - 
dateutil.parser.parse(tid["start_date"])
+                    d = timezone.utcnow() - 
dateutil.parser.parse(tid["start_date"])
                     tid["duration"] = d.total_seconds()
                 return tid
 
@@ -1314,7 +1315,7 @@ class Airflow(BaseView):
         if dttm:
             dttm = dateutil.parser.parse(dttm)
         else:
-            dttm = dag.latest_execution_date or datetime.utcnow().date()
+            dttm = dag.latest_execution_date or timezone.utcnow().date()
 
         DR = models.DagRun
         drs = (
@@ -1390,7 +1391,7 @@ class Airflow(BaseView):
         if base_date:
             base_date = dateutil.parser.parse(base_date)
         else:
-            base_date = dag.latest_execution_date or datetime.utcnow()
+            base_date = dag.latest_execution_date or timezone.utcnow()
 
         dates = dag.date_range(base_date, num=-abs(num_runs))
         min_date = dates[0] if dates else datetime(2000, 1, 1)
@@ -1497,7 +1498,7 @@ class Airflow(BaseView):
         if base_date:
             base_date = dateutil.parser.parse(base_date)
         else:
-            base_date = dag.latest_execution_date or datetime.utcnow()
+            base_date = dag.latest_execution_date or timezone.utcnow()
 
         dates = dag.date_range(base_date, num=-abs(num_runs))
         min_date = dates[0] if dates else datetime(2000, 1, 1)
@@ -1560,7 +1561,7 @@ class Airflow(BaseView):
         if base_date:
             base_date = dateutil.parser.parse(base_date)
         else:
-            base_date = dag.latest_execution_date or datetime.utcnow()
+            base_date = dag.latest_execution_date or timezone.utcnow()
 
         dates = dag.date_range(base_date, num=-abs(num_runs))
         min_date = dates[0] if dates else datetime(2000, 1, 1)
@@ -1651,7 +1652,7 @@ class Airflow(BaseView):
             DagModel).filter(DagModel.dag_id == dag_id).first()
 
         if orm_dag:
-            orm_dag.last_expired = datetime.utcnow()
+            orm_dag.last_expired = timezone.utcnow()
             session.merge(orm_dag)
         session.commit()
 
@@ -1687,7 +1688,7 @@ class Airflow(BaseView):
         if dttm:
             dttm = dateutil.parser.parse(dttm)
         else:
-            dttm = dag.latest_execution_date or datetime.utcnow().date()
+            dttm = dag.latest_execution_date or timezone.utcnow().date()
 
         form = DateTimeForm(data={'execution_date': dttm})
 
@@ -1698,7 +1699,7 @@ class Airflow(BaseView):
 
         tasks = []
         for ti in tis:
-            end_date = ti.end_date if ti.end_date else datetime.utcnow()
+            end_date = ti.end_date if ti.end_date else timezone.utcnow()
             tasks.append({
                 'startDate': wwwutils.epoch(ti.start_date),
                 'endDate': wwwutils.epoch(end_date),
@@ -2172,7 +2173,7 @@ class ChartModelView(wwwutils.DataProfilingMixin, 
AirflowModelView):
             model.iteration_no += 1
         if not model.user_id and current_user and hasattr(current_user, 'id'):
             model.user_id = current_user.id
-        model.last_modified = datetime.utcnow()
+        model.last_modified = timezone.utcnow()
 
 
 chart_mapping = (
@@ -2433,9 +2434,9 @@ class DagRunModelView(ModelViewOnly):
                 count += 1
                 dr.state = target_state
                 if target_state == State.RUNNING:
-                    dr.start_date = datetime.utcnow()
+                    dr.start_date = timezone.utcnow()
                 else:
-                    dr.end_date = datetime.utcnow()
+                    dr.end_date = timezone.utcnow()
             session.commit()
             models.DagStat.update(dirty_ids, session=session)
             flash(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c857436b/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 9408192..de2bd54 100644
--- a/setup.py
+++ b/setup.py
@@ -222,6 +222,7 @@ def do_setup():
             'future>=0.16.0, <0.17',
             'gitpython>=2.0.2',
             'gunicorn>=19.4.0, <20.0',
+            'iso8601>=0.1.12',
             'jinja2>=2.7.3, <2.9.0',
             'lxml>=3.6.0, <4.0',
             'markdown>=2.5.2, <3.0',


Reply via email to